EMQX 快速上手
EMQX 是一款「无限连接,任意集成,随处运行」的大规模分布式物联网接入平台,同时作为一个高性能、可扩展的 MQTT 消息服务器,它可以为物联网(IoT)应用提供可靠的实时消息传输和设备连接解决方案。
1 EMQX 的安装
以 CentOS7 为例,介绍 EMQX 的安装方法。
# 安装所需要的依赖包
yum install -y yum-utils device-mapper-persistent-data lvm2
# 使用以下命令设置稳定存储库,以 CentOS 7 为例
sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo
# 安装最新版本的 EMQX
yum install emqx
2 EMQX 快速上手
2.1 启动 Broker 服务
使用如下命令启动 EMQX,EMQX 的 MQTT 协议默认运行在 TCP 1883 端口,如有需要可进行修改,配置文件路径一般为 /etc/emqx/emqx.conf。
emqx start
EMQX 带有一个控制台面板,通过浏览器访问 EMQX 所在 IP 的 18083端口可进入控制台。若需远程访问,需要在配置文件中将控制台地址修改为0.0.0.0:18083。
http://localhost:18083

2.2 通过 MQTTX 模拟发布和订阅消息
MQTTX 是一个开源、跨平台的 MQTT 桌面客户端,最初由 EMQ开发,可以在 Linux、Windows、MacOS 上运行。
下载对应版本的 MQTTX 并安装后打开,界面如下图所示。

点击上图标记出的加号分别创建两个连接,用于模拟消息发布者和消息订阅者。创建连接时只需修改下图所示的三个位置,其余保持默认即可。![image-20241023134048744]
之后对两个连接分别点击下图标记位置的按钮,与 Server 建立连接,后文为方便介绍将两个连接分别称为 1 号连接和 2 号连接。

两个连接成功建立后,在 2 号连接中创建一个订阅,Topic 设置为 test/lwg,QoS 等级设置为 2。
(QoS 等级分为 3 中,0 表示消息最多到达一次,1 表示消息至少到达一次,2 表示消息仅到达一次)

然后返回 1 号连接,在右下角分别输入刚才在 2 号连接中输入的 Topic 和 JSON 格式的报文,点击发送按钮即可发送消息。

回到 2 号连接中,在已接收的消息中看到 1 号连接发送的报文。

2.3 通过 Python 模拟消息发布和消息订阅
除了使用 MQTTX 模拟 MQTT 客户端,还可以使用多种高级语言实现 MQTT 客户端,这里以 Python 为例。
首先,需要安装 Paho 模块:
pip install paho-mqtt
然后分贝编写两个脚本,模拟消息发布者和消息订阅者。
消息发布者 pub_tcp.py 如下:
# python 3.x
import json
import logging
import random
import time
from paho.mqtt import client as mqtt_client
# 需要修改为 EMQX 服务所在 IP
BROKER = 'xx.xx.xx.xx'
PORT = 1883
# BROKER = 'ma10c211.cn-shenzhen.mqttce.com'
# PORT = 15428
TOPIC = "python-mqtt-test1/tcp"
# generate client ID with pub prefix randomly
CLIENT_ID = f'python-mqtt-tcp-pub-{random.randint(0, 1000)}'
# 鉴权信息
# USERNAME = 'emqx'
# PASSWORD = 'public'
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
FLAG_EXIT = False
def on_connect(client, userdata, flags, rc):
if rc == 0 and client.is_connected():
print("Connected to MQTT Broker!")
else:
print(f'Failed to connect, return code {rc}')
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
# 设置用户名和密码
# client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.connect(BROKER, PORT, keepalive=120)
client.on_disconnect = on_disconnect
return client
def publish(client):
msg_count = 0
while not FLAG_EXIT:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
if not client.is_connected():
logging.error("publish: MQTT client is not connected!")
time.sleep(1)
continue
result = client.publish(TOPIC, msg, qos=2)
# result: [0, 1]
status = result[0]
if status == 0:
print(f'Send `{msg}` to topic `{TOPIC}`')
else:
print(f'Failed to send message to topic {TOPIC}')
msg_count += 1
time.sleep(1)
def run():
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
level=logging.DEBUG)
client = connect_mqtt()
client.loop_start()
time.sleep(1)
if client.is_connected():
publish(client)
else:
client.loop_stop()
if __name__ == '__main__':
run()
消息订阅者 sub_tcp.py 如下:
# python 3.x
import logging
import random
import time
from paho.mqtt import client as mqtt_client
from pub_tcp import CLIENT_ID
# 需要修改为 EMQX 服务所在 IP
BROKER = 'xx.xx.xx.xx'
PORT = 1883
TOPIC = "python-mqtt-test1/tcp"
# generate client ID with pub prefix randomly
# CLIENT_ID = f'python-mqtt-tcp-sub-{random.randint(0, 1000)}'
CLIENT_ID = 'python-mqtt-tcp-sub-692'
print(CLIENT_ID)
# USERNAME = 'emqx'
# PASSWORD = 'public'
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
FLAG_EXIT = False
def on_connect(client, userdata, flags, rc):
if rc == 0 and client.is_connected():
print("Connected to MQTT Broker!")
client.subscribe(TOPIC, qos=2)
else:
print(f'Failed to connect, return code {rc}')
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
def on_message(client, userdata, msg):
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID, clean_session=False)
# client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=120)
client.on_disconnect = on_disconnect
return client
def run():
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
level=logging.DEBUG)
client = connect_mqtt()
client.loop_forever()
if __name__ == '__main__':
run()
分别运行两个脚本,发现消息订阅者可以收到消息。
