使用Python语言实现一个MQTT客户端的应用,用于接收LoRaWAN节点的数据,同时向节点发送数据。数据使用了16进制方式来处理。

下面应用,网关,节点设备的交换流程图
gw102-lorawan-app-txrx.png

这里建议使用uv作为python项目的管理工具。创建项目后,添加mqtt-paho依赖库。

uv init lorawanpy
cd lorawanpy
uv add paho-mqtt

复制python代码替换到默认的main.py文件内。

import paho.mqtt.client as mqtt
import json
import base64

# define mqtt server, deveui
ChirpStackServer = "192.168.30.50"
devEUI = "eb03290421c69c6a"
ApplicationID = "b3525049-5fea-4f22-a60d-6510ce7004e1"

# MQTT回调函数
def on_connect(client, userdata, flags, reason_code, properties):
    print(f"Connected with result code {reason_code}")
    # 订阅主题
    client.subscribe("application/+/device/+/event/up")

def on_message(client, userdata, msg):
    try:
        # 解析JSON数据
        payload = json.loads(msg.payload.decode())
        print(f"Received message on topic {msg.topic}: {payload}")

        # 获取并解码data字段
        if 'data' in payload:
            # 打印原始Base64编码数据
            print(f"Base64 data: {payload['data']}")
            
            # Base64解码
            decoded_bytes = base64.b64decode(payload['data'])
            
            # 将解码后的字节数据转换为十六进制字符串
            hex_data = decoded_bytes.hex()
            print(f"Decoded data (hex): {hex_data}")
        else:
            print("No data field in the message.")
    except json.JSONDecodeError:
        print("Failed to decode JSON from message.")
    except Exception as e:
        print(f"An error occurred: {e}")

def parse_command(command, client):
    try:
        # 分割命令字符串
        parts = command.split(',')
        if len(parts) != 3:
            raise ValueError("Invalid command format")

        # 解析命令
        action = parts[0]
        fport = int(parts[1])
        data_hex = parts[2]

        # 检查动作是否为发送
        if action.lower() != 's':
            raise ValueError("Unsupported action")

        # 将十六进制数据转换为字节
        data_bytes = bytes.fromhex(data_hex[2:])  # 去掉0x前缀

        # 将字节数据编码为Base64
        data_base64 = base64.b64encode(data_bytes).decode('utf-8')

        # 构建JSON对象
        json_data = {
            "devEui": devEUI,
            "confirmed": False,
            "fPort": fport,
            "data": data_base64
        }

        # 将JSON数据转换为字符串
        json_str = json.dumps(json_data)

        # 发布到MQTT主题
        topic = f"application/{ApplicationID}/device/{devEUI}/command/down"
        client.publish(topic, json_str)
        print(f"Published JSON to topic {topic}: {json_str}")

    except Exception as e:
        print(f"Error parsing command: {e}")

def main():
    # 创建MQTT客户端
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    client.on_connect = on_connect
    client.on_message = on_message

    # 连接到ChirpStack的MQTT broker
    client.connect(ChirpStackServer, 1883, 60)

    # 开始循环以处理网络流量和回调
    client.loop_start()

    # 从终端接收输入命令
    while True:
        command = input("")
        parse_command(command, client)

if __name__ == "__main__":
    main()

程序执行如下

uv run main.py
s,12,0x22
Published JSON to topic application/b3525049-5fea-4f22-a60d-6510ce7004e1/device/eb03290421c69c6a/command/down: {"devEui": "eb03290421c69c6a", "confirmed": false, "fPort": 12, "data": "Ig=="}

这里使用"s,12,0x22"发送一个数据包,含义为fPort为12,数据内容为0x22。可以在chirpstack上查看到已经进入发送队列了,当节点再次发送数据时,打开接收窗口,就能收到这个数据包。

作者:SteveChen  创建时间:2025-04-30 10:35
最后编辑:SteveChen  更新时间:2025-05-08 17:19