From 871acccb98f51c848e67decf242e3cce2119e558 Mon Sep 17 00:00:00 2001 From: Zip <76966589@qq.com> Date: Wed, 13 Mar 2019 11:10:26 +0800 Subject: [PATCH] =?UTF-8?q?add:udp=E5=8F=91=E9=80=81=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E7=A1=AE=E8=AE=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TC1/user_udp.c | 109 ++++++++++++++++++++++++++++++++++++++++++------- TC1/user_udp.h | 3 +- 2 files changed, 96 insertions(+), 16 deletions(-) diff --git a/TC1/user_udp.c b/TC1/user_udp.c index 04fe33a..93ea29b 100644 --- a/TC1/user_udp.c +++ b/TC1/user_udp.c @@ -3,12 +3,19 @@ #include "main.h" #define LOCAL_UDP_PORT 10182 - +#define REMOTE_UDP_PORT 10181 #define MAX_UDP_DATA_SIZE (1024) #define MAX_UDP_SEND_QUEUE_SIZE (5) mico_queue_t udp_msg_send_queue = NULL; +typedef struct +{ + uint32_t datalen; + uint8_t data[MAX_UDP_DATA_SIZE]; +} udp_send_msg_t, *p_udp_send_msg_t; + +static OSStatus udp_msg_send( int socket, const unsigned char* msg, uint32_t msg_len ); void udp_thread( void *arg ); OSStatus user_udp_init( void ) @@ -35,15 +42,21 @@ void udp_thread( void *arg ) OSStatus err; struct sockaddr_in addr; fd_set readfds; + struct timeval t = { 0, 5000 * 1000 }; //5s socklen_t addrLen = sizeof(addr); int udp_fd = -1, len; + p_udp_send_msg_t p_send_msg = NULL; + int msg_send_event_fd = -1; char ip_address[16]; uint8_t *buf = NULL; -// /* create udp msg send queue */ -// err = mico_rtos_init_queue( &udp_msg_send_queue, "uqp_msg_send_queue", MAX_UDP_DATA_SIZE/*sizeof(p_mqtt_send_msg_t)*/, -// MAX_UDP_SEND_QUEUE_SIZE ); -// require_noerr_action( err, exit, os_log( "ERROR: create mqtt msg send queue err=%d.", err ) ); + /* create udp msg send queue */ + err = mico_rtos_init_queue( &udp_msg_send_queue, "uqp_msg_send_queue", sizeof(p_udp_send_msg_t), + MAX_UDP_SEND_QUEUE_SIZE ); + require_noerr_action( err, exit, os_log( "ERROR: create udp msg send queue err=%d.", err ) ); + /* create msg send queue event fd */ + msg_send_event_fd = mico_create_event_fd( udp_msg_send_queue ); + require_action( msg_send_event_fd >= 0, exit, os_log( "ERROR: create msg send queue event fd failed!!!" ) ); buf = malloc( 1024 ); require_action( buf, exit, err = kNoMemoryErr ); @@ -63,9 +76,9 @@ void udp_thread( void *arg ) while ( 1 ) { FD_ZERO( &readfds ); + FD_SET( msg_send_event_fd, &readfds ); FD_SET( udp_fd, &readfds ); - - require_action( select( udp_fd + 1, &readfds, NULL, NULL, NULL ) >= 0, exit, err = kConnectionErr ); + select( udp_fd + 1, &readfds, NULL, NULL, NULL ); /*Read data from udp and send data back */ if ( FD_ISSET( udp_fd, &readfds ) ) @@ -73,17 +86,29 @@ void udp_thread( void *arg ) len = recvfrom( udp_fd, buf, 1024, 0, (struct sockaddr *) &addr, &addrLen ); require_action( len >= 0, exit, err = kConnectionErr ); -// os_log( "udp s_type:0x%x",((struct sockaddr )addr).s_type); -// os_log( "udp s_port:0x%x",((struct sockaddr )addr).s_port); -// os_log( "udp s_ip:0x%x",((struct sockaddr )addr).s_type); -// os_log( "udp s_spares: %x %x %x %x %x %x",((struct sockaddr )addr).s_spares[0],((struct sockaddr )addr).s_spares[1], -// ((struct sockaddr )addr).s_spares[2],((struct sockaddr )addr).s_spares[3], -// ((struct sockaddr )addr).s_spares[4],((struct sockaddr )addr).s_spares[5],); - strcpy( ip_address, inet_ntoa( addr.sin_addr ) ); -// os_log( "udp recv from %s:%d, len:%d :%s", ip_address,addr.sin_port, len ,buf); + os_log( "udp recv from %s:%d, len:%d :%s", ip_address,addr.sin_port, len ,buf); sendto( udp_fd, buf, len, 0, (struct sockaddr *) &addr, sizeof(struct sockaddr_in) ); } + + /* recv msg from user worker thread to be sent to server */ + if ( FD_ISSET( msg_send_event_fd, &readfds ) ) + { + while ( mico_rtos_is_queue_empty( &udp_msg_send_queue ) == false ) + { + // get msg from send queue + mico_rtos_pop_from_queue( &udp_msg_send_queue, &p_send_msg, 0 ); + require_string( p_send_msg, exit, "Wrong data point" ); + + // send message to server + err = udp_msg_send( udp_fd, p_send_msg->data, p_send_msg->datalen ); +// require_noerr_string( err, MQTT_reconnect, "ERROR: udp publish data err" ); + + os_log( "udp send data success! msg=[%ld][%s].\r\n", p_send_msg->datalen, p_send_msg->data ); + free( p_send_msg ); + p_send_msg = NULL; + } + } } exit: @@ -93,4 +118,58 @@ void udp_thread( void *arg ) mico_rtos_delete_thread( NULL ); } +// send msg to udp +static OSStatus udp_msg_send( int socket, const unsigned char* msg, uint32_t msg_len ) +{ + OSStatus err = kUnknownErr; + int ret = 0; + require( msg_len && msg, exit ); + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons( LOCAL_UDP_PORT ); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_BROADCAST; + addr.sin_port = htons( REMOTE_UDP_PORT ); + /*the receiver should bind at port=20000*/ + sendto( socket, msg, msg_len, 0, (struct sockaddr *) &addr, sizeof(addr) ); + + exit: + return err; +} + +/* Application collect data and seng them to udp send queue */ +OSStatus user_udp_send( char *arg ) +{ + OSStatus err = kUnknownErr; + p_udp_send_msg_t p_send_msg = NULL; + +// app_log("======App prepare to send ![%d]======", MicoGetMemoryInfo()->free_memory); + + /* Send queue is full, pop the oldest */ + if ( mico_rtos_is_queue_full( &udp_msg_send_queue ) == true ) + { + mico_rtos_pop_from_queue( &udp_msg_send_queue, &p_send_msg, 0 ); + free( p_send_msg ); + p_send_msg = NULL; + } + + /* Push the latest data into send queue*/ + p_send_msg = calloc( 1, sizeof(udp_send_msg_t) ); + require_action( p_send_msg, exit, err = kNoMemoryErr ); + + p_send_msg->datalen = strlen( arg ); + memcpy( p_send_msg->data, arg, p_send_msg->datalen ); + + err = mico_rtos_push_to_queue( &udp_msg_send_queue, &p_send_msg, 0 ); + require_noerr( err, exit ); + + //app_log("Push user msg into send queue success!"); + + exit: + if ( err != kNoErr && p_send_msg ) free( p_send_msg ); + return err; + +} diff --git a/TC1/user_udp.h b/TC1/user_udp.h index 6d17bab..3f60311 100644 --- a/TC1/user_udp.h +++ b/TC1/user_udp.h @@ -6,7 +6,8 @@ #include "mico.h" #include "MiCOKit_EXT.h" - +OSStatus user_udp_init( void ); +OSStatus user_udp_send( char *arg );