一、前言
隨著物聯(lián)網(wǎng)技術(shù)的快速發(fā)展,越來越多的設(shè)備和系統(tǒng)需要通過網(wǎng)絡(luò)進(jìn)行連接和數(shù)據(jù)交換,以實(shí)現(xiàn)智能化管理和控制。華為云物聯(lián)網(wǎng)平臺(tái)作為業(yè)界領(lǐng)先的物聯(lián)網(wǎng)解決方案提供商,提供了穩(wěn)定可靠的MQTT服務(wù)器,使得設(shè)備能夠輕松接入云端,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)上傳和下發(fā)。
當(dāng)前決定開發(fā)一個(gè)基于Python的MQTT客戶端項(xiàng)目,利用paho-mqtt庫與華為云物聯(lián)網(wǎng)MQTT服務(wù)器進(jìn)行通信。該項(xiàng)目實(shí)現(xiàn)設(shè)備數(shù)據(jù)的定時(shí)上傳功能,確保設(shè)備狀態(tài)和數(shù)據(jù)能夠及時(shí)被云端系統(tǒng)獲取并處理。同時(shí),項(xiàng)目還具備接收服務(wù)器下發(fā)消息的能力,使得設(shè)備能夠根據(jù)云端指令進(jìn)行相應(yīng)的操作或響應(yīng)。
通過本項(xiàng)目,能夠構(gòu)建一個(gè)高效、穩(wěn)定的物聯(lián)網(wǎng)通信框架,為設(shè)備的遠(yuǎn)程監(jiān)控、控制和管理提供有力支持。通過定時(shí)上傳數(shù)據(jù),可以實(shí)時(shí)了解設(shè)備的運(yùn)行狀態(tài)、環(huán)境參數(shù)等關(guān)鍵信息,從而及時(shí)發(fā)現(xiàn)并解決潛在問題。同時(shí),接收服務(wù)器下發(fā)消息的功能也使得設(shè)備能夠靈活響應(yīng)云端指令,實(shí)現(xiàn)更加智能化的控制和管理。
本項(xiàng)目通過Python的paho-mqtt庫與華為云物聯(lián)網(wǎng)MQTT服務(wù)器進(jìn)行通信,實(shí)現(xiàn)設(shè)備數(shù)據(jù)的定時(shí)上傳和服務(wù)器消息的接收處理,為物聯(lián)網(wǎng)設(shè)備的遠(yuǎn)程監(jiān)控、控制和管理提供可靠的技術(shù)支持。
二、Python代碼編寫
2.1 安裝Paho MQTT庫
打開自己電腦的Python安裝目錄。
如果不知道自己的Python安裝目錄在哪里。
可以使用 which
或 where
命令
- 在Linux或macOS上,打開終端并輸入:
which python3
- 在Windows上,打開命令提示符并輸入:
where python
這些命令會(huì)顯示Python可執(zhí)行文件的路徑。
例如:
C:Users11266>where python
C:Users11266AppDataLocalProgramsPythonPython311python.exe
C:Users11266AppDataLocalMicrosoftWindowsAppspython.exe
找到pip.exe文件的路徑。
在文件路徑里輸入cmd
按下回車鍵,打開命令行。
在命令行輸入pip install paho-mqtt
安裝 paho-mqtt
庫。
例如:
C:Users11266AppDataLocalProgramsPythonPython311Scripts>pip install paho-mqtt
Collecting paho-mqtt
Downloading paho_mqtt-2.1.0-py3-none-any.whl (67 kB)
---------------------------------------- 67.2/67.2 kB 1.2 MB/s eta 0:00:00
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-2.1.0
[notice] A new release of pip is available: 23.1.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip
C:Users11266AppDataLocalProgramsPythonPython311Scripts>
2.2 paho-mqtt庫介紹
paho-mqtt
是一個(gè)Python客戶端庫,用于與MQTT代理服務(wù)器進(jìn)行通信。MQTT(Message Queuing Telemetry Transport)是一種輕量級(jí)的、發(fā)布-訂閱模式的消息傳輸協(xié)議,常用于物聯(lián)網(wǎng)(IoT)應(yīng)用和實(shí)時(shí)數(shù)據(jù)傳輸。paho-mqtt
庫提供了在Python中實(shí)現(xiàn)MQTT客戶端的功能,使你可以連接到MQTT代理服務(wù)器、訂閱主題、發(fā)布消息等。
特性
- 支持MQTT 3.1和3.1.1版本的協(xié)議規(guī)范。
- 提供同步和異步的消息發(fā)布和訂閱功能。
- 支持TLS/SSL加密連接,以確保安全的通信。
- 具有遺囑消息和保持活動(dòng)功能,以增強(qiáng)連接的穩(wěn)定性。
- 可以設(shè)置用戶名和密碼進(jìn)行連接認(rèn)證。
- 提供靈活的回調(diào)函數(shù)機(jī)制,用于處理連接、訂閱和接收消息等事件。
安裝
可以使用pip來安裝 paho-mqtt
庫:
pip install paho-mqtt
總結(jié)
paho-mqtt
庫提供了一個(gè)方便的方式來在Python應(yīng)用程序中實(shí)現(xiàn)MQTT客戶端功能。它支持多種MQTT特性,并且易于使用。通過這個(gè)庫,你可以輕松地構(gòu)建與MQTT代理服務(wù)器進(jìn)行通信的應(yīng)用程序,從而實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)傳輸、遠(yuǎn)程控制和監(jiān)控等功能。
2.3 我的華為云MQTT服務(wù)器信息
MQTT服務(wù)器IP地址: 117.78.5.125
MQTT端口號(hào): 1883
客戶端ID:65ec636771d845632aff9496_dev1_0_0_2024052901
用戶名:65ec636771d845632aff9496_dev1
登錄密碼:a6e312275a031e7629e3133fefeac555dbce6dc06b56c039dd8a224084ee5b44
訂閱主題:$oc/devices/65ec636771d845632aff9496_dev1/sys/messages/down
發(fā)布主題:$oc/devices/65ec636771d845632aff9496_dev1/sys/properties/report
發(fā)布的消息:{"services": [{"service_id": "stm32","properties":{"MQ135":60,"DHT11_T":24,"DHT11_H":60,"SOIL_H":50,"motor":1,"FLAME":0,"GPS":{"lon":120.21,"lat":30.19}}}]}
2.4 實(shí)現(xiàn)代碼
import paho.mqtt.client as mqtt
import json
import time
from threading import Timer
# MQTT 配置信息
MQTT_SERVER = "117.78.5.125"
MQTT_PORT = 1883
CLIENT_ID = "65ec636771d845632aff9496_dev1_0_0_2024052901"
USERNAME = "65ec636771d845632aff9496_dev1"
PASSWORD = "a6e312275a031e7629e3133fefeac555dbce6dc06b56c039dd8a224084ee5b44"
SUBSCRIBE_TOPIC = "$oc/devices/65ec636771d845632aff9496_dev1/sys/messages/down"
PUBLISH_TOPIC = "$oc/devices/65ec636771d845632aff9496_dev1/sys/properties/report"
PUBLISH_INTERVAL = 10 # 定時(shí)器間隔,單位:秒
# 要發(fā)布的消息內(nèi)容
message_payload = {
"services": [
{
"service_id": "stm32",
"properties": {
"MQ135": 60,
"DHT11_T": 24,
"DHT11_H": 60,
"SOIL_H": 50,
"motor": 1,
"FLAME": 0,
"GPS": {
"lon": 120.21,
"lat": 30.19
}
}
}
]
}
# 回調(diào)函數(shù) - 當(dāng)連接到服務(wù)器時(shí)被調(diào)用
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe(SUBSCRIBE_TOPIC)
# 回調(diào)函數(shù) - 當(dāng)收到服務(wù)器下發(fā)的消息時(shí)被調(diào)用
def on_message(client, userdata, msg):
print(f"Received message from topic '{msg.topic}': {msg.payload.decode()}")
# 定時(shí)發(fā)布消息
def publish_message():
client.publish(PUBLISH_TOPIC, json.dumps(message_payload))
print(f"Published message: {json.dumps(message_payload)}")
# 設(shè)置定時(shí)器,以繼續(xù)定期發(fā)布消息
Timer(PUBLISH_INTERVAL, publish_message).start()
# 創(chuàng)建MQTT客戶端并設(shè)置回調(diào)函數(shù)
client = mqtt.Client(CLIENT_ID, protocol=mqtt.MQTTv311)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
# 連接到MQTT服務(wù)器
client.connect(MQTT_SERVER, MQTT_PORT, 60)
# 啟動(dòng)網(wǎng)絡(luò)循環(huán)
client.loop_start()
# 啟動(dòng)定時(shí)發(fā)布消息
publish_message()
# 主線程保持運(yùn)行狀態(tài)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Disconnecting from MQTT server...")
client.loop_stop()
client.disconnect()
代碼說明
- 配置部分:設(shè)置了MQTT服務(wù)器信息、客戶端ID、用戶名、密碼、訂閱主題和發(fā)布主題等。
- 回調(diào)函數(shù):
on_connect
: 在連接成功后訂閱指定的主題。on_message
: 收到消息時(shí)打印消息內(nèi)容。
- 定時(shí)發(fā)布消息:使用
Timer
類定時(shí)發(fā)布消息到指定的主題。 - 主程序:創(chuàng)建并配置MQTT客戶端,連接到MQTT服務(wù)器,啟動(dòng)網(wǎng)絡(luò)循環(huán)和定時(shí)發(fā)布消息的功能。
運(yùn)行代碼
運(yùn)行此代碼后,將:
- 連接到指定的MQTT服務(wù)器。
- 訂閱
$oc/devices/65ec636771d845632aff9496_dev1/sys/messages/down
主題。 - 每10秒鐘向
$oc/devices/65ec636771d845632aff9496_dev1/sys/properties/report
主題發(fā)布一次消息。 - 打印所有從服務(wù)器接收到的消息。