EMQX 是一款「无限连接,任意集成,随处运行」的大规模分布式物联网接入平台,同时作为一个高性能、可扩展的 MQTT 消息服务器,它可以为物联网(IoT)应用提供可靠的实时消息传输和设备连接解决方案。
1 EMQX 的安装
以 CentOS7 为例,介绍 EMQX 的安装方法。
# 安装所需要的依赖包 yum install -y yum-utils device-mapper-persistent-data lvm2 # 使用以下命令设置稳定存储库,以 CentOS 7 为例 sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo # 安装最新版本的 EMQX yum install emqx
2 EMQX 快速上手
2.1 启动 Broker 服务
使用如下命令启动 EMQX,EMQX 的 MQTT 协议默认运行在 TCP 1883 端口,如有需要可进行修改,配置文件路径一般为 /etc/emqx/emqx.conf。
emqx start
EMQX 带有一个控制台面板,通过浏览器访问 EMQX 所在 IP 的 18083端口可进入控制台。若需远程访问,需要在配置文件中将控制台地址修改为0.0.0.0:18083。
http://localhost:18083
2.2 通过 MQTTX 模拟发布和订阅消息
MQTTX 是一个开源、跨平台的 MQTT 桌面客户端,最初由 EMQ开发,可以在 Linux、Windows、MacOS 上运行。
下载对应版本的 MQTTX 并安装后打开,界面如下图所示。
点击上图标记出的加号分别创建两个连接,用于模拟消息发布者和消息订阅者。创建连接时只需修改下图所示的三个位置,其余保持默认即可。![image-20241023134048744]
之后对两个连接分别点击下图标记位置的按钮,与 Server 建立连接,后文为方便介绍将两个连接分别称为 1 号连接和 2 号连接。
两个连接成功建立后,在 2 号连接中创建一个订阅,Topic 设置为 test/lwg,QoS 等级设置为 2。
(QoS 等级分为 3 中,0 表示消息最多到达一次,1 表示消息至少到达一次,2 表示消息仅到达一次)
然后返回 1 号连接,在右下角分别输入刚才在 2 号连接中输入的 Topic 和 JSON 格式的报文,点击发送按钮即可发送消息。
回到 2 号连接中,在已接收的消息中看到 1 号连接发送的报文。
2.3 通过 Python 模拟消息发布和消息订阅
除了使用 MQTTX 模拟 MQTT 客户端,还可以使用多种高级语言实现 MQTT 客户端,这里以 Python 为例。
首先,需要安装 Paho 模块:
pip install paho-mqtt
然后分贝编写两个脚本,模拟消息发布者和消息订阅者。
消息发布者 pub_tcp.py 如下:
# python 3.x import json import logging import random import time from paho.mqtt import client as mqtt_client # 需要修改为 EMQX 服务所在 IP BROKER = 'xx.xx.xx.xx' PORT = 1883 # BROKER = 'ma10c211.cn-shenzhen.mqttce.com' # PORT = 15428 TOPIC = "python-mqtt-test1/tcp" # generate client ID with pub prefix randomly CLIENT_ID = f'python-mqtt-tcp-pub-{random.randint(0, 1000)}' # 鉴权信息 # USERNAME = 'emqx' # PASSWORD = 'public' FIRST_RECONNECT_DELAY = 1 RECONNECT_RATE = 2 MAX_RECONNECT_COUNT = 12 MAX_RECONNECT_DELAY = 60 FLAG_EXIT = False def on_connect(client, userdata, flags, rc): if rc == 0 and client.is_connected(): print("Connected to MQTT Broker!") else: print(f'Failed to connect, return code {rc}') def on_disconnect(client, userdata, rc): logging.info("Disconnected with result code: %s", rc) reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY while reconnect_count < MAX_RECONNECT_COUNT: logging.info("Reconnecting in %d seconds...", reconnect_delay) time.sleep(reconnect_delay) try: client.reconnect() logging.info("Reconnected successfully!") return except Exception as err: logging.error("%s. Reconnect failed. Retrying...", err) reconnect_delay *= RECONNECT_RATE reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) reconnect_count += 1 logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count) global FLAG_EXIT FLAG_EXIT = True def connect_mqtt(): client = mqtt_client.Client(CLIENT_ID) # 设置用户名和密码 # client.username_pw_set(USERNAME, PASSWORD) client.on_connect = on_connect client.connect(BROKER, PORT, keepalive=120) client.on_disconnect = on_disconnect return client def publish(client): msg_count = 0 while not FLAG_EXIT: msg_dict = { 'msg': msg_count } msg = json.dumps(msg_dict) if not client.is_connected(): logging.error("publish: MQTT client is not connected!") time.sleep(1) continue result = client.publish(TOPIC, msg, qos=2) # result: [0, 1] status = result[0] if status == 0: print(f'Send `{msg}` to topic `{TOPIC}`') else: print(f'Failed to send message to topic {TOPIC}') msg_count += 1 time.sleep(1) def run(): logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s', level=logging.DEBUG) client = connect_mqtt() client.loop_start() time.sleep(1) if client.is_connected(): publish(client) else: client.loop_stop() if __name__ == '__main__': run()
消息订阅者 sub_tcp.py 如下:
# python 3.x import logging import random import time from paho.mqtt import client as mqtt_client from pub_tcp import CLIENT_ID # 需要修改为 EMQX 服务所在 IP BROKER = 'xx.xx.xx.xx' PORT = 1883 TOPIC = "python-mqtt-test1/tcp" # generate client ID with pub prefix randomly # CLIENT_ID = f'python-mqtt-tcp-sub-{random.randint(0, 1000)}' CLIENT_ID = 'python-mqtt-tcp-sub-692' print(CLIENT_ID) # USERNAME = 'emqx' # PASSWORD = 'public' FIRST_RECONNECT_DELAY = 1 RECONNECT_RATE = 2 MAX_RECONNECT_COUNT = 12 MAX_RECONNECT_DELAY = 60 FLAG_EXIT = False def on_connect(client, userdata, flags, rc): if rc == 0 and client.is_connected(): print("Connected to MQTT Broker!") client.subscribe(TOPIC, qos=2) else: print(f'Failed to connect, return code {rc}') def on_disconnect(client, userdata, rc): logging.info("Disconnected with result code: %s", rc) reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY while reconnect_count < MAX_RECONNECT_COUNT: logging.info("Reconnecting in %d seconds...", reconnect_delay) time.sleep(reconnect_delay) try: client.reconnect() logging.info("Reconnected successfully!") return except Exception as err: logging.error("%s. Reconnect failed. Retrying...", err) reconnect_delay *= RECONNECT_RATE reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) reconnect_count += 1 logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count) global FLAG_EXIT FLAG_EXIT = True def on_message(client, userdata, msg): print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic') def connect_mqtt(): client = mqtt_client.Client(CLIENT_ID, clean_session=False) # client.username_pw_set(USERNAME, PASSWORD) client.on_connect = on_connect client.on_message = on_message client.connect(BROKER, PORT, keepalive=120) client.on_disconnect = on_disconnect return client def run(): logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s', level=logging.DEBUG) client = connect_mqtt() client.loop_forever() if __name__ == '__main__': run()
分别运行两个脚本,发现消息订阅者可以收到消息。
- 本文固定链接: https://weiguangli.com/archives/715
- 转载请注明: lwg0452 于 Weiguang的博客 发表