summaryrefslogtreecommitdiff
path: root/lora_pkt_fwd/src/jitqueue.c
blob: dbde8d2898d0dac2fd1ab5d0891c2f8b90ea68b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
/*
 / _____)             _              | |
( (____  _____ ____ _| |_ _____  ____| |__
 \____ \| ___ |    (_   _) ___ |/ ___)  _ \
 _____) ) ____| | | || |_| ____( (___| | | |
(______/|_____)_|_|_| \__)_____)\____)_| |_|
  (C)2013 Semtech-Cycleo

Description:
    LoRa concentrator : Just In Time TX scheduling queue

License: Revised BSD License, see LICENSE.TXT file include in the project
Maintainer: Michael Coracin
*/

/* -------------------------------------------------------------------------- */
/* --- DEPENDANCIES --------------------------------------------------------- */

#define _GNU_SOURCE     /* needed for qsort_r to be defined */
#include <stdlib.h>     /* qsort_r */
#include <stdio.h>      /* printf, fprintf, snprintf, fopen, fputs */
#include <string.h>     /* memset, memcpy */
#include <pthread.h>
#include <assert.h>
#include <math.h>

#include "trace.h"
#include "jitqueue.h"

/* -------------------------------------------------------------------------- */
/* --- PRIVATE MACROS ------------------------------------------------------- */

/* -------------------------------------------------------------------------- */
/* --- PRIVATE CONSTANTS & TYPES -------------------------------------------- */
#define TX_START_DELAY          1500    /* microseconds */
                                        /* TODO: get this value from HAL? */
#define TX_MARGIN_DELAY         1000    /* Packet overlap margin in microseconds */
                                        /* TODO: How much margin should we take? */
#define TX_JIT_DELAY            30000   /* Pre-delay to program packet for TX in microseconds */
#define TX_MAX_ADVANCE_DELAY    ((JIT_NUM_BEACON_IN_QUEUE + 1) * 128 * 1E6) /* Maximum advance delay accepted for a TX packet, compared to current time */

#define BEACON_GUARD            3000000 /* Interval where no ping slot can be placed,
                                            to ensure beacon can be sent */
#define BEACON_RESERVED         2120000 /* Time on air of the beacon, with some margin */

/* -------------------------------------------------------------------------- */
/* --- PRIVATE VARIABLES (GLOBAL) ------------------------------------------- */
static pthread_mutex_t mx_jit_queue = PTHREAD_MUTEX_INITIALIZER; /* control access to JIT queue */

/* -------------------------------------------------------------------------- */
/* --- PRIVATE FUNCTIONS DEFINITION ----------------------------------------- */

/* -------------------------------------------------------------------------- */
/* --- PUBLIC FUNCTIONS DEFINITION ----------------------------------------- */

bool jit_queue_is_full(struct jit_queue_s *queue) {
    bool result;

    pthread_mutex_lock(&mx_jit_queue);

    result = (queue->num_pkt == JIT_QUEUE_MAX)?true:false;

    pthread_mutex_unlock(&mx_jit_queue);

    return result;
}

bool jit_queue_is_empty(struct jit_queue_s *queue) {
    bool result;

    pthread_mutex_lock(&mx_jit_queue);

    result = (queue->num_pkt == 0)?true:false;

    pthread_mutex_unlock(&mx_jit_queue);

    return result;
}

void jit_queue_init(struct jit_queue_s *queue) {
    int i;

    pthread_mutex_lock(&mx_jit_queue);

    memset(queue, 0, sizeof(*queue));
    for (i=0; i<JIT_QUEUE_MAX; i++) {
        queue->nodes[i].pre_delay = 0;
        queue->nodes[i].post_delay = 0;
    }

    pthread_mutex_unlock(&mx_jit_queue);
}

int compare(const void *a, const void *b, void *arg)
{
    struct jit_node_s *p = (struct jit_node_s *)a;
    struct jit_node_s *q = (struct jit_node_s *)b;
    int *counter = (int *)arg;
    int p_count, q_count;

    p_count = p->pkt.count_us;
    q_count = q->pkt.count_us;

    if (p_count > q_count)
        *counter = *counter + 1;

    return p_count - q_count;
}

