This repository has been archived by the owner on Jun 30, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
packet.c
416 lines (364 loc) · 11.5 KB
/
packet.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
// $Id: packet.c,v 1.25 2018/12/02 09:16:45 karn Exp $
// AFSK/FM packet demodulator
// Reads RTP PCM audio stream, emits decoded frames in multicast RTP
// Copyright 2018, Phil Karn, KA9Q
#define _GNU_SOURCE 1
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <locale.h>
#include <netdb.h>
#include "dsp.h"
#include "osc.h"
#include "filter.h"
#include "misc.h"
#include "multicast.h"
#include "ax25.h"
// Needs to be redone with common RTP receiver module
struct session {
struct session *next;
struct sockcache source;
struct rtp_state rtp_state_in;
struct rtp_state rtp_state_out;
int input_pointer;
struct filter_in *filter_in;
pthread_t decode_thread;
unsigned int decoded_packets;
};
// Config constants
#define MAX_MCAST 20 // Maximum number of multicast addresses
float const SCALE = 1./32768;
int const Bufsize = 2048;
int const AN = 2048; // Should be power of 2 for FFT efficiency
int const AL = 1000; // 25 bit times
//int const AM = AN - AL + 1; // should be >= Samppbit, i.e., samprate / bitrate
int const AM = 1049;
float const Samprate = 48000;
float const Bitrate = 1200;
//int const Samppbit = Samprate/Bitrate;
int const Samppbit = 40;
// Command line params
char *Mcast_address_text[MAX_MCAST];
char *Decode_mcast_address_text = "ax25.mcast.local";
int Verbose;
int Mcast_ttl = 10; // Very low intensity output
// Global variables
int Nfds; // Number of streams
int Input_fd = -1;
int Output_fd = -1;
struct session *Session;
extern float Kaiser_beta;
pthread_mutex_t Output_mutex;
struct session *lookup_session(const uint32_t ssrc);
struct session *make_session(uint32_t ssrc);
int close_session(struct session *sp);
void *decode_task(void *arg);
int main(int argc,char *argv[]){
// Drop root if we have it
if(seteuid(getuid()) != 0)
fprintf(stderr,"seteuid: %s\n",strerror(errno));
setlocale(LC_ALL,getenv("LANG"));
// Unlike aprs and aprsfeed, stdout is not line buffered because each packet
// generates a multi-line dump. So we have to be sure to fflush(stdout) after each
// packet in case we're redirected into a file
int c;
while((c = getopt(argc,argv,"I:R:vT:")) != EOF){
switch(c){
case 'v':
Verbose++;
break;
case 'I':
if(Nfds == MAX_MCAST){
fprintf(stderr,"Too many multicast addresses; max %d\n",MAX_MCAST);
} else
Mcast_address_text[Nfds++] = optarg;
break;
case 'R':
Decode_mcast_address_text = optarg;
break;
case 'T':
Mcast_ttl = strtol(optarg,NULL,0);
break;
default:
fprintf(stderr,"Usage: %s [-v] [-I input_mcast_address] [-R output_mcast_address] [-T mcast_ttl]\n",argv[0]);
fprintf(stderr,"Defaults: %s -I [none] -R %s -T %d\n",argv[0],Decode_mcast_address_text,Mcast_ttl);
exit(1);
}
}
if(Nfds == 0){
fprintf(stderr,"At least one -I option required\n");
exit(1);
}
// Set up multicast input, create mask for select()
fd_set fdset_template; // Mask for select()
FD_ZERO(&fdset_template);
int max_fd = 2; // Highest number fd for select()
int input_fd[Nfds]; // Multicast receive sockets
for(int i=0;i<Nfds;i++){
input_fd[i] = setup_mcast(Mcast_address_text[i],NULL,0,0,0);
if(input_fd[i] == -1){
fprintf(stderr,"Can't set up input %s\n",Mcast_address_text[i]);
continue;
}
if(input_fd[i] > max_fd)
max_fd = input_fd[i];
FD_SET(input_fd[i],&fdset_template);
}
Output_fd = setup_mcast(Decode_mcast_address_text,NULL,1,Mcast_ttl,0);
if(Output_fd == -10){
fprintf(stderr,"Can't set up output to %s\n",
Decode_mcast_address_text);
exit(1);
}
pthread_mutex_init(&Output_mutex,NULL);
struct rtp_header rtp_hdr;
struct sockaddr sender;
// audio input thread
// Receive audio multicasts, multiplex into sessions, execute filter front end (which wakes up decoder thread)
while(1){
// Wait for traffic to arrive
fd_set fdset = fdset_template;
int s = select(max_fd+1,&fdset,NULL,NULL,NULL);
if(s < 0 && errno != EAGAIN && errno != EINTR)
break;
if(s == 0)
continue; // Nothing arrived; probably just an ignored signal
for(int fd_index = 0;fd_index < Nfds;fd_index++){
if(input_fd[fd_index] == -1 || !FD_ISSET(input_fd[fd_index],&fdset))
continue;
unsigned char buffer[16384]; // Fix this
socklen_t socksize = sizeof(sender);
int size = recvfrom(input_fd[fd_index],buffer,sizeof(buffer),0,&sender,&socksize);
if(size == -1){
if(errno != EINTR){ // Happens routinely
perror("recvfrom");
usleep(1000);
}
continue;
}
if(size < RTP_MIN_SIZE){
usleep(1000); // Avoid tight loop
continue; // Too small to be valid RTP
}
// Extract RTP header
unsigned char *dp = buffer;
dp = ntoh_rtp(&rtp_hdr,dp);
size -= dp - buffer;
if(rtp_hdr.pad){
// Remove padding
size -= dp[size-1];
rtp_hdr.pad = 0;
}
if(rtp_hdr.type != PCM_MONO_PT)
continue; // Only mono PCM for now
struct session *sp = lookup_session(rtp_hdr.ssrc);
if(sp == NULL){
// Not found
if((sp = make_session(rtp_hdr.ssrc)) == NULL){
fprintf(stdout,"No room for new session!!\n");
fflush(stdout);
continue;
}
sp->rtp_state_out.ssrc = sp->rtp_state_in.ssrc = rtp_hdr.ssrc;
update_sockcache(&sp->source,&sender);
sp->input_pointer = 0;
sp->filter_in = create_filter_input(AL,AM,REAL);
pthread_create(&sp->decode_thread,NULL,decode_task,sp); // One decode thread per stream
if(Verbose){
fprintf(stdout,"New session from %s:%s, ssrc %x\n",sp->source.host,sp->source.port,sp->rtp_state_in.ssrc);
fflush(stdout);
}
}
int sample_count = size / sizeof(signed short); // 16-bit sample count
int skipped_samples = rtp_process(&sp->rtp_state_in,&rtp_hdr,sample_count);
if(skipped_samples < 0)
continue; // Drop probable duplicate(s)
// Ignore skipped_samples > 0; no real need to maintain sample count when squelch closes
// Even if its caused by dropped RTP packets there's no FEC to fix it anyway
signed short *samples = (signed short *)dp;
while(sample_count-- > 0){
// Swap sample to host order, convert to float
sp->filter_in->input.r[sp->input_pointer++] = ntohs(*samples++) * SCALE;
if(sp->input_pointer == sp->filter_in->ilen){
execute_filter_input(sp->filter_in); // Wakes up any threads waiting for data on this filter
sp->input_pointer = 0;
}
}
}
}
// Need to kill decoder threads? Or will ordinary signals reach them?
exit(0);
}
// Find existing session in table, if it exists
struct session *lookup_session(const uint32_t ssrc){
struct session *sp;
for(sp = Session; sp != NULL; sp = sp->next){
if(sp->rtp_state_in.ssrc == ssrc)
// Found it
return sp;
}
return NULL;
}
// Create a new session, partly initialize
struct session *make_session(uint32_t ssrc){
struct session *sp;
if((sp = calloc(1,sizeof(*sp))) == NULL)
return NULL; // Shouldn't happen on modern machines!
sp->rtp_state_in.ssrc = ssrc;
// Put at head of bucket chain
sp->next = Session;
Session = sp;
return sp;
}
int close_session(struct session *sp){
if(sp == NULL)
return -1;
// Remove from linked list
struct session *se,*se_prev = NULL;
for(se = Session; se && se != sp; se_prev = se,se = se->next)
;
if(!se)
return -1;
if(se == sp){
if(se_prev)
se_prev->next = sp->next;
else
Session = se_prev;
}
return 0;
}
// AFSK demod, HDLC decode
void *decode_task(void *arg){
pthread_setname("afsk");
struct session *sp = (struct session *)arg;
assert(sp != NULL);
struct filter_out *filter = create_filter_output(sp->filter_in,NULL,1,COMPLEX);
set_filter(filter,+100./Samprate,+4000./Samprate,3.0); // Creates analytic, band-limited signal
// Tone replica generators (-1200 and -2200 Hz)
struct osc mark;
memset(&mark,0,sizeof(mark));
pthread_mutex_init(&mark.mutex,NULL);
set_osc(&mark,-1200./Samprate, 0.0);
struct osc space;
memset(&space,0,sizeof(space));
pthread_mutex_init(&space.mutex,NULL);
set_osc(&space,-2200./Samprate, 0.0);
// Tone integrators
int symphase = 0;
float complex mark_accum = 0; // On-time
float complex space_accum = 0;
float complex mark_offset_accum = 0; // Straddles previous zero crossing
float complex space_offset_accum = 0;
float last_val = 0; // Last on-time symbol
float mid_val = 0; // Last zero crossing symbol
// hdlc state
unsigned char hdlc_frame[1024];
memset(hdlc_frame,0,sizeof(hdlc_frame));
int frame_bit = 0;
int flagsync = 0;
int ones = 0;
while(1){
execute_filter_output(filter); // Blocks until data appears
for(int n=0; n<filter->olen; n++){
// Spin down by 1200 and 2200 Hz, accumulate each in boxcar (comb) filters
// Mark and space each have in-phase and offset integrators for timing recovery
float complex s;
s = filter->output.c[n] * step_osc(&mark);
mark_accum += s;
mark_offset_accum += s;
s = filter->output.c[n] * step_osc(&space);
space_accum += s;
space_offset_accum += s;
if(++symphase == Samppbit/2){
// Finish offset integrator and reset
mid_val = cnrmf(mark_offset_accum) - cnrmf(space_offset_accum);
mark_offset_accum = space_offset_accum = 0;
}
if(symphase < Samppbit)
continue;
// Finished whole bit
symphase = 0;
float cur_val = cnrmf(mark_accum) - cnrmf(space_accum);
mark_accum = space_accum = 0;
assert(frame_bit >= 0);
if(cur_val * last_val < 0){
// Transition -- Gardner-style clock adjust
symphase += ((cur_val - last_val) * mid_val) > 0 ? +1 : -1;
// NRZI zero
if(ones == 6){
// Flag
if(flagsync){
frame_bit -= 7; // Remove 0111111
int bytes = frame_bit / 8;
if(bytes > 0 && crc_good(hdlc_frame,bytes)){
if(Verbose){
time_t t;
struct tm *tmp;
time(&t);
tmp = gmtime(&t);
// Lock output to prevent intermingled output
pthread_mutex_lock(&Output_mutex);
fprintf(stdout,"%d %s %04d %02d:%02d:%02d UTC ",tmp->tm_mday,Months[tmp->tm_mon],tmp->tm_year+1900,
tmp->tm_hour,tmp->tm_min,tmp->tm_sec);
fprintf(stdout,"ssrc %x packet %d len %d:\n",sp->rtp_state_in.ssrc,sp->decoded_packets++,bytes);
dump_frame(stdout,hdlc_frame,bytes);
fflush(stdout);
pthread_mutex_unlock(&Output_mutex);
}
struct rtp_header rtp_hdr;
memset(&rtp_hdr,0,sizeof(rtp_hdr));
rtp_hdr.version = 2;
rtp_hdr.type = AX25_PT;
rtp_hdr.seq = sp->rtp_state_out.seq++;
// RTP timestamp??
rtp_hdr.timestamp = sp->rtp_state_out.timestamp;
sp->rtp_state_out.timestamp += bytes;
rtp_hdr.ssrc = sp->rtp_state_out.ssrc;
unsigned char packet[2048],*dp;
dp = packet;
dp = hton_rtp(dp,&rtp_hdr);
memcpy(dp,hdlc_frame,bytes);
dp += bytes;
send(Output_fd,packet,dp - packet,0); // Check return code?
sp->rtp_state_out.packets++;
sp->rtp_state_out.bytes += bytes;
}
}
if(1 || frame_bit != 0){
memset(hdlc_frame,0,sizeof(hdlc_frame));
frame_bit = 0;
}
flagsync = 1;
} else if(ones == 5){
// Drop stuffed zero
} else if(ones < 5){
if(flagsync){
frame_bit++;
}
}
ones = 0;
} else {
// NRZI one
if(++ones == 7){
// Abort
if(1 || frame_bit != 0){
memset(hdlc_frame,0,sizeof(hdlc_frame));
frame_bit = 0;
}
flagsync = 0;
} else {
if(flagsync){
hdlc_frame[frame_bit/8] |= 1 << (frame_bit % 8);
frame_bit++;
}
}
}
last_val = cur_val;
}
}
return NULL;
}