forked from Katee/quietnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
quietnet.py
120 lines (96 loc) · 3.69 KB
/
quietnet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import numpy as np
import struct
import math
# split something into chunks
def chunks(l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
def unpack(buffer):
return unpack_buffer(list(chunks(buffer, 2)))
def unpack_buffer(buffer):
return [struct.unpack('h', frame)[0] for frame in buffer]
def pack_buffer(buffer):
return [struct.pack('h', frame) for frame in buffer]
def fft(signal):
return np.abs(np.fft.rfft(signal))
def get_peak(hertz, rate, chunk):
return int(round((float(hertz) / rate) * chunk))
def weighted_values_around_peak(in_data, peak_index, offset):
period = math.pi / (offset * 2)
out_data = []
for i in range(len(in_data)):
if i >= peak_index - offset and i <= peak_index + offset:
out_data.append(in_data[i] * np.abs(math.sin((period * (i - peak_index + offset)) + (math.pi / 2.0))))
else:
out_data.append(0)
return out_data
def has_freq(fft_sample, freq_in_hertz, rate, chunk, offset=3):
peak_index = get_peak(freq_in_hertz, rate, chunk)
top = max(fft_sample[peak_index-1:peak_index+2])
avg_around_peak = np.average(weighted_values_around_peak(fft_sample, peak_index, offset))
if top > avg_around_peak:
return fft_sample[peak_index]
else:
return 0
def get_signal(buffer):
unpacked_buffer = unpack_buffer(list(chunks(buffer, 2)))
return np.array(unpacked_buffer, dtype=float)
def raw_has_freq(buffer, freq_in_hertz, rate, chunk):
fft_sample = fft(get_signal(buffer))
return has_freq(fft_sample, freq_in_hertz, rate, chunk)
def get_freq_over_time(ffts, search_freq, chunk=1024, rate=44100):
return [has_freq(fft, search_freq, rate, chunk) for fft in ffts]
def get_points(freq_samples, frame_length, threshold=None, last_point=0):
if threshold == None:
threshold = np.median(freq_samples)
points = []
for i in range(len(freq_samples)):
freq_value = freq_samples[i]
point = 0
if freq_value > threshold:
# ignore big changes in frequency when they aren't near the frame transition
if last_point == 1 or (i % frame_length) <= 2:
point = 1
else:
point = 0
points.append(point)
last_point = point
return points
def get_bits(points, frame_length):
return [int(round(sum(c) / float(frame_length))) for c in list(chunks(points, frame_length)) if len(c) == frame_length]
def get_bit(points, frame_length):
return int(round(sum(points) / float(frame_length)))
def get_bytes(bits, sigil):
i = 0
# scan for the first occurance of the sigil
while i < len(bits) - len(sigil):
if bits[i:i+len(sigil)] == sigil:
i += len(sigil)
break
i += 1
return [l for l in list(chunks(bits[i:], 8)) if len(l) == 8]
def decode_byte(l):
byte_string = ''.join([str(bit) for bit in l])
return chr(int(byte_string, base=2))
def decode(bytes):
string = ""
for byte in bytes:
byte = ''.join([str(bit) for bit in byte])
string += chr(int(byte, base=2))
return string
def tone(freq=400, datasize=4096, rate=44100, amp=12000.0, offset=0):
sine_list=[]
for x in range(datasize):
samp = math.sin(2*math.pi*freq*((x + offset)/float(rate)))
sine_list.append(int(samp*amp))
return sine_list
def envelope(in_data, left=True, right=True, rate=44100):
half = float(len(in_data)) / 2
freq = math.pi / (len(in_data) / 2)
out_data = []
for x in range(len(in_data)):
samp = in_data[x]
if (x < half and left) or (right and x >= half):
samp = samp * (1 + math.sin(freq*x - (math.pi / 2))) / 2
out_data.append(int(samp))
return out_data