diff options
author | David Marcaccini <david.marcaccini@multitech.com> | 2019-04-29 13:33:15 -0500 |
---|---|---|
committer | David Marcaccini <david.marcaccini@multitech.com> | 2019-04-29 13:33:15 -0500 |
commit | 92b102013fe546a85b309bacc16736c0b2a37cb7 (patch) | |
tree | e1d1475b6afc5e9e1c2434f5f318ef63caf5926a /src | |
download | mtsazure-92b102013fe546a85b309bacc16736c0b2a37cb7.tar.gz mtsazure-92b102013fe546a85b309bacc16736c0b2a37cb7.tar.bz2 mtsazure-92b102013fe546a85b309bacc16736c0b2a37cb7.zip |
Initial commit of mtsazure codebase
Diffstat (limited to 'src')
-rw-r--r-- | src/mtsazure.c | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/src/mtsazure.c b/src/mtsazure.c new file mode 100644 index 0000000..a0a6366 --- /dev/null +++ b/src/mtsazure.c @@ -0,0 +1,493 @@ +// MTS Azure Telecommunicator - sends and receives messages to Microsoft Azure +// Copyright 2019 Multi-Tech Systems Inc. - All Rights Reserved +// Author: David Marcaccini +// Contact: david.marcaccini@multitech.com + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> + +#include <iothub.h> +#include <iothub_device_client_ll.h> +#include <iothub_client_options.h> +#include <iothub_message.h> +#include <azure_c_shared_utility/threadapi.h> +#include <azure_c_shared_utility/crt_abstractions.h> +#include <azure_c_shared_utility/shared_util_options.h> +#include <iothubtransportmqtt.h> +#include <iothubtransportmqtt_websockets.h> +#include <iothubtransportamqp.h> +#include <iothubtransportamqp_websockets.h> +#include <iothubtransporthttp.h> + +//#include "certs.h" + +#define MAX_CONNECTION_STRING_LENGTH 1023 + +static unsigned global_verbosity = 0; +static size_t g_message_send_confirmations = 0; + +typedef enum msg_format_enum{ + FORMAT_BINARY = 0, + FORMAT_HEX, + FORMAT_BASE64, +}msg_format_t; + +static void send_confirm_callback(IOTHUB_CLIENT_CONFIRMATION_RESULT result, void* userContextCallback){ + g_message_send_confirmations++; + if(global_verbosity){ + //printf("Confirmation callback received for message with result %s\n", ENUM_TO_STRING(IOTHUB_CLIENT_CONFIRMATION_RESULT, result)); fflush(stdout); + printf("Confirmation callback received for message.\n"); fflush(stdout); + } +} + +static void connection_status_callback(IOTHUB_CLIENT_CONNECTION_STATUS result, IOTHUB_CLIENT_CONNECTION_STATUS_REASON reason, void* user_context){ + if(global_verbosity){ + if (result == IOTHUB_CLIENT_CONNECTION_AUTHENTICATED){ + printf("The device client is connected to the IoTHub\n"); + }else{ + printf("The device client has been disconnected from the IoTHub\n"); + } + fflush(stdout); + } +} + +//static inline char hex_char_decode(const char hex_char){ +static char hex_char_decode(const char hex_char){ + if(hex_char >= '0' && hex_char <= '9'){ + return (hex_char - '0'); + }else if(hex_char >= 'A' && hex_char <= 'F'){ + return (hex_char - 'A') + 10; + }else if(hex_char >= 'a' && hex_char <= 'f'){ + return (hex_char - 'a') + 10; + } + return 16; +} + +//static inline char b64_char_decode(const char b64_char){ +static char b64_char_decode(const char b64_char){ + if(b64_char >= 'A' && b64_char <= 'Z'){ + return (b64_char - 'A'); + }else if(b64_char >= 'a' && b64_char <= 'z'){ + return (b64_char - 'a') + 26; + }else if(b64_char >= '0' && b64_char <= '9'){ + return (b64_char - '0') + 52; + }else if(b64_char == '+'){ + return 62; + }else if(b64_char == '/'){ + return 63; + } + return 64; +} + +// Converts a nul terminated ascii string of hex characters to a binary array +// returns the number of bytes that are or would be written to binary_out if it were sufficiently long +static size_t hexdecode(uint8_t* binary_out, const char* encoded_ascii_in, size_t binary_out_buflen){ + size_t bytes_needed = 0; + size_t i = 0; + const size_t in_length = strlen(encoded_ascii_in); + while(in_length > (i + 1)){ // ignore any number of 0x or 0X prefixes + if(encoded_ascii_in[i] == '0' && (encoded_ascii_in[i + 1] == 'x' || encoded_ascii_in[i + 1] == 'X')){ + i += 2; + }else{ + break; + } + } + for(size_t b = 0; i < in_length; i++){ + char decoded_value = hex_char_decode(encoded_ascii_in[i]); + if(decoded_value < 16){ + if((b & 7) == 0){ + if(++bytes_needed <= binary_out_buflen){ + binary_out[b >> 3] = (decoded_value << 4); + } + }else{ + if(bytes_needed <= binary_out_buflen){ + binary_out[b >> 3] |= decoded_value; + } + } + b += 4; + } + } + return bytes_needed; +} + +// Converts a nul terminated ascii string of base64 characters (with trialing '=' characters) to a binary array +// returns the number of bytes that are or would be written to binary_out if it were sufficiently long +static size_t base64decode(uint8_t* binary_out, const char* encoded_ascii_in, size_t binary_out_buflen){ + size_t bytes_needed = 0; + for(size_t i = 0, b = 0; i < strlen(encoded_ascii_in); i++){ + char decoded_value = b64_char_decode(encoded_ascii_in[i]); + if(decoded_value < 64){ + switch((b & 7) >> 1){ + case(0): + if((b >> 3) < binary_out_buflen){ + binary_out[b >> 3] = (((uint8_t)decoded_value) << 2); + } + bytes_needed++; + break; + case(1): + if((b >> 3) < binary_out_buflen){ + binary_out[b >> 3] |= (((uint8_t)decoded_value) & 0x3F); + } + break; + case(2): + if((b >> 3) < binary_out_buflen){ + binary_out[b >> 3] |= ((((uint8_t)decoded_value) >> 2) & 0xF); + } + if((i + 1) < strlen(encoded_ascii_in)){ + if(b64_char_decode(encoded_ascii_in[i + 1]) >= 0){ + if(((b >> 3) + 1) < binary_out_buflen){ + binary_out[(b >> 3) + 1] = (((uint8_t)decoded_value) << 6); + } + bytes_needed++; + } + } + break; + default: // case(3): + if((b >> 3) < binary_out_buflen){ + binary_out[b >> 3] |= ((((uint8_t)decoded_value) >> 4) & 0x3); + } + if((i + 2) < strlen(encoded_ascii_in)){ + if(b64_char_decode(encoded_ascii_in[i + 1]) >= 0){ + if(((b >> 3) + 1) < binary_out_buflen){ + binary_out[(b >> 3) + 1] = (((uint8_t)decoded_value) << 4); + } + bytes_needed++; + } + } + break; + } + b += 6; + } + } + return bytes_needed; +} + +static inline void str_to_lowercase(char* str){ + for(size_t s = 0; s < strlen(str); s++){ + if(str[s] >= 'A' && str[s] <= 'Z'){ + str[s] |= 0x20; + } + } +} + +static void print_help_and_exit(const char* program_name){ + const char* pname = program_name ? program_name : "mtsazure"; + printf( + "Usage:\n" + "%s [OPTIONS]\n" + "Options List:\n" + "-h or --help : print this help message and exit\n" + "-v or --verbose : run program in verbose mode, prints messages to stdout\n" + "-vv or --extra-verbose : run program in extra verbose mode, prints even more messages to stdout\n" + "-p or --protocol <protocol_name> : used to specify protocol (default: MQTT)\n" + " Valid <protocol_name> options are : mqtt, mqtt-websocket, amqp, amqp-websocket, http\n" + "-c or --connection-string <connection_string> : used to specify your Azure connection string\n" + "-csp or --connection-string-path <path> : used to specify your Azure connection string on a the first line of the\n" + " file specified with <path>\n" + " Note: if BOTH -c and -csp options are specified,\n" + " the command line (-c) connection string will be used\n" + "-f or --format <format> : specify the format of the source data\n" + " Valid <format> options are : binary, hex, base64\n" + " Note: hex and base64 formats are decoded to\n" + " binary before transmission\n" + "-s or --source <message_source_file_path> : used to specify a path to a file containing the message\n" + "-m or --message <message> : used to specify a message for transmission\n" + "-b or --block-length <length_in_bytes> : used to break the message into blocks of N bytes (default = full message)\n" + "-n or --no-trailing-newline : remove 1 newline character (if present) from the end of the entire message\n" + " Note: an argument of 0 for --block-length\n" + " indicates the entire message length\n" + " Note: if BOTH -s and -m options are specified,\n" + " the -m message will take precidence\n" + " Note: if NEITHER -s or -m options are specifed,\n" + " the program will take data from stdin\n", + pname + ); + fflush(stdout); + exit(EXIT_SUCCESS); +} + +int main(int argc, char** argv){ + uint8_t stack_buf[4096]; // 4kB stack buffer to read in files and stdin 4k at a time + uint8_t* mbuf = NULL; // message buffer - for intake of message data + uint8_t* tbuf = NULL; + char connection_string[MAX_CONNECTION_STRING_LENGTH + 1] = {0}; + const char* connection_string_path = NULL; + IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol = MQTT_Protocol; + IOTHUB_MESSAGE_HANDLE message_handle; + msg_format_t msg_format = FORMAT_BINARY; + const char* cmd_line_msg = NULL; + const char* msg_source = NULL; + size_t msg_length = 0; // length in bytes of the message (possibly in encoded base64 or hex format) + size_t transmission_length = 0; // length in bytes of the raw binary message to be transmitted + size_t block_length = 0; // 0 indicates the entire message should be sent as a single block + bool remove_trailing_newline = false; + for(int i = 1; i < argc; i++){ + if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--help")){ + print_help_and_exit(argv[0]); + }else if(!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")){ + if(global_verbosity < 1){ + global_verbosity = 1; + } + }else if(!strcmp(argv[i], "-vv") || !strcmp(argv[i], "--extra-verbose")){ + if(global_verbosity < 2){ + global_verbosity = 2; + } + } + } + for(int i = 1; i < argc; i++){ + if((i < (argc - 1)) && (!strcmp(argv[i], "-p") || !strcmp(argv[i], "--protocol"))){ + char protocol_string[32] = {0}; + strncpy(protocol_string, argv[++i], sizeof(protocol_string) - 1); + str_to_lowercase(protocol_string); + if(!strcmp(protocol_string, "mqtt")){ + protocol = MQTT_Protocol; + }else if(!strcmp(protocol_string, "mqtt-websocket")){ + protocol = MQTT_WebSocket_Protocol; + }else if(!strcmp(protocol_string, "amqp")){ + protocol = AMQP_Protocol; + }else if(!strcmp(protocol_string, "amqp-websocket")){ + protocol = AMQP_Protocol_over_WebSocketsTls; + }else if(!strcmp(protocol_string, "http")){ + protocol = HTTP_Protocol; + }else{ + fprintf(stderr, "%s : unknown --protocol option '%s'. Aborting execution...\n", argv[0], protocol_string); fflush(stderr); + exit(EXIT_FAILURE); + } + }else if((i < (argc - 1)) && (!strcmp(argv[i], "-c") || !strcmp(argv[i], "--connection-string"))){ + memset((void*)connection_string, 0, sizeof(connection_string)); + strncpy(connection_string, argv[++i], sizeof(connection_string) - 1); + }else if((i < (argc - 1)) && (!strcmp(argv[i], "-csp") || !strcmp(argv[i], "--connection-string-path"))){ + connection_string_path = argv[++i]; + }else if((i < (argc - 1)) && (!strcmp(argv[i], "-f") || !strcmp(argv[i], "--format"))){ + char format_string[32] = {0}; + strncpy(format_string, argv[++i], sizeof(format_string) - 1); + str_to_lowercase(format_string); + if(!strcmp(format_string, "binary")){ + msg_format = FORMAT_BINARY; + }else if(!strcmp(format_string, "hex") || !strcmp(format_string, "hexadecimal")){ + msg_format = FORMAT_HEX; + }else if(!strcmp(format_string, "base64")){ + msg_format = FORMAT_BASE64; + }else{ + fprintf(stderr, "%s : unknown --format option '%s'. Aborting execution...\n", argv[0], format_string); fflush(stderr); + exit(EXIT_FAILURE); + } + }else if((i < (argc - 1)) && (!strcmp(argv[i], "-s") || !strcmp(argv[i], "--source"))){ + msg_source = argv[++i]; + }else if((i < (argc - 1)) && (!strcmp(argv[i], "-m") || !strcmp(argv[i], "--message"))){ + cmd_line_msg = argv[++i]; + }else if((i < (argc - 1)) && (!strcmp(argv[i], "-b") || !strcmp(argv[i], "--block-length"))){ + block_length = (size_t)strtoull(argv[++i], NULL, 0); + }else if(!strcmp(argv[i], "-n") || !strcmp(argv[i], "--no-trailing-newline")){ + remove_trailing_newline = true; + } + } + if(strlen(connection_string) == 0){ + if(connection_string_path){ + FILE* csf = fopen(connection_string_path, "r"); + if(csf){ + char* cs = fgets(connection_string, sizeof(connection_string), csf); + fclose(csf); + if(!cs){ + fprintf(stderr, "%s : failed to read connection string from file at path: '%s'. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); + exit(EXIT_FAILURE); + } + const size_t csl = strlen(connection_string); + if(csl > 0){ + if(connection_string[csl - 1] == '\n'){ + connection_string[csl - 1] = 0; + if(strlen(connection_string) == 0){ + fprintf(stderr, "%s : connection string read from file at path '%s' is of zero length. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); + exit(EXIT_FAILURE); + } + } + }else{ + fprintf(stderr, "%s : connection string read from file at path '%s' is of zero length. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); + exit(EXIT_FAILURE); + } + }else{ + fprintf(stderr, "%s : failed to open connection string file at path : '%s' for reading. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); + exit(EXIT_FAILURE); + } + }else{ + fprintf(stderr, "%s : empty connection string and no connection string filepath specified - cannot connect to Azure\n" + "Use the option -c <connection_string> or --connection-string <connection_string> to specify an Azure connection string\n" + "Alternatively, place the connection string on the first line of a file specified with -csp <path> or --connection-string-path <path>\n", + argv[0]); fflush(stderr); + exit(EXIT_FAILURE); + } + } + if(cmd_line_msg){ + msg_length = strlen(cmd_line_msg); + if(msg_length == 0){ + fprintf(stderr, "%s : command-line message is empty\n", argv[0]); fflush(stderr); + exit(EXIT_FAILURE); + } + mbuf = (uint8_t*)malloc(msg_length + 1); + if(!mbuf){ + fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)msg_length + 1); fflush(stderr); + exit(EXIT_FAILURE); + } + memcpy((void*)mbuf, (const void*)cmd_line_msg, msg_length); + if(remove_trailing_newline && msg_length > 0 && mbuf[msg_length - 1] == '\n'){ + msg_length--; + } + mbuf[msg_length] = 0; // explicitly add a nul byte + }else if(msg_source){ + FILE* sf = fopen(msg_source, "rb"); + if(sf){ + msg_length = 0; + size_t bytes_read; + do{ + bytes_read = fread((void*)stack_buf, sizeof(uint8_t), sizeof(stack_buf), sf); + msg_length += bytes_read; + }while(bytes_read == sizeof(stack_buf)); + if(msg_length == 0){ + fprintf(stderr, "%s : failed to read data from source file: '%s'\n", argv[0], msg_source); fflush(stderr); + fclose(sf); + exit(EXIT_FAILURE); + } + mbuf = (uint8_t*)malloc(msg_length + 1); + if(!mbuf){ + fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)msg_length + 1); fflush(stderr); + fclose(sf); + exit(EXIT_FAILURE); + } + rewind(sf); + msg_length = 0; + do{ + bytes_read = fread((void*)stack_buf, sizeof(uint8_t), sizeof(stack_buf), sf); + memcpy((void*)(&mbuf[msg_length]), (const void*)stack_buf, bytes_read); + msg_length += bytes_read; + }while(bytes_read == sizeof(stack_buf)); + if(remove_trailing_newline && msg_length > 0 && mbuf[msg_length - 1] == '\n'){ + msg_length--; + } + mbuf[msg_length] = 0; // explicitly add a nul byte + fclose(sf); + }else{ + fprintf(stderr, "%s : failed to open source file '%s' for reading\n", argv[0], msg_source); fflush(stderr); + exit(EXIT_FAILURE); + } + }else{ // take data from stdin + msg_length = 0; + size_t bytes_read; + size_t mbuf_length = sizeof(stack_buf); + mbuf = (uint8_t*)malloc(mbuf_length + 1); + if(!mbuf){ + fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)mbuf_length + 1); fflush(stderr); + exit(EXIT_FAILURE); + } + do{ + bytes_read = fread((void*)stack_buf, sizeof(uint8_t), sizeof(stack_buf), stdin); + if((msg_length + bytes_read) > mbuf_length){ // buffer is currently too short, extend it + mbuf_length <<= 1; // double allocation + void* new_mbuf = realloc((void*)mbuf, mbuf_length + 1); + if(!new_mbuf){ + fprintf(stderr, "%s : failed to reallocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)mbuf_length + 1); fflush(stderr); + free(mbuf); + exit(EXIT_FAILURE); + } + mbuf = (uint8_t*)new_mbuf; + } + memcpy((void*)(&mbuf[msg_length]), (const void*)stack_buf, bytes_read); + msg_length += bytes_read; + }while(bytes_read == sizeof(stack_buf)); + if(remove_trailing_newline && msg_length > 0 && mbuf[msg_length - 1] == '\n'){ + msg_length--; + } + mbuf[msg_length] = 0; // explicitly add a nul byte + if(msg_length == 0){ + fprintf(stderr, "%s : stdin received EOF before reading any bytes, no message to be transmitted. Aborting execution...\n", argv[0]); fflush(stderr); + free(mbuf); + exit(EXIT_FAILURE); + } + } + if(msg_format == FORMAT_BASE64){ + transmission_length = base64decode(NULL, (const char*)mbuf, 0); + }else if(msg_format == FORMAT_HEX){ + transmission_length = hexdecode(NULL, (const char*)mbuf, 0); + }else{ // msg_format == FORMAT_BINARY + transmission_length = msg_length; + } + if(transmission_length == 0){ + fprintf(stderr, "%s : transmission message length is of zero length, check message\n", argv[0]); fflush(stderr); + free(mbuf); + exit(EXIT_FAILURE); + } + tbuf = (uint8_t*)malloc(transmission_length + 1); + if(!tbuf){ + fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)transmission_length + 1); fflush(stderr); + free(mbuf); + exit(EXIT_FAILURE); + } + if(msg_format == FORMAT_BASE64){ + base64decode(tbuf, (const char*)mbuf, transmission_length); + }else if(msg_format == FORMAT_HEX){ + hexdecode(tbuf, (const char*)mbuf, transmission_length); + }else{ // msg_format == FORMAT_BINARY + memcpy((void*)tbuf, (const void*)mbuf, transmission_length); + } + tbuf[transmission_length] = 0; // explicitly add a nul byte + free(mbuf); + // At this point a vaild message will be present in the tbuf and its length will be in transmission_length + // Initialize IoTHub SDK subsystem + IoTHub_Init(); + IOTHUB_DEVICE_CLIENT_LL_HANDLE device_ll_handle; + if(global_verbosity){ + printf("Device handle setup using connection string: '%s'\n", connection_string); fflush(stdout); + } + // Create the IoTHub handle here + device_ll_handle = IoTHubDeviceClient_LL_CreateFromConnectionString(connection_string, protocol); + if(!device_ll_handle){ + fprintf(stderr, "%s : Failure createing IoTHub device. Check connection string : '%s'\n", argv[0], connection_string); fflush(stderr); + }else{ + if(global_verbosity > 1 && protocol != HTTP_Protocol){ // Can not set this options in HTTP + bool traceOn = true; + IoTHubDeviceClient_LL_SetOption(device_ll_handle, OPTION_LOG_TRACE, &traceOn); + } + // Setting connection status callback to get indication of connection to iothub + IoTHubDeviceClient_LL_SetConnectionStatusCallback(device_ll_handle, connection_status_callback, NULL); + size_t block_count = 0; + if(block_length == 0){ + block_length = transmission_length; + } + size_t n_blocks = ((transmission_length - 1) / block_length) + 1; + if(global_verbosity && n_blocks > 1){ + printf("The message entire message is %llu bytes long, and shall be divided into %llu blocks.\n" + "The primary block length is %llu bytes with the finah block of length %llu bytes.\n", + (unsigned long long)transmission_length, (unsigned long long)n_blocks, + (unsigned long long)block_length, (unsigned long long)(transmission_length - ((n_blocks - 1) * block_length))); fflush(stdout); + } + do{ + if(block_count == g_message_send_confirmations){ + // Construct the iothub message from a string or a byte array + size_t this_block_length = (block_count == (n_blocks - 1)) ? transmission_length - (block_count * block_length) : block_length; + message_handle = IoTHubMessage_CreateFromByteArray((const unsigned char*)&tbuf[block_count * block_length], this_block_length); + + if(global_verbosity){ + printf("Sending message: %llu of %llu to IoTHub\n", (unsigned long long)(block_count + 1), (unsigned long long)n_blocks); fflush(stdout); + } + IoTHubDeviceClient_LL_SendEventAsync(device_ll_handle, message_handle, send_confirm_callback, NULL); + + // The message is copied to the sdk so the we can destroy it + IoTHubMessage_Destroy(message_handle); + + block_count++; + } + + IoTHubDeviceClient_LL_DoWork(device_ll_handle); + ThreadAPI_Sleep(1); + + } while(g_message_send_confirmations < n_blocks); + + // Clean up the iothub sdk handle + IoTHubDeviceClient_LL_Destroy(device_ll_handle); + } + // Free all the sdk subsystem + IoTHub_Deinit(); + free(tbuf); + return 0; +} |