-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncMultiProcessing.py
144 lines (115 loc) · 5.15 KB
/
AsyncMultiProcessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import importlib
import os
import pathlib
import aioconsole
from lib.Process import Process
from typing import Union
class AsyncMultiProcessing:
def __init__(self, **kwargs) -> None:
self._processes: dict[str, Process, str] = {}
self.modules_path: Union[str, pathlib.Path]
self.modules_path = kwargs.get('modules_path')
if self.modules_path is None:
raise ValueError('modules_path must be specified.')
asyncio.run(self.load_processes()) # Converts Python files with classes to 'Processes' in a dictionary.
asyncio.run(self.run())
async def load_processes(self) -> None:
"""
How you want to load processes is only your decision. This is an overridable method where you can convert
Python files with classes to asyncio processes in a dictionary.
I would recommend to use 'import_module' from 'importlib' to load modules.
This is the quickest and cleanest way dynamically.
:return: dict[str, lib.Process]
"""
for entry in os.scandir(self.modules_path):
if entry.is_file() and entry.name.endswith('.py') and entry.name != '__init__.py':
split_entry = os.path.splitext(entry.name)
module_name = split_entry[0]
if module_name is None:
continue
process = self.python_file_to_process(module_name, self.modules_path)
await self.add_process(process, module_name)
return None
def python_file_to_process(self, python_file_path: str, package: str = None) -> Process:
"""
Converts a Python file with classes to a lib.Process object.
:param package: Module
:param python_file_path: path to Python file with classes
:return: lib.Process
"""
module_full = os.path.splitext(python_file_path)
if len(module_full) < 2 or module_full[1] == '.py':
raise TypeError('python_file_path must have a Python extension.')
module_name = module_full[0]
module_pythonic_path = os.path.join(self.modules_path, module_name).replace(os.sep, '.')
imported_module = importlib.import_module(module_pythonic_path, package)
module = getattr(imported_module, module_name)
return Process(target=module)
async def add_process(self, process_object: Process, process_name: str) -> None:
"""
Adds a lib.Process object to the dictionary.
:param process_name:
:param process_object: object to add to the dictionary
:return: None if successful
"""
if not isinstance(process_object, Process):
raise TypeError('process_object must be a lib.Process object.')
if process_name in self._processes:
raise ValueError('{} is already in self._processes'.format(process_name))
self._processes[process_name] = process_object
return None
@property
def processes(self) -> dict[str, Process]:
"""
Returns the processes dictionary.
:return: dict[str, lib.Process]
"""
return self._processes
def get_process(self, process_name: str) -> Process:
"""
Gets a lib.Process object from the dictionary.
:param process_name: name of process.
:return: lib.Process
"""
return self._processes[process_name]
async def start_process(self, process_name: str) -> None:
"""
Starts a lib.Process object.
:param process_name: has to be the name of an object you want to start
:return: None
"""
process = self._processes[process_name]
process.start()
process.join(timeout=1)
async def terminate_proc_add(self, process_name: str) -> None:
"""
Terminates a lib.Process object and adds it back to the dictionary as a new process.
:param process_name: Has to be the name of an object you want to terminate.
:return: None
"""
process = self._processes[process_name]
process.terminate()
self._processes[process_name] = Process(target=process.target_class)
return None
async def run(self) -> None:
"""
This method is overridable. But you must return None as the result will be ignored.
:return: None
"""
while True:
command_input = await aioconsole.ainput('Enter command: ')
if not command_input:
continue
command_arguments = command_input.split()
command_name = command_arguments[0]
command_excess_arguments = command_arguments[1:]
command_excess_name = command_excess_arguments[0]
if command_name == 'enable':
await self.start_process(command_excess_name)
elif command_name == 'disable':
await self.terminate_proc_add(command_excess_name)
else:
print('Unknown command.')
if __name__ == '__main__':
AsyncMultiProcessing(modules_path='')