第 2 章 - 安装和使用 Azure RTOS NetX Duo MQTT 客户端

本章介绍了与安装、设置和使用 Azure RTOS NetX Duo MQTT 客户端组件相关的各种问题。

产品分发

可从 https://github.com/azure-rtos/netxduo 获取 NetX Duo 的 MQTT 客户端。 此程序包包含两个源文件、一个 include 文件,以及一个包含本文档的文件,如下所示:

  • nxd_mqtt_client.h:用于 NetX Duo 的 MQTT 客户端的头文件
  • nxd_mqtt_client.c:用于 NetX Duo 的 MQTT 客户端的 C 源文件
  • nxd_mqtt_client.pdf:用于 NetX Duo 的 MQTT 客户端的说明
  • demo_mqtt_client.c:NetX Duo MQTT 演示

MQTT 客户端安装

若要使用 NetX Duo 的 MQTT 客户端,应将之前提到的全部分发文件复制到安装了 NetX Duo 的目录。 例如,如果 NetX Duo 安装在“\threadx\arm7\green”目录中,则需要将用于 NetX Duo MQTT 客户端的 nxd_mqtt_client.h 和 nxd_mqtt_client.c 复制到该目录中 。

使用 MQTT 客户端

使用 NetX Duo 的 MQTT 客户端非常容易。 基本上来说,应用程序代码必须先包含 tx_api.h 和 nx_api.h,然后必须包含 nxd_mqtt_client.h,才能分别使用 ThreadX 和 NetX Duo。 包含 MQTT 客户端头文件后,应用程序代码将能够使用本指南后面部分所述的 MQTT 服务。 应用程序还必须在生成过程中包含 nxd_mqtt_client.c。 这些文件的编译方式必须与其他应用程序文件相同,并且其对象窗体必须与应用程序的文件链接起来。 这就是使用 NetX Duo MQTT 客户端所需的全部内容。

将 MQTT 客户端与 NetX Secure TLS 配合使用

若要将 MQTT 客户端与 NetX Secure TLS 模块配合使用,应用程序必须已安装 NetX Secure TLS 模块,并且必须包含 nx_secure_tls_api.h 和 nx_crypto.h。 构建 MQTT 库时必须定义符号 NX_SECURE_ENABLE。

配置选项

构建 NetX Duo 的 MQTT 客户端时,有多个配置选项可用。 下面是所有选项的列表,其中包含每个选项的详细说明。 此处列出了默认值,但可以在包含 nxd_mqtt_client.h 之前重新定义这些值。

  • NX_DISABLE_ERROR_CHECKING:定义后,此选项会删除基本的 MQTT 客户端错误检查。 通常会在调试应用程序后使用此选项。
  • NX_SECURE_ENABLE:定义此项后,构建 MQTT 客户端时将支持 TLS。 定义此符号需要安装 NetX Secure TLS 模块。 默认情况下不会启用 NX_SECURE_ENABLE。**
  • NXD_MQTT_REQUIRE_TLS:定义了此项后,应用程序必须使用 TLS 连接到 MQTT 中转站。 此功能要求定义 NX_SECURE_ENABLE。 默认情况下未定义此符号。
  • NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH:如果已启用,则会启用 MQTT 传输队列。 它必须为正整数。
  • NXD_MQTT_MAX_TOPIC_NAME_LENGTH:已弃用。
  • NXD_MQTT_MAX_MESSAGE_LENGTH:已弃用。
  • NXD_MQTT_KEEPALIVE_TIMER_RATE:定义 MQTT 计时器速率(以 ThreadX 计时器时钟周期为单位)。 此计时器用于跟踪自上次发送 MQTT 控制消息以来过去的时间,并在保持活动状态时间到期之前发送 MQTT PINGREQ 消息。 如果客户端连接到设置了保持活动状态计时器值的中转站,则会激活此计时器。 默认值为 TX_TIMER_TICKS_PER_SECOND,这是一个一秒计时器。
  • NXD_MQTT_PING_TIMEOUT_DELAY:定义 MQTT 客户端在发出 MQTT PINGREQ 之后等待来自中转站的 PINGRESP 的时间。 如果在此超时延迟之后未收到 PINGRESP,则客户端会认为该中转站无响应,因此会断开其与中转站的连接。 默认的 PING 超时延迟为 TX_TIMER_TICKS_PER_SECOND,即 1 秒。
  • NXD_MQTT_SOCKET_TIMEOUT:定义与 MQTT 服务器断开连接时 TCP 套接字断开连接调用中的超时(以计时器时钟周期为单位)。 默认值为 NX_WAIT_FOREVER。

