-
Notifications
You must be signed in to change notification settings - Fork 3
/
dataset.py
139 lines (117 loc) · 4.84 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os;
import numpy as np;
import random;
import utils as U;
from tensorflow import keras;
class Generator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, samples, labels, options, train=True):
'Initialization'
self.data = [(samples[i], labels[i]) for i in range (0, len(samples))];
self.opt = options;
self.train = train;
self.batch_size = options.batchSize if train else options.batchSize // options.nCrops;
self.mix = (options.BC and train);
self.preprocess_funcs = self.preprocess_setup();
#self.__getitem__(3);
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.data) / self.batch_size));
#return len(self.samples);
def __getitem__(self, batchIndex):
'Generate one batch of data'
batchX, batchY = self.generate_batch(batchIndex);
batchX = np.expand_dims(batchX, axis=1)
batchX = np.expand_dims(batchX, axis=3)
#print(batchX.shape);
#exit();
return batchX, batchY
def generate_batch(self, batchIndex):
'Generates data containing batch_size samples'
sounds = [];
labels = [];
indexes = None;
for i in range(self.batch_size):
# Generate indexes of the batch
if self.mix: # Training phase of BC learning
# Select two training examples
while True:
sound1, label1 = self.data[random.randint(0, len(self.data) - 1)]
sound2, label2 = self.data[random.randint(0, len(self.data) - 1)]
if label1 != label2:
break
sound1 = self.preprocess(sound1)
sound2 = self.preprocess(sound2)
# Mix two examples
r = np.array(random.random())
sound = U.mix(sound1, sound2, r, self.opt.fs).astype(np.float32)
eye = np.eye(self.opt.nClasses)
label = (eye[label1] * r + eye[label2] * (1 - r)).astype(np.float32)
else: # Training phase of standard learning or testing phase
#print(batchIndex);
if indexes == None:
indexes = self.data[batchIndex*self.batch_size:(batchIndex+1)*self.batch_size];
else:
if i >= len(indexes):
break;
sound, target = indexes[i];
sound = self.preprocess(sound).astype(np.float32)
#label = (np.eye(self.opt.nClasses))[int(label)]
label = np.zeros((self.opt.nCrops, self.opt.nClasses));
label[:,target] = 1;
if self.train and self.opt.strongAugment:
sound = U.random_gain(6)(sound).astype(np.float32)
sounds.append(sound);
labels.append(label);
sounds = np.asarray(sounds);
labels = np.asarray(labels);
if not self.train:
sounds = sounds.reshape(sounds.shape[0]*sounds.shape[1], sounds.shape[2]);
labels = labels.reshape(labels.shape[0]*labels.shape[1], labels.shape[2]);
return sounds, labels;
'''
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
'''
def preprocess_setup(self):
if self.train:
funcs = []
if self.opt.strongAugment:
funcs += [U.random_scale(1.25)]
funcs += [U.padding(self.opt.inputLength // 2),
U.random_crop(self.opt.inputLength),
U.normalize(32768.0),
]
else:
funcs = [U.padding(self.opt.inputLength // 2),
U.normalize(32768.0),
U.multi_crop(self.opt.inputLength, self.opt.nCrops),
]
return funcs
def preprocess(self, sound):
for f in self.preprocess_funcs:
sound = f(sound)
return sound;
def setup(opt, split):
dataset = np.load(os.path.join(opt.data, opt.dataset, 'wav{}.npz'.format(opt.fs // 1000)))
# Split to train and val
train_sounds = []
train_labels = []
val_sounds = []
val_labels = []
for i in range(1, opt.nFolds + 1):
sounds = dataset['fold{}'.format(i)].item()['sounds']
labels = dataset['fold{}'.format(i)].item()['labels']
if i == split:
val_sounds.extend(sounds)
val_labels.extend(labels)
else:
train_sounds.extend(sounds)
train_labels.extend(labels)
# Iterator setup
train_data = Generator(train_sounds, train_labels, opt, train=True)
val_data = Generator(val_sounds, val_labels, opt, train=False)
return train_data, val_data