-
Notifications
You must be signed in to change notification settings - Fork 0
Getting started
tommasoviciani edited this page Jun 1, 2020
·
3 revisions
Follow the following steps to correctly apply the framework:
- Establish the job goals of our project and catalog them in an
Enum
class.
We will need them to make certain modules perform actions when a certain job goal occurs in the queue.
class JobGoal(Enum):
GOAL1 = 1
GOAL2 = 2
- Write the classes of our modules which will inherit the
Module
class. In this case, we will write two modules.
class Module1(Module):
# The pattern object must always be present in our modules because it stores our decorated methods
mp = Module.Pattern()
# The tag is a string that identifies a specific module for distributing jobs
tag = 'm1'
def __init__(self):
# Call the constructor of the Module class. As arguments, it accepts the tag of the module that we are going
# to initialize, and the instance of module needed to reach the jobs of the other modules.
# Always put on top of the code
super().__init__(instance=self, tag=self.tag)
# Always make sure to call this method after each call to the Module class constructor.
# Record the module pattern
self.controller.register_pattern(self.mp)
# Some variables of example
self.count = 2
self.recevied_count = 0
@mp.setup()
def setup(self):
"""
The method decorated with @setup() will be executed only once when starting our module. It is used to configure our
variables
"""
print(f'{self.tag}: setup')
@mp.on_incoming_data()
def on_incoming_data(self, job):
"""
The method decorated with @on_incoming_data() will be called when a new job is available in the queue. When
we use this decorator, there must be an input job which is precisely the one obtained from the queue
"""
print(f'{self.tag}: received a new job')
# Call this method to execute the job just obtained from the queue
self.controller.run_job(job)
@mp.timer(name='timer', interval=5)
def timer(self):
"""
The method decorated with @timer() is a subprocess which will be executed cyclically. There can be multiple timers in
a module and each of them needs a name to distinguish one from the other and a time interval expressed in seconds
"""
print(f'{self.tag}[timer]: send job')
self.controller.send_job(
Job(data={'x': 'module1'}, goal=JobGoal.GOAL1, producer=self.tag, recipient='m2'))
@mp.main_loop(interval=1)
def main_loop(self):
"""
The method decorated with @main_loop() is the subprocess that continues to run indefinitely until the module process
is killed. It is performed cyclically and is the last decorated method that is called
"""
print(f'{self.tag}: count = {self.count + self.recevied_count}')
self.count += self.recevied_count
class Module2(Module):
# The pattern object must always be present in our modules because it stores our decorated methods
mp = Module.Pattern()
# The tag is a string that identifies a specific module for distributing jobs
tag = 'm2'
def __init__(self):
# Call the constructor of the Module class. As arguments, it accepts the tag of the module that we are going
# to initialize, and the instance of module needed to reach the jobs of the other modules.
# Always put on top of the code
super().__init__(instance=self, tag=self.tag)
# Always make sure to call this method after each call to the Module class constructor.
# Record the module pattern
self.controller.register_pattern(self.mp)
# Some variables of example
self.count = 0
@mp.setup()
def setup(self):
"""
The method decorated with @setup() will be executed only once when starting our module. It is used to configure our
variables
"""
print(f'{self.tag}: setup')
@mp.on_incoming_data()
def on_incoming_data(self, job):
"""
The method decorated with @on_incoming_data() will be called when a new job is available in the queue. When
we use this decorator, there must be an input job which is precisely the one obtained from the queue
"""
# Call this method to execute the job just obtained from the queue
self.controller.run_job(job)
@mp.timer(name='timer', interval=8)
def timer(self):
"""
The method decorated with @timer() is a subprocess which will be executed cyclically. There can be multiple timers in
a module and each of them needs a name to distinguish one from the other and a time interval expressed in seconds
"""
print(f'{self.tag}[timer]: send job')
self.controller.send_job(
Job(data={'x': 2}, goal=JobGoal.GOAL2, producer=self.tag, recipient='m1'))
- Establish goal solvers for both modules. The method decorated with
@solve()
defines how the module should behave when a given job with the usualJobGoal
passed as input to the decorator is executed by therun_job()
method of thecontroller
. The decorated method should, therefore, accept this job as an argument
Module1
@mp.solve(JobGoal.GOAL2)
def solve_goal1(self, job):
print(f'{self.tag}: Received {job.data} from {job.producer}')
self.recevied_count = job.data['x']
Module2
@mp.solve(JobGoal.GOAL1)
def solve_goal2(self, job):
print(f'{self.tag}: Received {job.data} from {job.producer}')
- Finally, write the class of our MCU that will have to assign the jobs that the modules are sent. To do this we need two methods decorated with
@on_receiver()
and@assinging_job()
class MyMcu(Mcu):
# The pattern object must always be present in our MCU because it stores our decorated methods
mp = Mcu.Pattern()
tag = 'mcu'
def __init__(self):
# Call to the constructor of the MCU class. As arguments, it accepts the name of the MCU that we are going
# to initialize. Always put on top of the code
super().__init__(instance=self, tag=self.tag)
# Always make sure to call this method after each call to the MCU class constructor.
# Record the MCU pattern
self.controller.register_pattern(self.mp)
@mp.on_receiver()
def on_receiver(self, job):
"""
The method decorated with @on_receiver() as a receiver has as input the job sent by the sending module
"""
if job is None:
return
self.shared_queue.put(job)
@mp.assigning_job()
def assigning_job(self, job):
"""
The method decorated with @assinging_job() is called when a job shows up in the queue. In a nutshell, this decorated
method is used to distribute shared queue jobs to modules
"""
# Here the recipient module is obtained to which the job in question must be assigned
module_recipient_result = self.controller.get_recipient_module(lambda module_recipient: module_recipient.tag == job.recipient)
module_recipient_result.queue.put(job) if module_recipient_result else print(
f'{self.tag}: the {job.target} destination of the job {job.goal} is unreachable')
- We have to register the modules in the MCU by calling the
register_modules()
method. To start the whole application call the method from the instance of MCUstart()
. Let's add the following content to the bottom of the code
if __name__ == '__main__':
print('Start')
mcu = MyMcu()
mcu.register_modules([Module1(), Module2()])
mcu.start()