From c3d9db335ec309b28f85545bb87db392614920eb Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 25 Apr 2025 11:46:19 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E8=AF=95=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TC1/main.c | 5 +- TC1/mqtt_server/user_mqtt_client.c | 99 +++++++++++++++++------------- TC1/mqtt_server/user_mqtt_client.h | 8 +-- TC1/user_wifi.c | 8 ++- 4 files changed, 69 insertions(+), 51 deletions(-) diff --git a/TC1/main.c b/TC1/main.c index 30b3838..00a047b 100644 --- a/TC1/main.c +++ b/TC1/main.c @@ -190,7 +190,10 @@ int application_start(void) { require_noerr(err, exit); PowerInit(); AppHttpdStart(); // start http server thread - udp_server_start(); +// udp_server_start(); +//if (!(MQTT_SERVER[0] < 0x20 || MQTT_SERVER[0] > 0x7f || MQTT_SERVER_PORT < 1)){ +// UserMqttInit(); +// } UserLedSet(user_config->power_led_enabled); err = mico_rtos_create_thread(NULL, MICO_APPLICATION_PRIORITY, "p_count", diff --git a/TC1/mqtt_server/user_mqtt_client.c b/TC1/mqtt_server/user_mqtt_client.c index 77264c1..20198ee 100644 --- a/TC1/mqtt_server/user_mqtt_client.c +++ b/TC1/mqtt_server/user_mqtt_client.c @@ -36,14 +36,18 @@ typedef struct { uint32_t datalen; } mqtt_recv_msg_t, *p_mqtt_recv_msg_t, mqtt_send_msg_t, *p_mqtt_send_msg_t; +typedef struct { + Client client; + Network network; + // 其他状态、配置参数…… +} mqtt_context_t; + static void MqttClientThread(mico_thread_arg_t arg); static void MessageArrived(MessageData *md); static void MqttClientThread2(mico_thread_arg_t arg); -static void MessageArrived2(MessageData *md); - static OSStatus MqttMsgPublish(Client *c, const char *topic, char qos, char retained, const unsigned char *msg, uint32_t msg_len); @@ -57,14 +61,9 @@ bool isconnect2 = false; mico_queue_t mqtt_msg_send_queue = NULL; mico_queue_t mqtt_msg_send_queue2 = NULL; -Client c; // mqtt client object -Network n; // socket network for mqtt client -Client c2; // mqtt client object -Network n2; // socket network for mqtt client volatile bool mqtt_thread_should_exit = false; static mico_worker_thread_t mqtt_client_worker_thread; /* Worker thread to manage send/recv events */ -static mico_worker_thread_t mqtt_client_worker_thread2; /* Worker thread to manage send/recv events */ //static mico_timed_event_t mqtt_client_send_event; @@ -175,20 +174,28 @@ OSStatus UserMqttInit(void) { require_noerr_action(err, exit, mqtt_log("ERROR: create mqtt msg send queue err=%d.", err)); /* start mqtt client */ - err = mico_rtos_create_thread(NULL, MICO_APPLICATION_PRIORITY, "mqtt_client", - (mico_thread_function_t) MqttClientThread, - mqtt_thread_stack_size, 0); + mqtt_context_t *ctx1 = malloc(sizeof(mqtt_context_t)); + memset(ctx1, 0, sizeof(mqtt_context_t)); + + err = mico_rtos_create_thread(NULL, MICO_APPLICATION_PRIORITY, "mqtt_client_1", + (mico_thread_function_t)MqttClientThread, + mqtt_thread_stack_size, ctx1); require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client thread."); - /* start mqtt client */ - err = mico_rtos_create_thread(NULL, MICO_APPLICATION_PRIORITY, "mqtt_client_2", - (mico_thread_function_t) MqttClientThread2, - mqtt_thread_stack_size, 0); + // 第二个客户端 + mqtt_context_t *ctx2 = malloc(sizeof(mqtt_context_t)); + memset(ctx2, 0, sizeof(mqtt_context_t)); + + err = mico_rtos_create_thread(NULL, MICO_APPLICATION_PRIORITY, "mqtt_client_2", + (mico_thread_function_t)MqttClientThread2, + mqtt_thread_stack_size, ctx2); + + require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client thread2."); /* Create a worker thread for user handling MQTT data event */ err = mico_rtos_create_worker_thread(&mqtt_client_worker_thread, MICO_APPLICATION_PRIORITY, - 0x800, 5); + 0x1000, 5); require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client worker thread."); exit: @@ -242,13 +249,16 @@ static OSStatus MqttMsgPublish(Client *c, const char *topic, char qos, char reta void registerMqttEvents(void) { timer_status = 0; mico_start_timer(&timer_handle); +} +void registerMqttEvents2(void) { timer_status_2 = 0; mico_start_timer(&timer_handle2); } void MqttClientThread(mico_thread_arg_t arg) { OSStatus err = kUnknownErr; - + mqtt_context_t *ctx = (mqtt_context_t *)arg; + // 后续一直用 ctx->client 和 ctx->network int rc = -1; fd_set readfds; struct timeval t = {0, MQTT_YIELD_TMIE * 1000}; @@ -262,8 +272,8 @@ void MqttClientThread(mico_thread_arg_t arg) { mqtt_log("MQTT client thread started..."); - memset(&c, 0, sizeof(c)); - memset(&n, 0, sizeof(n)); + memset(&ctx->client, 0, sizeof(ctx->client)); + memset(&ctx->network, 0, sizeof(ctx->network)); /* create msg send queue event fd */ msg_send_event_fd = mico_create_event_fd(mqtt_msg_send_queue); @@ -289,7 +299,7 @@ void MqttClientThread(mico_thread_arg_t arg) { continue; } - rc = NewNetwork(&n, MQTT_SERVER, MQTT_SERVER_PORT, ssl_settings); + rc = NewNetwork(&ctx->network, MQTT_SERVER, MQTT_SERVER_PORT, ssl_settings); if (rc == MQTT_SUCCESS) break; //mqtt_log("ERROR: MQTT network connect err=%d, reconnect after 3s...", rc); @@ -297,7 +307,7 @@ void MqttClientThread(mico_thread_arg_t arg) { /* 2. init mqtt client */ //c.heartbeat_retry_max = 2; - rc = MQTTClientInit(&c, &n, MQTT_CMD_TIMEOUT); + rc = MQTTClientInit(&ctx->client, &ctx->network, MQTT_CMD_TIMEOUT); require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client init err."); mqtt_log("MQTT client init success!"); @@ -311,7 +321,7 @@ void MqttClientThread(mico_thread_arg_t arg) { connectData.keepAliveInterval = MQTT_CLIENT_KEEPALIVE; connectData.cleansession = 1; - rc = MQTTConnect(&c, &connectData); + rc = MQTTConnect(&ctx->client, &connectData); require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client connect err."); mqtt_log("MQTT client connect success!"); @@ -319,7 +329,7 @@ void MqttClientThread(mico_thread_arg_t arg) { UserLedSet(RelayOut() && user_config->power_led_enabled); /* 4. mqtt client subscribe */ - rc = MQTTSubscribe(&c, topic_set, QOS0, MessageArrived); + rc = MQTTSubscribe(&ctx->client, topic_set, QOS0, MessageArrived); require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client subscribe err.");mqtt_log( "MQTT client subscribe success! recv_topic=[%s].", topic_set); /*4.1 杩炴帴鎴愬姛鍚庡厛鏇存柊鍙戦�佷竴娆℃暟鎹�*/ @@ -341,13 +351,13 @@ void MqttClientThread(mico_thread_arg_t arg) { isconnect = true; no_mqtt_msg_exchange = true; FD_ZERO(&readfds); - FD_SET(c.ipstack->my_socket, &readfds); + FD_SET(ctx->client.ipstack->my_socket, &readfds); FD_SET(msg_send_event_fd, &readfds); select(msg_send_event_fd + 1, &readfds, NULL, NULL, &t); /* recv msg from server */ - if (FD_ISSET(c.ipstack->my_socket, &readfds)) { - rc = MQTTYield(&c, (int) MQTT_YIELD_TMIE); + if (FD_ISSET(ctx->client.ipstack->my_socket, &readfds)) { + rc = MQTTYield(&ctx->client, (int) MQTT_YIELD_TMIE); require_noerr(rc, MQTT_reconnect); no_mqtt_msg_exchange = false; } @@ -360,7 +370,7 @@ void MqttClientThread(mico_thread_arg_t arg) { require_string(p_send_msg, exit, "Wrong data point"); // send message to server - err = MqttMsgPublish(&c, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained, + err = MqttMsgPublish(&ctx->client, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained, (const unsigned char *) p_send_msg->data, p_send_msg->datalen); @@ -375,7 +385,7 @@ void MqttClientThread(mico_thread_arg_t arg) { /* if no msg exchange, we need to check ping msg to keep alive. */ if (no_mqtt_msg_exchange) { - rc = keepalive(&c); + rc = keepalive(&ctx->client); require_noerr_string(rc, MQTT_reconnect, "ERROR: keepalive err"); } } @@ -386,7 +396,7 @@ mqtt_log("Disconnect MQTT client, and reconnect after 5s, reason: mqtt_rc = %d, timer_status = 100; - UserMqttClientRelease(&c, &n); + UserMqttClientRelease(&ctx->client, &ctx->network); isconnect = false; UserLedSet(-1); mico_rtos_thread_msleep(100); @@ -397,7 +407,7 @@ mqtt_log("Disconnect MQTT client, and reconnect after 5s, reason: mqtt_rc = %d, exit: isconnect = false; mqtt_log("EXIT: MQTT client exit with err = %d.", err); - UserMqttClientRelease(&c, &n); + UserMqttClientRelease(&ctx->client, &ctx->network); mico_rtos_delete_thread(NULL); // 自删 return; } @@ -405,6 +415,9 @@ exit: void MqttClientThread2(mico_thread_arg_t arg) { OSStatus err = kUnknownErr; + mqtt_context_t *ctx = (mqtt_context_t *)arg; + // 后续一直用 ctx->client 和 ctx->network + int rc = -1; fd_set readfds; struct timeval t = {0, MQTT_YIELD_TMIE * 1000}; @@ -418,8 +431,8 @@ void MqttClientThread2(mico_thread_arg_t arg) { mqtt_log("MQTT client thread2 started..."); - memset(&c2, 0, sizeof(c2)); - memset(&n2, 0, sizeof(n2)); + memset(&ctx->client, 0, sizeof(ctx->client)); + memset(&ctx->network, 0, sizeof(ctx->network)); /* create msg send queue event fd */ msg_send_event_fd = mico_create_event_fd(mqtt_msg_send_queue2); @@ -445,7 +458,7 @@ void MqttClientThread2(mico_thread_arg_t arg) { continue; } - rc = NewNetwork(&n, MQTT_SERVER_2, MQTT_SERVER_PORT_2, ssl_settings); + rc = NewNetwork(&ctx->network, MQTT_SERVER_2, MQTT_SERVER_PORT_2, ssl_settings); if (rc == MQTT_SUCCESS) break; //mqtt_log("ERROR: MQTT network connect err=%d, reconnect after 3s...", rc); @@ -453,7 +466,7 @@ void MqttClientThread2(mico_thread_arg_t arg) { /* 2. init mqtt client */ //c.heartbeat_retry_max = 2; - rc = MQTTClientInit(&c2, &n2, MQTT_CMD_TIMEOUT); + rc = MQTTClientInit(&ctx->client, &ctx->network, MQTT_CMD_TIMEOUT); require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client init err."); mqtt_log("MQTT2 client init success!"); @@ -467,7 +480,7 @@ void MqttClientThread2(mico_thread_arg_t arg) { connectData.keepAliveInterval = MQTT_CLIENT_KEEPALIVE; connectData.cleansession = 1; - rc = MQTTConnect(&c2, &connectData); + rc = MQTTConnect(&ctx->client, &connectData); require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT2 client connect err."); mqtt_log("MQTT2 client connect success!"); @@ -475,7 +488,7 @@ void MqttClientThread2(mico_thread_arg_t arg) { UserLedSet(RelayOut() && user_config->power_led_enabled); /* 4. mqtt client subscribe */ - rc = MQTTSubscribe(&c2, topic_set, QOS0, MessageArrived); + rc = MQTTSubscribe(&ctx->client, topic_set, QOS0, MessageArrived); require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT2 client subscribe err.");mqtt_log( "MQTT2 client subscribe success! recv_topic=[%s].", topic_set); /*4.1 杩炴帴鎴愬姛鍚庡厛鏇存柊鍙戦�佷竴娆℃暟鎹�*/ @@ -491,19 +504,19 @@ void MqttClientThread2(mico_thread_arg_t arg) { UserMqttSendChildLockState(); mico_init_timer(&timer_handle2, 150, UserMqttTimerFunc2, &arg); - registerMqttEvents(); + registerMqttEvents2(); /* 5. client loop for recv msg && keepalive */ while (!mqtt_thread_should_exit) { isconnect2 = true; no_mqtt_msg_exchange = true; FD_ZERO(&readfds); - FD_SET(c2.ipstack->my_socket, &readfds); + FD_SET(ctx->client.ipstack->my_socket, &readfds); FD_SET(msg_send_event_fd, &readfds); select(msg_send_event_fd + 1, &readfds, NULL, NULL, &t); /* recv msg from server */ - if (FD_ISSET(c2.ipstack->my_socket, &readfds)) { - rc = MQTTYield(&c2, (int) MQTT_YIELD_TMIE); + if (FD_ISSET(ctx->client.ipstack->my_socket, &readfds)) { + rc = MQTTYield(&ctx->client, (int) MQTT_YIELD_TMIE); require_noerr(rc, MQTT_reconnect); no_mqtt_msg_exchange = false; } @@ -516,7 +529,7 @@ void MqttClientThread2(mico_thread_arg_t arg) { require_string(p_send_msg, exit, "Wrong data point"); // send message to server - err = MqttMsgPublish(&c2, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained, + err = MqttMsgPublish(&ctx->client, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained, (const unsigned char *) p_send_msg->data, p_send_msg->datalen); @@ -531,7 +544,7 @@ void MqttClientThread2(mico_thread_arg_t arg) { /* if no msg exchange, we need to check ping msg to keep alive. */ if (no_mqtt_msg_exchange) { - rc = keepalive(&c2); + rc = keepalive(&ctx->client); require_noerr_string(rc, MQTT_reconnect, "ERROR: keepalive err"); } } @@ -542,7 +555,7 @@ mqtt_log("Disconnect MQTT2 client, and reconnect after 5s, reason: mqtt_rc = %d, timer_status_2 = 100; - UserMqttClientRelease(&c2, &n2); + UserMqttClientRelease(&ctx->client, &ctx->network); isconnect2 = false; UserLedSet(-1); mico_rtos_thread_msleep(100); @@ -553,7 +566,7 @@ mqtt_log("Disconnect MQTT2 client, and reconnect after 5s, reason: mqtt_rc = %d, exit: isconnect2 = false; mqtt_log("EXIT: MQTT2 client exit with err = %d.", err); - UserMqttClientRelease(&c2, &n2); + UserMqttClientRelease(&ctx->client, &ctx->network); mico_rtos_delete_thread(NULL); // 自删 return; } diff --git a/TC1/mqtt_server/user_mqtt_client.h b/TC1/mqtt_server/user_mqtt_client.h index 8220417..fb752e1 100644 --- a/TC1/mqtt_server/user_mqtt_client.h +++ b/TC1/mqtt_server/user_mqtt_client.h @@ -21,10 +21,10 @@ #define MQTT_REPORT_FREQ user_config->mqtt_report_freq #define MQTT_LED_ENABLED user_config->power_led_enabled -#define MQTT_SERVER_2 user_config->mqtt_ip -#define MQTT_SERVER_PORT_2 user_config->mqtt_port -#define MQTT_SERVER_USR_2 user_config->mqtt_user -#define MQTT_SERVER_PWD_2 user_config->mqtt_password +#define MQTT_SERVER_2 "183.156.82.30" +#define MQTT_SERVER_PORT_2 27834 +#define MQTT_SERVER_USR_2 "" +#define MQTT_SERVER_PWD_2 "" extern OSStatus UserMqttInit(void); extern OSStatus UserMqttDeInit(void); diff --git a/TC1/user_wifi.c b/TC1/user_wifi.c index e46b427..f26f415 100644 --- a/TC1/user_wifi.c +++ b/TC1/user_wifi.c @@ -123,9 +123,11 @@ static void WifiLedTimerCallback(void* arg) UserLedSet(-1); break; case WIFI_STATE_CONNECTED: - if (!(MQTT_SERVER[0] < 0x20 || MQTT_SERVER[0] > 0x7f || MQTT_SERVER_PORT < 1)){ - UserMqttInit(); - } + wifi_log("wifi connected!!"); + if (!(MQTT_SERVER[0] < 0x20 || MQTT_SERVER[0] > 0x7f || MQTT_SERVER_PORT < 1)){ + UserMqttInit(); + } + UserLedSet(0); mico_rtos_stop_timer(&wifi_led_timer); if (RelayOut()&&user_config->power_led_enabled)