From 2a02a3a7dfda2679ebda86fa830023fe996a06c9 Mon Sep 17 00:00:00 2001 From: Harsh Sharma <92harshsharma@gmail.com> Date: Wed, 13 Jun 2018 13:26:38 -0500 Subject: Initial commit --- lora_pkt_fwd/src/jitqueue.c | 465 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 465 insertions(+) create mode 100644 lora_pkt_fwd/src/jitqueue.c (limited to 'lora_pkt_fwd/src/jitqueue.c') diff --git a/lora_pkt_fwd/src/jitqueue.c b/lora_pkt_fwd/src/jitqueue.c new file mode 100644 index 0000000..dbde8d2 --- /dev/null +++ b/lora_pkt_fwd/src/jitqueue.c @@ -0,0 +1,465 @@ +/* + / _____) _ | | +( (____ _____ ____ _| |_ _____ ____| |__ + \____ \| ___ | (_ _) ___ |/ ___) _ \ + _____) ) ____| | | || |_| ____( (___| | | | +(______/|_____)_|_|_| \__)_____)\____)_| |_| + (C)2013 Semtech-Cycleo + +Description: + LoRa concentrator : Just In Time TX scheduling queue + +License: Revised BSD License, see LICENSE.TXT file include in the project +Maintainer: Michael Coracin +*/ + +/* -------------------------------------------------------------------------- */ +/* --- DEPENDANCIES --------------------------------------------------------- */ + +#define _GNU_SOURCE /* needed for qsort_r to be defined */ +#include /* qsort_r */ +#include /* printf, fprintf, snprintf, fopen, fputs */ +#include /* memset, memcpy */ +#include +#include +#include + +#include "trace.h" +#include "jitqueue.h" + +/* -------------------------------------------------------------------------- */ +/* --- PRIVATE MACROS ------------------------------------------------------- */ + +/* -------------------------------------------------------------------------- */ +/* --- PRIVATE CONSTANTS & TYPES -------------------------------------------- */ +#define TX_START_DELAY 1500 /* microseconds */ + /* TODO: get this value from HAL? */ +#define TX_MARGIN_DELAY 1000 /* Packet overlap margin in microseconds */ + /* TODO: How much margin should we take? */ +#define TX_JIT_DELAY 30000 /* Pre-delay to program packet for TX in microseconds */ +#define TX_MAX_ADVANCE_DELAY ((JIT_NUM_BEACON_IN_QUEUE + 1) * 128 * 1E6) /* Maximum advance delay accepted for a TX packet, compared to current time */ + +#define BEACON_GUARD 3000000 /* Interval where no ping slot can be placed, + to ensure beacon can be sent */ +#define BEACON_RESERVED 2120000 /* Time on air of the beacon, with some margin */ + +/* -------------------------------------------------------------------------- */ +/* --- PRIVATE VARIABLES (GLOBAL) ------------------------------------------- */ +static pthread_mutex_t mx_jit_queue = PTHREAD_MUTEX_INITIALIZER; /* control access to JIT queue */ + +/* -------------------------------------------------------------------------- */ +/* --- PRIVATE FUNCTIONS DEFINITION ----------------------------------------- */ + +/* -------------------------------------------------------------------------- */ +/* --- PUBLIC FUNCTIONS DEFINITION ----------------------------------------- */ + +bool jit_queue_is_full(struct jit_queue_s *queue) { + bool result; + + pthread_mutex_lock(&mx_jit_queue); + + result = (queue->num_pkt == JIT_QUEUE_MAX)?true:false; + + pthread_mutex_unlock(&mx_jit_queue); + + return result; +} + +bool jit_queue_is_empty(struct jit_queue_s *queue) { + bool result; + + pthread_mutex_lock(&mx_jit_queue); + + result = (queue->num_pkt == 0)?true:false; + + pthread_mutex_unlock(&mx_jit_queue); + + return result; +} + +void jit_queue_init(struct jit_queue_s *queue) { + int i; + + pthread_mutex_lock(&mx_jit_queue); + + memset(queue, 0, sizeof(*queue)); + for (i=0; inodes[i].pre_delay = 0; + queue->nodes[i].post_delay = 0; + } + + pthread_mutex_unlock(&mx_jit_queue); +} + +int compare(const void *a, const void *b, void *arg) +{ + struct jit_node_s *p = (struct jit_node_s *)a; + struct jit_node_s *q = (struct jit_node_s *)b; + int *counter = (int *)arg; + int p_count, q_count; + + p_count = p->pkt.count_us; + q_count = q->pkt.count_us; + + if (p_count > q_count) + *counter = *counter + 1; + + return p_count - q_count; +} + +void jit_sort_queue(struct jit_queue_s *queue) { + int counter = 0; + + if (queue->num_pkt == 0) { + return; + } + + MSG_DEBUG(DEBUG_JIT, "sorting queue in ascending order packet timestamp - queue size:%u\n", queue->num_pkt); + qsort_r(queue->nodes, queue->num_pkt, sizeof(queue->nodes[0]), compare, &counter); + MSG_DEBUG(DEBUG_JIT, "sorting queue done - swapped:%d\n", counter); +} + +bool jit_collision_test(uint32_t p1_count_us, uint32_t p1_pre_delay, uint32_t p1_post_delay, uint32_t p2_count_us, uint32_t p2_pre_delay, uint32_t p2_post_delay) { + if (((p1_count_us - p2_count_us) <= (p1_pre_delay + p2_post_delay + TX_MARGIN_DELAY)) || + ((p2_count_us - p1_count_us) <= (p2_pre_delay + p1_post_delay + TX_MARGIN_DELAY))) { + return true; + } else { + return false; + } +} + +enum jit_error_e jit_enqueue(struct jit_queue_s *queue, struct timeval *time, struct lgw_pkt_tx_s *packet, enum jit_pkt_type_e pkt_type) { + int i = 0; + uint32_t time_us = time->tv_sec * 1000000UL + time->tv_usec; /* convert time in µs */ + uint32_t packet_post_delay = 0; + uint32_t packet_pre_delay = 0; + uint32_t target_pre_delay = 0; + enum jit_error_e err_collision; + uint32_t asap_count_us; + + MSG_DEBUG(DEBUG_JIT, "Current concentrator time is %u, pkt_type=%d\n", time_us, pkt_type); + + if (packet == NULL) { + MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: invalid parameter\n"); + return JIT_ERROR_INVALID; + } + + if (jit_queue_is_full(queue)) { + MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: cannot enqueue packet, JIT queue is full\n"); + return JIT_ERROR_FULL; + } + + /* Compute packet pre/post delays depending on packet's type */ + switch (pkt_type) { + case JIT_PKT_TYPE_DOWNLINK_CLASS_A: + case JIT_PKT_TYPE_DOWNLINK_CLASS_B: + case JIT_PKT_TYPE_DOWNLINK_CLASS_C: + packet_pre_delay = TX_START_DELAY + TX_JIT_DELAY; + packet_post_delay = lgw_time_on_air(packet) * 1000UL; /* in us */ + break; + case JIT_PKT_TYPE_BEACON: + /* As defined in LoRaWAN spec */ + packet_pre_delay = TX_START_DELAY + BEACON_GUARD + TX_JIT_DELAY; + packet_post_delay = BEACON_RESERVED; + break; + default: + break; + } + + pthread_mutex_lock(&mx_jit_queue); + + /* An immediate downlink becomes a timestamped downlink "ASAP" */ + /* Set the packet count_us to the first available slot */ + if (pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_C) { + /* change tx_mode to timestamped */ + packet->tx_mode = TIMESTAMPED; + + /* Search for the ASAP timestamp to be given to the packet */ + asap_count_us = time_us + 1E6; /* TODO: Take 1 second margin, to be refined */ + if (queue->num_pkt == 0) { + /* If the jit queue is empty, we can insert this packet */ + MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink, first in JiT queue (count_us=%u)\n", asap_count_us); + } else { + /* Else we can try to insert it: + - ASAP meaning NOW + MARGIN + - at the last index of the queue + - between 2 downlinks in the queue + */ + + /* First, try if the ASAP time collides with an already enqueued downlink */ + for (i=0; inum_pkt; i++) { + if (jit_collision_test(asap_count_us, packet_pre_delay, packet_post_delay, queue->nodes[i].pkt.count_us, queue->nodes[i].pre_delay, queue->nodes[i].post_delay) == true) { + MSG_DEBUG(DEBUG_JIT, "DEBUG: cannot insert IMMEDIATE downlink at count_us=%u, collides with %u (index=%d)\n", asap_count_us, queue->nodes[i].pkt.count_us, i); + break; + } + } + if (i == queue->num_pkt) { + /* No collision with ASAP time, we can insert it */ + MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink ASAP at %u (no collision)\n", asap_count_us); + } else { + /* Search for the best slot then */ + for (i=0; inum_pkt; i++) { + asap_count_us = queue->nodes[i].pkt.count_us + queue->nodes[i].post_delay + packet_pre_delay + TX_JIT_DELAY + TX_MARGIN_DELAY; + if (i == (queue->num_pkt - 1)) { + /* Last packet index, we can insert after this one */ + MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink, last in JiT queue (count_us=%u)\n", asap_count_us); + } else { + /* Check if packet can be inserted between this index and the next one */ + MSG_DEBUG(DEBUG_JIT, "DEBUG: try to insert IMMEDIATE downlink (count_us=%u) between index %d and index %d?\n", asap_count_us, i, i+1); + if (jit_collision_test(asap_count_us, packet_pre_delay, packet_post_delay, queue->nodes[i+1].pkt.count_us, queue->nodes[i+1].pre_delay, queue->nodes[i+1].post_delay) == true) { + MSG_DEBUG(DEBUG_JIT, "DEBUG: failed to insert IMMEDIATE downlink (count_us=%u), continue...\n", asap_count_us); + continue; + } else { + MSG_DEBUG(DEBUG_JIT, "DEBUG: insert IMMEDIATE downlink (count_us=%u)\n", asap_count_us); + break; + } + } + } + } + } + /* Set packet with ASAP timestamp */ + packet->count_us = asap_count_us; + } + + /* Check criteria_1: is it already too late to send this packet ? + * The packet should arrive at least at (tmst - TX_START_DELAY) to be programmed into concentrator + * Note: - Also add some margin, to be checked how much is needed, if needed + * - Valid for both Downlinks and Beacon packets + * + * Warning: unsigned arithmetic (handle roll-over) + * t_packet < t_current + TX_START_DELAY + MARGIN + */ + if ((packet->count_us - time_us) <= (TX_START_DELAY + TX_MARGIN_DELAY + TX_JIT_DELAY)) { + MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet REJECTED, already too late to send it (current=%u, packet=%u, type=%d)\n", time_us, packet->count_us, pkt_type); + pthread_mutex_unlock(&mx_jit_queue); + return JIT_ERROR_TOO_LATE; + } + + /* Check criteria_2: Does packet timestamp seem plausible compared to current time + * We do not expect the server to program a downlink too early compared to current time + * Class A: downlink has to be sent in a 1s or 2s time window after RX + * Class B: downlink has to occur in a 128s time window + * Class C: no check needed, departure time has been calculated previously + * So let's define a safe delay above which we can say that the packet is out of bound: TX_MAX_ADVANCE_DELAY + * Note: - Valid for Downlinks only, not for Beacon packets + * + * Warning: unsigned arithmetic (handle roll-over) + t_packet > t_current + TX_MAX_ADVANCE_DELAY + */ + if ((pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_A) || (pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_B)) { + if ((packet->count_us - time_us) > TX_MAX_ADVANCE_DELAY) { + MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet REJECTED, timestamp seems wrong, too much in advance (current=%u, packet=%u, type=%d)\n", time_us, packet->count_us, pkt_type); + pthread_mutex_unlock(&mx_jit_queue); + return JIT_ERROR_TOO_EARLY; + } + } + + /* Check criteria_3: does this new packet overlap with a packet already enqueued ? + * Note: - need to take into account packet's pre_delay and post_delay of each packet + * - Valid for both Downlinks and beacon packets + * - Beacon guard can be ignored if we try to queue a Class A downlink + */ + for (i=0; inum_pkt; i++) { + /* We ignore Beacon Guard for Class A/C downlinks */ + if (((pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_A) || (pkt_type == JIT_PKT_TYPE_DOWNLINK_CLASS_C)) && (queue->nodes[i].pkt_type == JIT_PKT_TYPE_BEACON)) { + target_pre_delay = TX_START_DELAY; + } else { + target_pre_delay = queue->nodes[i].pre_delay; + } + + /* Check if there is a collision + * Warning: unsigned arithmetic (handle roll-over) + * t_packet_new - pre_delay_packet_new < t_packet_prev + post_delay_packet_prev (OVERLAP on post delay) + * t_packet_new + post_delay_packet_new > t_packet_prev - pre_delay_packet_prev (OVERLAP on pre delay) + */ + if (jit_collision_test(packet->count_us, packet_pre_delay, packet_post_delay, queue->nodes[i].pkt.count_us, target_pre_delay, queue->nodes[i].post_delay) == true) { + switch (queue->nodes[i].pkt_type) { + case JIT_PKT_TYPE_DOWNLINK_CLASS_A: + case JIT_PKT_TYPE_DOWNLINK_CLASS_B: + case JIT_PKT_TYPE_DOWNLINK_CLASS_C: + MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet (type=%d) REJECTED, collision with packet already programmed at %u (%u)\n", pkt_type, queue->nodes[i].pkt.count_us, packet->count_us); + err_collision = JIT_ERROR_COLLISION_PACKET; + break; + case JIT_PKT_TYPE_BEACON: + if (pkt_type != JIT_PKT_TYPE_BEACON) { + /* do not overload logs for beacon/beacon collision, as it is expected to happen with beacon pre-scheduling algorith used */ + MSG_DEBUG(DEBUG_JIT_ERROR, "ERROR: Packet (type=%d) REJECTED, collision with beacon already programmed at %u (%u)\n", pkt_type, queue->nodes[i].pkt.count_us, packet->count_us); + } + err_collision = JIT_ERROR_COLLISION_BEACON; + break; + default: + MSG("ERROR: Unknown packet type, should not occur, BUG?\n"); + assert(0); + break; + } + pthread_mutex_unlock(&mx_jit_queue); + return err_collision; + } + } + + /* Finally enqueue it */ + /* Insert packet at the end of the queue */ + memcpy(&(queue->nodes[queue->num_pkt].pkt), packet, sizeof(struct lgw_pkt_tx_s)); + queue->nodes[queue->num_pkt].pre_delay = packet_pre_delay; + queue->nodes[queue->num_pkt].post_delay = packet_post_delay; + queue->nodes[queue->num_pkt].pkt_type = pkt_type; + if (pkt_type == JIT_PKT_TYPE_BEACON) { + queue->num_beacon++; + } + queue->num_pkt++; + /* Sort the queue in ascending order of packet timestamp */ + jit_sort_queue(queue); + + /* Done */ + pthread_mutex_unlock(&mx_jit_queue); + + jit_print_queue(queue, false, DEBUG_JIT); + + MSG_DEBUG(DEBUG_JIT, "enqueued packet with count_us=%u (size=%u bytes, toa=%u us, type=%u)\n", packet->count_us, packet->size, packet_post_delay, pkt_type); + + return JIT_ERROR_OK; +} + +enum jit_error_e jit_dequeue(struct jit_queue_s *queue, int index, struct lgw_pkt_tx_s *packet, enum jit_pkt_type_e *pkt_type) { + if (packet == NULL) { + MSG("ERROR: invalid parameter\n"); + return JIT_ERROR_INVALID; + } + + if ((index < 0) || (index >= JIT_QUEUE_MAX)) { + MSG("ERROR: invalid parameter\n"); + return JIT_ERROR_INVALID; + } + + if (jit_queue_is_empty(queue)) { + MSG("ERROR: cannot dequeue packet, JIT queue is empty\n"); + return JIT_ERROR_EMPTY; + } + + pthread_mutex_lock(&mx_jit_queue); + + /* Dequeue requested packet */ + memcpy(packet, &(queue->nodes[index].pkt), sizeof(struct lgw_pkt_tx_s)); + queue->num_pkt--; + *pkt_type = queue->nodes[index].pkt_type; + if (*pkt_type == JIT_PKT_TYPE_BEACON) { + queue->num_beacon--; + MSG_DEBUG(DEBUG_BEACON, "--- Beacon dequeued ---\n"); + } + + /* Replace dequeued packet with last packet of the queue */ + memcpy(&(queue->nodes[index]), &(queue->nodes[queue->num_pkt]), sizeof(struct jit_node_s)); + memset(&(queue->nodes[queue->num_pkt]), 0, sizeof(struct jit_node_s)); + + /* Sort queue in ascending order of packet timestamp */ + jit_sort_queue(queue); + + /* Done */ + pthread_mutex_unlock(&mx_jit_queue); + + jit_print_queue(queue, false, DEBUG_JIT); + + MSG_DEBUG(DEBUG_JIT, "dequeued packet with count_us=%u from index %d\n", packet->count_us, index); + + return JIT_ERROR_OK; +} + +enum jit_error_e jit_peek(struct jit_queue_s *queue, struct timeval *time, int *pkt_idx) { + /* Return index of node containing a packet inline with given time */ + int i = 0; + int idx_highest_priority = -1; + uint32_t time_us; + + if ((time == NULL) || (pkt_idx == NULL)) { + MSG("ERROR: invalid parameter\n"); + return JIT_ERROR_INVALID; + } + + if (jit_queue_is_empty(queue)) { + return JIT_ERROR_EMPTY; + } + + time_us = time->tv_sec * 1000000UL + time->tv_usec; + + pthread_mutex_lock(&mx_jit_queue); + + /* Search for highest priority packet to be sent */ + for (i=0; inum_pkt; i++) { + /* First check if that packet is outdated: + * If a packet seems too much in advance, and was not rejected at enqueue time, + * it means that we missed it for peeking, we need to drop it + * + * Warning: unsigned arithmetic + * t_packet > t_current + TX_MAX_ADVANCE_DELAY + */ + if ((queue->nodes[i].pkt.count_us - time_us) >= TX_MAX_ADVANCE_DELAY) { + /* We drop the packet to avoid lock-up */ + queue->num_pkt--; + if (queue->nodes[i].pkt_type == JIT_PKT_TYPE_BEACON) { + queue->num_beacon--; + MSG("WARNING: --- Beacon dropped (current_time=%u, packet_time=%u) ---\n", time_us, queue->nodes[i].pkt.count_us); + } else { + MSG("WARNING: --- Packet dropped (current_time=%u, packet_time=%u) ---\n", time_us, queue->nodes[i].pkt.count_us); + } + + /* Replace dropped packet with last packet of the queue */ + memcpy(&(queue->nodes[i]), &(queue->nodes[queue->num_pkt]), sizeof(struct jit_node_s)); + memset(&(queue->nodes[queue->num_pkt]), 0, sizeof(struct jit_node_s)); + + /* Sort queue in ascending order of packet timestamp */ + jit_sort_queue(queue); + + /* restart loop after purge to find packet to be sent */ + i = 0; + continue; + } + + /* Then look for highest priority packet to be sent: + * Warning: unsigned arithmetic (handle roll-over) + * t_packet < t_highest + */ + if ((idx_highest_priority == -1) || (((queue->nodes[i].pkt.count_us - time_us) < (queue->nodes[idx_highest_priority].pkt.count_us - time_us)))) { + idx_highest_priority = i; + } + } + + /* Peek criteria 1: look for a packet to be sent in next TX_JIT_DELAY ms timeframe + * Warning: unsigned arithmetic (handle roll-over) + * t_packet < t_current + TX_JIT_DELAY + */ + if ((queue->nodes[idx_highest_priority].pkt.count_us - time_us) < TX_JIT_DELAY) { + *pkt_idx = idx_highest_priority; + MSG_DEBUG(DEBUG_JIT, "peek packet with count_us=%u at index %d\n", + queue->nodes[idx_highest_priority].pkt.count_us, idx_highest_priority); + } else { + *pkt_idx = -1; + } + + pthread_mutex_unlock(&mx_jit_queue); + + return JIT_ERROR_OK; +} + +void jit_print_queue(struct jit_queue_s *queue, bool show_all, int debug_level) { + int i = 0; + int loop_end; + + if (jit_queue_is_empty(queue)) { + MSG_DEBUG(debug_level, "INFO: [jit] queue is empty\n"); + } else { + pthread_mutex_lock(&mx_jit_queue); + + MSG_DEBUG(debug_level, "INFO: [jit] queue contains %d packets:\n", queue->num_pkt); + MSG_DEBUG(debug_level, "INFO: [jit] queue contains %d beacons:\n", queue->num_beacon); + loop_end = (show_all == true) ? JIT_QUEUE_MAX : queue->num_pkt; + for (i=0; inodes[i].pkt.count_us, + queue->nodes[i].pkt_type); + } + + pthread_mutex_unlock(&mx_jit_queue); + } +} + -- cgit v1.2.3