void jit_sort_queue(struct jit_queue_s *queue) {
    int counter = 0;

    if (queue->num_pkt == 0) {
        return;
    }

    MSG_DEBUG(DEBUG_JIT, "sorting queue in ascending order packet timestamp - queue size:%u\n", queue->num_pkt);
    qsort_r(queue->nodes, queue->num_pkt, sizeof(queue->nodes[0]), compare, &counter);
    MSG_DEBUG(DEBUG_JIT, "sorting queue done - swapped:%d\n", counter);
}

bool jit_collision_test(uint32_t p1_count_us, uint32_t p1_pre_delay, uint32_t p1_post_delay, uint32_t p2_count_us, uint32_t p2_pre_delay, uint32_t p2_post_delay) {
    if (((p1_count_us - p2_count_us) <= (p1_pre_delay + p2_post_delay + TX_MARGIN_DELAY)) ||
        ((p2_count_us - p1_count_us) <= (p2_pre_delay + p1_post_delay + TX_MARGIN_DELAY))) {
        return true;
    } else {
        return false;
    }
}

enum jit_error_e jit_enqueue(struct jit_queue_s *queue, struct timeval *time, struct lgw_pkt_tx_s *packet, enum jit_pkt_type_e pkt_type) {
    int i = 0;
    uint32_t time_us = time->tv_sec * 1000000UL + time->tv_usec; /* convert time in µs */
    uint32_t packet_post_delay = 0;
    uint32_t packet_pre_delay = 0;
    uint32_t target_pre_delay = 0;
    enum jit_error_e err_collision;
    uint32_t asap_count_us;

    MSG_DEBUG(DEBUG_JIT, "Current concentrator time is %u, pkt_type=%d\n", time_us, pkt_type);

    if (packet == NULL) {
        MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: invalid parameter\n");
        return JIT_ERROR_INVALID;
    }

    if (jit_queue_is_full(queue)) {
        MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: cannot enqueue packet, JIT queue is full\n");
        return JIT_ERROR_FULL;
    }

    /* Compute packet pre/post delays depending on packet's type */
    switch (pkt_type) {
        case JIT_PKT_TYPE_DOWNLINK_CLASS_A:
        case JIT_PKT_TYPE_DOWNLINK_CLASS_B:
        case JIT_PKT_TYPE_DOWNLINK_CLASS_C:
            packet_pre_delay = TX_START_DELAY + TX_JIT_DELAY;
            packet_post_delay = lgw_time_on_air(packet) * 1000UL; /* in us */
            break;
        case JIT_PKT_TYPE_BEACON:
            /* As defined in LoRaWAN spec */
            packet_pre_delay = TX_START_DELAY + BEACON_GUARD + TX_JIT_DELAY;
            packet_post_delay = BEACON_RESERVED;
            break;
        default:
            break;
    }

    pthread_mutex_lock(&mx_jit_queue);

    /* An immediate downlink becomes a timestamped downlink "ASAP" */
    /* Set the packet count_us to the first available slot */
    if (pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_C) {
        /* change tx_mode to timestamped */
        packet->tx_mode = TIMESTAMPED;

        /* Search for the ASAP timestamp to be given to the packet */
        asap_count_us = time_us + 1E6; /* TODO: Take 1 second margin, to be refined */
        if (queue->num_pkt == 0) {
            /* If the jit queue is empty, we can insert this packet */
            MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink, first in JiT queue (count_us=%u)\n", asap_count_us);
        } else {
            /* Else we can try to insert it:
                - ASAP meaning NOW + MARGIN
                - at the last index of the queue
                - between 2 downlinks in the queue
            */

            /* First, try if the ASAP time collides with an already enqueued downlink */
            for (i=0; i<queue->num_pkt; i++) {
                if (jit_collision_test(asap_count_us, packet_pre_delay, packet_post_delay, queue->nodes[i].pkt.count_us, queue->nodes[i].pre_delay, queue->nodes[i].post_delay) == true) {
                    MSG_DEBUG(DEBUG_JIT, "DEBUG: cannot insert IMMEDIATE downlink at count_us=%u, collides with %u (index=%d)\n", asap_count_us, queue->nodes[i].pkt.count_us, i);
                    break;
                }
            }
            if (i == queue->num_pkt) {
                /* No collision with ASAP time, we can insert it */
                MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink ASAP at %u (no collision)\n", asap_count_us);
            } else {
                /* Search for the best slot then */
                for (i=0; i<queue->num_pkt; i++) {
                    asap_count_us = queue->nodes[i].pkt.count_us + queue->nodes[i].post_delay + packet_pre_delay + TX_JIT_DELAY + TX_MARGIN_DELAY;
                    if (i == (queue->num_pkt - 1)) {
                        /* Last packet index, we can insert after this one */
                        MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink, last in JiT queue (count_us=%u)\n", asap_count_us);
                    } else {
                        /* Check if packet can be inserted between this index and the next one */
                        MSG_DEBUG(DEBUG_JIT, "DEBUG: try to insert IMMEDIATE downlink (count_us=%u) between index %d and index %d?\n", asap_count_us, i, i+1);
                        if (jit_collision_test(asap_count_us, packet_pre_delay, packet_post_delay, queue->nodes[i+1].pkt.count_us, queue->nodes[i+1].pre_delay, queue->nodes[i+1].post_delay) == true) {
                            MSG_DEBUG(DEBUG_JIT, "DEBUG: failed to insert IMMEDIATE downlink (count_us=%u), continue...\n", asap_count_us);
                            continue;
                        } else {
                            MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink (count_us=%u)\n", asap_count_us);
                            break;
                        }
                    }
                }
            }
        }
        /* Set packet with ASAP timestamp */
        packet->count_us = asap_count_us;
    }

