-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #21 from wearable-motion-capture/summary_paper
Summary paper
- Loading branch information
Showing
53 changed files
with
1,725 additions
and
1,464 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
global-include *.json *.xml *.pt *.pkl | ||
global-include *.json *.xml *.pt *.pkl SW-v3.8-model-436400 | ||
|
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import atexit | ||
import logging | ||
import queue | ||
import signal | ||
import threading | ||
import wear_mocap_ape.config as config | ||
|
||
from wear_mocap_ape.stream.publisher.watch_only_udp import WatchOnlyNnUDP | ||
from wear_mocap_ape.stream.listener.imu import ImuListener | ||
from wear_mocap_ape.data_types import messaging | ||
|
||
import argparse | ||
|
||
|
||
def run_watch_only_nn_udp(ip, smooth, stream_mc): | ||
# listener and publisher run in separate threads. Listener fills the queue, publisher empties it | ||
q = queue.Queue() | ||
|
||
# the listener fills the que with received and parsed smartwatch data | ||
imu_l = ImuListener( | ||
ip=ip, | ||
msg_size=messaging.watch_only_imu_msg_len, | ||
port=config.PORT_LISTEN_WATCH_IMU_LEFT # the smartwatch app sends on this PORT | ||
) | ||
imu_w_l = threading.Thread( | ||
target=imu_l.listen, | ||
args=(q,) | ||
) | ||
|
||
# the publisher estimates arm poses from queued data and publishes them via UDP to given IP and PORT | ||
joints_p = WatchOnlyNnUDP( | ||
ip=ip, | ||
port=config.PORT_PUB_LEFT_ARM, | ||
smooth=smooth, | ||
stream_mc=stream_mc | ||
) | ||
udp_publisher = threading.Thread( | ||
target=joints_p.processing_loop, | ||
args=(q,) | ||
) | ||
|
||
imu_w_l.start() | ||
udp_publisher.start() | ||
|
||
def terminate_all(*args): | ||
imu_l.terminate() | ||
joints_p.terminate() | ||
|
||
# make sure all handler exit on termination | ||
atexit.register(terminate_all) | ||
signal.signal(signal.SIGTERM, terminate_all) | ||
signal.signal(signal.SIGINT, terminate_all) | ||
return joints_p | ||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
# Instantiate the parser | ||
parser = argparse.ArgumentParser(description='streams data from the watch in standalone mode') | ||
|
||
# Required IP argument | ||
parser.add_argument('ip', type=str, | ||
help=f'put your local IP here. ' | ||
f'The script will publish arm ' | ||
f'pose data on PORT {config.PORT_PUB_LEFT_ARM}') | ||
args = parser.parse_args() | ||
|
||
run_watch_only_nn_udp(ip=args.ip, smooth=5, stream_mc=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import argparse | ||
import atexit | ||
import logging | ||
import queue | ||
import signal | ||
import threading | ||
|
||
from wear_mocap_ape import config | ||
from wear_mocap_ape.data_deploy.nn import deploy_models | ||
from wear_mocap_ape.data_types import messaging | ||
from wear_mocap_ape.stream.listener.imu import ImuListener | ||
from wear_mocap_ape.stream.publisher.watch_phone_pocket_nn_udp import WatchPhonePocketNnUDP | ||
|
||
|
||
def run_watch_phone_pocket_nn_udp(ip: str, smooth: int, stream_mc: bool) -> WatchPhonePocketNnUDP: | ||
# data for left-hand mode | ||
q = queue.Queue() | ||
|
||
# listen for imu data from phone and watch | ||
imu_listener = ImuListener( | ||
ip=ip, | ||
msg_size=messaging.watch_phone_imu_msg_len, | ||
port=config.PORT_LISTEN_WATCH_PHONE_IMU_LEFT | ||
) | ||
imu_thread = threading.Thread( | ||
target=imu_listener.listen, | ||
args=(q,) | ||
) | ||
|
||
# process into arm pose and body orientation | ||
estimator = WatchPhonePocketNnUDP(ip=ip, | ||
port=config.PORT_PUB_LEFT_ARM, | ||
smooth=smooth, | ||
model_hash=deploy_models.LSTM.WATCH_PHONE_POCKET.value, | ||
stream_mc=stream_mc, | ||
mc_samples=60) | ||
udp_thread = threading.Thread( | ||
target=estimator.processing_loop, | ||
args=(q,) | ||
) | ||
|
||
imu_thread.start() | ||
udp_thread.start() | ||
|
||
def terminate_all(*args): | ||
imu_listener.terminate() | ||
estimator.terminate() | ||
|
||
# make sure all handler exit on termination | ||
atexit.register(terminate_all) | ||
signal.signal(signal.SIGTERM, terminate_all) | ||
signal.signal(signal.SIGINT, terminate_all) | ||
|
||
return estimator | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Parse arguments | ||
parser = argparse.ArgumentParser(description='') | ||
|
||
# Required IP argument | ||
parser.add_argument('ip', type=str, help=f'put your local IP here.') | ||
parser.add_argument('smooth', nargs='?', type=int, default=5, help=f'smooth predicted trajectories') | ||
parser.add_argument('--stream_mc', action='store_true') | ||
parser.add_argument('--no-stream_mc', dest='stream_mc', action='store_false') | ||
parser.set_defaults(stream_mc=True) | ||
|
||
args = parser.parse_args() | ||
|
||
ip_arg = args.ip | ||
smooth_arg = args.smooth | ||
stream_mc_arg = args.stream_mc | ||
|
||
# run the predictions | ||
run_watch_phone_pocket_nn_udp(ip_arg, smooth_arg, stream_mc_arg) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import argparse | ||
import atexit | ||
import logging | ||
import queue | ||
import signal | ||
import threading | ||
|
||
import wear_mocap_ape.config as config | ||
from wear_mocap_ape.stream.listener.imu import ImuListener | ||
from wear_mocap_ape.data_types import messaging | ||
from wear_mocap_ape.stream.publisher.watch_phone_uarm_udp import WatchPhoneUarmUDP | ||
|
||
|
||
def run_watch_phone_uarm_udp(ip, smooth): | ||
# data processing happens in independent threads. | ||
# We exchange data via queues. | ||
left_q = queue.Queue() # data for left-hand mode | ||
|
||
# left listener | ||
imu_l = ImuListener( | ||
ip=ip, | ||
msg_size=messaging.watch_phone_imu_msg_len, | ||
port=config.PORT_LISTEN_WATCH_PHONE_IMU_LEFT, | ||
) | ||
l_thread = threading.Thread( | ||
target=imu_l.listen, | ||
args=(left_q,) | ||
) | ||
|
||
# left publisher | ||
wp2ul = WatchPhoneUarmUDP( | ||
ip=ip, | ||
port=config.PORT_PUB_LEFT_ARM, | ||
smooth=smooth | ||
) | ||
ul_thread = threading.Thread( | ||
target=wp2ul.processing_loop, | ||
args=(left_q,) | ||
) | ||
l_thread.start() | ||
ul_thread.start() | ||
|
||
def terminate_all(*args): | ||
imu_l.terminate() | ||
wp2ul.terminate() | ||
|
||
# make sure all handler exit on termination | ||
atexit.register(terminate_all) | ||
signal.signal(signal.SIGTERM, terminate_all) | ||
signal.signal(signal.SIGINT, terminate_all) | ||
|
||
return wp2ul | ||
|
||
|
||
if __name__ == "__main__": | ||
# enable basic logging | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Instantiate the parser | ||
parser = argparse.ArgumentParser(description='streams data from the watch in standalone mode') | ||
|
||
# Required IP argument | ||
parser.add_argument('ip', type=str, | ||
help=f'put your local IP here. ' | ||
f'The script will publish arm ' | ||
f'pose data on PORT {config.PORT_PUB_LEFT_ARM} (left hand)') | ||
parser.add_argument('smooth', nargs='?', type=int, default=5, help=f'smooth predicted trajectories') | ||
args = parser.parse_args() | ||
|
||
run_watch_phone_uarm_udp(ip=args.ip, smooth=args.smooth) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import argparse | ||
import atexit | ||
import logging | ||
import queue | ||
import signal | ||
import threading | ||
|
||
import wear_mocap_ape.config as config | ||
from wear_mocap_ape.stream.listener.imu import ImuListener | ||
from wear_mocap_ape.stream.publisher.watch_phone_uarm_nn_udp import WatchPhoneUarmNnUDP | ||
from wear_mocap_ape.data_types import messaging | ||
|
||
|
||
def run_watch_phone_uarm_nn_udp(ip, smooth, stream_mc): | ||
# data processing happens in independent threads. | ||
# We exchange data via queues. | ||
left_q = queue.Queue() # data for left-hand mode | ||
|
||
# left listener | ||
imu_l = ImuListener( | ||
ip=ip, | ||
msg_size=messaging.watch_phone_imu_msg_len, | ||
port=config.PORT_LISTEN_WATCH_PHONE_IMU_LEFT, | ||
) | ||
l_thread = threading.Thread( | ||
target=imu_l.listen, | ||
args=(left_q,) | ||
) | ||
|
||
# left publisher | ||
wp2ul = WatchPhoneUarmNnUDP( | ||
ip=ip, | ||
port=config.PORT_PUB_LEFT_ARM, | ||
smooth=smooth, | ||
stream_mc=stream_mc, | ||
) | ||
ul_thread = threading.Thread( | ||
target=wp2ul.processing_loop, | ||
args=(left_q,) | ||
) | ||
l_thread.start() | ||
ul_thread.start() | ||
|
||
def terminate_all(*args): | ||
imu_l.terminate() | ||
wp2ul.terminate() | ||
|
||
# make sure all handler exit on termination | ||
atexit.register(terminate_all) | ||
signal.signal(signal.SIGTERM, terminate_all) | ||
signal.signal(signal.SIGINT, terminate_all) | ||
|
||
return wp2ul | ||
|
||
|
||
if __name__ == "__main__": | ||
# enable basic logging | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Instantiate the parser | ||
parser = argparse.ArgumentParser(description='streams data from the watch in standalone mode') | ||
|
||
# Required IP argument | ||
parser.add_argument('ip', type=str, | ||
help=f'put your local IP here. ' | ||
f'The script will publish arm ' | ||
f'pose data on PORT {config.PORT_PUB_LEFT_ARM} (left hand)') | ||
parser.add_argument('smooth', nargs='?', type=int, default=5, help=f'smooth predicted trajectories') | ||
args = parser.parse_args() | ||
|
||
run_watch_phone_uarm_nn_udp(ip=args.ip, smooth=args.smooth, stream_mc=False) |
Oops, something went wrong.