-
Notifications
You must be signed in to change notification settings - Fork 3
/
fastdtw.py
70 lines (55 loc) · 2.17 KB
/
fastdtw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import defaultdict
from six.moves import xrange
def fastdtw(x, y, radius=1, dist=lambda a, b: abs(a - b)):
min_time_size = radius + 2
if len(x) < min_time_size or len(y) < min_time_size:
return dtw(x, y, dist=dist)
x_shrinked = __reduce_by_half(x)
y_shrinked = __reduce_by_half(y)
distance, path = fastdtw(x_shrinked, y_shrinked, radius=radius, dist=dist)
window = __expand_window(path, len(x), len(y), radius)
return dtw(x, y, window, dist=dist)
def dtw(x, y, window=None, dist=lambda a, b: abs(a - b)):
len_x, len_y = len(x), len(y)
if window is None:
window = [(i, j) for i in xrange(len_x) for j in xrange(len_y)]
window = ((i + 1, j + 1) for i, j in window)
D = defaultdict(lambda: (float('inf'),))
D[0, 0] = (0, 0, 0)
for i, j in window:
dt = dist(x[i-1], y[j-1])
D[i, j] = min((D[i-1, j][0]+dt, i-1, j), (D[i, j-1][0]+dt, i, j-1), (D[i-1, j-1][0]+dt, i-1, j-1), key=lambda a: a[0])
path = []
i, j = len_x, len_y
while not (i == j == 0):
path.append((i-1, j-1))
i, j = D[i, j][1], D[i, j][2]
path.reverse()
return (D[len_x, len_y][0], path)
def __reduce_by_half(x):
return [(x[i//2] + x[1+i//2]) / 2 for i in xrange(0, len(x), 2)]
def __expand_window(path, len_x, len_y, radius):
path_ = set(path)
for i, j in path:
for a, b in ((i + a, j + b) for a in xrange(-radius, radius+1) for b in xrange(-radius, radius+1)):
path_.add((a, b))
window_ = set()
for i, j in path_:
for a, b in ((i * 2, j * 2), (i * 2, j * 2 + 1), (i * 2 + 1, j * 2), (i * 2 + 1, j * 2 + 1)):
window_.add((a, b))
window = []
start_j = 0
for i in xrange(0, len_x):
new_start_j = None
for j in xrange(start_j, len_y):
if (i, j) in window_:
window.append((i, j))
if new_start_j is None:
new_start_j = j
elif new_start_j is not None:
break
start_j = new_start_j
return window