-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathrealtime.c
240 lines (202 loc) · 7.49 KB
/
realtime.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
/* Copyright 2013 Bliksem Labs. See the LICENSE file at the top-level directory of this distribution and at https://github.com/bliksemlabs/rrrr/. */
/* realtime.c */
/*
Fetch GTFS-RT updates over Websockets
Depends on https://github.com/warmcat/libwebsockets
compile with -lwebsockets -lprotobuf-c
protoc-c --c_out . gtfs-realtime.proto
clang -O2 -c gtfs-realtime.pb-c.c -o gtfs-realtime.pb-c.o
clang -O2 realtime.c gtfs-realtime.pb-c.o -o rrrrealtime -lwebsockets -lprotobuf-c
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
#include <getopt.h>
#include <string.h>
#include <signal.h>
#include <libwebsockets.h>
#include "radixtree.h"
#include "tdata.h"
#include "config.h"
/*
Websockets exchange frames. Messages can be split across frames.
Libwebsockets does not aggregate frames into messages, you must do it manually.
"The configuration-time option MAX_USER_RX_BUFFER has been replaced by a
buffer size chosen per-protocol. For compatibility, there's a default
of 4096 rx buffer, but user code should set the appropriate size for
the protocol frames."
If your frames ever exceed the set size, you need to detect it with libwebsockets_remaining_packet_payload.
See README.coding on how to support fragmented messages (multiple frames per message).
Also libwebsockets-test-fraggle.
Test fragmented messages with /alerts endpoint, since it dumps around 23k in the first message.
http://stackoverflow.com/questions/13010354/chunking-websocket-transmission
http://autobahn.ws/
http://www.lenholgate.com/blog/2011/07/websockets-is-a-stream-not-a-message-based-protocol.html
*/
#define MAX_FRAME_LENGTH (10 * 1024)
#define MAX_MESSAGE_LENGTH (10 * 1024 * 1024) // initial receives can be huge
#define V if (verbose)
uint8_t msg[MAX_MESSAGE_LENGTH];
size_t msg_len = 0;
bool verbose = true;
RadixTree *tripid_index;
tdata_t tdata;
static void msg_add_frame (uint8_t *frame, size_t len) {
if (msg_len + len > MAX_MESSAGE_LENGTH) {
fprintf (stderr, "message exceeded maximum message length\n");
msg_len = 0;
return;
}
memcpy(msg + msg_len, frame, len);
msg_len += len;
}
static void msg_reset () { msg_len = 0; }
static void msg_dump () {
for (uint8_t *c = msg; c < msg + msg_len; ++c) {
if (*c >= 32 && *c < 127) printf ("%c", *c);
else printf ("[%02X]", *c);
}
printf ("\n=============== END OF MESSAGE ================\n");
}
static bool socket_closed = false;
static bool force_exit = false;
/* Protocol: Incremental GTFS-RT */
static int callback_gtfs_rt (struct libwebsocket_context *this,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user, void *in, size_t len) {
switch (reason) {
case LWS_CALLBACK_CLOSED:
fprintf(stderr, "LWS_CALLBACK_CLOSED\n");
socket_closed = 1;
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
fprintf(stderr, "rx %d bytes: ", (int)len);
if (libwebsockets_remaining_packet_payload (wsi)) {
fprintf (stderr, "frame exceeds maximum allowed frame length\n");
} else if (libwebsocket_is_final_fragment (wsi)) {
fprintf(stderr, "final frame, ");
if (msg_len == 0) {
/* single frame message, nothing in the buffer */
fprintf(stderr, "single-frame message. ");
if (len > 0) tdata_apply_gtfsrt (&tdata, tripid_index, in, len);
} else {
/* last frame in a multi-frame message */
fprintf(stderr, "had previous fragment frames. ");
msg_add_frame (in, len);
tdata_apply_gtfsrt (&tdata, tripid_index, msg, msg_len);
fprintf(stderr, "emptying message buffer. ");
msg_reset();
}
} else {
/* non-final fragment frame */
fprintf(stderr, "message fragment frame. ");
msg_add_frame (in, len);
}
fprintf(stderr, "\n");
break;
case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
return 0; // say everything is supported?
break;
default:
break;
}
return 0;
}
/* List of supported protocols and their callbacks */
enum protocols {
PROTOCOL_INCREMENTAL_GTFS_RT,
PROTOCOL_COUNT /* end */
};
static struct libwebsocket_protocols protocols[] = {
{
"incremental-gtfs-rt",
callback_gtfs_rt,
0, // shared protocol data size
MAX_FRAME_LENGTH, // frame buffer size
},
{ NULL, NULL, 0, 0 } /* end */
};
void sighandler(int sig) {
force_exit = true;
}
static struct option long_options[] = {
{ "port", required_argument, NULL, 'p' },
{ "address", required_argument, NULL, 'a' },
{ "path", required_argument, NULL, 't' },
{ "help", no_argument, NULL, 'h' },
{ NULL, 0, 0, 0 } /* end */
};
int main(int argc, char **argv) {
int ret = 0;
int port = 8088;
bool use_ssl = false;
struct libwebsocket_context *context;
const char *address = "ovapi.nl";
const char *path = "/vehiclePositions";
//const char *path = "/tripUpdates";
struct libwebsocket *wsi_gtfsrt;
int ietf_version = -1; /* latest */
struct lws_context_creation_info cc_info;
int opt = 0;
while (opt >= 0) {
opt = getopt_long(argc, argv, "p:a:h", long_options, NULL);
if (opt < 0) continue;
switch (opt) {
case 'p':
port = atoi(optarg);
break;
case 'a':
address = optarg;
printf ("address set to %s\n", address);
break;
case 't':
path = optarg;
printf ("path set to %s\n", path);
break;
case 'h':
goto usage;
}
}
signal(SIGINT, sighandler);
tdata_load (RRRR_INPUT_FILE, &tdata);
tripid_index = rxt_load_strings_from_tdata (tdata.trip_ids, tdata.trip_id_width, tdata.n_trips);
/*
* create the websockets context. This tracks open connections and
* knows how to route any traffic and which protocol version to use,
* and if each connection is client or server side.
* This is client-only, so we tell it to not listen on any port.
*/
memset (&cc_info, 0, sizeof cc_info);
cc_info.port = CONTEXT_PORT_NO_LISTEN;
cc_info.protocols = protocols;
cc_info.extensions = libwebsocket_get_internal_extensions();
cc_info.gid = -1;
cc_info.uid = -1;
context = libwebsocket_create_context (&cc_info);
if (context == NULL) {
fprintf (stderr, "creating libwebsocket context failed\n");
return 1;
}
/* create a client websocket to incremental gtfs-rt distributor */
wsi_gtfsrt = libwebsocket_client_connect (context, address, port, use_ssl, path, address, address,
protocols[PROTOCOL_INCREMENTAL_GTFS_RT].name, ietf_version);
if (wsi_gtfsrt == NULL) {
fprintf(stderr, "libwebsocket gtfs-rt connect failed\n");
ret = 1;
goto bail;
}
fprintf(stderr, "websocket connections opened\n");
/* service the websocket context to handle incoming packets */
int n = 0;
while (n >= 0 && !socket_closed && !force_exit) n = libwebsocket_service(context, 500);
bail:
fprintf(stderr, "Exiting\n");
libwebsocket_context_destroy(context);
return ret;
usage:
fprintf(stderr, "Usage: rrrrealtime [--address=<server address>] [--port=<p>] [--path=/<path>]\n");
return 1;
}