    /* Check criteria_1: is it already too late to send this packet ?
     *  The packet should arrive at least at (tmst - TX_START_DELAY) to be programmed into concentrator
     *  Note: - Also add some margin, to be checked how much is needed, if needed
     *        - Valid for both Downlinks and Beacon packets
     *
     *  Warning: unsigned arithmetic (handle roll-over)
     *      t_packet < t_current + TX_START_DELAY + MARGIN
     */
    if ((packet->count_us - time_us) <= (TX_START_DELAY + TX_MARGIN_DELAY + TX_JIT_DELAY)) {
        MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet REJECTED, already too late to send it (current=%u, packet=%u, type=%d)\n", time_us, packet->count_us, pkt_type);
        pthread_mutex_unlock(&mx_jit_queue);
        return JIT_ERROR_TOO_LATE;
    }

    /* Check criteria_2: Does packet timestamp seem plausible compared to current time
     *  We do not expect the server to program a downlink too early compared to current time
     *  Class A: downlink has to be sent in a 1s or 2s time window after RX
     *  Class B: downlink has to occur in a 128s time window
     *  Class C: no check needed, departure time has been calculated previously
     *  So let's define a safe delay above which we can say that the packet is out of bound: TX_MAX_ADVANCE_DELAY
     *  Note: - Valid for Downlinks only, not for Beacon packets
     *
     *  Warning: unsigned arithmetic (handle roll-over)
                t_packet > t_current + TX_MAX_ADVANCE_DELAY
     */
    if ((pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_A) || (pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_B)) {
        if ((packet->count_us - time_us) > TX_MAX_ADVANCE_DELAY) {
            MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet REJECTED, timestamp seems wrong, too much in advance (current=%u, packet=%u, type=%d)\n", time_us, packet->count_us, pkt_type);
            pthread_mutex_unlock(&mx_jit_queue);
            return JIT_ERROR_TOO_EARLY;
        }
    }

