-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathhinawa-bebob-plug-parser
executable file
·135 lines (113 loc) · 4.26 KB
/
hinawa-bebob-plug-parser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2018 Takashi Sakamoto
import gi
gi.require_version('GLib', '2.0')
gi.require_version('Hinawa', '4.0')
from gi.repository import GLib, Hinawa
from hinawa_utils.bebob.plug_parser import PlugParser
from sys import argv, exit
from pathlib import Path
from threading import Thread
import json
from pprint import PrettyPrinter
def print_help(reason=""):
if len(reason) > 0:
print(reason)
print('')
print('Usage:')
print(' {} CDEV MODE'.format(argv[0]))
print('')
print(' where:')
print(' CDEV: the path to special file for Linux FireWire character device (e.g. /dev/fw[0-9]+)')
print(' MODE: dump mode (0: id-only, 1: whole as json, 2: pretty print)')
def dump_plug_info_to_stdio_as_json(fcp):
unit_plugs = PlugParser.parse_unit_plugs(fcp)
subunit_plugs = PlugParser.parse_subunit_plugs(fcp)
function_block_plugs = PlugParser.parse_function_block_plugs(fcp, subunit_plugs)
stream_formats = PlugParser.parse_stream_formats(fcp, unit_plugs)
info = {
'units': unit_plugs,
'subunits': subunit_plugs,
'function-blocks': function_block_plugs,
'stream-formats': stream_formats,
}
print(json.dumps(info))
def dump_plug_info_to_stdio_as_ids_only(fcp):
unit_plugs = PlugParser.parse_unit_plugs(fcp)
subunit_plugs = PlugParser.parse_subunit_plugs(fcp)
function_block_plugs = PlugParser.parse_function_block_plugs(fcp, subunit_plugs)
stream_formats = PlugParser.parse_stream_formats(fcp, unit_plugs)
for type, dir_plugs in unit_plugs.items():
for dir, plugs in dir_plugs.items():
for num, plug in plugs.items():
print(type, dir, num, plug['name'])
for type, dir_plugs in subunit_plugs.items():
for id, id_plugs in dir_plugs.items():
for dir, dir_plugs in id_plugs.items():
for plug_id, attrs in dir_plugs.items():
print(type, id, dir, plug_id, attrs['name'])
for type, type_plugs in function_block_plugs.items():
for id, id_plugs in type_plugs.items():
for fb_type, fb_type_plugs in id_plugs.items():
for fb_id, fb_id_plugs in fb_type_plugs.items():
for plug_id, attrs in fb_id_plugs['outputs'].items():
print(type, id, fb_type, fb_id, 'output', plug_id,
attrs['name'])
for plug_id, attrs in fb_id_plugs['inputs'].items():
print(type, id, fb_type, fb_id, 'input', plug_id,
attrs['name'])
for type, dir_fmts in stream_formats.items():
for dir, fmts in dir_fmts.items():
for i, entries in fmts.items():
for j, entry in enumerate(entries):
print(type, dir, i, j, entry['type'])
def dump_plug_info_to_stdio_by_pprinter(fcp):
unit_plugs = PlugParser.parse_unit_plugs(fcp)
subunit_plugs = PlugParser.parse_subunit_plugs(fcp)
function_block_plugs = PlugParser.parse_function_block_plugs(fcp, subunit_plugs)
stream_formats = PlugParser.parse_stream_formats(fcp, unit_plugs)
pp = PrettyPrinter()
pp.pprint(unit_plugs)
pp.pprint(subunit_plugs)
pp.pprint(function_block_plugs)
pp.pprint(stream_formats)
ops = {
'0': dump_plug_info_to_stdio_as_ids_only,
'1': dump_plug_info_to_stdio_as_json,
'2': dump_plug_info_to_stdio_by_pprinter,
}
if len(argv) < 3:
print_help("Missing arguments")
exit(1)
path = Path(argv[1])
if not path.exists():
print_help("{} is not found.¥n".format(path))
exit(1)
elif not path.is_char_device():
print_help("{} is not special file for character device.".format(path))
exit(1)
fullpath = str(path)
mode = argv[2]
if mode not in ops:
print_help('Invalid mode')
exit(1)
op = ops[mode]
node = Hinawa.FwNode.new()
fcp = Hinawa.FwFcp.new()
ctx = GLib.MainContext.new()
dispatcher = GLib.MainLoop.new(ctx, False)
try:
node.open(fullpath, 0)
_ = fcp.bind(node)
_, src = node.create_source()
src.attach(ctx)
th = Thread(target=lambda d: d.run(), args=(dispatcher,))
th.start()
op(fcp)
except Exception as e:
print(e)
finally:
fcp.unbind()
dispatcher.quit()
th.join()