Skip to content
Takai Eddine Kennouche edited this page Dec 12, 2019 · 5 revisions

Table of Contents

  1. What about the Clock?
    1. Discrete Event-Driven Simulation
      1. The problem with simpy
    2. Realtime Event-Driven node with asyncio
      1. asyncio and latency
  2. What about RPC?
    1. RPyC objects latency
  3. Dynamic Method Insertion

What about the Clock?

Discrete Event-Driven Simulation

The idea of building an event-driven framework, in addition to the desire to run in both simulation and realtime The initial design envisioned relying on simpy as a core simulation and realtime clock provider. Problems rose with simpy (and fundamentally any similar discrete-event simulator)

The problem with simpy

Current implementation of RealtimeEnvironment.step() sleeps for delta = real_time - time() to sync with python's monotonic clock. The issue with this implementation is in the case where the simpy interacts with the outside world, and events are scheduled in RealtimeEnvironment as consequence of these interactions. More specifically is when RealtimeEnvironment.run() for long enough, and a separate Process/Thread listens for some system activity (network packets) based on which events with be queue in simpy. The following code is an example:

import simpy
from concurrent.futures import ThreadPoolExecutor as Executor
executor = Executor(max_workers=1)

env = simpy.rt.RealtimeEnvironment(strict=True)

def event1(env):
    yield env.timeout(1)
    print("event1")

def event2(env):
    yield env.timeout(2)
    print("event2")

def event3(env):
    yield env.timeout(3)
    print("event3")

env.process(event1(env))
executor.submit(env.run, 5)
env.process(event2(env))
env.process(event3(env))

The output would be:

event1

event2 and event3 will never be executed since the moment BaseEnvironment's until event is hit and simpy goes for a 5s sleep, there is no way to avoid calling StopSimulation.callback One way to fix this, is somehow interrupt the realtime sleep loop whenever we are no longer sleeping for the head of the events queue. This could be achieved for example by continuously checking if the current event for which we are sleeping in the while loop, is still at the head of the queue, while also sleeping for as briefly as possible with sleep(). This will lead, though, to a close to a busy loop situation (using sleep(1e-6)) and will produce too much cpu load. A better way could be to use threading.Event clear/wait/set to implement interruptible sleep.

  • Another issue is that of the thread-safety/non-safety of simpy's internals. When calling schedule, increased eid counter and peeking, especially since heapq isn't thread-safe. With that said, the snippet bellow shows how to replace heapq with Python's thread-safe PriorityQueue.
from queue import PriorityQueue as Queue, Empty
...
def __init__(self, initial_time=0):
...
    self._queue = Queue()  # The list of all currently scheduled events.
...

def now(self):
    """The current simulation time."""
    with self._queue.mutex:
        now = self._now
    return now
    
def schedule(self, event, priority=NORMAL, delay=0):
    """Schedule an *event* with a given *priority* and a *delay*."""
    with self._queue.mutex:
        eid = next(self._eid)
    self._queue.put((self._now + delay, priority, eid, event))

def peek(self):
    with self._queue.mutex:
        try:
            return self._queue.queue[0][0]
        except IndexError:
            return Infinity

     def step(self):
        """Process the next event.
        """
        try:
            self._now, _, _, event = self._queue.get(block=False)
        except (IndexError, Empty):
            raise EmptySchedule()
        # Process callbacks of the event. Set the events callbacks to None
        # immediately to prevent concurrent modifications.
        callbacks, event.callbacks = event.callbacks, None

Yet, still, these issues and their fixes would force the maintenance of custom patched simpy version, a not so desirable situation.

Another alternative, that is still hacky but still would rely on the official simpy code base, using the simulated time Environment and handling realtime sleep and wakeup outside of simpy as shown in the snippet bellow.

import simpy, queue, threading, time

Infinity = float('inf')

def event1(env):
    yield env.timeout(1)
    print("event1")

def event2(env):
    yield env.timeout(2)
    print("event2")

def event3(env):
    yield env.timeout(3)
    print("event3")


def simthread(tasks):
    env = simpy.Environment(time.time())

    while True:
        delay = env.peek() - time.time()
        if delay <= 0:
            env.step()
            continue

        try:
            func = tasks.get(timeout=None if delay == Infinity else delay)
        except queue.Empty:
            continue

        # Use an falsey function object as exit code.
        if not func:
            return
        func(env)

