python应用实现LoRaWAN节点数据收发
使用Python语言实现一个MQTT客户端的应用,用于接收LoRaWAN节点的数据,同时向节点发送数据。数据使用了16进制方式来处理。
下面应用,网关,节点设备的交换流程图
这里建议使用uv作为python项目的管理工具。创建项目后,添加mqtt-paho依赖库。
uv init lorawanpy
cd lorawanpy
uv add paho-mqtt
复制python代码替换到默认的main.py文件内。
import paho.mqtt.client as mqtt
import json
import base64
# define mqtt server, deveui
ChirpStackServer = "192.168.30.50"
devEUI = "eb03290421c69c6a"
ApplicationID = "b3525049-5fea-4f22-a60d-6510ce7004e1"
# MQTT回调函数
def on_connect(client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}")
# 订阅主题
client.subscribe("application/+/device/+/event/up")
def on_message(client, userdata, msg):
try:
# 解析JSON数据
payload = json.loads(msg.payload.decode())
print(f"Received message on topic {msg.topic}: {payload}")
# 获取并解码data字段
if 'data' in payload:
# 打印原始Base64编码数据
print(f"Base64 data: {payload['data']}")
# Base64解码
decoded_bytes = base64.b64decode(payload['data'])
# 将解码后的字节数据转换为十六进制字符串
hex_data = decoded_bytes.hex()
print(f"Decoded data (hex): {hex_data}")
else:
print("No data field in the message.")
except json.JSONDecodeError:
print("Failed to decode JSON from message.")
except Exception as e:
print(f"An error occurred: {e}")
def parse_command(command, client):
try:
# 分割命令字符串
parts = command.split(',')
if len(parts) != 3:
raise ValueError("Invalid command format")
# 解析命令
action = parts[0]
fport = int(parts[1])
data_hex = parts[2]
# 检查动作是否为发送
if action.lower() != 's':
raise ValueError("Unsupported action")
# 将十六进制数据转换为字节
data_bytes = bytes.fromhex(data_hex[2:]) # 去掉0x前缀
# 将字节数据编码为Base64
data_base64 = base64.b64encode(data_bytes).decode('utf-8')
# 构建JSON对象
json_data = {
"devEui": devEUI,
"confirmed": False,
"fPort": fport,
"data": data_base64
}
# 将JSON数据转换为字符串
json_str = json.dumps(json_data)
# 发布到MQTT主题
topic = f"application/{ApplicationID}/device/{devEUI}/command/down"
client.publish(topic, json_str)
print(f"Published JSON to topic {topic}: {json_str}")
except Exception as e:
print(f"Error parsing command: {e}")
def main():
# 创建MQTT客户端
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
# 连接到ChirpStack的MQTT broker
client.connect(ChirpStackServer, 1883, 60)
# 开始循环以处理网络流量和回调
client.loop_start()
# 从终端接收输入命令
while True:
command = input("")
parse_command(command, client)
if __name__ == "__main__":
main()
程序执行如下
uv run main.py
s,12,0x22
Published JSON to topic application/b3525049-5fea-4f22-a60d-6510ce7004e1/device/eb03290421c69c6a/command/down: {"devEui": "eb03290421c69c6a", "confirmed": false, "fPort": 12, "data": "Ig=="}
这里使用"s,12,0x22"发送一个数据包,含义为fPort为12,数据内容为0x22。可以在chirpstack上查看到已经进入发送队列了,当节点再次发送数据时,打开接收窗口,就能收到这个数据包。
作者:SteveChen 创建时间:2025-04-30 10:35
最后编辑:SteveChen 更新时间:2025-05-08 17:19
最后编辑:SteveChen 更新时间:2025-05-08 17:19