首页 > 编程开发 > EMQX 快速上手
2024
10-23

EMQX 快速上手

EMQX 是一款「无限连接,任意集成,随处运行」的大规模分布式物联网接入平台,同时作为一个高性能、可扩展的 MQTT 消息服务器,它可以为物联网(IoT)应用提供可靠的实时消息传输和设备连接解决方案。

1 EMQX 的安装

以 CentOS7 为例,介绍 EMQX 的安装方法。

# 安装所需要的依赖包
yum install -y yum-utils device-mapper-persistent-data lvm2

# 使用以下命令设置稳定存储库,以 CentOS 7 为例
sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo

# 安装最新版本的 EMQX
yum install emqx

2 EMQX 快速上手

2.1 启动 Broker 服务

使用如下命令启动 EMQX,EMQX 的 MQTT 协议默认运行在 TCP 1883 端口,如有需要可进行修改,配置文件路径一般为 /etc/emqx/emqx.conf。

emqx start

EMQX 带有一个控制台面板,通过浏览器访问 EMQX 所在 IP 的 18083端口可进入控制台。若需远程访问,需要在配置文件中将控制台地址修改为0.0.0.0:18083。

http://localhost:18083

EMQX 快速上手 - 第1张  | Weiguang的博客

2.2 通过 MQTTX 模拟发布和订阅消息

MQTTX 是一个开源、跨平台的 MQTT 桌面客户端,最初由 EMQ开发,可以在 Linux、Windows、MacOS 上运行。

下载对应版本的 MQTTX 并安装后打开,界面如下图所示。

EMQX 快速上手 - 第2张  | Weiguang的博客

点击上图标记出的加号分别创建两个连接,用于模拟消息发布者和消息订阅者。创建连接时只需修改下图所示的三个位置,其余保持默认即可。![image-20241023134048744]
EMQX 快速上手 - 第3张  | Weiguang的博客

之后对两个连接分别点击下图标记位置的按钮,与 Server 建立连接,后文为方便介绍将两个连接分别称为 1 号连接和 2 号连接。

EMQX 快速上手 - 第4张  | Weiguang的博客

两个连接成功建立后,在 2 号连接中创建一个订阅,Topic 设置为 test/lwg,QoS 等级设置为 2。

(QoS 等级分为 3 中,0 表示消息最多到达一次,1 表示消息至少到达一次,2 表示消息仅到达一次)

EMQX 快速上手 - 第5张  | Weiguang的博客

然后返回 1 号连接,在右下角分别输入刚才在 2 号连接中输入的 Topic 和 JSON 格式的报文,点击发送按钮即可发送消息。

EMQX 快速上手 - 第6张  | Weiguang的博客

回到 2 号连接中,在已接收的消息中看到 1 号连接发送的报文。

EMQX 快速上手 - 第7张  | Weiguang的博客

2.3 通过 Python 模拟消息发布和消息订阅

除了使用 MQTTX 模拟 MQTT 客户端,还可以使用多种高级语言实现 MQTT 客户端,这里以 Python 为例。

首先,需要安装 Paho 模块:

pip install paho-mqtt

然后分贝编写两个脚本,模拟消息发布者和消息订阅者。

消息发布者 pub_tcp.py 如下:

# python 3.x

import json
import logging
import random
import time

from paho.mqtt import client as mqtt_client

# 需要修改为 EMQX 服务所在 IP
BROKER = 'xx.xx.xx.xx'
PORT = 1883
# BROKER = 'ma10c211.cn-shenzhen.mqttce.com'
# PORT = 15428
TOPIC = "python-mqtt-test1/tcp"
# generate client ID with pub prefix randomly
CLIENT_ID = f'python-mqtt-tcp-pub-{random.randint(0, 1000)}'
# 鉴权信息
# USERNAME = 'emqx'
# PASSWORD = 'public'

FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60

FLAG_EXIT = False


def on_connect(client, userdata, flags, rc):
    if rc == 0 and client.is_connected():
        print("Connected to MQTT Broker!")
    else:
        print(f'Failed to connect, return code {rc}')