    /* Check criteria_3: does this new packet overlap with a packet already enqueued ?
     *  Note: - need to take into account packet's pre_delay and post_delay of each packet
     *        - Valid for both Downlinks and beacon packets
     *        - Beacon guard can be ignored if we try to queue a Class A downlink
     */
    for (i=0; i<queue->num_pkt; i++) {
        /* We ignore Beacon Guard for Class A/C downlinks */
        if (((pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_A) || (pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_C)) && (queue->nodes[i].pkt_type == JIT_PKT_TYPE_BEACON)) {
            target_pre_delay = TX_START_DELAY;
        } else {
            target_pre_delay = queue->nodes[i].pre_delay;
        }

        /* Check if there is a collision
         *  Warning: unsigned arithmetic (handle roll-over)
         *      t_packet_new - pre_delay_packet_new < t_packet_prev + post_delay_packet_prev (OVERLAP on post delay)
         *      t_packet_new + post_delay_packet_new > t_packet_prev - pre_delay_packet_prev (OVERLAP on pre delay)
         */
        if (jit_collision_test(packet->count_us, packet_pre_delay, packet_post_delay, queue->nodes[i].pkt.count_us, target_pre_delay, queue->nodes[i].post_delay) == true) {
            switch (queue->nodes[i].pkt_type) {
                case JIT_PKT_TYPE_DOWNLINK_CLASS_A:
                case JIT_PKT_TYPE_DOWNLINK_CLASS_B:
                case JIT_PKT_TYPE_DOWNLINK_CLASS_C:
                    MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet (type=%d) REJECTED, collision with packet already programmed at %u (%u)\n", pkt_type, queue->nodes[i].pkt.count_us, packet->count_us);
                    err_collision = JIT_ERROR_COLLISION_PACKET;
                    break;
                case JIT_PKT_TYPE_BEACON:
                    if (pkt_type != JIT_PKT_TYPE_BEACON) {
                        /* do not overload logs for beacon/beacon collision, as it is expected to happen with beacon pre-scheduling algorith used */
                        MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet (type=%d) REJECTED, collision with beacon already programmed at %u (%u)\n", pkt_type, queue->nodes[i].pkt.count_us, packet->count_us);
                    }
                    err_collision = JIT_ERROR_COLLISION_BEACON;
                    break;
                default:
                    MSG("ERROR: Unknown packet type, should not occur, BUG?\n");
                    assert(0);
                    break;
            }
            pthread_mutex_unlock(&mx_jit_queue);
            return err_collision;
        }
    }

    /* Finally enqueue it */
    /* Insert packet at the end of the queue */
    memcpy(&(queue->nodes[queue->num_pkt].pkt), packet, sizeof(struct lgw_pkt_tx_s));
    queue->nodes[queue->num_pkt].pre_delay = packet_pre_delay;
    queue->nodes[queue->num_pkt].post_delay = packet_post_delay;
    queue->nodes[queue->num_pkt].pkt_type = pkt_type;
    if (pkt_type == JIT_PKT_TYPE_BEACON) {
        queue->num_beacon++;
    }
    queue->num_pkt++;
    /* Sort the queue in ascending order of packet timestamp */
    jit_sort_queue(queue);

    /* Done */
    pthread_mutex_unlock(&mx_jit_queue);

    jit_print_queue(queue, false, DEBUG_JIT);

    MSG_DEBUG(DEBUG_JIT, "enqueued packet with count_us=%u (size=%u bytes, toa=%u us, type=%u)\n", packet->count_us, packet->size, packet_post_delay, pkt_type);

    return JIT_ERROR_OK;
}

enum jit_error_e jit_dequeue(struct jit_queue_s *queue, int index, struct lgw_pkt_tx_s *packet, enum jit_pkt_type_e *pkt_type) {
    if (packet == NULL) {
        MSG("ERROR: invalid parameter\n");
        return JIT_ERROR_INVALID;
    }

    if ((index < 0) || (index >= JIT_QUEUE_MAX)) {
        MSG("ERROR: invalid parameter\n");
        return JIT_ERROR_INVALID;
    }

    if (jit_queue_is_empty(queue)) {
        MSG("ERROR: cannot dequeue packet, JIT queue is empty\n");
        return JIT_ERROR_EMPTY;
    }

    pthread_mutex_lock(&mx_jit_queue);

    /* Dequeue requested packet */
    memcpy(packet, &(queue->nodes[index].pkt), sizeof(struct lgw_pkt_tx_s));
    queue->num_pkt--;
    *pkt_type = queue->nodes[index].pkt_type;
    if (*pkt_type == JIT_PKT_TYPE_BEACON) {
        queue->num_beacon--;
        MSG_DEBUG(DEBUG_BEACON, "--- Beacon dequeued ---\n");
    }

    /* Replace dequeued packet with last packet of the queue */
    memcpy(&(queue->nodes[index]), &(queue->nodes[queue->num_pkt]), sizeof(struct jit_node_s));
    memset(&(queue->nodes[queue->num_pkt]), 0, sizeof(struct jit_node_s));

    /* Sort queue in ascending order of packet timestamp */
    jit_sort_queue(queue);

    /* Done */
    pthread_mutex_unlock(&mx_jit_queue);

    jit_print_queue(queue, false, DEBUG_JIT);

    MSG_DEBUG(DEBUG_JIT, "dequeued packet with count_us=%u from index %d\n", packet->count_us, index);

    return JIT_ERROR_OK;
}

