// MTS Azure Telecommunicator - sends and receives messages to Microsoft Azure // Copyright 2019 Multi-Tech Systems Inc. - All Rights Reserved // Author: David Marcaccini // Contact: david.marcaccini@multitech.com #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //#include "certs.h" #define MAX_CONNECTION_STRING_LENGTH 1023 static unsigned global_verbosity = 0; static size_t g_message_send_confirmations = 0; typedef enum msg_format_enum{ FORMAT_BINARY = 0, FORMAT_HEX, FORMAT_BASE64, }msg_format_t; static void send_confirm_callback(IOTHUB_CLIENT_CONFIRMATION_RESULT result, void* userContextCallback){ g_message_send_confirmations++; if(global_verbosity){ //printf("Confirmation callback received for message with result %s\n", ENUM_TO_STRING(IOTHUB_CLIENT_CONFIRMATION_RESULT, result)); fflush(stdout); printf("Confirmation callback received for message.\n"); fflush(stdout); } } static void connection_status_callback(IOTHUB_CLIENT_CONNECTION_STATUS result, IOTHUB_CLIENT_CONNECTION_STATUS_REASON reason, void* user_context){ if(global_verbosity){ if (result == IOTHUB_CLIENT_CONNECTION_AUTHENTICATED){ printf("The device client is connected to the IoTHub\n"); }else{ printf("The device client has been disconnected from the IoTHub\n"); } fflush(stdout); } } //static inline char hex_char_decode(const char hex_char){ static char hex_char_decode(const char hex_char){ if(hex_char >= '0' && hex_char <= '9'){ return (hex_char - '0'); }else if(hex_char >= 'A' && hex_char <= 'F'){ return (hex_char - 'A') + 10; }else if(hex_char >= 'a' && hex_char <= 'f'){ return (hex_char - 'a') + 10; } return 16; } //static inline char b64_char_decode(const char b64_char){ static char b64_char_decode(const char b64_char){ if(b64_char >= 'A' && b64_char <= 'Z'){ return (b64_char - 'A'); }else if(b64_char >= 'a' && b64_char <= 'z'){ return (b64_char - 'a') + 26; }else if(b64_char >= '0' && b64_char <= '9'){ return (b64_char - '0') + 52; }else if(b64_char == '+'){ return 62; }else if(b64_char == '/'){ return 63; } return 64; } // Converts a nul terminated ascii string of hex characters to a binary array // returns the number of bytes that are or would be written to binary_out if it were sufficiently long static size_t hexdecode(uint8_t* binary_out, const char* encoded_ascii_in, size_t binary_out_buflen){ size_t bytes_needed = 0; size_t i = 0; const size_t in_length = strlen(encoded_ascii_in); while(in_length > (i + 1)){ // ignore any number of 0x or 0X prefixes if(encoded_ascii_in[i] == '0' && (encoded_ascii_in[i + 1] == 'x' || encoded_ascii_in[i + 1] == 'X')){ i += 2; }else{ break; } } for(size_t b = 0; i < in_length; i++){ char decoded_value = hex_char_decode(encoded_ascii_in[i]); if(decoded_value < 16){ if((b & 7) == 0){ if(++bytes_needed <= binary_out_buflen){ binary_out[b >> 3] = (decoded_value << 4); } }else{ if(bytes_needed <= binary_out_buflen){ binary_out[b >> 3] |= decoded_value; } } b += 4; } } return bytes_needed; } // Converts a nul terminated ascii string of base64 characters (with trialing '=' characters) to a binary array // returns the number of bytes that are or would be written to binary_out if it were sufficiently long static size_t base64decode(uint8_t* binary_out, const char* encoded_ascii_in, size_t binary_out_buflen){ size_t bytes_needed = 0; for(size_t i = 0, b = 0; i < strlen(encoded_ascii_in); i++){ char decoded_value = b64_char_decode(encoded_ascii_in[i]); if(decoded_value < 64){ switch((b & 7) >> 1){ case(0): if((b >> 3) < binary_out_buflen){ binary_out[b >> 3] = (((uint8_t)decoded_value) << 2); } bytes_needed++; break; case(1): if((b >> 3) < binary_out_buflen){ binary_out[b >> 3] |= (((uint8_t)decoded_value) & 0x3F); } break; case(2): if((b >> 3) < binary_out_buflen){ binary_out[b >> 3] |= ((((uint8_t)decoded_value) >> 2) & 0xF); } if((i + 1) < strlen(encoded_ascii_in)){ if(b64_char_decode(encoded_ascii_in[i + 1]) >= 0){ if(((b >> 3) + 1) < binary_out_buflen){ binary_out[(b >> 3) + 1] = (((uint8_t)decoded_value) << 6); } bytes_needed++; } } break; default: // case(3): if((b >> 3) < binary_out_buflen){ binary_out[b >> 3] |= ((((uint8_t)decoded_value) >> 4) & 0x3); } if((i + 2) < strlen(encoded_ascii_in)){ if(b64_char_decode(encoded_ascii_in[i + 1]) >= 0){ if(((b >> 3) + 1) < binary_out_buflen){ binary_out[(b >> 3) + 1] = (((uint8_t)decoded_value) << 4); } bytes_needed++; } } break; } b += 6; } } return bytes_needed; } static inline void str_to_lowercase(char* str){ for(size_t s = 0; s < strlen(str); s++){ if(str[s] >= 'A' && str[s] <= 'Z'){ str[s] |= 0x20; } } } static void print_help_and_exit(const char* program_name){ const char* pname = program_name ? program_name : "mtsazure"; printf( "Usage:\n" "%s [OPTIONS]\n" "Options List:\n" "-h or --help : print this help message and exit\n" "-v or --verbose : run program in verbose mode, prints messages to stdout\n" "-vv or --extra-verbose : run program in extra verbose mode, prints even more messages to stdout\n" "-p or --protocol : used to specify protocol (default: MQTT)\n" " Valid options are : mqtt, mqtt-websocket, amqp, amqp-websocket, http\n" "-c or --connection-string : used to specify your Azure connection string\n" "-csp or --connection-string-path : used to specify your Azure connection string on a the first line of the\n" " file specified with \n" " Note: if BOTH -c and -csp options are specified,\n" " the command line (-c) connection string will be used\n" "-f or --format : specify the format of the source data\n" " Valid options are : binary, hex, base64\n" " Note: hex and base64 formats are decoded to\n" " binary before transmission\n" "-s or --source : used to specify a path to a file containing the message\n" "-m or --message : used to specify a message for transmission\n" "-b or --block-length : used to break the message into blocks of N bytes (default = full message)\n" "-n or --no-trailing-newline : remove 1 newline character (if present) from the end of the entire message\n" " Note: an argument of 0 for --block-length\n" " indicates the entire message length\n" " Note: if BOTH -s and -m options are specified,\n" " the -m message will take precidence\n" " Note: if NEITHER -s or -m options are specifed,\n" " the program will take data from stdin\n", pname ); fflush(stdout); exit(EXIT_SUCCESS); } int main(int argc, char** argv){ uint8_t stack_buf[4096]; // 4kB stack buffer to read in files and stdin 4k at a time uint8_t* mbuf = NULL; // message buffer - for intake of message data uint8_t* tbuf = NULL; char connection_string[MAX_CONNECTION_STRING_LENGTH + 1] = {0}; const char* connection_string_path = NULL; IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol = MQTT_Protocol; IOTHUB_MESSAGE_HANDLE message_handle; msg_format_t msg_format = FORMAT_BINARY; const char* cmd_line_msg = NULL; const char* msg_source = NULL; size_t msg_length = 0; // length in bytes of the message (possibly in encoded base64 or hex format) size_t transmission_length = 0; // length in bytes of the raw binary message to be transmitted size_t block_length = 0; // 0 indicates the entire message should be sent as a single block bool remove_trailing_newline = false; for(int i = 1; i < argc; i++){ if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--help")){ print_help_and_exit(argv[0]); }else if(!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")){ if(global_verbosity < 1){ global_verbosity = 1; } }else if(!strcmp(argv[i], "-vv") || !strcmp(argv[i], "--extra-verbose")){ if(global_verbosity < 2){ global_verbosity = 2; } } } for(int i = 1; i < argc; i++){ if((i < (argc - 1)) && (!strcmp(argv[i], "-p") || !strcmp(argv[i], "--protocol"))){ char protocol_string[32] = {0}; strncpy(protocol_string, argv[++i], sizeof(protocol_string) - 1); str_to_lowercase(protocol_string); if(!strcmp(protocol_string, "mqtt")){ protocol = MQTT_Protocol; }else if(!strcmp(protocol_string, "mqtt-websocket")){ protocol = MQTT_WebSocket_Protocol; }else if(!strcmp(protocol_string, "amqp")){ protocol = AMQP_Protocol; }else if(!strcmp(protocol_string, "amqp-websocket")){ protocol = AMQP_Protocol_over_WebSocketsTls; }else if(!strcmp(protocol_string, "http")){ protocol = HTTP_Protocol; }else{ fprintf(stderr, "%s : unknown --protocol option '%s'. Aborting execution...\n", argv[0], protocol_string); fflush(stderr); exit(EXIT_FAILURE); } }else if((i < (argc - 1)) && (!strcmp(argv[i], "-c") || !strcmp(argv[i], "--connection-string"))){ memset((void*)connection_string, 0, sizeof(connection_string)); strncpy(connection_string, argv[++i], sizeof(connection_string) - 1); }else if((i < (argc - 1)) && (!strcmp(argv[i], "-csp") || !strcmp(argv[i], "--connection-string-path"))){ connection_string_path = argv[++i]; }else if((i < (argc - 1)) && (!strcmp(argv[i], "-f") || !strcmp(argv[i], "--format"))){ char format_string[32] = {0}; strncpy(format_string, argv[++i], sizeof(format_string) - 1); str_to_lowercase(format_string); if(!strcmp(format_string, "binary")){ msg_format = FORMAT_BINARY; }else if(!strcmp(format_string, "hex") || !strcmp(format_string, "hexadecimal")){ msg_format = FORMAT_HEX; }else if(!strcmp(format_string, "base64")){ msg_format = FORMAT_BASE64; }else{ fprintf(stderr, "%s : unknown --format option '%s'. Aborting execution...\n", argv[0], format_string); fflush(stderr); exit(EXIT_FAILURE); } }else if((i < (argc - 1)) && (!strcmp(argv[i], "-s") || !strcmp(argv[i], "--source"))){ msg_source = argv[++i]; }else if((i < (argc - 1)) && (!strcmp(argv[i], "-m") || !strcmp(argv[i], "--message"))){ cmd_line_msg = argv[++i]; }else if((i < (argc - 1)) && (!strcmp(argv[i], "-b") || !strcmp(argv[i], "--block-length"))){ block_length = (size_t)strtoull(argv[++i], NULL, 0); }else if(!strcmp(argv[i], "-n") || !strcmp(argv[i], "--no-trailing-newline")){ remove_trailing_newline = true; } } if(strlen(connection_string) == 0){ if(connection_string_path){ FILE* csf = fopen(connection_string_path, "r"); if(csf){ char* cs = fgets(connection_string, sizeof(connection_string), csf); fclose(csf); if(!cs){ fprintf(stderr, "%s : failed to read connection string from file at path: '%s'. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); exit(EXIT_FAILURE); } const size_t csl = strlen(connection_string); if(csl > 0){ if(connection_string[csl - 1] == '\n'){ connection_string[csl - 1] = 0; if(strlen(connection_string) == 0){ fprintf(stderr, "%s : connection string read from file at path '%s' is of zero length. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); exit(EXIT_FAILURE); } } }else{ fprintf(stderr, "%s : connection string read from file at path '%s' is of zero length. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); exit(EXIT_FAILURE); } }else{ fprintf(stderr, "%s : failed to open connection string file at path : '%s' for reading. Aborting execution...\n", argv[0], connection_string_path); fflush(stderr); exit(EXIT_FAILURE); } }else{ fprintf(stderr, "%s : empty connection string and no connection string filepath specified - cannot connect to Azure\n" "Use the option -c or --connection-string to specify an Azure connection string\n" "Alternatively, place the connection string on the first line of a file specified with -csp or --connection-string-path \n", argv[0]); fflush(stderr); exit(EXIT_FAILURE); } } if(cmd_line_msg){ msg_length = strlen(cmd_line_msg); if(msg_length == 0){ fprintf(stderr, "%s : command-line message is empty\n", argv[0]); fflush(stderr); exit(EXIT_FAILURE); } mbuf = (uint8_t*)malloc(msg_length + 1); if(!mbuf){ fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)msg_length + 1); fflush(stderr); exit(EXIT_FAILURE); } memcpy((void*)mbuf, (const void*)cmd_line_msg, msg_length); if(remove_trailing_newline && msg_length > 0 && mbuf[msg_length - 1] == '\n'){ msg_length--; } mbuf[msg_length] = 0; // explicitly add a nul byte }else if(msg_source){ FILE* sf = fopen(msg_source, "rb"); if(sf){ msg_length = 0; size_t bytes_read; do{ bytes_read = fread((void*)stack_buf, sizeof(uint8_t), sizeof(stack_buf), sf); msg_length += bytes_read; }while(bytes_read == sizeof(stack_buf)); if(msg_length == 0){ fprintf(stderr, "%s : failed to read data from source file: '%s'\n", argv[0], msg_source); fflush(stderr); fclose(sf); exit(EXIT_FAILURE); } mbuf = (uint8_t*)malloc(msg_length + 1); if(!mbuf){ fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)msg_length + 1); fflush(stderr); fclose(sf); exit(EXIT_FAILURE); } rewind(sf); msg_length = 0; do{ bytes_read = fread((void*)stack_buf, sizeof(uint8_t), sizeof(stack_buf), sf); memcpy((void*)(&mbuf[msg_length]), (const void*)stack_buf, bytes_read); msg_length += bytes_read; }while(bytes_read == sizeof(stack_buf)); if(remove_trailing_newline && msg_length > 0 && mbuf[msg_length - 1] == '\n'){ msg_length--; } mbuf[msg_length] = 0; // explicitly add a nul byte fclose(sf); }else{ fprintf(stderr, "%s : failed to open source file '%s' for reading\n", argv[0], msg_source); fflush(stderr); exit(EXIT_FAILURE); } }else{ // take data from stdin msg_length = 0; size_t bytes_read; size_t mbuf_length = sizeof(stack_buf); mbuf = (uint8_t*)malloc(mbuf_length + 1); if(!mbuf){ fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)mbuf_length + 1); fflush(stderr); exit(EXIT_FAILURE); } do{ bytes_read = fread((void*)stack_buf, sizeof(uint8_t), sizeof(stack_buf), stdin); if((msg_length + bytes_read) > mbuf_length){ // buffer is currently too short, extend it mbuf_length <<= 1; // double allocation void* new_mbuf = realloc((void*)mbuf, mbuf_length + 1); if(!new_mbuf){ fprintf(stderr, "%s : failed to reallocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)mbuf_length + 1); fflush(stderr); free(mbuf); exit(EXIT_FAILURE); } mbuf = (uint8_t*)new_mbuf; } memcpy((void*)(&mbuf[msg_length]), (const void*)stack_buf, bytes_read); msg_length += bytes_read; }while(bytes_read == sizeof(stack_buf)); if(remove_trailing_newline && msg_length > 0 && mbuf[msg_length - 1] == '\n'){ msg_length--; } mbuf[msg_length] = 0; // explicitly add a nul byte if(msg_length == 0){ fprintf(stderr, "%s : stdin received EOF before reading any bytes, no message to be transmitted. Aborting execution...\n", argv[0]); fflush(stderr); free(mbuf); exit(EXIT_FAILURE); } } if(msg_format == FORMAT_BASE64){ transmission_length = base64decode(NULL, (const char*)mbuf, 0); }else if(msg_format == FORMAT_HEX){ transmission_length = hexdecode(NULL, (const char*)mbuf, 0); }else{ // msg_format == FORMAT_BINARY transmission_length = msg_length; } if(transmission_length == 0){ fprintf(stderr, "%s : transmission message length is of zero length, check message\n", argv[0]); fflush(stderr); free(mbuf); exit(EXIT_FAILURE); } tbuf = (uint8_t*)malloc(transmission_length + 1); if(!tbuf){ fprintf(stderr, "%s : failed to allocate %lu bytes of heap memory. Aborting execution...\n", argv[0], (unsigned long)transmission_length + 1); fflush(stderr); free(mbuf); exit(EXIT_FAILURE); } if(msg_format == FORMAT_BASE64){ base64decode(tbuf, (const char*)mbuf, transmission_length); }else if(msg_format == FORMAT_HEX){ hexdecode(tbuf, (const char*)mbuf, transmission_length); }else{ // msg_format == FORMAT_BINARY memcpy((void*)tbuf, (const void*)mbuf, transmission_length); } tbuf[transmission_length] = 0; // explicitly add a nul byte free(mbuf); // At this point a vaild message will be present in the tbuf and its length will be in transmission_length // Initialize IoTHub SDK subsystem IoTHub_Init(); IOTHUB_DEVICE_CLIENT_LL_HANDLE device_ll_handle; if(global_verbosity){ printf("Device handle setup using connection string: '%s'\n", connection_string); fflush(stdout); } // Create the IoTHub handle here device_ll_handle = IoTHubDeviceClient_LL_CreateFromConnectionString(connection_string, protocol); if(!device_ll_handle){ fprintf(stderr, "%s : Failure createing IoTHub device. Check connection string : '%s'\n", argv[0], connection_string); fflush(stderr); }else{ if(global_verbosity > 1 && protocol != HTTP_Protocol){ // Can not set this options in HTTP bool traceOn = true; IoTHubDeviceClient_LL_SetOption(device_ll_handle, OPTION_LOG_TRACE, &traceOn); } // Setting connection status callback to get indication of connection to iothub IoTHubDeviceClient_LL_SetConnectionStatusCallback(device_ll_handle, connection_status_callback, NULL); size_t block_count = 0; if(block_length == 0){ block_length = transmission_length; } size_t n_blocks = ((transmission_length - 1) / block_length) + 1; if(global_verbosity && n_blocks > 1){ printf("The message entire message is %llu bytes long, and shall be divided into %llu blocks.\n" "The primary block length is %llu bytes with the finah block of length %llu bytes.\n", (unsigned long long)transmission_length, (unsigned long long)n_blocks, (unsigned long long)block_length, (unsigned long long)(transmission_length - ((n_blocks - 1) * block_length))); fflush(stdout); } do{ if(block_count == g_message_send_confirmations){ // Construct the iothub message from a string or a byte array size_t this_block_length = (block_count == (n_blocks - 1)) ? transmission_length - (block_count * block_length) : block_length; message_handle = IoTHubMessage_CreateFromByteArray((const unsigned char*)&tbuf[block_count * block_length], this_block_length); if(global_verbosity){ printf("Sending message: %llu of %llu to IoTHub\n", (unsigned long long)(block_count + 1), (unsigned long long)n_blocks); fflush(stdout); } IoTHubDeviceClient_LL_SendEventAsync(device_ll_handle, message_handle, send_confirm_callback, NULL); // The message is copied to the sdk so the we can destroy it IoTHubMessage_Destroy(message_handle); block_count++; } IoTHubDeviceClient_LL_DoWork(device_ll_handle); ThreadAPI_Sleep(1); } while(g_message_send_confirmations < n_blocks); // Clean up the iothub sdk handle IoTHubDeviceClient_LL_Destroy(device_ll_handle); } // Free all the sdk subsystem IoTHub_Deinit(); free(tbuf); return 0; }