summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Maki <jmaki@multitech.com>2010-11-19 10:04:58 -0600
committerJames Maki <jmaki@multitech.com>2010-11-19 10:04:58 -0600
commit8e799fec27f9c3b6a9f13c5002168025a00fbfc7 (patch)
treeb02c42ebd1371d9552fbfc107bf9850ecad3dfef
parentf1b137da46178ff963506db378560023d272e97a (diff)
downloadvenus-gps-8e799fec27f9c3b6a9f13c5002168025a00fbfc7.tar.gz
venus-gps-8e799fec27f9c3b6a9f13c5002168025a00fbfc7.tar.bz2
venus-gps-8e799fec27f9c3b6a9f13c5002168025a00fbfc7.zip
allow multiple consumers with pthreads
-rw-r--r--configure.in2
-rwxr-xr-xsrc/gps_test.rb2
-rw-r--r--src/venus_api.c21
-rw-r--r--src/venus_gps.c957
4 files changed, 718 insertions, 264 deletions
diff --git a/configure.in b/configure.in
index be3bba3..f715123 100644
--- a/configure.in
+++ b/configure.in
@@ -12,6 +12,8 @@ AC_CHECK_HEADERS([unistd.h getopt.h errno.h time.h termios.h sys/types.h \
sys/stat.h fcntl.h stdarg.h asm/byteorder.h netdb.h stdint.h signal.h], [],AC_MSG_ERROR([
required header missing]))
+AC_CHECK_LIB([pthread], [pthread_create], [], [], [])
+
AC_DEFINE([DEBUG], 0, [set to 1 to enable debug])
AC_DEFINE([CONFIG_USE_SYSLOG], 0, [set to 1 to use syslog for logging])
diff --git a/src/gps_test.rb b/src/gps_test.rb
index 8f64112..97532a6 100755
--- a/src/gps_test.rb
+++ b/src/gps_test.rb
@@ -7,7 +7,6 @@ GPS_SERVER_PORT = 5445
MODE = :tcp_server
#MODE = :tcp_client
#MODE = :udp_server
-#MODE = :serial
def tcp_read(sock)
loop {
@@ -25,6 +24,7 @@ def udp_read(sock)
loop {
begin
msg, sender = sock.recvfrom(1024)
+ p sender
p msg
rescue Errno::EAGAIN, Errno::EWOULDBLOCK => e
puts "recvfrom error: #{e.class}: #{e}"
diff --git a/src/venus_api.c b/src/venus_api.c
index 04ad78d..40a52ea 100644
--- a/src/venus_api.c
+++ b/src/venus_api.c
@@ -267,19 +267,24 @@ int venus_write_msg_read_ack(int fd, struct venus_msg *msg)
ssize_t venus_read_nmea_sentence(int fd, void *buf, size_t count)
{
- int err;
- char c;
ssize_t total = 0;
char *cp = buf;
-
int start = 0;
+ char c;
+ int err;
- while (1) {
+ if (!count) {
+ return 0;
+ }
+ count--;
+
+ while (count > 0) {
err = safe_read(fd, &c, 1);
if (err <= 0) {
log_error("read from gps failed: %m");
return -1;
}
+
if (!start) {
if (c != '$') {
continue;
@@ -287,16 +292,16 @@ ssize_t venus_read_nmea_sentence(int fd, void *buf, size_t count)
start = 1;
}
- if (total < count) {
- *cp++ = c;
- total++;
- }
+ cp[total++] = c;
+ count--;
if (c == '\n') {
break;
}
}
+ cp[total] = '\0';
+
return total;
}
diff --git a/src/venus_gps.c b/src/venus_gps.c
index 28491f7..9a26e4f 100644
--- a/src/venus_gps.c
+++ b/src/venus_gps.c
@@ -23,6 +23,7 @@
#define _GNU_SOURCE
+#include <pthread.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
@@ -39,6 +40,7 @@
#include <time.h>
#include <signal.h>
#include <string.h>
+#include <sys/time.h>
#include "log.h"
#include "cbuffer.h"
@@ -46,28 +48,42 @@
#include "venus_gps.h"
#include "venus_api.h"
-static sig_atomic_t timer_expired;
-static void sigalarm_handler(int arg)
-{
- timer_expired = 1;
-}
+#define BIT(nr) (1UL << (nr))
-static sig_atomic_t sigpipe = 0;
-static void sigpipe_handler(int arg)
-{
- sigpipe = 1;
-}
+#define ID_PREFIX_MAX 10
+#define ID_MAX 20
+
+#define SLEEP_AFTER_FAILURE 1
+
+static char id[ID_MAX + 1];
+static char id_prefix[ID_PREFIX_MAX + 1];
static char *device = "/dev/ttyS3";
static speed_t baud_rate = B9600;
-static uint8_t gga = 1;
-static uint8_t gsa = 1;
-static uint8_t gsv = 1;
-static uint8_t gll = 1;
-static uint8_t rmc = 1;
-static uint8_t vtg = 1;
-static uint8_t zda = 1;
+#define SOCKET_CLIENTS_MAX 8
+struct socket_client_args {
+ char *host;
+ int port;
+ int type;
+ pthread_t tid;
+};
+int n_socket_clients = 0;
+struct socket_client_args socket_clients[SOCKET_CLIENTS_MAX];
+
+static int gps_tcp_server_port = -1;
+static char *gps_filename = NULL;
+static char *gps_serial_port = NULL;
+
+static uint8_t gpgga = 1;
+static uint8_t gpgsa = 1;
+static uint8_t gpgsv = 1;
+static uint8_t gpgll = 1;
+static uint8_t gprmc = 1;
+static uint8_t gpvtg = 1;
+#if CONFIG_HAVE_ZDA
+static uint8_t gpzda = 1;
+#endif
#if CONFIG_CAN_RESET
/*
@@ -80,167 +96,466 @@ static int16_t longitude = -93;
static int16_t altitude = 256;
#endif
+struct gps_msg {
+ char gpgga[NMEA_SENTENCE_MAX + 1];
+ char gpgsa[NMEA_SENTENCE_MAX + 1];
+ char gpgsv[3][NMEA_SENTENCE_MAX + 1];
+ char gpgll[NMEA_SENTENCE_MAX + 1];
+ char gprmc[NMEA_SENTENCE_MAX + 1];
+ char gpvtg[NMEA_SENTENCE_MAX + 1];
+ struct timeval timestamp;
+};
+
+static struct {
+ pthread_cond_t msg_cond;
+ pthread_mutex_t msg_mutex;
+ struct gps_msg *msg;
+ uint32_t consumed;
+ uint32_t consumers;
+
+ struct gps_msg msgs_[2];
+} producer = {
+ .msg_cond = PTHREAD_COND_INITIALIZER,
+ .msg_mutex = PTHREAD_MUTEX_INITIALIZER,
+ .msg = NULL,
+ .consumed = 0xFFFFFFFF,
+ .consumers = 0,
+};
-static int exchange_data(int fd0, int fd1)
+static int fill_msg(int fd, struct gps_msg *msg)
{
- int i;
- int err;
- struct pollfd fds[2];
- nfds_t nfds = 2;
- struct cbuffer bufs[2];
+ char buf[NMEA_SENTENCE_MAX + 1];
+ int tmp;
+
+ memset(msg, 0, sizeof(*msg));
- log_debug("starting");
+ do {
+ tmp = venus_read_nmea_sentence(fd, buf, sizeof(buf));
+ if (tmp <= 0) {
+ log_error("venus_read_nmea_sentence failed");
+ return -1;
+ }
+ } while (strncmp(buf, "$GPGGA", 6));
- memset(fds, 0, sizeof(fds));
+ strcpy(msg->gpgga, buf);
- for (i = 0; i < nfds; i++) {
- memset(&fds[i], 0, sizeof(fds[i]));
- bufs[i].read_index = bufs[i].write_index = 0;
+ while (1) {
+ tmp = venus_read_nmea_sentence(fd, buf, sizeof(buf));
+ if (tmp <= 0) {
+ log_error("venus_read_nmea_sentence failed");
+ return -1;
+ }
+ if (!strncmp(buf, "$GPGGA", 6)) {
+ strcpy(msg->gpgga, buf);
+ } else if (!strncmp(buf, "$GPGSA", 6)) {
+ strcpy(msg->gpgsa, buf);
+ } else if (!strncmp(buf, "$GPGSV", 6)) {
+ if (buf[9] == '1') {
+ strcpy(msg->gpgsv[0], buf);
+ } else if (buf[9] == '2') {
+ strcpy(msg->gpgsv[1], buf);
+ } else if (buf[9] == '3') {
+ strcpy(msg->gpgsv[2], buf);
+ }
+ } else if (!strncmp(buf, "$GPGLL", 6)) {
+ strcpy(msg->gpgll, buf);
+ } else if (!strncmp(buf, "$GPRMC", 6)) {
+ strcpy(msg->gprmc, buf);
+ } else if (!strncmp(buf, "$GPVTG", 6)) {
+ strcpy(msg->gpvtg, buf);
+ break;
+ }
}
- fds[0].fd = fd0;
- fds[1].fd = fd1;
+ return 0;
+}
+
+static void *gps_msg_producer(void *arg)
+{
+ int fd;
+ int tmp;
+ struct gps_msg *msg;
+ int msg_no;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+
+ log_notice("Venus GPS NMEA Producer starting");
+ fd = venus_open(device, baud_rate);
+ if (fd < 0) {
+ log_error("failed to open %s", device);
+ goto error;
+ }
+
+ msg_no = 0;
while (1) {
- for (i = 0; i < nfds; i++) {
- fds[i].events = 0;
+ msg = &producer.msgs_[msg_no];
+
+ tmp = fill_msg(fd, msg);
+ if (tmp < 0) {
+ log_error("fill_msg failed");
+ goto error;
}
- for (i = 0; i < nfds; i++) {
- if (buffer_free(&bufs[i])) {
- fds[i].events |= POLLIN;
- }
- if (buffer_used(&bufs[i])) {
- fds[DIR_REV(i)].events |= POLLOUT;
- }
+ tmp = gettimeofday(&msg->timestamp, NULL);
+ if (tmp < 0) {
+ log_error("gettimeofday failed: %m");
+ goto error;
}
- err = poll(fds, nfds, -1);
- if (err <= 0) {
- log_error("poll returned %d %m", err);
- break;
+ tmp = pthread_mutex_lock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_lock failed: %d", tmp);
+ exit(1);
}
- log_debug("poll returned %d", err);
+ producer.msg = msg;
+ producer.consumed = 0;
- for (i = 0; i < nfds; i++) {
- if (fds[i].revents) {
- log_debug("fds[%d]: %d revents: %04X", i, fds[i].fd, fds[i].revents);
+ tmp = pthread_cond_broadcast(&producer.msg_cond);
+ if (tmp) {
+ log_error("pthread_cond_broadcast failed: %d", tmp);
+ exit(1);
+ }
+
+ tmp = pthread_mutex_unlock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_unlock failed: %d", tmp);
+ exit(1);
+ }
- if (fds[i].revents & POLLHUP) {
- log_error("fds[%d]: %d POLLHUP", i, fds[i].fd);
- return 0;
- } else if (fds[i].revents & POLLOUT) {
- log_debug("fds[%d]: %d POLLOUT buffer_used: %lu",
- i, fds[i].fd, (unsigned long) buffer_used(&bufs[DIR_REV(i)]));
+ msg_no = !msg_no;
+ }
- err = buffered_write(fds[i].fd, &bufs[DIR_REV(i)]);
- if (err < 0) {
- log_error("fds[%d]: %d safe_write failed: %m", i, fds[i].fd);
- return -1;
- }
- } else if (fds[i].revents & POLLIN) {
- log_debug("fds[%d]: %d POLLIN buffer_free: %lu",
- i, fds[i].fd, (unsigned long) buffer_free(&bufs[i]));
+error:
- err = buffered_read(fds[i].fd, &bufs[i]);
- if (err <= 0) {
- log_debug("fds[%d]: %d safe_read failed: %m", i, fds[i].fd);
- return -1;
- }
- }
- }
+ if (fd >= 0) {
+ close(fd);
+ }
+
+ log_notice("exiting");
+
+ pthread_exit((void *) 1);
+}
+
+struct gps_timeval {
+ struct timeval gpgga;
+ struct timeval gpgsa;
+ struct timeval gpgsv;
+ struct timeval gpgll;
+ struct timeval gprmc;
+ struct timeval gpvtg;
+};
+
+enum {
+ WRITE_MSG_SOCKET,
+ WRITE_MSG_FILE,
+};
+
+int write_sentence(int fd, const char *buf, size_t len, int where)
+{
+ int tmp;
+
+ if (!*buf || len <= 0) {
+ return 0;
+ }
+
+ if (where == WRITE_MSG_SOCKET) {
+ if (*id_prefix) {
+ tmp = send(fd, id_prefix, strlen(id_prefix), MSG_MORE);
+ }
+ if (*id) {
+ tmp = send(fd, id, strlen(id), MSG_MORE);
}
+ tmp = send(fd, buf, len, MSG_MORE);
+ } else {
+ if (*id_prefix) {
+ tmp = full_write(fd, id_prefix, strlen(id_prefix));
+ }
+ if (*id) {
+ tmp = full_write(fd, id, strlen(id));
+ }
+ tmp = full_write(fd, buf, len);
+ }
+ if (tmp < 0) {
+ log_error("write failed: %m");
}
- return -1;
+ return tmp;
}
-static int udp_send_msgs(int sd, int tty)
+int write_msg(int fd, struct gps_msg *msg, struct gps_timeval *gpstv, int where)
{
- char buf[NMEA_SENTENCE_MAX];
- int count;
- int err;
+ struct timeval tmptv;
+ int tmp;
+ int count = 0;
- while (1) {
- count = venus_read_nmea_sentence(tty, buf, sizeof(buf));
- if (count <= 0) {
- log_error("venus_read_nmea_sentence failed: quiting");
+ if (gpgga && timercmp(&gpstv->gpgga, &msg->timestamp, <=)) {
+ tmp = write_sentence(fd, msg->gpgga, strlen(msg->gpgga), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
return -1;
}
- err = send(sd, buf, count, 0);
- if (err < 0) {
- log_debug("send failed: %m");
+
+ tmptv.tv_sec = gpgga - 1;
+ tmptv.tv_usec = 750000;
+ timeradd(&msg->timestamp, &tmptv, &gpstv->gpgga);
+ count++;
+ }
+ if (gpgsa && timercmp(&gpstv->gpgsa, &msg->timestamp, <=)) {
+ tmp = write_sentence(fd, msg->gpgsa, strlen(msg->gpgsa), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
return -1;
}
+
+ tmptv.tv_sec = gpgsa - 1;
+ tmptv.tv_usec = 750000;
+ timeradd(&msg->timestamp, &tmptv, &gpstv->gpgsa);
+ count++;
}
+ if (gpgsv && timercmp(&gpstv->gpgsv, &msg->timestamp, <=)) {
+ tmp = write_sentence(fd, msg->gpgsv[0], strlen(msg->gpgsv[0]), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
+ return -1;
+ }
+ tmp = write_sentence(fd, msg->gpgsv[1], strlen(msg->gpgsv[1]), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
+ return -1;
+ }
+ tmp = write_sentence(fd, msg->gpgsv[2], strlen(msg->gpgsv[2]), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
+ return -1;
+ }
- return -1;
+ tmptv.tv_sec = gpgsv - 1;
+ tmptv.tv_usec = 750000;
+ timeradd(&msg->timestamp, &tmptv, &gpstv->gpgsv);
+ count++;
+ }
+ if (gpgll && timercmp(&gpstv->gpgll, &msg->timestamp, <=)) {
+ tmp = write_sentence(fd, msg->gpgll, strlen(msg->gpgll), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
+ return -1;
+ }
+
+ tmptv.tv_sec = gpgll - 1;
+ tmptv.tv_usec = 750000;
+ timeradd(&msg->timestamp, &tmptv, &gpstv->gpgll);
+ count++;
+ }
+ if (gprmc && timercmp(&gpstv->gprmc, &msg->timestamp, <=)) {
+ tmp = write_sentence(fd, msg->gprmc, strlen(msg->gprmc), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
+ return -1;
+ }
+
+ tmptv.tv_sec = gprmc - 1;
+ tmptv.tv_usec = 750000;
+ timeradd(&msg->timestamp, &tmptv, &gpstv->gprmc);
+ count++;
+ }
+ if (gpvtg && timercmp(&gpstv->gpvtg, &msg->timestamp, <=)) {
+ tmp = write_sentence(fd, msg->gpvtg, strlen(msg->gpvtg), where);
+ if (tmp < 0) {
+ log_error("write_sentence failed");
+ return -1;
+ }
+
+ tmptv.tv_sec = gpvtg - 1;
+ tmptv.tv_usec = 750000;
+ timeradd(&msg->timestamp, &tmptv, &gpstv->gpvtg);
+ count++;
+ }
+
+ if (count && where == WRITE_MSG_SOCKET) {
+ tmp = send(fd, NULL, 0, 0);
+ if (tmp < 0) {
+ log_error("send failed: %m");
+ return -1;
+ }
+ }
+
+ return count;
}
-static int gps_to_udp_client(const char *host, int port)
+static int gps_client_consume(int fd, int consumer, int where)
{
- int tty;
- int sd;
+ struct gps_msg msg;
+ struct gps_timeval gpstv;
+ int tmp;
- log_notice("Venus GPS UDP Client connecting to %s:%d", host, port);
+ memset(&gpstv, 0, sizeof(gpstv));
while (1) {
- sd = inet_conn_str(host, port, SOCK_DGRAM);
- if (sd < 0) {
- log_error("inet_conn_str failed: %m");
- sleep(60);
- } else {
- log_debug("connection opened");
+ tmp = pthread_mutex_lock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_lock failed: %d", tmp);
+ exit(1);
+ }
- tty = venus_open(device, baud_rate);
+ while (producer.consumed & BIT(consumer)) {
+ tmp = pthread_cond_wait(&producer.msg_cond, &producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_cond_wait failed: %d", tmp);
+ exit(1);
+ }
+ }
- set_nonblocking(sd);
+ memcpy(&msg, producer.msg, sizeof(msg));
+ producer.consumed |= BIT(consumer);
- udp_send_msgs(sd, tty);
+ tmp = pthread_mutex_unlock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_unlock failed: %d", tmp);
+ exit(1);
+ }
- venus_close(tty);
- close(sd);
+ tmp = write_msg(fd, &msg, &gpstv, where);
+ if (tmp < 0) {
+ log_error("write_msg failed");
+ return -1;
}
}
return -1;
}
-static int gps_to_tcp_client(const char *host, int port)
+static int consumer_add(void)
+{
+ int tmp;
+ int consumer;
+
+ tmp = pthread_mutex_lock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_lock failed: %d", tmp);
+ exit(1);
+ }
+
+ consumer = ffs(~producer.consumers);
+ if (consumer) {
+ producer.consumers |= BIT(consumer - 1);
+ }
+
+ tmp = pthread_mutex_unlock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_unlock failed: %d", tmp);
+ exit(1);
+ }
+
+ return consumer - 1;
+}
+
+static void consumer_delete(int consumer)
+{
+ int tmp;
+
+ tmp = pthread_mutex_lock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_lock failed: %d", tmp);
+ exit(1);
+ }
+
+ producer.consumers &= ~BIT(consumer);
+
+ tmp = pthread_mutex_unlock(&producer.msg_mutex);
+ if (tmp) {
+ log_emerg("pthread_mutex_unlock failed: %d", tmp);
+ exit(1);
+ }
+}
+
+static void *gps_socket_client(void *arg)
{
- int tty;
int sd;
+ int consumer;
+ char *type_name = NULL;
+ struct socket_client_args *sc = (struct socket_client_args *) arg;
+
+ switch (sc->type) {
+ case SOCK_STREAM:
+ type_name = "TCP";
+ break;
+ case SOCK_DGRAM:
+ type_name = "UDP";
+ break;
+ }
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- log_notice("Venus GPS TCP Client connecting to %s:%d", host, port);
+ log_notice("Venus GPS %s Client connecting to %s:%d",
+ type_name, sc->host, sc->port);
+
+ consumer = consumer_add();
+ if (consumer < 0) {
+ log_error("no more free consumers: exiting");
+ goto error;
+ }
while (1) {
- sd = inet_conn_str(host, port, SOCK_STREAM);
+ sd = inet_conn_str(sc->host, sc->port, sc->type);
if (sd < 0) {
log_error("inet_conn_str failed: %m");
- sleep(60);
+ sleep(SLEEP_AFTER_FAILURE);
} else {
- log_debug("connection opened");
+ log_info("connection opened");
+ gps_client_consume(sd, consumer, WRITE_MSG_SOCKET);
+ close(sd);
+ log_info("connection closed");
+ }
+ }
+
+error:
- tty = venus_open(device, baud_rate);
+ if (consumer >= 0) {
+ consumer_delete(consumer);
+ }
- set_nonblocking(tty);
- set_nonblocking(sd);
+ log_notice("exiting");
- exchange_data(sd, tty);
+ pthread_exit((void *) 1);
+}
- venus_close(tty);
- close(sd);
- }
+static void *gps_tcp_server_conn(void *arg)
+{
+ int sd = (int) (long) arg;
+ int consumer;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+
+ log_notice("Venus GPS TCP Server Conn connected");
+
+ consumer = consumer_add();
+ if (consumer < 0) {
+ log_error("no more free consumers: exiting");
+ goto error;
}
- return -1;
+ gps_client_consume(sd, consumer, WRITE_MSG_SOCKET);
+
+error:
+ if (consumer >= 0) {
+ consumer_delete(consumer);
+ }
+
+ close(sd);
+
+ log_notice("exiting");
+
+ pthread_exit((void *) 1);
}
-static int gps_to_tcp_server(int port)
+static void *gps_tcp_server(void *arg)
{
- int err;
- int tty;
+ int tmp;
int lsd;
int sd;
int cli_addr_len;
@@ -250,38 +565,41 @@ static int gps_to_tcp_server(int port)
struct sockaddr_in serv_addr;
struct sockaddr_in cli_addr;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+
lsd = socket(PF_INET, SOCK_STREAM, 0);
if (lsd == -1) {
log_error("socket failed: %m");
- exit(1);
+ goto error;
}
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- serv_addr.sin_port = htons(port);
+ serv_addr.sin_port = htons(gps_tcp_server_port);
- err = setsockopt(lsd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
- if (err == -1) {
+ tmp = setsockopt(lsd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
+ if (tmp == -1) {
log_error("setsockopt failed: %m");
- exit(1);
+ goto error;
}
serv_addr_len = sizeof(serv_addr);
- err = bind(lsd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
- if (err == -1) {
+ tmp = bind(lsd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
+ if (tmp == -1) {
log_error("bind failed: %m");
- exit(1);
+ goto error;
}
- err = listen(lsd, 1);
- if (err == -1) {
+ tmp = listen(lsd, 4);
+ if (tmp == -1) {
log_error("listen failed: %m");
- exit(1);
+ goto error;
}
- log_notice("Venus GPS TCP Server listening on port %d", port);
+ log_notice("Venus GPS TCP Server listening on port %d", gps_tcp_server_port);
while (1) {
cli_addr_len = sizeof(cli_addr);
@@ -290,69 +608,89 @@ static int gps_to_tcp_server(int port)
if (sd == -1) {
log_error("accept failed: %m");
} else {
- log_debug("accepted");
-
- tty = venus_open(device, baud_rate);
+ pthread_t tid;
- set_nonblocking(tty);
- set_nonblocking(sd);
-
- exchange_data(sd, tty);
+ tmp = pthread_create(&tid, NULL, gps_tcp_server_conn, (void *) (long) sd);
+ if (tmp) {
+ log_error("pthread_create gps_tcp_server_conn failed: %d", tmp);
+ close(sd);
+ continue;
+ }
- venus_close(tty);
- close(sd);
+ tmp = pthread_detach(tid);
+ if (tmp) {
+ log_error("pthread_detach failed: %d", tmp);
+ }
}
}
- close(lsd);
+error:
+ if (lsd >= 0) {
+ close(lsd);
+ }
- return -1;
+ log_notice("exiting");
+
+ pthread_exit((void *) 1);
}
-static int gps_to_file(const char *file)
+static void *gps_file_client(void *arg)
{
- int tty;
int fd;
+ int consumer = -1;
- log_notice("Venus GPS to file %s", file);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- if (!strcmp(file, "-")) {
+ log_notice("Venus GPS to file %s", gps_filename);
+
+ if (!strcmp(gps_filename, "-")) {
fd = fileno(stdout);
} else {
- fd = open(file, O_WRONLY);
+ fd = open(gps_filename, O_WRONLY);
if (fd < 0) {
- log_error("failed to open %s: %m", file);
- return -1;
+ log_error("failed to open %s: %m", gps_filename);
+ goto error;
}
}
- set_nonblocking(fd);
-
- while (1) {
- tty = venus_open(device, baud_rate);
+ consumer = consumer_add();
+ if (consumer < 0) {
+ log_error("no more free consumers: exiting");
+ goto error;
+ }
- set_nonblocking(tty);
+ gps_client_consume(fd, consumer, WRITE_MSG_FILE);
- exchange_data(fd, tty);
+error:
+ if (consumer >= 0) {
+ consumer_delete(consumer);
+ }
- venus_close(tty);
+ if (fd >= 0) {
+ close(fd);
}
- return -1;
+ log_notice("exiting");
+
+ pthread_exit((void *) 1);
}
-int gps_to_serial(const char *port)
+static void *gps_serial_client(void *arg)
{
- int tty;
int fd;
struct termios tio;
+ int consumer = -1;
- log_notice("Venus GPS to serial %s", port);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- fd = open(port, O_RDWR | O_NOCTTY);
+ log_notice("Venus GPS to serial %s", gps_serial_port);
+
+ fd = open(gps_serial_port, O_RDWR | O_NOCTTY);
if (fd < 0) {
- log_error("failed to open %s: %m", port);
- return -1;
+ log_error("failed to open %s: %m", gps_serial_port);
+ goto error;
}
tcgetattr(fd, &tio);
@@ -374,19 +712,26 @@ int gps_to_serial(const char *port)
tcsetattr(fd, TCSANOW, &tio);
tcflush(fd, TCIOFLUSH);
- set_nonblocking(fd);
-
- while (1) {
- tty = venus_open(device, baud_rate);
+ consumer = consumer_add();
+ if (consumer < 0) {
+ log_error("no more free consumers: exiting");
+ goto error;
+ }
- set_nonblocking(tty);
+ gps_client_consume(fd, consumer, WRITE_MSG_FILE);
- exchange_data(fd, tty);
+error:
+ if (consumer >= 0) {
+ consumer_delete(consumer);
+ }
- venus_close(tty);
+ if (fd >= 0) {
+ close(fd);
}
- return -1;
+ log_notice("exiting");
+
+ pthread_exit((void *) 1);
}
static void print_version(const char *name)
@@ -404,14 +749,16 @@ static void usage(FILE * out)
fprintf(out, "usage: venus-gps [ OPTIONS ... ]\n");
fprintf(out, "where OPTIONS := { \n");
fprintf(out, " --daemonize |\n");
- fprintf(out, " -d --device <device> (default: /dev/ttyS3) |\n");
+ fprintf(out, " -d, --device <device> (default: /dev/ttyS3) |\n");
fprintf(out, " --file <file> |\n");
fprintf(out, " --tcp-server |\n");
fprintf(out, " --tcp-client <host> |\n");
fprintf(out, " --udp-client <host> |\n");
fprintf(out, " --serial <device> |\n");
+ fprintf(out, " --id ID |\n");
+ fprintf(out, " --id-prefix PREFIX |\n");
#if CONFIG_CAN_DEFAULT
- fprintf(out, " -f --factory-defaults |\n");
+ fprintf(out, " -f, --factory-defaults |\n");
#endif
#if CONFIG_CAN_RESET
fprintf(out, "\n");
@@ -421,15 +768,15 @@ static void usage(FILE * out)
fprintf(out, " --altitude VAL (default: ) |\n");
#endif
fprintf(out, "\n");
- fprintf(out, " --gga 0-255 (default: 1) |\n");
- fprintf(out, " --gsa 0-255 (default: 1) |\n");
- fprintf(out, " --gsv 0-255 (default: 1) |\n");
- fprintf(out, " --gll 0-255 (default: 1) |\n");
- fprintf(out, " --rmc 0-255 (default: 1) |\n");
+ fprintf(out, " --gpgga 0-255 (default: 1) |\n");
+ fprintf(out, " --gpgsa 0-255 (default: 1) |\n");
+ fprintf(out, " --gpgsv 0-255 (default: 1) |\n");
+ fprintf(out, " --gpgll 0-255 (default: 1) |\n");
+ fprintf(out, " --gprmc 0-255 (default: 1) |\n");
#if CONFIG_HAVE_ZDA
- fprintf(out, " --zda 0-255 (default: 1) |\n");
+ fprintf(out, " --gpzda 0-255 (default: 1) |\n");
#endif
- fprintf(out, " --vtg 0-255 (default: 1)\n");
+ fprintf(out, " --gpvtg 0-255 (default: 1)\n");
fprintf(out, " }\n");
fprintf(out, "\n");
}
@@ -442,13 +789,13 @@ static void usage(FILE * out)
enum {
OPT_VERSION = 128,
OPT_HELP,
- OPT_GGA,
- OPT_GSA,
- OPT_GSV,
- OPT_GLL,
- OPT_RMC,
- OPT_VTG,
- OPT_ZDA,
+ OPT_GPGGA,
+ OPT_GPGSA,
+ OPT_GPGSV,
+ OPT_GPGLL,
+ OPT_GPRMC,
+ OPT_GPVTG,
+ OPT_GPZDA,
OPT_START_MODE,
OPT_LATITUDE,
OPT_LONGITUDE,
@@ -458,7 +805,9 @@ enum {
OPT_TCP_SERVER,
OPT_TCP_CLIENT,
OPT_UDP_CLIENT,
- OPT_SERIAL,
+ OPT_SERIAL_CLIENT,
+ OPT_ID_PREFIX,
+ OPT_ID,
};
static char *short_options = "b:d:p:f";
@@ -471,18 +820,20 @@ static struct option long_options[] = {
{"tcp-server", 0, 0, OPT_TCP_SERVER},
{"tcp-client", 1, 0, OPT_TCP_CLIENT},
{"udp-client", 1, 0, OPT_UDP_CLIENT},
- {"serial", 1, 0, OPT_SERIAL},
+ {"serial-client", 1, 0, OPT_SERIAL_CLIENT},
+ {"id", 1, 0, OPT_ID},
+ {"id-prefix", 1, 0, OPT_ID_PREFIX},
#if CONFIG_CAN_DEFAULT
{"factory-defaults", 0, 0, OPT_FACTORY_DEFAULTS},
#endif
- {"gga", 1, 0, OPT_GGA},
- {"gsa", 1, 0, OPT_GSA},
- {"gsv", 1, 0, OPT_GSV},
- {"gll", 1, 0, OPT_GLL},
- {"rmc", 1, 0, OPT_RMC},
- {"vtg", 1, 0, OPT_VTG},
+ {"gpgga", 1, 0, OPT_GPGGA},
+ {"gpgsa", 1, 0, OPT_GPGSA},
+ {"gpgsv", 1, 0, OPT_GPGSV},
+ {"gpgll", 1, 0, OPT_GPGLL},
+ {"gprmc", 1, 0, OPT_GPRMC},
+ {"gpvtg", 1, 0, OPT_GPVTG},
#if CONFIG_HAVE_ZDA
- {"zda", 1, 0, OPT_ZDA},
+ {"gpzda", 1, 0, OPT_GPZDA},
#endif
#if CONFIG_CAN_RESET
{"start-mode", 1, 0, OPT_START_MODE},
@@ -504,33 +855,14 @@ int main(int argc, char *argv[])
int tty;
struct venus_msg msg;
+ struct socket_client_args *sc;
+
#if CONFIG_CAN_DEFAULT
int factory_defaults = 0;
#endif
int daemonize = 0;
int port = 5445;
- char *file = "-";
- char tcp_server_enabled = 0;
- int tcp_server_port = port;
- char *tcp_client_host = NULL;
- int tcp_client_port = port;
- char *udp_client_host = NULL;
- int udp_client_port = port;
- char *serial_port = NULL;
-
- struct sigaction sa;
-
- sa.sa_handler = sigalarm_handler;
- sigemptyset(&sa.sa_mask);
- sa.sa_flags = 0;
- sigaction(SIGALRM, &sa, NULL);
-
- sa.sa_handler = sigpipe_handler;
- sigemptyset(&sa.sa_mask);
- sa.sa_flags = 0;
- sigaction(SIGPIPE, &sa, NULL);
-
while ((i = getopt_long(argc, argv, short_options, long_options, &option_index)) >= 0) {
switch (i) {
case 0:
@@ -554,54 +886,75 @@ int main(int argc, char *argv[])
break;
case OPT_FILE:
- file = optarg;
+ gps_filename = optarg;
break;
- case OPT_SERIAL:
- serial_port = optarg;
+ case OPT_SERIAL_CLIENT:
+ gps_serial_port = optarg;
break;
case OPT_TCP_SERVER:
- tcp_server_enabled = 1;
- tcp_server_port = port;
+ gps_tcp_server_port = port;
break;
case OPT_TCP_CLIENT:
+ if (n_socket_clients >= SOCKET_CLIENTS_MAX) {
+ log_error("max clients exceeded: %d",
+ SOCKET_CLIENTS_MAX);
+ usage(stderr);
+ exit(1);
+ }
+ sc = &socket_clients[n_socket_clients++];
+
cp = strchr(optarg, ':');
if (cp) {
- tcp_client_host = strndup(optarg, cp - optarg);
+ sc->host = strndup(optarg, cp - optarg);
cp++;
- tcp_client_port = atoi(cp);
- if (tcp_client_port <= 0 || tcp_client_port >= (1 << 16)) {
+ sc->port = atoi(cp);
+ if (sc->port <= 0 || sc->port >= (1 << 16)) {
log_error("invalid port");
usage(stderr);
exit(1);
}
} else {
- tcp_client_host = strdup(optarg);
- tcp_client_port = port;
+ sc->host = strdup(optarg);
+ sc->port = port;
}
+
+ sc->type = SOCK_STREAM;
+
break;
case OPT_UDP_CLIENT:
+ if (n_socket_clients >= SOCKET_CLIENTS_MAX) {
+ log_error("max clients exceeded: %d",
+ SOCKET_CLIENTS_MAX);
+ usage(stderr);
+ exit(1);
+ }
+ sc = &socket_clients[n_socket_clients++];
+
cp = strchr(optarg, ':');
if (cp) {
- udp_client_host = strndup(optarg, cp - optarg);
+ sc->host = strndup(optarg, cp - optarg);
cp++;
- udp_client_port = atoi(cp);
- if (udp_client_port <= 0 || udp_client_port >= (1 << 16)) {
+ sc->port = atoi(cp);
+ if (sc->port <= 0 || sc->port >= (1 << 16)) {
log_error("invalid port");
usage(stderr);
exit(1);
}
} else {
- udp_client_host = strdup(optarg);
- udp_client_port = port;
+ sc->host = strdup(optarg);
+ sc->port = port;
}
+
+ sc->type = SOCK_DGRAM;
+
break;
case OPT_BAUD_RATE:
@@ -613,33 +966,60 @@ int main(int argc, char *argv[])
}
break;
- case OPT_GGA:
- gga = atoi(optarg);
+ case OPT_ID_PREFIX:
+ tmp = strlen(optarg);
+ if (tmp > ID_PREFIX_MAX) {
+ log_error("id-prefix is too long max: %d > %d",
+ tmp, ID_PREFIX_MAX);
+ usage(stderr);
+ exit(1);
+ }
+ snprintf(id_prefix, sizeof(id_prefix), "%s", optarg);
break;
- case OPT_GSA:
- gsa = atoi(optarg);
+ case OPT_ID:
+ tmp = strlen(optarg);
+ if(tmp > ID_MAX) {
+ log_error("id is too long max: %d > %d",
+ tmp, ID_MAX);
+ usage(stderr);
+ exit(1);
+ }
+ if(strcspn(optarg, "$&") != tmp) {
+ log_error("id contains invalid char $&");
+ usage(stderr);
+ exit(1);
+ }
+ snprintf(id, sizeof(id), "%s", optarg);
+ break;
+
+ case OPT_GPGGA:
+ gpgga = atoi(optarg);
+ break;
+
+ case OPT_GPGSA:
+ gpgsa = atoi(optarg);
break;
- case OPT_GSV:
- gsv = atoi(optarg);
+ case OPT_GPGSV:
+ gpgsv = atoi(optarg);
break;
- case OPT_GLL:
- gll = atoi(optarg);
+ case OPT_GPGLL:
+ gpgll = atoi(optarg);
break;
- case OPT_RMC:
- rmc = atoi(optarg);
+ case OPT_GPRMC:
+ gprmc = atoi(optarg);
break;
- case OPT_VTG:
- vtg = atoi(optarg);
+ case OPT_GPVTG:
+ gpvtg = atoi(optarg);
break;
#if CONFIG_HAVE_ZDA
- case OPT_ZDA:
- zda = atoi(optarg);
+ case OPT_GPZDA:
+ gpzda = atoi(optarg);
break;
#endif
@@ -744,7 +1124,8 @@ int main(int argc, char *argv[])
#if CONFIG_CAN_RESET
if (start_mode != (typeof(start_mode)) - 1) {
log_notice("issuing system restart");
- tmp = venus_system_restart(tty, start_mode, time(NULL), latitude, longitude, altitude);
+ tmp = venus_system_restart(tty, start_mode, time(NULL),
+ latitude, longitude, altitude);
if (tmp < 0) {
log_error("system restart failed");
exit(1);
@@ -753,7 +1134,7 @@ int main(int argc, char *argv[])
}
#endif
- tmp = venus_conf_nmea(tty, gga, gsa, gsv, gll, rmc, vtg, zda, UPDATE_ATTR_SRAM);
+ tmp = venus_conf_nmea(tty, 1, 1, 1, 1, 1, 1, 0, UPDATE_ATTR_SRAM);
if (tmp < 0) {
log_error("failed to configure nmea sentences");
exit(1);
@@ -767,19 +1148,85 @@ int main(int argc, char *argv[])
venus_close(tty);
- /*
- * FIXME: Support more than one mode at a time
- */
- if (tcp_server_enabled) {
- gps_to_tcp_server(tcp_server_port);
- } else if (tcp_client_host) {
- gps_to_tcp_client(tcp_client_host, tcp_client_port);
- } else if (udp_client_host) {
- gps_to_udp_client(udp_client_host, udp_client_port);
- } else if (serial_port) {
- gps_to_serial(serial_port);
- } else if (file) {
- gps_to_file(file);
+ sigset_t sigset;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGTERM);
+ sigaddset(&sigset, SIGINT);
+ sigaddset(&sigset, SIGPIPE);
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
+ pthread_t gps_msg_producer_tid;
+ pthread_t gps_serial_client_tid;
+ pthread_t gps_file_client_tid;
+ pthread_t gps_tcp_server_tid;
+
+ tmp = pthread_create(&gps_msg_producer_tid, NULL, gps_msg_producer, NULL);
+ if (tmp) {
+ log_error("pthread_create gps_msg_producer failed: %d", tmp);
+ exit(1);
+ }
+
+ for (i = 0; i < n_socket_clients; i++) {
+ tmp = pthread_create(&socket_clients[i].tid, NULL,
+ gps_socket_client, (void *) &socket_clients[i]);
+ if (tmp) {
+ log_error("pthread_create gps_socket_client failed: %d", tmp);
+ }
+ }
+
+ if (gps_serial_port) {
+ tmp = pthread_create(&gps_serial_client_tid, NULL,
+ gps_serial_client, NULL);
+ if (tmp) {
+ log_error("pthread_create gps_serial_client failed: %d", tmp);
+ }
+ }
+ if (gps_filename) {
+ tmp = pthread_create(&gps_file_client_tid, NULL,
+ gps_file_client, NULL);
+ if (tmp) {
+ log_error("pthread_create gps_file_client failed: %d", tmp);
+ }
+ }
+ if (gps_tcp_server_port >= 0) {
+ tmp = pthread_create(&gps_tcp_server_tid, NULL,
+ gps_tcp_server, NULL);
+ if (tmp) {
+ log_error("pthread_create gps_tcp_server failed: %d", tmp);
+ }
+ }
+
+ while (1) {
+ siginfo_t info;
+ struct timespec timeout = {
+ .tv_sec = 1,
+ .tv_nsec = 0,
+ };
+
+ tmp = sigtimedwait(&sigset, &info, &timeout);
+ switch (tmp) {
+ case -1:
+ if (errno != EAGAIN) {
+ log_error("sigtimedwait failed: %m");
+ }
+ break;
+ case SIGINT:
+ log_notice("SIGINT caught");
+ exit(0);
+ break;
+ case SIGTERM:
+ log_notice("SIGTERM caught");
+ exit(0);
+ break;
+ case SIGPIPE:
+ log_notice("SIGPIPE caught");
+ break;
+ default:
+ log_error("unhandled signal %d caught", tmp);
+ break;
+ }
+
+ /* Check for exited threads */
}
return 0;