-
Notifications
You must be signed in to change notification settings - Fork 2
/
pipe_reader.py
40 lines (32 loc) · 1.06 KB
/
pipe_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
import sys
import torch
import numpy as np
import mpc
read_path = "/tmp/pipe_sim2mpc"
write_path = "/tmp/pipe_mpc2sim"
state_cur = torch.zeros(8, dtype=torch.float32)
actions_last = torch.zeros(3, dtype=torch.float32)
actions_cur = torch.zeros(3, dtype=torch.float32)
while True:
with open(read_path, "rb") as read_fifo:
data = read_fifo.read()
if len(data) == 0:
continue
state = torch.tensor(np.frombuffer(data, dtype=np.float32))
state_cur[:-2] = state
try:
actions_cur = mpc.optimize_next_frame(state_cur, actions_last, 0.1, 3)
print("state_cur")
print(state_cur)
print("actions_cur")
print(actions_cur)
print("\n\n")
except Exception as e:
actions_cur[0] = 0.0
actions_cur[1] = 0.0
actions_cur[2] = 0.0
print(state_cur)
with open(write_path, "wb") as write_fifo:
write_fifo.write(actions_cur.detach().numpy().tobytes())
actions_last = actions_cur