From fdf5d6669d5132da9016840dfe2c1c99f525e4bc Mon Sep 17 00:00:00 2001 From: Jason Reiss Date: Tue, 29 Sep 2015 12:01:12 -0500 Subject: [PATCH] add-queue to pkt-fwd --- basic_pkt_fwd/src/basic_pkt_fwd.c | 93 +++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/basic_pkt_fwd/src/basic_pkt_fwd.c b/basic_pkt_fwd/src/basic_pkt_fwd.c index e20dfc1..4d0bb35 100644 --- a/basic_pkt_fwd/src/basic_pkt_fwd.c +++ b/basic_pkt_fwd/src/basic_pkt_fwd.c @@ -162,8 +162,19 @@ static int parse_gateway_configuration(const char * conf_file); static double difftimespec(struct timespec end, struct timespec beginning); +typedef struct tx_queue_s { + struct lgw_pkt_tx_s pkt; + struct tx_queue_s* next; +} tx_queue_s; + + +struct tx_queue_s* tx_queue = NULL; +struct tx_queue_s* tx_queue_end = NULL; + + /* threads */ void thread_up(void); +void thread_queue(void); void thread_down(void); /* -------------------------------------------------------------------------- */ @@ -479,6 +490,7 @@ int main(void) /* threads */ pthread_t thrid_up; + pthread_t thrid_queue; pthread_t thrid_down; /* network socket creation */ @@ -649,6 +661,11 @@ int main(void) MSG("ERROR: [main] impossible to create downstream thread\n"); exit(EXIT_FAILURE); } + i = pthread_create( &thrid_queue, NULL, (void * (*)(void *))thread_queue, NULL); + if (i != 0) { + MSG("ERROR: [main] impossible to create queue thread\n"); + exit(EXIT_FAILURE); + } /* configure signal handling */ sigemptyset(&sigact.sa_mask); @@ -744,6 +761,7 @@ int main(void) /* wait for upstream thread to finish (1 fetch cycle max) */ pthread_join(thrid_up, NULL); + pthread_cancel(thrid_queue); pthread_cancel(thrid_down); /* don't wait for downstream thread */ /* if an exit signal was received, try to quit properly */ @@ -1121,9 +1139,9 @@ void thread_up(void) { } /* -------------------------------------------------------------------------- */ -/* --- THREAD 2: POLLING SERVER AND EMITTING PACKETS ------------------------ */ +/* --- THREAD 2: POLLING SERVER AND QUEUEING PACKETS ------------------------ */ -void thread_down(void) { +void thread_queue(void) { int i; /* loop variables */ /* configuration and metadata for an outbound packet */ @@ -1462,11 +1480,48 @@ void thread_down(void) { meas_dw_dgram_rcv += 1; /* count only datagrams with no JSON errors */ meas_dw_network_byte += msg_len; /* meas_dw_network_byte */ meas_dw_payload_byte += txpkt.size; + pthread_mutex_unlock(&mx_meas_dw); + + pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */ + if (tx_queue == NULL) { + tx_queue = malloc(sizeof(tx_queue_s)); + tx_queue_end = tx_queue; + tx_queue->pkt = txpkt; + tx_queue->next = NULL; + } else { + struct tx_queue_s* item = malloc(sizeof(tx_queue_s)); + item->next = NULL; + item->pkt = txpkt; + tx_queue_end->next = item; + tx_queue_end = item; + } + pthread_mutex_unlock(&mx_concent); + } + wait_ms(1); + } + MSG("\nINFO: End of queue thread\n"); +} + +/* -------------------------------------------------------------------------- */ +/* --- THREAD 3: POLLING QUEUE AND EMITTING PACKETS ------------------------ */ + +void thread_down(void) { + MSG("\nINFO: Start of downstream thread\n"); + struct lgw_pkt_tx_s txpkt; + + while (!exit_sig && !quit_sig) { + pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */ + if (tx_queue != NULL) { + struct tx_queue_s* del = tx_queue; + txpkt = tx_queue->pkt; + tx_queue = tx_queue->next; + free(del); /* transfer data and metadata to the concentrator, and schedule TX */ - pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */ - i = lgw_send(txpkt); + int i = lgw_send(txpkt); pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */ + + pthread_mutex_lock(&mx_meas_dw); if (i == LGW_HAL_ERROR) { meas_nb_tx_fail += 1; pthread_mutex_unlock(&mx_meas_dw); @@ -1476,7 +1531,35 @@ void thread_down(void) { meas_nb_tx_ok += 1; pthread_mutex_unlock(&mx_meas_dw); } - } + + uint8_t tx_status_var = TX_STATUS_UNKNOWN; + // wait for 200 ms and ensure packet is transmitted + for (i=0; (i < 20) && (tx_status_var != TX_EMITTING); ++i) { + wait_ms(10); + pthread_mutex_lock(&mx_concent); + lgw_status(TX_STATUS, &tx_status_var); + pthread_mutex_unlock(&mx_concent); + } + if (tx_status_var != TX_EMITTING) { + MSG("WARNING: [down] packet was scheduled but failed to TX\n"); + } else { + // if packet is transmitting then wait for end of TX or timeout after 4 seconds + for (i=0; (i < 400) && (tx_status_var != TX_FREE); ++i) { + wait_ms(10); + pthread_mutex_lock(&mx_concent); + lgw_status(TX_STATUS, &tx_status_var); + pthread_mutex_unlock(&mx_concent); + } + + if (tx_status_var != TX_FREE) { + MSG("WARNING: [down] timedout waiting for end of TX\n"); + } + } + + } else { + pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */ + } + wait_ms(1); } MSG("\nINFO: End of downstream thread\n"); } iff --git a/gps_pkt_fwd/src/gps_pkt_fwd.c b/gps_pkt_fwd/src/gps_pkt_fwd.c index 79f7584..3d1cbf8 100644 --- a/gps_pkt_fwd/src/gps_pkt_fwd.c +++ b/gps_pkt_fwd/src/gps_pkt_fwd.c @@ -192,8 +192,18 @@ static int parse_gateway_configuration(const char * conf_file); static double difftimespec(struct timespec end, struct timespec beginning); +typedef struct tx_queue_s { + struct lgw_pkt_tx_s pkt; + struct tx_queue_s* next; +} tx_queue_s; + + +struct tx_queue_s* tx_queue = NULL; +struct tx_queue_s* tx_queue_end = NULL; + /* threads */ void thread_up(void); +void thread_queue(void); void thread_down(void); void thread_gps(void); void thread_valid(void); @@ -546,6 +556,7 @@ int main(void) /* threads */ pthread_t thrid_up; + pthread_t thrid_queue; pthread_t thrid_down; pthread_t thrid_gps; pthread_t thrid_valid; @@ -739,6 +750,12 @@ int main(void) exit(EXIT_FAILURE); } + i = pthread_create( &thrid_queue, NULL, (void * (*)(void *))thread_queue, NULL); + if (i != 0) { + MSG("ERROR: [main] impossible to create queue thread\n"); + exit(EXIT_FAILURE); + } + /* spawn thread to manage GPS */ if (gps_enabled == true) { i = pthread_create( &thrid_gps, NULL, (void * (*)(void *))thread_gps, NULL); @@ -891,6 +908,7 @@ int main(void) /* wait for upstream thread to finish (1 fetch cycle max) */ pthread_join(thrid_up, NULL); + pthread_cancel(thrid_queue); pthread_cancel(thrid_down); /* don't wait for downstream thread */ pthread_cancel(thrid_gps); /* don't wait for GPS thread */ pthread_cancel(thrid_valid); /* don't wait for validation thread */ @@ -1325,9 +1343,9 @@ void thread_up(void) { } /* -------------------------------------------------------------------------- */ -/* --- THREAD 2: POLLING SERVER AND EMITTING PACKETS ------------------------ */ +/* --- THREAD 2: POLLING SERVER AND QUEUE PACKETS ------------------------ */ -void thread_down(void) { +void thread_queue(void) { int i; /* loop variables */ /* configuration and metadata for an outbound packet */ @@ -1719,11 +1737,49 @@ void thread_down(void) { meas_dw_dgram_rcv += 1; /* count only datagrams with no JSON errors */ meas_dw_network_byte += msg_len; /* meas_dw_network_byte */ meas_dw_payload_byte += txpkt.size; + pthread_mutex_unlock(&mx_meas_dw); + + pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */ + if (tx_queue == NULL) { + tx_queue = malloc(sizeof(tx_queue_s)); + tx_queue_end = tx_queue; + tx_queue->pkt = txpkt; + tx_queue->next = NULL; + } else { + struct tx_queue_s* item = malloc(sizeof(tx_queue_s)); + item->next = NULL; + item->pkt = txpkt; + tx_queue_end->next = item; + tx_queue_end = item; + } + pthread_mutex_unlock(&mx_concent); + } + wait_ms(1); + } + MSG("\nINFO: End of queue thread\n"); +} + +/* -------------------------------------------------------------------------- */ +/* --- THREAD 3: POLLING QUEUE AND EMITTING PACKETS ------------------------ */ + +void thread_down(void) { + MSG("\nINFO: Start of downstream thread\n"); + struct lgw_pkt_tx_s txpkt; + + while (!exit_sig && !quit_sig) { + pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */ + if (tx_queue != NULL) { + struct tx_queue_s* del = tx_queue; + txpkt = tx_queue->pkt; + tx_queue = tx_queue->next; + free(del); + /* transfer data and metadata to the concentrator, and schedule TX */ - pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */ - i = lgw_send(txpkt); + int i = lgw_send(txpkt); pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */ + + pthread_mutex_lock(&mx_meas_dw); if (i == LGW_HAL_ERROR) { meas_nb_tx_fail += 1; pthread_mutex_unlock(&mx_meas_dw); @@ -1733,7 +1789,35 @@ void thread_down(void) { meas_nb_tx_ok += 1; pthread_mutex_unlock(&mx_meas_dw); } - } + + uint8_t tx_status_var = TX_STATUS_UNKNOWN; + // wait for 200 ms and ensure packet is transmitted + for (i=0; (i < 20) && (tx_status_var != TX_EMITTING); ++i) { + wait_ms(10); + pthread_mutex_lock(&mx_concent); + lgw_status(TX_STATUS, &tx_status_var); + pthread_mutex_unlock(&mx_concent); + } + if (tx_status_var != TX_EMITTING) { + MSG("WARNING: [down] packet was scheduled but failed to TX\n"); + } else { + // if packet is transmitting then wait for end of TX or timeout after 4 seconds + for (i=0; (i < 400) && (tx_status_var != TX_FREE); ++i) { + wait_ms(10); + pthread_mutex_lock(&mx_concent); + lgw_status(TX_STATUS, &tx_status_var); + pthread_mutex_unlock(&mx_concent); + } + + if (tx_status_var != TX_FREE) { + MSG("WARNING: [down] timedout waiting for end of TX\n"); + } + } + + } else { + pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */ + } + wait_ms(1); } MSG("\nINFO: End of downstream thread\n"); }