summaryrefslogtreecommitdiff
path: root/recipes-connectivity/lora/lora-packet-forwarder/lora-packet-forwarder-add-queue.patch
blob: 318b6ea80e690538b7443d20f31e94a3851ceef0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
From fdf5d6669d5132da9016840dfe2c1c99f525e4bc Mon Sep 17 00:00:00 2001
From: Jason Reiss <jreiss@multitech.com>
Date: Tue, 29 Sep 2015 12:01:12 -0500
Subject: [PATCH] add-queue to pkt-fwd

---
 basic_pkt_fwd/src/basic_pkt_fwd.c |   93 +++++++++++++++++++++++++++++++++++--
 1 file changed, 88 insertions(+), 5 deletions(-)

diff --git a/basic_pkt_fwd/src/basic_pkt_fwd.c b/basic_pkt_fwd/src/basic_pkt_fwd.c
index e20dfc1..4d0bb35 100644
--- a/basic_pkt_fwd/src/basic_pkt_fwd.c
+++ b/basic_pkt_fwd/src/basic_pkt_fwd.c
@@ -162,8 +162,19 @@ static int parse_gateway_configuration(const char * conf_file);
 
 static double difftimespec(struct timespec end, struct timespec beginning);
 
+typedef struct tx_queue_s {
+    struct lgw_pkt_tx_s pkt;
+    struct tx_queue_s* next;
+} tx_queue_s;
+   
+    
+struct tx_queue_s* tx_queue = NULL;
+struct tx_queue_s* tx_queue_end = NULL;
+    
+    
 /* threads */
 void thread_up(void);
+void thread_queue(void);
 void thread_down(void);
 
 /* -------------------------------------------------------------------------- */
@@ -479,6 +490,7 @@ int main(void)
 	
 	/* threads */
 	pthread_t thrid_up;
+	pthread_t thrid_queue;
 	pthread_t thrid_down;
 	
 	/* network socket creation */
@@ -649,6 +661,11 @@ int main(void)
 		MSG("ERROR: [main] impossible to create downstream thread\n");
 		exit(EXIT_FAILURE);
 	}
+	i = pthread_create( &thrid_queue, NULL, (void * (*)(void *))thread_queue, NULL);
+	if (i != 0) {
+		MSG("ERROR: [main] impossible to create queue thread\n");
+		exit(EXIT_FAILURE);
+	}
 	
 	/* configure signal handling */
 	sigemptyset(&sigact.sa_mask);
@@ -744,6 +761,7 @@ int main(void)
 	
 	/* wait for upstream thread to finish (1 fetch cycle max) */
 	pthread_join(thrid_up, NULL);
+	pthread_cancel(thrid_queue);
 	pthread_cancel(thrid_down); /* don't wait for downstream thread */
 	
 	/* if an exit signal was received, try to quit properly */
@@ -1121,9 +1139,9 @@ void thread_up(void) {
 }
 
 /* -------------------------------------------------------------------------- */
-/* --- THREAD 2: POLLING SERVER AND EMITTING PACKETS ------------------------ */
+/* --- THREAD 2: POLLING SERVER AND QUEUEING PACKETS ------------------------ */
 
-void thread_down(void) {
+void thread_queue(void) {
 	int i; /* loop variables */
 	
 	/* configuration and metadata for an outbound packet */
@@ -1462,11 +1480,48 @@ void thread_down(void) {
 			meas_dw_dgram_rcv += 1; /* count only datagrams with no JSON errors */
 			meas_dw_network_byte += msg_len; /* meas_dw_network_byte */
 			meas_dw_payload_byte += txpkt.size;
+			pthread_mutex_unlock(&mx_meas_dw);
+
+			pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
+            if (tx_queue == NULL) {
+                tx_queue = malloc(sizeof(tx_queue_s));
+                tx_queue_end = tx_queue;
+                tx_queue->pkt = txpkt;
+                tx_queue->next = NULL;
+            } else {
+                struct tx_queue_s* item = malloc(sizeof(tx_queue_s));
+                item->next = NULL;
+                item->pkt = txpkt;
+                tx_queue_end->next = item;
+                tx_queue_end = item;
+            }
+			pthread_mutex_unlock(&mx_concent);
+        }
+        wait_ms(1);
+    }
+	MSG("\nINFO: End of queue thread\n");
+}
+
+/* -------------------------------------------------------------------------- */
+/* --- THREAD 3: POLLING QUEUE AND EMITTING PACKETS ------------------------ */
+
+void thread_down(void) {
+	MSG("\nINFO: Start of downstream thread\n");
 			
+	struct lgw_pkt_tx_s txpkt;
+
+	while (!exit_sig && !quit_sig) {
+		pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
+        if (tx_queue != NULL) {
+            struct tx_queue_s* del = tx_queue;
+            txpkt = tx_queue->pkt;
+            tx_queue = tx_queue->next;
+            free(del);
 			/* transfer data and metadata to the concentrator, and schedule TX */
-			pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
-			i = lgw_send(txpkt);
+            int	i = lgw_send(txpkt);
 			pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
+
+			pthread_mutex_lock(&mx_meas_dw);
 			if (i == LGW_HAL_ERROR) {
 				meas_nb_tx_fail += 1;
 				pthread_mutex_unlock(&mx_meas_dw);
@@ -1476,7 +1531,35 @@ void thread_down(void) {
 				meas_nb_tx_ok += 1;
 				pthread_mutex_unlock(&mx_meas_dw);
 			}
-		}
+			
+            uint8_t tx_status_var = TX_STATUS_UNKNOWN;
+            // wait for 200 ms and ensure packet is transmitted
+            for (i=0; (i < 20) && (tx_status_var != TX_EMITTING); ++i) {
+                wait_ms(10);
+                pthread_mutex_lock(&mx_concent);
+                lgw_status(TX_STATUS, &tx_status_var);
+                pthread_mutex_unlock(&mx_concent);
+            }   
+            if (tx_status_var != TX_EMITTING) {
+               MSG("WARNING: [down] packet was scheduled but failed to TX\n");
+            } else {
+                // if packet is transmitting then wait for end of TX or timeout after 4 seconds
+                for (i=0; (i < 400) && (tx_status_var != TX_FREE); ++i) {
+                   wait_ms(10);
+                   pthread_mutex_lock(&mx_concent);
+                   lgw_status(TX_STATUS, &tx_status_var);
+                   pthread_mutex_unlock(&mx_concent);
+                }   
+
+                if (tx_status_var != TX_FREE) {
+                    MSG("WARNING: [down] timedout waiting for end of TX\n");
+                }
+            }
+			
+		} else {
+			pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
+        }
+        wait_ms(1);
 	}
 	MSG("\nINFO: End of downstream thread\n");
 }
-- 
1.7.10.4