-
Notifications
You must be signed in to change notification settings - Fork 0
/
dynamic_predictor.py
113 lines (88 loc) · 3.5 KB
/
dynamic_predictor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import math
from utils import calculateNewCoords
class DynamicPredictor:
def __init__(self):
self.speed_buffer = []
self.direction_buffer = []
self.time_stamps = []
self.speed_grads = []
self.direction_grads = []
self.avg_speed_grad = 0
self.avg_direction_grad = 0
self.mem_size = 10
self.max_speed = 0
self.max_direction = 0
# intention vector calculated by dynamic regression
self.intention_angle = 0
self.alpha = 0.1
self.beta = 0.05
self.p_lr = 0.1
def sigmoid(self, val):
return (1 / (1 + math.exp(-val)))
def learn(self, pred_angle):
if len(self.direction_buffer) < 2:
return
if math.fabs(self.direction_buffer[-1] - self.intention_angle) < math.fabs(self.direction_buffer[-1] - pred_angle):
self.beta += self.p_lr
else:
self.beta -= self.p_lr
if self.beta > 1:
self.beta = 1
if self.beta < 0:
self.beta = 0
def eval_grads(self):
if len(self.time_stamps) < 2:
return
for i in range(len(self.time_stamps) - 1):
self.avg_speed_grad += (self.speed_buffer[i+1] - self.speed_buffer[i])
self.avg_direction_grad = (self.direction_buffer[i+1] - self.direction_buffer[i])
self.avg_speed_grad /= (self.time_stamps[-1] - self.time_stamps[0])
self.avg_direction_grad /= (self.time_stamps[-1] - self.time_stamps[0])
def update_intention(self):
if len(self.time_stamps) == 0:
return
self.intention_angle = self.alpha * self.direction_buffer[-1] + (1 - self.alpha) * self.intention_angle
def predict(self, time_elapsed):
if len(self.time_stamps) < 2:
return
self.eval_grads()
# returns new speed and direction
predicted_direction = self.direction_buffer[-1] + (self.avg_direction_grad * time_elapsed)
predicted_speed = self.speed_buffer[-1] + (self.avg_speed_grad * time_elapsed)
if predicted_direction > self.max_direction:
predicted_direction = self.max_direction
if predicted_speed > self.max_speed:
predicted_speed = self.max_speed
npd = self.beta * self.intention_angle + (1 - self.beta) * predicted_direction
return predicted_speed, npd
def feed(self, speed, direction, time_stamp):
print(speed,direction,time_stamp)
if speed > self.max_speed:
self.max_speed = speed
if direction > self.max_direction:
self.max_direction = direction
self.time_stamps.append(time_stamp)
self.speed_buffer.append(speed)
self.direction_buffer.append(direction)
if len(self.time_stamps) > self.mem_size:
del self.time_stamps[0]
del self.speed_buffer[0]
del self.direction_buffer[0]
self.update_intention()
if len(self.time_stamps) > 2:
_, pd = self.predict(time_stamp - self.time_stamps[-1])
self.learn(pd)
def predict_coords(self, coords, t):
ps, pd = self.predict(t)
return calculateNewCoords(coords, ps, pd, t)
if __name__ == '__main__':
dypre = DynamicPredictor()
dypre.feed(7, 1, 1)
dypre.feed(7, 2, 2)
dypre.feed(7, 3, 3)
dypre.feed(8, 4, 4)
dypre.feed(9, 5, 5)
dypre.feed(8, 6, 10)
dypre.feed(7, 10, 15)
print(dypre.predict(1))
print(dypre.predict(4))