-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtests.py
264 lines (238 loc) · 12.2 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import unittest
from time import perf_counter_ns
import neo
import numpy as np
import quantities as pq
from elephant.conversion import BinnedSpikeTrain
from elephant.spike_train_correlation import correlation_coefficient
from elephant.spike_train_generation import homogeneous_poisson_process
from elephant.statistics import mean_firing_rate, isi
from online_statistics import OnlineMeanFiringRate, OnlineInterSpikeInterval, \
OnlinePearsonCorrelationCoefficient
class TestOnlineMeanFiringRate(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.rtol = 1e-15
cls.atol = 1e-15
cls.num_neurons = 10
cls.num_buffers = 100
cls.buff_size = 1 # in sec
def test_correctness_multiple_neurons(self):
"""Test, if results of online and normal mean firing rate function
are identical for multiple neurons."""
# create list of spiketrain lists
list_of_st_lists = [[homogeneous_poisson_process(
50*pq.Hz, t_start=self.buff_size*i*pq.s,
t_stop=(self.buff_size*i+self.buff_size)*pq.s)
for i in range(self.num_buffers)] for _ in range(self.num_neurons)]
# simulate buffered reading/transport of spiketrains,
# i.e. loop over spiketrain list and call calculate_mfr()
# for neo.Spiketrain input
omfr_all_neurons_neo = [OnlineMeanFiringRate()
for _ in range(self.num_neurons)]
tic1_neo = perf_counter_ns()
for j, st_list in enumerate(list_of_st_lists):
for st in st_list:
omfr_all_neurons_neo[j].update_mfr(spike_buffer=st)
final_online_mfr_neo = [omfr_all_neurons_neo[j].current_mfr
for j in range(self.num_neurons)]
toc1_neo = perf_counter_ns()
# for numpy.ndarray input
omfr_all_neurons_np = [OnlineMeanFiringRate()
for _ in range(self.num_neurons)]
tic1_np = perf_counter_ns()
for j, st_list in enumerate(list_of_st_lists):
for st in st_list:
omfr_all_neurons_np[j].update_mfr(spike_buffer=st.magnitude)
final_online_mfr_np = [omfr_all_neurons_np[j].current_mfr
for j in range(self.num_neurons)]
toc1_np = perf_counter_ns()
# concatenate each list of spiketrains to one spiketrain per neuron
# and call 'offline' mean_firing_rate()
t_start_first_st = list_of_st_lists[0][0].t_start
t_stop_last_st = list_of_st_lists[0][-1].t_stop
concatenated_st = [neo.SpikeTrain(
np.concatenate([np.asarray(st) for st in st_list])*pq.s,
t_start=t_start_first_st, t_stop=t_stop_last_st)
for st_list in list_of_st_lists]
tic2 = perf_counter_ns()
normal_mfr = [mean_firing_rate(concatenated_st[j]).rescale(pq.Hz)
for j in range(self.num_neurons)]
toc2 = perf_counter_ns()
# compare results of normal mfr and online mfr
with self.subTest(msg="neo.Spiketrain input"):
print(f"neo.Spiketrain input\n"
f"online_mfr: | run-time: {(toc1_neo-tic1_neo)*1e-9}sec\n"
f"normal_mfr: | run-time: {(toc2-tic2)*1e-9}sec\n"
f"(t_online_neo/t_normal)={(toc1_neo-tic1_neo)/(toc2-tic2)}")
for j in range(self.num_neurons):
np.testing.assert_allclose(
final_online_mfr_neo[j].magnitude, normal_mfr[j].magnitude,
rtol=self.rtol, atol=self.atol)
with self.subTest(msg="numpy.ndarray input"):
print(f"numpy.ndarray input\n"
f"online_mfr: | run-time: {(toc1_np-tic1_np)*1e-9}sec\n"
f"normal_mfr: | run-time: {(toc2-tic2)*1e-9}sec\n"
f"(t_online_np/t_normal)={(toc1_np-tic1_np)/(toc2-tic2)}")
for j in range(self.num_neurons):
np.testing.assert_allclose(
final_online_mfr_np[j].magnitude, normal_mfr[j].magnitude,
rtol=self.rtol, atol=self.atol)
class TestOnlineInterSpikeInterval(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.rtol = 1e-15
cls.atol = 1e-15
cls.num_neurons = 10
cls.num_buffers = 100
cls.buff_size = 1 # in sec
def test_correctness_multiple_neurons(self):
"""Test, if results of online and normal inter-spike interval
function are identical for multiple neurons."""
# create list of spiketrain lists
list_of_st_lists = [[homogeneous_poisson_process(
50*pq.Hz, t_start=self.buff_size*i*pq.s,
t_stop=(self.buff_size*i+self.buff_size)*pq.s)
for i in range(self.num_buffers)] for _ in range(self.num_neurons)]
# simulate buffered reading/transport of spiketrains,
# i.e. loop over spiketrain list and call calculate_isi()
# for neo.Spiketrain input
oisi_all_neurons_neo = [OnlineInterSpikeInterval()
for _ in range(self.num_neurons)]
tic1_neo = perf_counter_ns()
for j, st_list in enumerate(list_of_st_lists):
for st in st_list:
oisi_all_neurons_neo[j].update_isi(spike_buffer=st)
final_online_isi_histogram_neo = [oisi_all_neurons_neo[j].current_isi_histogram
for j in range(self.num_neurons)]
toc1_neo = perf_counter_ns()
# for np.ndarray input
oisi_all_neurons_np = [OnlineInterSpikeInterval()
for _ in range(self.num_neurons)]
tic1_np = perf_counter_ns()
for j, st_list in enumerate(list_of_st_lists):
for st in st_list:
oisi_all_neurons_np[j].update_isi(spike_buffer=st.magnitude)
final_online_isi_histogram_np = [oisi_all_neurons_np[j].current_isi_histogram
for j in range(self.num_neurons)]
toc1_np = perf_counter_ns()
# concatenate list of spiketrains to one single spiketrain
# and call 'offline' isi()
t_start_first_st = list_of_st_lists[0][0].t_start
t_stop_last_st = list_of_st_lists[0][-1].t_stop
concatenated_st = [neo.SpikeTrain(
np.concatenate([np.asarray(st) for st in st_list]) * pq.s,
t_start=t_start_first_st, t_stop=t_stop_last_st)
for st_list in list_of_st_lists]
tic2 = perf_counter_ns()
normal_isi = [isi(concatenated_st[j]).magnitude.tolist()
for j in range(self.num_neurons)]
# create histogram of ISI values
bin_size = 0.0005 # in sec
max_isi_value = 1 # in sec
num_bins = int(max_isi_value / bin_size)
bin_edges = np.linspace(start=0, stop=max_isi_value, num=num_bins+1)
normal_isi_histogram = []
for j in range(self.num_neurons):
histogram, _ = np.histogram(normal_isi[j], bins=bin_edges)
normal_isi_histogram.append(histogram)
toc2 = perf_counter_ns()
# compare results of normal isi and online isi
with self.subTest(msg="neo.Spiketrain input"):
print(f"online_isi: | run-time: {(toc1_neo-tic1_neo)*1e-9}sec\n"
f"normal_isi: | run-time: {(toc2-tic2)*1e-9}sec\n"
f"(t_online_neo/t_normal)={(toc1_neo-tic1_neo)/(toc2-tic2)}")
for j in range(self.num_neurons):
np.testing.assert_allclose(
final_online_isi_histogram_neo[j], normal_isi_histogram[j],
rtol=self.rtol, atol=self.atol)
with self.subTest(msg="np.ndarray input"):
print(f"online_isi: | run-time: {(toc1_np-tic1_np)*1e-9}sec\n"
f"normal_isi: | run-time: {(toc2-tic2)*1e-9}sec\n"
f"(t_online_np/t_normal)={(toc1_np-tic1_np)/(toc2-tic2)}")
for j in range(self.num_neurons):
np.testing.assert_allclose(
final_online_isi_histogram_np[j], normal_isi_histogram[j],
rtol=self.rtol, atol=self.atol)
class TestOnlinePearsonCorrelationCoefficient(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.rtol = 1e-15
cls.atol = 1e-15
cls.num_neurons = 10
cls.num_buffers = 100
cls.buff_size = 1 # in sec
def test_correctness_with_two_neurons_and_single_loop(self):
"""Test, if results of online and normal function of
Pearson's correlation coefficient are identical for two neurons and
a single loop / buffer call."""
# create spiketrains
t_length = 1*pq.s
st1 = homogeneous_poisson_process(rate=50*pq.Hz, t_stop=t_length)
st2 = homogeneous_poisson_process(rate=50*pq.Hz, t_stop=t_length)
binned_st = BinnedSpikeTrain([st1, st2], bin_size=5*pq.ms)
# call calculate_pcc()
opcc_neuron_pair1 = OnlinePearsonCorrelationCoefficient()
tic1 = perf_counter_ns()
opcc_neuron_pair1.update_pcc(spike_buffer1=st1, spike_buffer2=st2)
toc1 = perf_counter_ns()
final_online_pcc = opcc_neuron_pair1.R_xy
# call normal correlation_coefficient()
tic2 = perf_counter_ns()
normal_pcc = correlation_coefficient(binned_spiketrain=binned_st)
toc2 = perf_counter_ns()
# compare online and normal pcc results
print(f"online pcc:{final_online_pcc}|run-time:{(toc1-tic1)*1e-9}sec\n"
f"normal pcc:{normal_pcc[0][1]}|run-time:{(toc2-tic2)*1e-9}sec\n"
f"PCC1: (t_online / t_normal)={(toc1-tic1)/(toc2-tic2)}")
np.testing.assert_allclose(final_online_pcc, normal_pcc[0][1],
rtol=self.rtol, atol=self.atol)
def test_correctness_with_two_neurons_and_multiple_loops(self):
"""Test, if results of online and normal function of
Pearson's correlation coefficient are identical for two neurons and
multiple loops / buffer calls."""
# create list of spiketrains
st1_list = [homogeneous_poisson_process(
50*pq.Hz, t_start=self.buff_size*i*pq.s,
t_stop=(self.buff_size*i+self.buff_size)*pq.s)
for i in range(self.num_buffers)]
st2_list = [homogeneous_poisson_process(
50*pq.Hz, t_start=self.buff_size*i*pq.s,
t_stop=(self.buff_size*i+self.buff_size)*pq.s)
for i in range(self.num_buffers)]
# simulate buffered reading/transport of spiketrains,
# i.e. loop over binned spiketrain list and call calculate_pcc()
opcc_neuron_pair1 = OnlinePearsonCorrelationCoefficient()
tic1 = perf_counter_ns()
for i in range(self.num_buffers):
opcc_neuron_pair1.update_pcc(spike_buffer1=st1_list[i],
spike_buffer2=st2_list[i])
final_online_pcc = opcc_neuron_pair1.R_xy
toc1 = perf_counter_ns()
# concatenate each list of spiketrains to one spiketrain
t_start_first_st1 = st1_list[0].t_start
t_stop_last_st1 = st1_list[-1].t_stop
concatenated_st1 = neo.SpikeTrain(
np.concatenate([np.asarray(st) for st in st1_list])*pq.s,
t_start=t_start_first_st1, t_stop=t_stop_last_st1)
t_start_first_st2 = st2_list[0].t_start
t_stop_last_st2 = st2_list[-1].t_stop
concatenated_st2 = neo.SpikeTrain(
np.concatenate([np.asarray(st) for st in st2_list]) * pq.s,
t_start=t_start_first_st2, t_stop=t_stop_last_st2)
# then create one BinnedSpikeTrain of the concatenated spiketrains
concatenated_binned_st = BinnedSpikeTrain(
[concatenated_st1, concatenated_st2], bin_size=5 * pq.ms)
# call normal correlation_coefficient()
tic2 = perf_counter_ns()
normal_pcc = correlation_coefficient(
binned_spiketrain=concatenated_binned_st)
toc2 = perf_counter_ns()
# compare online and normal pcc results
print(f"online pcc:{final_online_pcc}|run-time:{(toc1-tic1)*1e-9}sec\n"
f"normal pcc:{normal_pcc[0][1]}|run-time:{(toc2-tic2)*1e-9}sec\n"
f"PCC2: (t_online / t_normal)={(toc1-tic1)/(toc2-tic2)}\n")
np.testing.assert_allclose(final_online_pcc, normal_pcc[0][1],
rtol=self.rtol, atol=self.atol)
if __name__ == '__main__':
unittest.main()