示例 MQTT 程序

以下程序演示了一个简单的 MQTT 应用程序。 为简单起见,将假定返回代码是成功的,因此不会执行进一步的错误检查。

#define LOCAL_SERVER_ADDRESS (IP_ADDRESS(192, 168, 1, 81))

/*******************************************************/
/* IOT MQTT Client Example                             */
/*******************************************************/
#define DEMO_STACK_SIZE 2048
#define CLIENT_ID_STRING "mytestclient"
#define MQTT_CLIENT_STACK_SIZE 4096
#define STRLEN(p) (sizeof(p) - 1)

/* Declare the MQTT thread stack space. */
static ULONG mqtt_client_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)];

/* Declare the MQTT client control block. */
static NXD_MQTT_CLIENT mqtt_client;

/* Define the symbol for signaling a received message. */

/* Define the test threads. */

#define TOPIC_NAME "topic"

#define MESSAGE_STRING "This is a message. "

/* Define the priority of the MQTT internal thread. */
#define MQTT_THREAD_PRIORTY 2

/* Define the MQTT keep alive timer for 5 minutes */
#define MQTT_KEEP_ALIVE_TIMER 300
#define QOS0 0
#define QOS1 1

/* Declare event flag, which is used in this demo. */
TX_EVENT_FLAGS_GROUP mqtt_app_flag;
#define DEMO_MESSAGE_EVENT 1
#define DEMO_ALL_EVENTS 3

/* Declare buffers to hold message and topic. */
static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];

/* Declare the disconnect notify function. */
static VOID my_disconnect_func(NXD_MQTT_CLIENT *client_ptr)
{
    printf("client disconnected from server\n");
}

static VOID my_notify_func(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages)
{
    tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR);
    return;
}

static ULONG error_counter;
void demo_mqtt_client_local(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr)
{
    UINT status;
    NXD_ADDRESS server_ip;
    ULONG events;
    UINT topic_length, message_length;

    /* Create MQTT client instance. */
    nxd_mqtt_client_create(&mqtt_client, "my_client",
        CLIENT_ID_STRING, STRLEN(CLIENT_ID_STRING), ip_ptr, pool_ptr,
        (VOID*)mqtt_client_stack, sizeof(mqtt_client_stack),
        MQTT_THREAD_PRIORTY, NX_NULL, 0);

    /* Register the disconnect notification function. */
    nxd_mqtt_client_disconnect_notify_set(&mqtt_client, my_disconnect_func);

    /* Create an event flag for this demo. */
    status = tx_event_flags_create(&mqtt_app_flag, "my app event");
    server_ip.nxd_ip_version = 4;
    server_ip.nxd_ip_address.v4 = LOCAL_SERVER_ADDRESS;

    /* Start the connection to the server. */
    nxd_mqtt_client_connect(&mqtt_client, &server_ip, NXD_MQTT_PORT, 
        MQTT_KEEP_ALIVE_TIMER, 0, NX_WAIT_FOREVER);

    /* Subscribe to the topic with QoS level 0. */
    nxd_mqtt_client_subscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME),
        QOS0);

    /* Set the receive notify function. */
    nxd_mqtt_client_receive_notify_set(&mqtt_client, my_notify_func);

    /* Publish a message with QoS Level 1. */
    nxd_mqtt_client_publish(&mqtt_client, TOPIC_NAME,
        STRLEN(TOPIC_NAME), (CHAR*)MESSAGE_STRING, 
        STRLEN(MESSAGE_STRING), 0, QOS1, NX_WAIT_FOREVER);

    /* Now wait for the broker to publish the message. */
    tx_event_flags_get(&mqtt_app_flag, DEMO_ALL_EVENTS,
        TX_OR_CLEAR, &events, TX_WAIT_FOREVER);

    if(events & DEMO_MESSAGE_EVENT)
    {
        nxd_mqtt_client_message_get(&mqtt_client, topic_buffer,
            sizeof(topic_buffer), &topic_length, message_buffer,
            sizeof(message_buffer), &message_length);

        topic_buffer[topic_length] = 0;

        message_buffer[message_length] = 0;

        printf("topic = %s, message = %s\n", topic_buffer, message_buffer);
    }

    /* Now unsubscribe the topic. */
    nxd_mqtt_client_unsubscribe(&mqtt_client, TOPIC_NAME,
        STRLEN(TOPIC_NAME));

    /* Disconnect from the broker. */
    nxd_mqtt_client_disconnect(&mqtt_client);

    /* Delete the client instance, release all the resources. */
    nxd_mqtt_client_delete(&mqtt_client);
    return;
}