tasks = queue.Queue()
thread = threading.Thread(target=simthread, args=(tasks,))
thread.start()
tasks.put(lambda env: env.process(event1(env)))
tasks.put(lambda env: env.process(event2(env)))
tasks.put(lambda env: env.process(event3(env)))
time.sleep(3)
# Signal simulation thread to exit.
tasks.put(None)
thread.join()

And to conclude, the final issue is the queue explosion effect, when the simulation produces so many events and timeouts that the tasks queue will keep growing forever, since even after canceling and events it will still stay in the Queue.

The ultimate solution is to move away from simpy.

Realtime Event-Driven node with asyncio

Oh asyncio! it took sometime but here we are. Asyncio proved to be an ideal alternative to memory-effeciently implement our event-driven node.

asyncio does a lot of stuff. The core functions that our scheduling mechanism relies on are:

  1. asyncio.sleep(delay): With this a coroutine (event) can asynchronously sleep in rt without blocking other coroutines, thus bypassing the uninterruptible sleep we faced with simpy.
  2. loop.call_soon: This will attempt immediate invocation of the provided action, the action that is defining what the scheduled event is doing.
  3. loop.run_coroutine_threadsafe: (1) and (2) will generate a coroutine object that is thread-safely inserted in a asyncio loop for handling. This will return a concurrent.futures.Future, that we can use to cancel the event as long as it is still not in processing or processed.

asyncio and latency

ASGriDS design and current implementation relies heavily on the assumption that node's operations are as asynchronous as possible, and that their internal events loops can execute scheduled events as fast as possible. asyncio's default event loop implementation seems sufficient in simple local deployment cases with modern computer hardware, and situations where the simulation doesn't generate a huge number of events. But when the simulation is distributed on low resource hw (such as Raspi) over a network, with huge number of events, asyncio event loop shows some latency in processing events, that lead to noticeable drifts in time between nodes.

In general this behavior is expected, since the nodes have no explicit time synchronization mechanism, but only implicitly assume they are all keeping up with their realtime clocks (that are supposed synchronized with universal time), given that they can process ASAP. But then, asyncio was chosed with the hope that it provide a stable efficient backend.

Fortunately, uvloop is drop-in replacement for asyncio's default event loop, that provides much faster implementation that proved, in our case, to be capable of greatly reducing events processing latency. Bellow is snippet that shows how simple it is to deploy:

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

What about RPC?

A straightforward way to implemented a central point of simulation control and deployment on remote nodes, is using a Remote Procedure Calling approach. For this we relied on RPyC to provide a pythonic implementation, easy to use API, and most importantly seamless object proxying that allows us to deploy objects remotely and interact with them as if they were local.

Immediately after implementing our design on top of RPyC, problems appeared.

RPyC objects latency

The nature of RPC and object proxying means that local interaction with proxys of remote objects, need to generate traffic to synchronize object state and preserve consistency between local mirrors and actual objects.

This also means that a remote object that contains handles to local ones, will need to communicate in order to get actual data or needed functionality from these objects.

When used blindly, this means RPyC wrapped objects in our event-driven and callback-based code will trigger a complex chain of dependency between local data/objects and remote ones, with so much traffic that the whole functioning of the simulation will be slowed down, and simulation nodes will lose sync and consistency.

Fortunately RPyC provides the following functions, that put together can be a remedy to this dilemma:

  1. teleport_function: "“Teleports” a function (including nested functions/closures) over the RPyC connection. The function is passed in bytecode form and reconstructed on the other side."
  2. deliver: "delivers (recreates) a local object on the other party. the object is pickled locally and unpickled on the remote side, thus moved by value. changes made to the remote object will not reflect locally."
  3. execute: allows execute arbitrary statements on the remote RPyC server container the remote node.

So basically, one can implemented a certain behavior (function) locally, deploy remotely with teleport_function, upload the necessary data if needed with deliver, then hook the function and data on the remote node with execute. The remote object will not have any handles to local data/objects and will not need and network communication to execute its function. If this is done on a pre-simulation phase, it will eliminates any latency.

Dynamic Method Insertion

This is not implemented. But it could be an interesting functionality to have. It basically means the ability to dynamically interact with single objects, by adding methods, replacing existing ones...etc without modifying the underlying class implementation. All this at runtime.

A usecase would be for example, modifying deployed nodes, changing their behavior that could be a production/consumption profile, control algorithm...etc on the fly either manually or programatically, wether the nodes are deployed locally or remotely with RPyC.

Implementing this relies on the MethodType function in python's types module.