def on_disconnect(client, userdata, rc):
    logging.info("Disconnected with result code: %s", rc)
    reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
    while reconnect_count < MAX_RECONNECT_COUNT:
        logging.info("Reconnecting in %d seconds...", reconnect_delay)
        time.sleep(reconnect_delay)

        try:
            client.reconnect()
            logging.info("Reconnected successfully!")
            return
        except Exception as err:
            logging.error("%s. Reconnect failed. Retrying...", err)

        reconnect_delay *= RECONNECT_RATE
        reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
        reconnect_count += 1
    logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
    global FLAG_EXIT
    FLAG_EXIT = True


def connect_mqtt():
    client = mqtt_client.Client(CLIENT_ID)
    # 设置用户名和密码
    # client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.connect(BROKER, PORT, keepalive=120)
    client.on_disconnect = on_disconnect
    return client


def publish(client):
    msg_count = 0
    while not FLAG_EXIT:
        msg_dict = {
            'msg': msg_count
        }
        msg = json.dumps(msg_dict)
        if not client.is_connected():
            logging.error("publish: MQTT client is not connected!")
            time.sleep(1)
            continue
        result = client.publish(TOPIC, msg, qos=2)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f'Send `{msg}` to topic `{TOPIC}`')
        else:
            print(f'Failed to send message to topic {TOPIC}')
        msg_count += 1
        time.sleep(1)


def run():
    logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    client = connect_mqtt()
    client.loop_start()
    time.sleep(1)
    if client.is_connected():
        publish(client)
    else:
        client.loop_stop()


if __name__ == '__main__':
    run()

消息订阅者 sub_tcp.py 如下:

# python 3.x

import logging
import random
import time

from paho.mqtt import client as mqtt_client

from pub_tcp import CLIENT_ID

# 需要修改为 EMQX 服务所在 IP
BROKER = 'xx.xx.xx.xx'
PORT = 1883
TOPIC = "python-mqtt-test1/tcp"
# generate client ID with pub prefix randomly
# CLIENT_ID = f'python-mqtt-tcp-sub-{random.randint(0, 1000)}'
CLIENT_ID = 'python-mqtt-tcp-sub-692'
print(CLIENT_ID)
# USERNAME = 'emqx'
# PASSWORD = 'public'

FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60

FLAG_EXIT = False


def on_connect(client, userdata, flags, rc):
    if rc == 0 and client.is_connected():
        print("Connected to MQTT Broker!")
        client.subscribe(TOPIC, qos=2)
    else:
        print(f'Failed to connect, return code {rc}')


def on_disconnect(client, userdata, rc):
    logging.info("Disconnected with result code: %s", rc)
    reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
    while reconnect_count < MAX_RECONNECT_COUNT:
        logging.info("Reconnecting in %d seconds...", reconnect_delay)
        time.sleep(reconnect_delay)

        try:
            client.reconnect()
            logging.info("Reconnected successfully!")
            return
        except Exception as err:
            logging.error("%s. Reconnect failed. Retrying...", err)

        reconnect_delay *= RECONNECT_RATE
        reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
        reconnect_count += 1
    logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
    global FLAG_EXIT
    FLAG_EXIT = True


def on_message(client, userdata, msg):
    print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')


def connect_mqtt():
    client = mqtt_client.Client(CLIENT_ID, clean_session=False)
    # client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER, PORT, keepalive=120)
    client.on_disconnect = on_disconnect
    return client


def run():
    logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    client = connect_mqtt()
    client.loop_forever()


if __name__ == '__main__':
    run()

分别运行两个脚本,发现消息订阅者可以收到消息。

EMQX 快速上手 - 第8张  | Weiguang的博客

最后编辑:
作者:lwg0452
这个作者貌似有点懒,什么都没有留下。
捐 赠如果您觉得这篇文章有用处,请支持作者!鼓励作者写出更好更多的文章!

留下一个回复

你的email不会被公开。