1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
|
From fdf5d6669d5132da9016840dfe2c1c99f525e4bc Mon Sep 17 00:00:00 2001
From: Jason Reiss <jreiss@multitech.com>
Date: Tue, 29 Sep 2015 12:01:12 -0500
Subject: [PATCH] add-queue to pkt-fwd
---
basic_pkt_fwd/src/basic_pkt_fwd.c | 93 +++++++++++++++++++++++++++++++++++--
1 file changed, 88 insertions(+), 5 deletions(-)
diff --git a/basic_pkt_fwd/src/basic_pkt_fwd.c b/basic_pkt_fwd/src/basic_pkt_fwd.c
index e20dfc1..4d0bb35 100644
--- a/basic_pkt_fwd/src/basic_pkt_fwd.c
+++ b/basic_pkt_fwd/src/basic_pkt_fwd.c
@@ -162,8 +162,19 @@ static int parse_gateway_configuration(const char * conf_file);
static double difftimespec(struct timespec end, struct timespec beginning);
+typedef struct tx_queue_s {
+ struct lgw_pkt_tx_s pkt;
+ struct tx_queue_s* next;
+} tx_queue_s;
+
+
+struct tx_queue_s* tx_queue = NULL;
+struct tx_queue_s* tx_queue_end = NULL;
+
+
/* threads */
void thread_up(void);
+void thread_queue(void);
void thread_down(void);
/* -------------------------------------------------------------------------- */
@@ -479,6 +490,7 @@ int main(void)
/* threads */
pthread_t thrid_up;
+ pthread_t thrid_queue;
pthread_t thrid_down;
/* network socket creation */
@@ -649,6 +661,11 @@ int main(void)
MSG("ERROR: [main] impossible to create downstream thread\n");
exit(EXIT_FAILURE);
}
+ i = pthread_create( &thrid_queue, NULL, (void * (*)(void *))thread_queue, NULL);
+ if (i != 0) {
+ MSG("ERROR: [main] impossible to create queue thread\n");
+ exit(EXIT_FAILURE);
+ }
/* configure signal handling */
sigemptyset(&sigact.sa_mask);
@@ -744,6 +761,7 @@ int main(void)
/* wait for upstream thread to finish (1 fetch cycle max) */
pthread_join(thrid_up, NULL);
+ pthread_cancel(thrid_queue);
pthread_cancel(thrid_down); /* don't wait for downstream thread */
/* if an exit signal was received, try to quit properly */
@@ -1121,9 +1139,9 @@ void thread_up(void) {
}
/* -------------------------------------------------------------------------- */
-/* --- THREAD 2: POLLING SERVER AND EMITTING PACKETS ------------------------ */
+/* --- THREAD 2: POLLING SERVER AND QUEUEING PACKETS ------------------------ */
-void thread_down(void) {
+void thread_queue(void) {
int i; /* loop variables */
/* configuration and metadata for an outbound packet */
@@ -1462,11 +1480,48 @@ void thread_down(void) {
meas_dw_dgram_rcv += 1; /* count only datagrams with no JSON errors */
meas_dw_network_byte += msg_len; /* meas_dw_network_byte */
meas_dw_payload_byte += txpkt.size;
+ pthread_mutex_unlock(&mx_meas_dw);
+
+ pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
+ if (tx_queue == NULL) {
+ tx_queue = malloc(sizeof(tx_queue_s));
+ tx_queue_end = tx_queue;
+ tx_queue->pkt = txpkt;
+ tx_queue->next = NULL;
+ } else {
+ struct tx_queue_s* item = malloc(sizeof(tx_queue_s));
+ item->next = NULL;
+ item->pkt = txpkt;
+ tx_queue_end->next = item;
+ tx_queue_end = item;
+ }
+ pthread_mutex_unlock(&mx_concent);
+ }
+ wait_ms(1);
+ }
+ MSG("\nINFO: End of queue thread\n");
+}
+
+/* -------------------------------------------------------------------------- */
+/* --- THREAD 3: POLLING QUEUE AND EMITTING PACKETS ------------------------ */
+
+void thread_down(void) {
+ MSG("\nINFO: Start of downstream thread\n");
+ struct lgw_pkt_tx_s txpkt;
+
+ while (!exit_sig && !quit_sig) {
+ pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
+ if (tx_queue != NULL) {
+ struct tx_queue_s* del = tx_queue;
+ txpkt = tx_queue->pkt;
+ tx_queue = tx_queue->next;
+ free(del);
/* transfer data and metadata to the concentrator, and schedule TX */
- pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
- i = lgw_send(txpkt);
+ int i = lgw_send(txpkt);
pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
+
+ pthread_mutex_lock(&mx_meas_dw);
if (i == LGW_HAL_ERROR) {
meas_nb_tx_fail += 1;
pthread_mutex_unlock(&mx_meas_dw);
@@ -1476,7 +1531,35 @@ void thread_down(void) {
meas_nb_tx_ok += 1;
pthread_mutex_unlock(&mx_meas_dw);
}
- }
+
+ uint8_t tx_status_var = TX_STATUS_UNKNOWN;
+ // wait for 200 ms and ensure packet is transmitted
+ for (i=0; (i < 20) && (tx_status_var != TX_EMITTING); ++i) {
+ wait_ms(10);
+ pthread_mutex_lock(&mx_concent);
+ lgw_status(TX_STATUS, &tx_status_var);
+ pthread_mutex_unlock(&mx_concent);
+ }
+ if (tx_status_var != TX_EMITTING) {
+ MSG("WARNING: [down] packet was scheduled but failed to TX\n");
+ } else {
+ // if packet is transmitting then wait for end of TX or timeout after 4 seconds
+ for (i=0; (i < 400) && (tx_status_var != TX_FREE); ++i) {
+ wait_ms(10);
+ pthread_mutex_lock(&mx_concent);
+ lgw_status(TX_STATUS, &tx_status_var);
+ pthread_mutex_unlock(&mx_concent);
+ }
+
+ if (tx_status_var != TX_FREE) {
+ MSG("WARNING: [down] timedout waiting for end of TX\n");
+ }
+ }
+
+ } else {
+ pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
+ }
+ wait_ms(1);
}
MSG("\nINFO: End of downstream thread\n");
}
iff --git a/gps_pkt_fwd/src/gps_pkt_fwd.c b/gps_pkt_fwd/src/gps_pkt_fwd.c
index 79f7584..3d1cbf8 100644
--- a/gps_pkt_fwd/src/gps_pkt_fwd.c
+++ b/gps_pkt_fwd/src/gps_pkt_fwd.c
@@ -192,8 +192,18 @@ static int parse_gateway_configuration(const char * conf_file);
static double difftimespec(struct timespec end, struct timespec beginning);
+typedef struct tx_queue_s {
+ struct lgw_pkt_tx_s pkt;
+ struct tx_queue_s* next;
+} tx_queue_s;
+
+
+struct tx_queue_s* tx_queue = NULL;
+struct tx_queue_s* tx_queue_end = NULL;
+
/* threads */
void thread_up(void);
+void thread_queue(void);
void thread_down(void);
void thread_gps(void);
void thread_valid(void);
@@ -546,6 +556,7 @@ int main(void)
/* threads */
pthread_t thrid_up;
+ pthread_t thrid_queue;
pthread_t thrid_down;
pthread_t thrid_gps;
pthread_t thrid_valid;
@@ -739,6 +750,12 @@ int main(void)
exit(EXIT_FAILURE);
}
+ i = pthread_create( &thrid_queue, NULL, (void * (*)(void *))thread_queue, NULL);
+ if (i != 0) {
+ MSG("ERROR: [main] impossible to create queue thread\n");
+ exit(EXIT_FAILURE);
+ }
+
/* spawn thread to manage GPS */
if (gps_enabled == true) {
i = pthread_create( &thrid_gps, NULL, (void * (*)(void *))thread_gps, NULL);
@@ -891,6 +908,7 @@ int main(void)
/* wait for upstream thread to finish (1 fetch cycle max) */
pthread_join(thrid_up, NULL);
+ pthread_cancel(thrid_queue);
pthread_cancel(thrid_down); /* don't wait for downstream thread */
pthread_cancel(thrid_gps); /* don't wait for GPS thread */
pthread_cancel(thrid_valid); /* don't wait for validation thread */
@@ -1325,9 +1343,9 @@ void thread_up(void) {
}
/* -------------------------------------------------------------------------- */
-/* --- THREAD 2: POLLING SERVER AND EMITTING PACKETS ------------------------ */
+/* --- THREAD 2: POLLING SERVER AND QUEUE PACKETS ------------------------ */
-void thread_down(void) {
+void thread_queue(void) {
int i; /* loop variables */
/* configuration and metadata for an outbound packet */
@@ -1719,11 +1737,49 @@ void thread_down(void) {
meas_dw_dgram_rcv += 1; /* count only datagrams with no JSON errors */
meas_dw_network_byte += msg_len; /* meas_dw_network_byte */
meas_dw_payload_byte += txpkt.size;
+ pthread_mutex_unlock(&mx_meas_dw);
+
+ pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
+ if (tx_queue == NULL) {
+ tx_queue = malloc(sizeof(tx_queue_s));
+ tx_queue_end = tx_queue;
+ tx_queue->pkt = txpkt;
+ tx_queue->next = NULL;
+ } else {
+ struct tx_queue_s* item = malloc(sizeof(tx_queue_s));
+ item->next = NULL;
+ item->pkt = txpkt;
+ tx_queue_end->next = item;
+ tx_queue_end = item;
+ }
+ pthread_mutex_unlock(&mx_concent);
+ }
+ wait_ms(1);
+ }
+ MSG("\nINFO: End of queue thread\n");
+}
+
+/* -------------------------------------------------------------------------- */
+/* --- THREAD 3: POLLING QUEUE AND EMITTING PACKETS ------------------------ */
+
+void thread_down(void) {
+ MSG("\nINFO: Start of downstream thread\n");
+ struct lgw_pkt_tx_s txpkt;
+
+ while (!exit_sig && !quit_sig) {
+ pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
+ if (tx_queue != NULL) {
+ struct tx_queue_s* del = tx_queue;
+ txpkt = tx_queue->pkt;
+ tx_queue = tx_queue->next;
+ free(del);
+
/* transfer data and metadata to the concentrator, and schedule TX */
- pthread_mutex_lock(&mx_concent); /* may have to wait for a fetch to finish */
- i = lgw_send(txpkt);
+ int i = lgw_send(txpkt);
pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
+
+ pthread_mutex_lock(&mx_meas_dw);
if (i == LGW_HAL_ERROR) {
meas_nb_tx_fail += 1;
pthread_mutex_unlock(&mx_meas_dw);
@@ -1733,7 +1789,35 @@ void thread_down(void) {
meas_nb_tx_ok += 1;
pthread_mutex_unlock(&mx_meas_dw);
}
- }
+
+ uint8_t tx_status_var = TX_STATUS_UNKNOWN;
+ // wait for 200 ms and ensure packet is transmitted
+ for (i=0; (i < 20) && (tx_status_var != TX_EMITTING); ++i) {
+ wait_ms(10);
+ pthread_mutex_lock(&mx_concent);
+ lgw_status(TX_STATUS, &tx_status_var);
+ pthread_mutex_unlock(&mx_concent);
+ }
+ if (tx_status_var != TX_EMITTING) {
+ MSG("WARNING: [down] packet was scheduled but failed to TX\n");
+ } else {
+ // if packet is transmitting then wait for end of TX or timeout after 4 seconds
+ for (i=0; (i < 400) && (tx_status_var != TX_FREE); ++i) {
+ wait_ms(10);
+ pthread_mutex_lock(&mx_concent);
+ lgw_status(TX_STATUS, &tx_status_var);
+ pthread_mutex_unlock(&mx_concent);
+ }
+
+ if (tx_status_var != TX_FREE) {
+ MSG("WARNING: [down] timedout waiting for end of TX\n");
+ }
+ }
+
+ } else {
+ pthread_mutex_unlock(&mx_concent); /* free concentrator ASAP */
+ }
+ wait_ms(1);
}
MSG("\nINFO: End of downstream thread\n");
}
|