From 8e799fec27f9c3b6a9f13c5002168025a00fbfc7 Mon Sep 17 00:00:00 2001 From: James Maki Date: Fri, 19 Nov 2010 10:04:58 -0600 Subject: allow multiple consumers with pthreads --- configure.in | 2 + src/gps_test.rb | 2 +- src/venus_api.c | 21 +- src/venus_gps.c | 957 +++++++++++++++++++++++++++++++++++++++++--------------- 4 files changed, 718 insertions(+), 264 deletions(-) diff --git a/configure.in b/configure.in index be3bba3..f715123 100644 --- a/configure.in +++ b/configure.in @@ -12,6 +12,8 @@ AC_CHECK_HEADERS([unistd.h getopt.h errno.h time.h termios.h sys/types.h \ sys/stat.h fcntl.h stdarg.h asm/byteorder.h netdb.h stdint.h signal.h], [],AC_MSG_ERROR([ required header missing])) +AC_CHECK_LIB([pthread], [pthread_create], [], [], []) + AC_DEFINE([DEBUG], 0, [set to 1 to enable debug]) AC_DEFINE([CONFIG_USE_SYSLOG], 0, [set to 1 to use syslog for logging]) diff --git a/src/gps_test.rb b/src/gps_test.rb index 8f64112..97532a6 100755 --- a/src/gps_test.rb +++ b/src/gps_test.rb @@ -7,7 +7,6 @@ GPS_SERVER_PORT = 5445 MODE = :tcp_server #MODE = :tcp_client #MODE = :udp_server -#MODE = :serial def tcp_read(sock) loop { @@ -25,6 +24,7 @@ def udp_read(sock) loop { begin msg, sender = sock.recvfrom(1024) + p sender p msg rescue Errno::EAGAIN, Errno::EWOULDBLOCK => e puts "recvfrom error: #{e.class}: #{e}" diff --git a/src/venus_api.c b/src/venus_api.c index 04ad78d..40a52ea 100644 --- a/src/venus_api.c +++ b/src/venus_api.c @@ -267,19 +267,24 @@ int venus_write_msg_read_ack(int fd, struct venus_msg *msg) ssize_t venus_read_nmea_sentence(int fd, void *buf, size_t count) { - int err; - char c; ssize_t total = 0; char *cp = buf; - int start = 0; + char c; + int err; - while (1) { + if (!count) { + return 0; + } + count--; + + while (count > 0) { err = safe_read(fd, &c, 1); if (err <= 0) { log_error("read from gps failed: %m"); return -1; } + if (!start) { if (c != '$') { continue; @@ -287,16 +292,16 @@ ssize_t venus_read_nmea_sentence(int fd, void *buf, size_t count) start = 1; } - if (total < count) { - *cp++ = c; - total++; - } + cp[total++] = c; + count--; if (c == '\n') { break; } } + cp[total] = '\0'; + return total; } diff --git a/src/venus_gps.c b/src/venus_gps.c index 28491f7..9a26e4f 100644 --- a/src/venus_gps.c +++ b/src/venus_gps.c @@ -23,6 +23,7 @@ #define _GNU_SOURCE +#include #include #include #include @@ -39,6 +40,7 @@ #include #include #include +#include #include "log.h" #include "cbuffer.h" @@ -46,28 +48,42 @@ #include "venus_gps.h" #include "venus_api.h" -static sig_atomic_t timer_expired; -static void sigalarm_handler(int arg) -{ - timer_expired = 1; -} +#define BIT(nr) (1UL << (nr)) -static sig_atomic_t sigpipe = 0; -static void sigpipe_handler(int arg) -{ - sigpipe = 1; -} +#define ID_PREFIX_MAX 10 +#define ID_MAX 20 + +#define SLEEP_AFTER_FAILURE 1 + +static char id[ID_MAX + 1]; +static char id_prefix[ID_PREFIX_MAX + 1]; static char *device = "/dev/ttyS3"; static speed_t baud_rate = B9600; -static uint8_t gga = 1; -static uint8_t gsa = 1; -static uint8_t gsv = 1; -static uint8_t gll = 1; -static uint8_t rmc = 1; -static uint8_t vtg = 1; -static uint8_t zda = 1; +#define SOCKET_CLIENTS_MAX 8 +struct socket_client_args { + char *host; + int port; + int type; + pthread_t tid; +}; +int n_socket_clients = 0; +struct socket_client_args socket_clients[SOCKET_CLIENTS_MAX]; + +static int gps_tcp_server_port = -1; +static char *gps_filename = NULL; +static char *gps_serial_port = NULL; + +static uint8_t gpgga = 1; +static uint8_t gpgsa = 1; +static uint8_t gpgsv = 1; +static uint8_t gpgll = 1; +static uint8_t gprmc = 1; +static uint8_t gpvtg = 1; +#if CONFIG_HAVE_ZDA +static uint8_t gpzda = 1; +#endif #if CONFIG_CAN_RESET /* @@ -80,167 +96,466 @@ static int16_t longitude = -93; static int16_t altitude = 256; #endif +struct gps_msg { + char gpgga[NMEA_SENTENCE_MAX + 1]; + char gpgsa[NMEA_SENTENCE_MAX + 1]; + char gpgsv[3][NMEA_SENTENCE_MAX + 1]; + char gpgll[NMEA_SENTENCE_MAX + 1]; + char gprmc[NMEA_SENTENCE_MAX + 1]; + char gpvtg[NMEA_SENTENCE_MAX + 1]; + struct timeval timestamp; +}; + +static struct { + pthread_cond_t msg_cond; + pthread_mutex_t msg_mutex; + struct gps_msg *msg; + uint32_t consumed; + uint32_t consumers; + + struct gps_msg msgs_[2]; +} producer = { + .msg_cond = PTHREAD_COND_INITIALIZER, + .msg_mutex = PTHREAD_MUTEX_INITIALIZER, + .msg = NULL, + .consumed = 0xFFFFFFFF, + .consumers = 0, +}; -static int exchange_data(int fd0, int fd1) +static int fill_msg(int fd, struct gps_msg *msg) { - int i; - int err; - struct pollfd fds[2]; - nfds_t nfds = 2; - struct cbuffer bufs[2]; + char buf[NMEA_SENTENCE_MAX + 1]; + int tmp; + + memset(msg, 0, sizeof(*msg)); - log_debug("starting"); + do { + tmp = venus_read_nmea_sentence(fd, buf, sizeof(buf)); + if (tmp <= 0) { + log_error("venus_read_nmea_sentence failed"); + return -1; + } + } while (strncmp(buf, "$GPGGA", 6)); - memset(fds, 0, sizeof(fds)); + strcpy(msg->gpgga, buf); - for (i = 0; i < nfds; i++) { - memset(&fds[i], 0, sizeof(fds[i])); - bufs[i].read_index = bufs[i].write_index = 0; + while (1) { + tmp = venus_read_nmea_sentence(fd, buf, sizeof(buf)); + if (tmp <= 0) { + log_error("venus_read_nmea_sentence failed"); + return -1; + } + if (!strncmp(buf, "$GPGGA", 6)) { + strcpy(msg->gpgga, buf); + } else if (!strncmp(buf, "$GPGSA", 6)) { + strcpy(msg->gpgsa, buf); + } else if (!strncmp(buf, "$GPGSV", 6)) { + if (buf[9] == '1') { + strcpy(msg->gpgsv[0], buf); + } else if (buf[9] == '2') { + strcpy(msg->gpgsv[1], buf); + } else if (buf[9] == '3') { + strcpy(msg->gpgsv[2], buf); + } + } else if (!strncmp(buf, "$GPGLL", 6)) { + strcpy(msg->gpgll, buf); + } else if (!strncmp(buf, "$GPRMC", 6)) { + strcpy(msg->gprmc, buf); + } else if (!strncmp(buf, "$GPVTG", 6)) { + strcpy(msg->gpvtg, buf); + break; + } } - fds[0].fd = fd0; - fds[1].fd = fd1; + return 0; +} + +static void *gps_msg_producer(void *arg) +{ + int fd; + int tmp; + struct gps_msg *msg; + int msg_no; + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + + log_notice("Venus GPS NMEA Producer starting"); + fd = venus_open(device, baud_rate); + if (fd < 0) { + log_error("failed to open %s", device); + goto error; + } + + msg_no = 0; while (1) { - for (i = 0; i < nfds; i++) { - fds[i].events = 0; + msg = &producer.msgs_[msg_no]; + + tmp = fill_msg(fd, msg); + if (tmp < 0) { + log_error("fill_msg failed"); + goto error; } - for (i = 0; i < nfds; i++) { - if (buffer_free(&bufs[i])) { - fds[i].events |= POLLIN; - } - if (buffer_used(&bufs[i])) { - fds[DIR_REV(i)].events |= POLLOUT; - } + tmp = gettimeofday(&msg->timestamp, NULL); + if (tmp < 0) { + log_error("gettimeofday failed: %m"); + goto error; } - err = poll(fds, nfds, -1); - if (err <= 0) { - log_error("poll returned %d %m", err); - break; + tmp = pthread_mutex_lock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_lock failed: %d", tmp); + exit(1); } - log_debug("poll returned %d", err); + producer.msg = msg; + producer.consumed = 0; - for (i = 0; i < nfds; i++) { - if (fds[i].revents) { - log_debug("fds[%d]: %d revents: %04X", i, fds[i].fd, fds[i].revents); + tmp = pthread_cond_broadcast(&producer.msg_cond); + if (tmp) { + log_error("pthread_cond_broadcast failed: %d", tmp); + exit(1); + } + + tmp = pthread_mutex_unlock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_unlock failed: %d", tmp); + exit(1); + } - if (fds[i].revents & POLLHUP) { - log_error("fds[%d]: %d POLLHUP", i, fds[i].fd); - return 0; - } else if (fds[i].revents & POLLOUT) { - log_debug("fds[%d]: %d POLLOUT buffer_used: %lu", - i, fds[i].fd, (unsigned long) buffer_used(&bufs[DIR_REV(i)])); + msg_no = !msg_no; + } - err = buffered_write(fds[i].fd, &bufs[DIR_REV(i)]); - if (err < 0) { - log_error("fds[%d]: %d safe_write failed: %m", i, fds[i].fd); - return -1; - } - } else if (fds[i].revents & POLLIN) { - log_debug("fds[%d]: %d POLLIN buffer_free: %lu", - i, fds[i].fd, (unsigned long) buffer_free(&bufs[i])); +error: - err = buffered_read(fds[i].fd, &bufs[i]); - if (err <= 0) { - log_debug("fds[%d]: %d safe_read failed: %m", i, fds[i].fd); - return -1; - } - } - } + if (fd >= 0) { + close(fd); + } + + log_notice("exiting"); + + pthread_exit((void *) 1); +} + +struct gps_timeval { + struct timeval gpgga; + struct timeval gpgsa; + struct timeval gpgsv; + struct timeval gpgll; + struct timeval gprmc; + struct timeval gpvtg; +}; + +enum { + WRITE_MSG_SOCKET, + WRITE_MSG_FILE, +}; + +int write_sentence(int fd, const char *buf, size_t len, int where) +{ + int tmp; + + if (!*buf || len <= 0) { + return 0; + } + + if (where == WRITE_MSG_SOCKET) { + if (*id_prefix) { + tmp = send(fd, id_prefix, strlen(id_prefix), MSG_MORE); + } + if (*id) { + tmp = send(fd, id, strlen(id), MSG_MORE); } + tmp = send(fd, buf, len, MSG_MORE); + } else { + if (*id_prefix) { + tmp = full_write(fd, id_prefix, strlen(id_prefix)); + } + if (*id) { + tmp = full_write(fd, id, strlen(id)); + } + tmp = full_write(fd, buf, len); + } + if (tmp < 0) { + log_error("write failed: %m"); } - return -1; + return tmp; } -static int udp_send_msgs(int sd, int tty) +int write_msg(int fd, struct gps_msg *msg, struct gps_timeval *gpstv, int where) { - char buf[NMEA_SENTENCE_MAX]; - int count; - int err; + struct timeval tmptv; + int tmp; + int count = 0; - while (1) { - count = venus_read_nmea_sentence(tty, buf, sizeof(buf)); - if (count <= 0) { - log_error("venus_read_nmea_sentence failed: quiting"); + if (gpgga && timercmp(&gpstv->gpgga, &msg->timestamp, <=)) { + tmp = write_sentence(fd, msg->gpgga, strlen(msg->gpgga), where); + if (tmp < 0) { + log_error("write_sentence failed"); return -1; } - err = send(sd, buf, count, 0); - if (err < 0) { - log_debug("send failed: %m"); + + tmptv.tv_sec = gpgga - 1; + tmptv.tv_usec = 750000; + timeradd(&msg->timestamp, &tmptv, &gpstv->gpgga); + count++; + } + if (gpgsa && timercmp(&gpstv->gpgsa, &msg->timestamp, <=)) { + tmp = write_sentence(fd, msg->gpgsa, strlen(msg->gpgsa), where); + if (tmp < 0) { + log_error("write_sentence failed"); return -1; } + + tmptv.tv_sec = gpgsa - 1; + tmptv.tv_usec = 750000; + timeradd(&msg->timestamp, &tmptv, &gpstv->gpgsa); + count++; } + if (gpgsv && timercmp(&gpstv->gpgsv, &msg->timestamp, <=)) { + tmp = write_sentence(fd, msg->gpgsv[0], strlen(msg->gpgsv[0]), where); + if (tmp < 0) { + log_error("write_sentence failed"); + return -1; + } + tmp = write_sentence(fd, msg->gpgsv[1], strlen(msg->gpgsv[1]), where); + if (tmp < 0) { + log_error("write_sentence failed"); + return -1; + } + tmp = write_sentence(fd, msg->gpgsv[2], strlen(msg->gpgsv[2]), where); + if (tmp < 0) { + log_error("write_sentence failed"); + return -1; + } - return -1; + tmptv.tv_sec = gpgsv - 1; + tmptv.tv_usec = 750000; + timeradd(&msg->timestamp, &tmptv, &gpstv->gpgsv); + count++; + } + if (gpgll && timercmp(&gpstv->gpgll, &msg->timestamp, <=)) { + tmp = write_sentence(fd, msg->gpgll, strlen(msg->gpgll), where); + if (tmp < 0) { + log_error("write_sentence failed"); + return -1; + } + + tmptv.tv_sec = gpgll - 1; + tmptv.tv_usec = 750000; + timeradd(&msg->timestamp, &tmptv, &gpstv->gpgll); + count++; + } + if (gprmc && timercmp(&gpstv->gprmc, &msg->timestamp, <=)) { + tmp = write_sentence(fd, msg->gprmc, strlen(msg->gprmc), where); + if (tmp < 0) { + log_error("write_sentence failed"); + return -1; + } + + tmptv.tv_sec = gprmc - 1; + tmptv.tv_usec = 750000; + timeradd(&msg->timestamp, &tmptv, &gpstv->gprmc); + count++; + } + if (gpvtg && timercmp(&gpstv->gpvtg, &msg->timestamp, <=)) { + tmp = write_sentence(fd, msg->gpvtg, strlen(msg->gpvtg), where); + if (tmp < 0) { + log_error("write_sentence failed"); + return -1; + } + + tmptv.tv_sec = gpvtg - 1; + tmptv.tv_usec = 750000; + timeradd(&msg->timestamp, &tmptv, &gpstv->gpvtg); + count++; + } + + if (count && where == WRITE_MSG_SOCKET) { + tmp = send(fd, NULL, 0, 0); + if (tmp < 0) { + log_error("send failed: %m"); + return -1; + } + } + + return count; } -static int gps_to_udp_client(const char *host, int port) +static int gps_client_consume(int fd, int consumer, int where) { - int tty; - int sd; + struct gps_msg msg; + struct gps_timeval gpstv; + int tmp; - log_notice("Venus GPS UDP Client connecting to %s:%d", host, port); + memset(&gpstv, 0, sizeof(gpstv)); while (1) { - sd = inet_conn_str(host, port, SOCK_DGRAM); - if (sd < 0) { - log_error("inet_conn_str failed: %m"); - sleep(60); - } else { - log_debug("connection opened"); + tmp = pthread_mutex_lock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_lock failed: %d", tmp); + exit(1); + } - tty = venus_open(device, baud_rate); + while (producer.consumed & BIT(consumer)) { + tmp = pthread_cond_wait(&producer.msg_cond, &producer.msg_mutex); + if (tmp) { + log_emerg("pthread_cond_wait failed: %d", tmp); + exit(1); + } + } - set_nonblocking(sd); + memcpy(&msg, producer.msg, sizeof(msg)); + producer.consumed |= BIT(consumer); - udp_send_msgs(sd, tty); + tmp = pthread_mutex_unlock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_unlock failed: %d", tmp); + exit(1); + } - venus_close(tty); - close(sd); + tmp = write_msg(fd, &msg, &gpstv, where); + if (tmp < 0) { + log_error("write_msg failed"); + return -1; } } return -1; } -static int gps_to_tcp_client(const char *host, int port) +static int consumer_add(void) +{ + int tmp; + int consumer; + + tmp = pthread_mutex_lock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_lock failed: %d", tmp); + exit(1); + } + + consumer = ffs(~producer.consumers); + if (consumer) { + producer.consumers |= BIT(consumer - 1); + } + + tmp = pthread_mutex_unlock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_unlock failed: %d", tmp); + exit(1); + } + + return consumer - 1; +} + +static void consumer_delete(int consumer) +{ + int tmp; + + tmp = pthread_mutex_lock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_lock failed: %d", tmp); + exit(1); + } + + producer.consumers &= ~BIT(consumer); + + tmp = pthread_mutex_unlock(&producer.msg_mutex); + if (tmp) { + log_emerg("pthread_mutex_unlock failed: %d", tmp); + exit(1); + } +} + +static void *gps_socket_client(void *arg) { - int tty; int sd; + int consumer; + char *type_name = NULL; + struct socket_client_args *sc = (struct socket_client_args *) arg; + + switch (sc->type) { + case SOCK_STREAM: + type_name = "TCP"; + break; + case SOCK_DGRAM: + type_name = "UDP"; + break; + } + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - log_notice("Venus GPS TCP Client connecting to %s:%d", host, port); + log_notice("Venus GPS %s Client connecting to %s:%d", + type_name, sc->host, sc->port); + + consumer = consumer_add(); + if (consumer < 0) { + log_error("no more free consumers: exiting"); + goto error; + } while (1) { - sd = inet_conn_str(host, port, SOCK_STREAM); + sd = inet_conn_str(sc->host, sc->port, sc->type); if (sd < 0) { log_error("inet_conn_str failed: %m"); - sleep(60); + sleep(SLEEP_AFTER_FAILURE); } else { - log_debug("connection opened"); + log_info("connection opened"); + gps_client_consume(sd, consumer, WRITE_MSG_SOCKET); + close(sd); + log_info("connection closed"); + } + } + +error: - tty = venus_open(device, baud_rate); + if (consumer >= 0) { + consumer_delete(consumer); + } - set_nonblocking(tty); - set_nonblocking(sd); + log_notice("exiting"); - exchange_data(sd, tty); + pthread_exit((void *) 1); +} - venus_close(tty); - close(sd); - } +static void *gps_tcp_server_conn(void *arg) +{ + int sd = (int) (long) arg; + int consumer; + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + + log_notice("Venus GPS TCP Server Conn connected"); + + consumer = consumer_add(); + if (consumer < 0) { + log_error("no more free consumers: exiting"); + goto error; } - return -1; + gps_client_consume(sd, consumer, WRITE_MSG_SOCKET); + +error: + if (consumer >= 0) { + consumer_delete(consumer); + } + + close(sd); + + log_notice("exiting"); + + pthread_exit((void *) 1); } -static int gps_to_tcp_server(int port) +static void *gps_tcp_server(void *arg) { - int err; - int tty; + int tmp; int lsd; int sd; int cli_addr_len; @@ -250,38 +565,41 @@ static int gps_to_tcp_server(int port) struct sockaddr_in serv_addr; struct sockaddr_in cli_addr; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + lsd = socket(PF_INET, SOCK_STREAM, 0); if (lsd == -1) { log_error("socket failed: %m"); - exit(1); + goto error; } memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); - serv_addr.sin_port = htons(port); + serv_addr.sin_port = htons(gps_tcp_server_port); - err = setsockopt(lsd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - if (err == -1) { + tmp = setsockopt(lsd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + if (tmp == -1) { log_error("setsockopt failed: %m"); - exit(1); + goto error; } serv_addr_len = sizeof(serv_addr); - err = bind(lsd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); - if (err == -1) { + tmp = bind(lsd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); + if (tmp == -1) { log_error("bind failed: %m"); - exit(1); + goto error; } - err = listen(lsd, 1); - if (err == -1) { + tmp = listen(lsd, 4); + if (tmp == -1) { log_error("listen failed: %m"); - exit(1); + goto error; } - log_notice("Venus GPS TCP Server listening on port %d", port); + log_notice("Venus GPS TCP Server listening on port %d", gps_tcp_server_port); while (1) { cli_addr_len = sizeof(cli_addr); @@ -290,69 +608,89 @@ static int gps_to_tcp_server(int port) if (sd == -1) { log_error("accept failed: %m"); } else { - log_debug("accepted"); - - tty = venus_open(device, baud_rate); + pthread_t tid; - set_nonblocking(tty); - set_nonblocking(sd); - - exchange_data(sd, tty); + tmp = pthread_create(&tid, NULL, gps_tcp_server_conn, (void *) (long) sd); + if (tmp) { + log_error("pthread_create gps_tcp_server_conn failed: %d", tmp); + close(sd); + continue; + } - venus_close(tty); - close(sd); + tmp = pthread_detach(tid); + if (tmp) { + log_error("pthread_detach failed: %d", tmp); + } } } - close(lsd); +error: + if (lsd >= 0) { + close(lsd); + } - return -1; + log_notice("exiting"); + + pthread_exit((void *) 1); } -static int gps_to_file(const char *file) +static void *gps_file_client(void *arg) { - int tty; int fd; + int consumer = -1; - log_notice("Venus GPS to file %s", file); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - if (!strcmp(file, "-")) { + log_notice("Venus GPS to file %s", gps_filename); + + if (!strcmp(gps_filename, "-")) { fd = fileno(stdout); } else { - fd = open(file, O_WRONLY); + fd = open(gps_filename, O_WRONLY); if (fd < 0) { - log_error("failed to open %s: %m", file); - return -1; + log_error("failed to open %s: %m", gps_filename); + goto error; } } - set_nonblocking(fd); - - while (1) { - tty = venus_open(device, baud_rate); + consumer = consumer_add(); + if (consumer < 0) { + log_error("no more free consumers: exiting"); + goto error; + } - set_nonblocking(tty); + gps_client_consume(fd, consumer, WRITE_MSG_FILE); - exchange_data(fd, tty); +error: + if (consumer >= 0) { + consumer_delete(consumer); + } - venus_close(tty); + if (fd >= 0) { + close(fd); } - return -1; + log_notice("exiting"); + + pthread_exit((void *) 1); } -int gps_to_serial(const char *port) +static void *gps_serial_client(void *arg) { - int tty; int fd; struct termios tio; + int consumer = -1; - log_notice("Venus GPS to serial %s", port); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - fd = open(port, O_RDWR | O_NOCTTY); + log_notice("Venus GPS to serial %s", gps_serial_port); + + fd = open(gps_serial_port, O_RDWR | O_NOCTTY); if (fd < 0) { - log_error("failed to open %s: %m", port); - return -1; + log_error("failed to open %s: %m", gps_serial_port); + goto error; } tcgetattr(fd, &tio); @@ -374,19 +712,26 @@ int gps_to_serial(const char *port) tcsetattr(fd, TCSANOW, &tio); tcflush(fd, TCIOFLUSH); - set_nonblocking(fd); - - while (1) { - tty = venus_open(device, baud_rate); + consumer = consumer_add(); + if (consumer < 0) { + log_error("no more free consumers: exiting"); + goto error; + } - set_nonblocking(tty); + gps_client_consume(fd, consumer, WRITE_MSG_FILE); - exchange_data(fd, tty); +error: + if (consumer >= 0) { + consumer_delete(consumer); + } - venus_close(tty); + if (fd >= 0) { + close(fd); } - return -1; + log_notice("exiting"); + + pthread_exit((void *) 1); } static void print_version(const char *name) @@ -404,14 +749,16 @@ static void usage(FILE * out) fprintf(out, "usage: venus-gps [ OPTIONS ... ]\n"); fprintf(out, "where OPTIONS := { \n"); fprintf(out, " --daemonize |\n"); - fprintf(out, " -d --device (default: /dev/ttyS3) |\n"); + fprintf(out, " -d, --device (default: /dev/ttyS3) |\n"); fprintf(out, " --file |\n"); fprintf(out, " --tcp-server |\n"); fprintf(out, " --tcp-client |\n"); fprintf(out, " --udp-client |\n"); fprintf(out, " --serial |\n"); + fprintf(out, " --id ID |\n"); + fprintf(out, " --id-prefix PREFIX |\n"); #if CONFIG_CAN_DEFAULT - fprintf(out, " -f --factory-defaults |\n"); + fprintf(out, " -f, --factory-defaults |\n"); #endif #if CONFIG_CAN_RESET fprintf(out, "\n"); @@ -421,15 +768,15 @@ static void usage(FILE * out) fprintf(out, " --altitude VAL (default: ) |\n"); #endif fprintf(out, "\n"); - fprintf(out, " --gga 0-255 (default: 1) |\n"); - fprintf(out, " --gsa 0-255 (default: 1) |\n"); - fprintf(out, " --gsv 0-255 (default: 1) |\n"); - fprintf(out, " --gll 0-255 (default: 1) |\n"); - fprintf(out, " --rmc 0-255 (default: 1) |\n"); + fprintf(out, " --gpgga 0-255 (default: 1) |\n"); + fprintf(out, " --gpgsa 0-255 (default: 1) |\n"); + fprintf(out, " --gpgsv 0-255 (default: 1) |\n"); + fprintf(out, " --gpgll 0-255 (default: 1) |\n"); + fprintf(out, " --gprmc 0-255 (default: 1) |\n"); #if CONFIG_HAVE_ZDA - fprintf(out, " --zda 0-255 (default: 1) |\n"); + fprintf(out, " --gpzda 0-255 (default: 1) |\n"); #endif - fprintf(out, " --vtg 0-255 (default: 1)\n"); + fprintf(out, " --gpvtg 0-255 (default: 1)\n"); fprintf(out, " }\n"); fprintf(out, "\n"); } @@ -442,13 +789,13 @@ static void usage(FILE * out) enum { OPT_VERSION = 128, OPT_HELP, - OPT_GGA, - OPT_GSA, - OPT_GSV, - OPT_GLL, - OPT_RMC, - OPT_VTG, - OPT_ZDA, + OPT_GPGGA, + OPT_GPGSA, + OPT_GPGSV, + OPT_GPGLL, + OPT_GPRMC, + OPT_GPVTG, + OPT_GPZDA, OPT_START_MODE, OPT_LATITUDE, OPT_LONGITUDE, @@ -458,7 +805,9 @@ enum { OPT_TCP_SERVER, OPT_TCP_CLIENT, OPT_UDP_CLIENT, - OPT_SERIAL, + OPT_SERIAL_CLIENT, + OPT_ID_PREFIX, + OPT_ID, }; static char *short_options = "b:d:p:f"; @@ -471,18 +820,20 @@ static struct option long_options[] = { {"tcp-server", 0, 0, OPT_TCP_SERVER}, {"tcp-client", 1, 0, OPT_TCP_CLIENT}, {"udp-client", 1, 0, OPT_UDP_CLIENT}, - {"serial", 1, 0, OPT_SERIAL}, + {"serial-client", 1, 0, OPT_SERIAL_CLIENT}, + {"id", 1, 0, OPT_ID}, + {"id-prefix", 1, 0, OPT_ID_PREFIX}, #if CONFIG_CAN_DEFAULT {"factory-defaults", 0, 0, OPT_FACTORY_DEFAULTS}, #endif - {"gga", 1, 0, OPT_GGA}, - {"gsa", 1, 0, OPT_GSA}, - {"gsv", 1, 0, OPT_GSV}, - {"gll", 1, 0, OPT_GLL}, - {"rmc", 1, 0, OPT_RMC}, - {"vtg", 1, 0, OPT_VTG}, + {"gpgga", 1, 0, OPT_GPGGA}, + {"gpgsa", 1, 0, OPT_GPGSA}, + {"gpgsv", 1, 0, OPT_GPGSV}, + {"gpgll", 1, 0, OPT_GPGLL}, + {"gprmc", 1, 0, OPT_GPRMC}, + {"gpvtg", 1, 0, OPT_GPVTG}, #if CONFIG_HAVE_ZDA - {"zda", 1, 0, OPT_ZDA}, + {"gpzda", 1, 0, OPT_GPZDA}, #endif #if CONFIG_CAN_RESET {"start-mode", 1, 0, OPT_START_MODE}, @@ -504,33 +855,14 @@ int main(int argc, char *argv[]) int tty; struct venus_msg msg; + struct socket_client_args *sc; + #if CONFIG_CAN_DEFAULT int factory_defaults = 0; #endif int daemonize = 0; int port = 5445; - char *file = "-"; - char tcp_server_enabled = 0; - int tcp_server_port = port; - char *tcp_client_host = NULL; - int tcp_client_port = port; - char *udp_client_host = NULL; - int udp_client_port = port; - char *serial_port = NULL; - - struct sigaction sa; - - sa.sa_handler = sigalarm_handler; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SIGALRM, &sa, NULL); - - sa.sa_handler = sigpipe_handler; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SIGPIPE, &sa, NULL); - while ((i = getopt_long(argc, argv, short_options, long_options, &option_index)) >= 0) { switch (i) { case 0: @@ -554,54 +886,75 @@ int main(int argc, char *argv[]) break; case OPT_FILE: - file = optarg; + gps_filename = optarg; break; - case OPT_SERIAL: - serial_port = optarg; + case OPT_SERIAL_CLIENT: + gps_serial_port = optarg; break; case OPT_TCP_SERVER: - tcp_server_enabled = 1; - tcp_server_port = port; + gps_tcp_server_port = port; break; case OPT_TCP_CLIENT: + if (n_socket_clients >= SOCKET_CLIENTS_MAX) { + log_error("max clients exceeded: %d", + SOCKET_CLIENTS_MAX); + usage(stderr); + exit(1); + } + sc = &socket_clients[n_socket_clients++]; + cp = strchr(optarg, ':'); if (cp) { - tcp_client_host = strndup(optarg, cp - optarg); + sc->host = strndup(optarg, cp - optarg); cp++; - tcp_client_port = atoi(cp); - if (tcp_client_port <= 0 || tcp_client_port >= (1 << 16)) { + sc->port = atoi(cp); + if (sc->port <= 0 || sc->port >= (1 << 16)) { log_error("invalid port"); usage(stderr); exit(1); } } else { - tcp_client_host = strdup(optarg); - tcp_client_port = port; + sc->host = strdup(optarg); + sc->port = port; } + + sc->type = SOCK_STREAM; + break; case OPT_UDP_CLIENT: + if (n_socket_clients >= SOCKET_CLIENTS_MAX) { + log_error("max clients exceeded: %d", + SOCKET_CLIENTS_MAX); + usage(stderr); + exit(1); + } + sc = &socket_clients[n_socket_clients++]; + cp = strchr(optarg, ':'); if (cp) { - udp_client_host = strndup(optarg, cp - optarg); + sc->host = strndup(optarg, cp - optarg); cp++; - udp_client_port = atoi(cp); - if (udp_client_port <= 0 || udp_client_port >= (1 << 16)) { + sc->port = atoi(cp); + if (sc->port <= 0 || sc->port >= (1 << 16)) { log_error("invalid port"); usage(stderr); exit(1); } } else { - udp_client_host = strdup(optarg); - udp_client_port = port; + sc->host = strdup(optarg); + sc->port = port; } + + sc->type = SOCK_DGRAM; + break; case OPT_BAUD_RATE: @@ -613,33 +966,60 @@ int main(int argc, char *argv[]) } break; - case OPT_GGA: - gga = atoi(optarg); + case OPT_ID_PREFIX: + tmp = strlen(optarg); + if (tmp > ID_PREFIX_MAX) { + log_error("id-prefix is too long max: %d > %d", + tmp, ID_PREFIX_MAX); + usage(stderr); + exit(1); + } + snprintf(id_prefix, sizeof(id_prefix), "%s", optarg); break; - case OPT_GSA: - gsa = atoi(optarg); + case OPT_ID: + tmp = strlen(optarg); + if(tmp > ID_MAX) { + log_error("id is too long max: %d > %d", + tmp, ID_MAX); + usage(stderr); + exit(1); + } + if(strcspn(optarg, "$&") != tmp) { + log_error("id contains invalid char $&"); + usage(stderr); + exit(1); + } + snprintf(id, sizeof(id), "%s", optarg); + break; + + case OPT_GPGGA: + gpgga = atoi(optarg); + break; + + case OPT_GPGSA: + gpgsa = atoi(optarg); break; - case OPT_GSV: - gsv = atoi(optarg); + case OPT_GPGSV: + gpgsv = atoi(optarg); break; - case OPT_GLL: - gll = atoi(optarg); + case OPT_GPGLL: + gpgll = atoi(optarg); break; - case OPT_RMC: - rmc = atoi(optarg); + case OPT_GPRMC: + gprmc = atoi(optarg); break; - case OPT_VTG: - vtg = atoi(optarg); + case OPT_GPVTG: + gpvtg = atoi(optarg); break; #if CONFIG_HAVE_ZDA - case OPT_ZDA: - zda = atoi(optarg); + case OPT_GPZDA: + gpzda = atoi(optarg); break; #endif @@ -744,7 +1124,8 @@ int main(int argc, char *argv[]) #if CONFIG_CAN_RESET if (start_mode != (typeof(start_mode)) - 1) { log_notice("issuing system restart"); - tmp = venus_system_restart(tty, start_mode, time(NULL), latitude, longitude, altitude); + tmp = venus_system_restart(tty, start_mode, time(NULL), + latitude, longitude, altitude); if (tmp < 0) { log_error("system restart failed"); exit(1); @@ -753,7 +1134,7 @@ int main(int argc, char *argv[]) } #endif - tmp = venus_conf_nmea(tty, gga, gsa, gsv, gll, rmc, vtg, zda, UPDATE_ATTR_SRAM); + tmp = venus_conf_nmea(tty, 1, 1, 1, 1, 1, 1, 0, UPDATE_ATTR_SRAM); if (tmp < 0) { log_error("failed to configure nmea sentences"); exit(1); @@ -767,19 +1148,85 @@ int main(int argc, char *argv[]) venus_close(tty); - /* - * FIXME: Support more than one mode at a time - */ - if (tcp_server_enabled) { - gps_to_tcp_server(tcp_server_port); - } else if (tcp_client_host) { - gps_to_tcp_client(tcp_client_host, tcp_client_port); - } else if (udp_client_host) { - gps_to_udp_client(udp_client_host, udp_client_port); - } else if (serial_port) { - gps_to_serial(serial_port); - } else if (file) { - gps_to_file(file); + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGTERM); + sigaddset(&sigset, SIGINT); + sigaddset(&sigset, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + pthread_t gps_msg_producer_tid; + pthread_t gps_serial_client_tid; + pthread_t gps_file_client_tid; + pthread_t gps_tcp_server_tid; + + tmp = pthread_create(&gps_msg_producer_tid, NULL, gps_msg_producer, NULL); + if (tmp) { + log_error("pthread_create gps_msg_producer failed: %d", tmp); + exit(1); + } + + for (i = 0; i < n_socket_clients; i++) { + tmp = pthread_create(&socket_clients[i].tid, NULL, + gps_socket_client, (void *) &socket_clients[i]); + if (tmp) { + log_error("pthread_create gps_socket_client failed: %d", tmp); + } + } + + if (gps_serial_port) { + tmp = pthread_create(&gps_serial_client_tid, NULL, + gps_serial_client, NULL); + if (tmp) { + log_error("pthread_create gps_serial_client failed: %d", tmp); + } + } + if (gps_filename) { + tmp = pthread_create(&gps_file_client_tid, NULL, + gps_file_client, NULL); + if (tmp) { + log_error("pthread_create gps_file_client failed: %d", tmp); + } + } + if (gps_tcp_server_port >= 0) { + tmp = pthread_create(&gps_tcp_server_tid, NULL, + gps_tcp_server, NULL); + if (tmp) { + log_error("pthread_create gps_tcp_server failed: %d", tmp); + } + } + + while (1) { + siginfo_t info; + struct timespec timeout = { + .tv_sec = 1, + .tv_nsec = 0, + }; + + tmp = sigtimedwait(&sigset, &info, &timeout); + switch (tmp) { + case -1: + if (errno != EAGAIN) { + log_error("sigtimedwait failed: %m"); + } + break; + case SIGINT: + log_notice("SIGINT caught"); + exit(0); + break; + case SIGTERM: + log_notice("SIGTERM caught"); + exit(0); + break; + case SIGPIPE: + log_notice("SIGPIPE caught"); + break; + default: + log_error("unhandled signal %d caught", tmp); + break; + } + + /* Check for exited threads */ } return 0; -- cgit v1.2.3