enum jit_error_e jit_peek(struct jit_queue_s *queue, struct timeval *time, int *pkt_idx) {
    /* Return index of node containing a packet inline with given time */
    int i = 0;
    int idx_highest_priority = -1;
    uint32_t time_us;

    if ((time == NULL) || (pkt_idx == NULL)) {
        MSG("ERROR: invalid parameter\n");
        return JIT_ERROR_INVALID;
    }

    if (jit_queue_is_empty(queue)) {
        return JIT_ERROR_EMPTY;
    }

    time_us = time->tv_sec * 1000000UL + time->tv_usec;

    pthread_mutex_lock(&mx_jit_queue);

    /* Search for highest priority packet to be sent */
    for (i=0; i<queue->num_pkt; i++) {
        /* First check if that packet is outdated:
         *  If a packet seems too much in advance, and was not rejected at enqueue time,
         *  it means that we missed it for peeking, we need to drop it
         *
         *  Warning: unsigned arithmetic
         *      t_packet > t_current + TX_MAX_ADVANCE_DELAY
         */
        if ((queue->nodes[i].pkt.count_us - time_us) >= TX_MAX_ADVANCE_DELAY) {
            /* We drop the packet to avoid lock-up */
            queue->num_pkt--;
            if (queue->nodes[i].pkt_type == JIT_PKT_TYPE_BEACON) {
                queue->num_beacon--;
                MSG("WARNING: --- Beacon dropped (current_time=%u, packet_time=%u) ---\n", time_us, queue->nodes[i].pkt.count_us);
            } else {
                MSG("WARNING: --- Packet dropped (current_time=%u, packet_time=%u) ---\n", time_us, queue->nodes[i].pkt.count_us);
            }

            /* Replace dropped packet with last packet of the queue */
            memcpy(&(queue->nodes[i]), &(queue->nodes[queue->num_pkt]), sizeof(struct jit_node_s));
            memset(&(queue->nodes[queue->num_pkt]), 0, sizeof(struct jit_node_s));

            /* Sort queue in ascending order of packet timestamp */
            jit_sort_queue(queue);

            /* restart loop  after purge to find packet to be sent */
            i = 0;
            continue;
        }

        /* Then look for highest priority packet to be sent:
         *  Warning: unsigned arithmetic (handle roll-over)
         *      t_packet < t_highest
         */
        if ((idx_highest_priority == -1) || (((queue->nodes[i].pkt.count_us - time_us) < (queue->nodes[idx_highest_priority].pkt.count_us - time_us)))) {
            idx_highest_priority = i;
        }
    }

    /* Peek criteria 1: look for a packet to be sent in next TX_JIT_DELAY ms timeframe
     *  Warning: unsigned arithmetic (handle roll-over)
     *      t_packet < t_current + TX_JIT_DELAY
     */
    if ((queue->nodes[idx_highest_priority].pkt.count_us - time_us) < TX_JIT_DELAY) {
        *pkt_idx = idx_highest_priority;
        MSG_DEBUG(DEBUG_JIT, "peek packet with count_us=%u at index %d\n",
            queue->nodes[idx_highest_priority].pkt.count_us, idx_highest_priority);
    } else {
        *pkt_idx = -1;
    }

    pthread_mutex_unlock(&mx_jit_queue);

    return JIT_ERROR_OK;
}

void jit_print_queue(struct jit_queue_s *queue, bool show_all, int debug_level) {
    int i = 0;
    int loop_end;

    if (jit_queue_is_empty(queue)) {
        MSG_DEBUG(debug_level, "INFO: [jit] queue is empty\n");
    } else {
        pthread_mutex_lock(&mx_jit_queue);

        MSG_DEBUG(debug_level, "INFO: [jit] queue contains %d packets:\n", queue->num_pkt);
        MSG_DEBUG(debug_level, "INFO: [jit] queue contains %d beacons:\n", queue->num_beacon);
        loop_end = (show_all == true) ? JIT_QUEUE_MAX : queue->num_pkt;
        for (i=0; i<loop_end; i++) {
            MSG_DEBUG(debug_level, " - node[%d]: count_us=%u - type=%d\n",
                        i,
                        queue->nodes[i].pkt.count_us,
                        queue->nodes[i].pkt_type);
        }

        pthread_mutex_unlock(&mx_jit_queue);
    }
}