From 79eee888ccc79cc5bb3c309d4535b0bf6e7c4da6 Mon Sep 17 00:00:00 2001 From: "christopher.tubbs" Date: Fri, 30 Aug 2024 13:21:31 -0500 Subject: [PATCH] Merged with the master branch, implemented suggestions such as the removal of an unneeded RLock, added more messaging and error handling, removed the now unncessary TimedOccurenceWatcher, removed unneeded logging configurations that caused extra logging, added handling to the evaluation service logger to ensure that logged messages don't propagate to the root logger, wrote documentation for the runner, and updated the runner to use redis streams instead of redis pubsub. --- .../lib/core/dmod/core/common/collection.py | 165 ------- python/lib/core/dmod/core/context/base.py | 1 + python/lib/core/dmod/core/context/manager.py | 121 +++-- python/lib/core/dmod/core/context/monitor.py | 25 +- python/lib/core/dmod/core/context/scope.py | 1 + .../dmod/evaluations/backends/file.py | 1 - .../dmod/evaluations/backends/network.py | 2 - python/services/evaluationservice/README.md | 2 +- .../dmod/evaluationservice/runner.md | 115 +++++ .../dmod/evaluationservice/runner.py | 424 ++++++------------ .../service/service_logging.py | 6 +- .../evaluationservice/utilities/__init__.py | 1 - .../utilities/communication.py | 8 +- 13 files changed, 372 insertions(+), 500 deletions(-) create mode 100644 python/services/evaluationservice/dmod/evaluationservice/runner.md diff --git a/python/lib/core/dmod/core/common/collection.py b/python/lib/core/dmod/core/common/collection.py index 4c990a460..e841315b3 100644 --- a/python/lib/core/dmod/core/common/collection.py +++ b/python/lib/core/dmod/core/common/collection.py @@ -238,171 +238,6 @@ def __contains__(self, obj: object) -> bool: return obj in self.__data -class _OccurrenceTracker(typing.Generic[_T]): - """ - Keeps track of occurrences of a type of value that have been encountered within a duration - """ - def __init__(self, key: _T, duration: timedelta, threshold: int, on_filled: typing.Callable[[_T], typing.Any]): - self.__key = key - self.__duration = duration - self.__threshold = threshold - self.__occurences: typing.List[datetime] = [] - self.__on_filled = on_filled - - def value_encountered(self): - """ - Inform the tracker that the value has been encountered again - """ - self.update_occurrences() - self.__occurences.append(datetime.now()) - if len(self.__occurences) >= self.__threshold: - self.__on_filled(self.__key) - - def update_occurrences(self) -> int: - """ - Update the list of occurrences to include only those within the current duration - - Returns: - The number of occurrences still being tracked - """ - cutoff: datetime = datetime.now() - self.__duration - self.__occurences = [ - occurrence - for occurrence in self.__occurences - if occurrence > cutoff - ] - return len(self.__occurences) - - @property - def key(self): - """ - The identifier that is being tracked - """ - return self.__key - - def __len__(self): - return len(self.__occurences) - - def __str__(self): - if len(self.__occurences) == 0: - occurrences_details = f"No Occurences within the last {self.__duration.total_seconds()} seconds." - else: - occurrences_details = (f"{len(self.__occurences)} occurrences since " - f"{self.__occurences[0].strftime('%Y-%m-%d %H:%M:%S')}") - return f"{self.key}: {occurrences_details}" - - -class TimedOccurrenceWatcher: - """ - Keeps track of the amount of occurrences of items within a range of time - """ - MINIMUM_TRACKING_SECONDS: typing.Final[float] = 0.1 - """ - The lowest number of seconds to watch for multiple occurrences. Only acting when multiple occurrences are tracked - in under 100ms would create a scenario where the watcher will most likely never trigger an action, rendering - this the wrong tool for the job. - """ - - @staticmethod - def default_key_function(obj: object) -> type: - """ - The function used to find a common identifier for an object if one is not provided - """ - return type(obj) - - def __init__( - self, - duration: timedelta, - threshold: int, - on_filled: typing.Callable[[_T], typing.Any], - key_function: typing.Callable[[_VT], _KT] = None - ): - if not isinstance(duration, timedelta): - raise ValueError(f"Cannot create a {self.__class__.__name__} - {duration} is not a timedelta object") - - if duration.total_seconds() < self.MINIMUM_TRACKING_SECONDS: - raise ValueError( - f"Cannot create a {self.__class__.__name__} - the duration is too short ({duration.total_seconds()}s)" - ) - - self.__duration = duration - - if not isinstance(key_function, typing.Callable): - key_function = self.default_key_function - - self.__key_function = key_function - self.__entries: typing.Dict[uuid.UUID, _OccurrenceTracker] = {} - self.__threshold = threshold - self.__on_filled = on_filled - - def value_encountered(self, value: _T): - """ - Add an occurrence of an object to track - - Args: - value: The item to track - """ - self.__update_trackers() - self._get_tracker(value).value_encountered() - - def _get_tracker(self, value: _T) -> _OccurrenceTracker[_T]: - """ - Get an occurrence tracker for the given value - - Args: - value: The value to track - - Returns: - A tracker for the value - """ - key = self.__key_function(value) - - for tracker in self.__entries.values(): - if tracker.key == key: - return tracker - - new_tracker = _OccurrenceTracker( - key=key, - duration=self.__duration, - threshold=self.__threshold, - on_filled=self.__on_filled - ) - self.__entries[uuid.uuid1()] = new_tracker - return new_tracker - - def __update_trackers(self): - """ - Update the amount of items in each tracker - - If a tracker becomes empty it will be removed - """ - for tracker_id, tracker in self.__entries.items(): - amount_left = tracker.update_occurrences() - if amount_left == 0: - del self.__entries[tracker_id] - - @property - def size(self) -> int: - """ - The number of items encountered within the duration - """ - self.__update_trackers() - return sum(len(tracker) for tracker in self.__entries.values()) - - @property - def duration(self) -> timedelta: - """ - The amount of time to track items for - """ - return self.__duration - - def __str__(self): - return f"{self.__class__.__name__}: {self.size} items within the last {self.duration.total_seconds()} Seconds" - - def __repr__(self): - return self.__str__() - - class EventfulMap(abc.ABC, typing.MutableMapping[_KT, _VT], typing.Generic[_KT, _VT]): @abc.abstractmethod def get_handlers(self) -> typing.Dict[CollectionEvent, typing.MutableSequence[typing.Callable]]: diff --git a/python/lib/core/dmod/core/context/base.py b/python/lib/core/dmod/core/context/base.py index e8c86379a..96b6df877 100644 --- a/python/lib/core/dmod/core/context/base.py +++ b/python/lib/core/dmod/core/context/base.py @@ -169,6 +169,7 @@ def end_scope(self): """ Override to add extra logic for when this scope is supposed to reach its end """ + self.logger.warning(f"Ending scope '{self.__scope_id}' for: {self}") self.drop_references() self.__scope_closed() diff --git a/python/lib/core/dmod/core/context/manager.py b/python/lib/core/dmod/core/context/manager.py index 52a4d4818..8945bc295 100644 --- a/python/lib/core/dmod/core/context/manager.py +++ b/python/lib/core/dmod/core/context/manager.py @@ -11,7 +11,6 @@ from multiprocessing import managers from multiprocessing import context -from multiprocessing import RLock from .base import ObjectManagerScope from .base import T @@ -23,8 +22,6 @@ TypeOfRemoteObject = typing.Union[typing.Type[managers.BaseProxy], type] """A wrapper object that is used to communicate to objects created by Managers""" -_PREPARATION_LOCK: RLock = RLock() - class DMODObjectManager(managers.BaseManager): """ @@ -132,45 +129,46 @@ def prepare( additional_proxy_types: A mapping between class types and the type of proxies used to operate upon them remotely """ - with _PREPARATION_LOCK: - if not cls.__initialized: - if not isinstance(additional_proxy_types, typing.Mapping): - additional_proxy_types = {} - - already_registered_items: typing.List[str] = list(getattr(cls, "_registry").keys()) - - for real_class, proxy_class in additional_proxy_types.items(): - name = real_class.__name__ if hasattr(real_class, "__name__") else None - - if name is None: - raise TypeError(f"Cannot add a proxy for {real_class} - {real_class} is not a standard type") - - if name in already_registered_items: - print(f"'{name}' is already registered to {cls.__name__}") - continue - - cls.register_class(class_type=real_class, type_of_proxy=proxy_class) - already_registered_items.append(name) - - # Now find all proxies attached to the SyncManager and attach those - # This will ensure that this manager has proxies for objects and structures like dictionaries - registry_initialization_arguments = ( - { - "typeid": typeid, - "callable": attributes[0], - "exposed": attributes[1], - "method_to_typeid": attributes[2], - "proxytype": attributes[3] - } - for typeid, attributes in getattr(managers.SyncManager, "_registry").items() - if typeid not in already_registered_items - ) + if not cls.__initialized: + if not isinstance(additional_proxy_types, typing.Mapping): + additional_proxy_types = {} + + already_registered_items: typing.List[str] = list(getattr(cls, "_registry").keys()) + + for real_class, proxy_class in additional_proxy_types.items(): + name = real_class.__name__ if hasattr(real_class, "__name__") else None + + if name is None: + raise TypeError(f"Cannot add a proxy for {real_class} - {real_class} is not a standard type") + + if name in already_registered_items: + print(f"'{name}' is already registered to {cls.__name__}") + continue + + cls.register_class(class_type=real_class, type_of_proxy=proxy_class) + already_registered_items.append(name) + + # Now find all proxies attached to the SyncManager and attach those + # This will ensure that this manager has proxies for objects and structures like dictionaries + registry_initialization_arguments = ( + { + "typeid": typeid, + "callable": attributes[0], + "exposed": attributes[1], + "method_to_typeid": attributes[2], + "proxytype": attributes[3] + } + for typeid, attributes in getattr(managers.SyncManager, "_registry").items() + if typeid not in already_registered_items + ) - for arguments in registry_initialization_arguments: - cls.register(**arguments) + for arguments in registry_initialization_arguments: + cls.register(**arguments) + else: + logging.warning(f"The '{cls.__name__}' cannot be prepared - it has already been done so.") - cls.__initialized = True - return cls + cls.__initialized = True + return cls def create_and_track_object(self, __class_name: str, __scope_name: str, /, *args, **kwargs) -> T: """ @@ -197,6 +195,10 @@ def create_and_track_object(self, __class_name: str, __scope_name: str, /, *args ) if __scope_name not in self.__scopes: + self.logger.warning( + f"Cannot track a '{__class_name}' object in the '{__scope_name}' " + f"scope if it has not been established. Establishing it now." + ) self.establish_scope(__scope_name) new_instance = self.create_object(__class_name, *args, **kwargs) @@ -228,12 +230,13 @@ def create_object(self, __class_name, /, *args, **kwargs) -> T: return value - def free(self, scope_name: str): + def free(self, scope_name: str, fail_on_missing_scope: bool = True): """ Remove all items associated with a given tracking key from the object manager Args: scope_name: The key used to keep track of like items + fail_on_missing_scope: Throw an exception if the scope doesn't exist Returns: The number of items that were deleted @@ -247,9 +250,13 @@ def free(self, scope_name: str): f"Received '{scope_name}' ({type(scope_name)}" ) - if scope_name not in self.__scopes: + if scope_name not in self.__scopes and fail_on_missing_scope: raise KeyError(f"Cannot free objects from {self} - no items are tracked by the key {scope_name}") + if scope_name not in self.__scopes: + self.logger.warning(f"Cannot remove scope by the name '{scope_name}' - it is not currently stored") + return + del self.__scopes[scope_name] def __inject_scope(self, scope: ObjectManagerScope): @@ -259,6 +266,9 @@ def __inject_scope(self, scope: ObjectManagerScope): Args: scope: The scope object to add """ + if scope is None: + raise ValueError(f"A scope for {self.__class__.__name__} cannot be added - it is None") + if scope.name in self.__scopes: raise KeyError( f"Cannot add a scope object '{scope.name}' to {self} - there is already a scope by that name. " @@ -297,7 +307,19 @@ def monitor_operation(self, scope: typing.Union[ObjectManagerScope, str, bytes], scope: A scope object containing references to shared objects that need to be kept alive operation: The operation using the shared objects """ + if self.__monitor_scope and not self.__scope_monitor: + if self.__scope_monitor is None: + self.__scope_monitor = FutureMonitor(logger=self.__logger) + self.__scope_monitor.start() + if not self.__monitor_scope or not self.__scope_monitor: + if not self.__monitor_scope and not self.__scope_monitor: + logging.error("This logger was not set to monitor scopes and it does not have a monitoring thread") + elif not self.__monitor_scope: + logging.error("This logger was told not to monitor scope") + else: + logging.error("There is no thread available to monitor scopes") + if isinstance(scope, ObjectManagerScope): scope_name = scope.name elif isinstance(scope, bytes): @@ -339,6 +361,9 @@ def monitor_operation(self, scope: typing.Union[ObjectManagerScope, str, bytes], @property def logger(self) -> LoggerProtocol: + """ + The logger made specifically for this manager + """ return self.__logger @logger.setter @@ -360,3 +385,15 @@ def logger(self, logger: LoggerProtocol): # the logger on the monitor is kept up to speed. if self.__scope_monitor: self.__scope_monitor.logger = logger + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.__scope_monitor: + self.__scope_monitor.stop() + + if self.__monitor_scope: + self.__scope_monitor.kill() + + for scope_name, scope in self.__scopes.items(): + scope.end_scope() + + super().__exit__(exc_type, exc_val, exc_tb) diff --git a/python/lib/core/dmod/core/context/monitor.py b/python/lib/core/dmod/core/context/monitor.py index 4af4190b3..29aaf7550 100644 --- a/python/lib/core/dmod/core/context/monitor.py +++ b/python/lib/core/dmod/core/context/monitor.py @@ -105,6 +105,8 @@ def __init__( poll_interval: typing.Union[float, timedelta] = None, logger: LoggerProtocol = None ): + logging.info(f"Creating a '{self.__class__.__name__}' instance") + if not timeout: timeout = DEFAULT_MONITOR_TIMEOUT elif isinstance(timeout, timedelta): @@ -171,6 +173,9 @@ def logger(self, logger: LoggerProtocol) -> None: @property def size(self) -> int: + """ + The number of items being monitored + """ return self.__size def __len__(self): @@ -195,6 +200,7 @@ def _monitor(self) -> bool: """ monitoring_succeeded: bool = True + self.logger.info(f"{self}: Beginning to monitor") while self.__should_be_monitoring: try: # Block to check if the loop should be exitted based on a current kill state @@ -278,9 +284,17 @@ def _monitor(self) -> bool: # Wait a little bit before polling again to allow for work to continue sleep(self.__poll_interval) + self.logger.info(f"No longer monitoring within {self}") self.__cleanup() return monitoring_succeeded + @property + def running(self) -> bool: + """ + Whether the monitor is running + """ + return self.__thread is not None and self.__thread.is_alive() + def find_scope(self, future_result: futures.Future) -> typing.Optional[ObjectManagerScope]: """ Find the scope that belongs with the given output @@ -414,11 +428,14 @@ def __cleanup(self): with self.__lock: while not self._queue.empty(): try: - entry = self._queue.get() + entry = self._queue.get(timeout=self._timeout) if isinstance(entry, futures.Future) and entry.running(): entry.cancel() except queue.Empty: pass + except TimeoutError: + pass + self._queue = queue.Queue() self.__size = 0 self.__scopes.clear() @@ -428,3 +445,9 @@ def __str__(self): def __repr__(self): return self.__str__() + + def __bool__(self): + return self.running + + def __del__(self): + self.__cleanup() diff --git a/python/lib/core/dmod/core/context/scope.py b/python/lib/core/dmod/core/context/scope.py index bc30dffad..13d7b7d95 100644 --- a/python/lib/core/dmod/core/context/scope.py +++ b/python/lib/core/dmod/core/context/scope.py @@ -20,6 +20,7 @@ class DMODObjectManagerScope(ObjectManagerScope): """ def __init__(self, name: str, object_manager: DMODObjectManager): super().__init__(name) + self.logger.info(f"Creating scope '{self.scope_id}' for '{object_manager}'") self.__object_manager: DMODObjectManager = object_manager self._perform_on_close(partial(self.__object_manager.free, self.scope_id)) diff --git a/python/lib/evaluations/dmod/evaluations/backends/file.py b/python/lib/evaluations/dmod/evaluations/backends/file.py index c8c08d3a2..f9ca6391c 100644 --- a/python/lib/evaluations/dmod/evaluations/backends/file.py +++ b/python/lib/evaluations/dmod/evaluations/backends/file.py @@ -21,7 +21,6 @@ from .. import util from .. import specification -util.configure_logging() RE_PATTERN = re.compile(r"(\{.+\}|\[.+\]|\(.+\)|(?"` allows you to only read new messages (ignoring all previous messages), and offering an id will +let you read every message that comes _after_ that id. `xread` will give you a message and `xreadgroup` will take +the message and reserve it for a single consumer within a consumer group. Only one consumer may hold a message +within a group at any given time. Multiple groups may read and claim messages at the same time. `xclaim` allows one +consumer in a group to claim ownership of a message from another consumer within the group and `xack` tells the group +that it is done processing the message, releases ownership, and the group will not read the message again. This +allows for workstealing and prevents the overprocessing of a single message. `xdel` deletes the message from the +queue altogether. Calling `xdel` is a good way of keeping _any_ further processing of a message and is a good thing +to do when operating a work queue. Once requirements for work have been fulfilled, the record may be removed to +ensure that it is not attempted again. + +Sets of actions generally correlate to groups. If something is responsible for controlling a light switch, a group +may be responsible for the incoming messages. Consumers within these groups generally correlate to actors that may +perform the actions granted to the group. If there are 4 consumers within a single group, Consumer C might be able +to claim ownership of message X within the group and consumers A, B, and D will not. If message Y comes through, +consumers A, B, or D may be able to claim that message for themselves and perform work while consumer C consumes +message X. This helps coordinate work across computational nodes. If consumers A, B, C, and D all work on different +nodes, one consumer may claim work and consume its own resources without disrupting the others. If more work +needs to be done simultaneously, more consumers may be added for horizontal scaling. + +Messages are caught in the main thread, evaluations are run in child processes, and there is a second thread that +monitors running processes to determine when shared scope should be destroyed and when messages should be removed +from the stream. The `concurrent.futures` interface is used to track running evaluations, so the monitoring thread +is able to poll the evaluations its track to see if processing within the child process has concluded. + +#### How does this differ from PubSub? + +The runner was originally implemented by subscribing to a Redis PubSub channel. A PubSub channel is a single stream +that clients may publish data to and other clients may subscribe to. Subscribers may 'listen' to the channel and +each subscriber will get the message in real time. If a subscriber misses a message, they will miss it and cannot +read it later. Imagine a PubSub channel is a TV channel. One or more entities may broadcast video through it and the +audience may do with what they receive as they like. The audience does not quite have the ability to respond to the +broadcaster unless they themselves become a broadcaster and the original broadcasters become audience members. + +PubSub works great for real time dissemination of data, but if used to coordinate work, like the runner previously, +_all_ subscribers will attempt to perform the same work. If there are four runners on four machines, each will +attempt to run the same evaluation on their own machine because they have no way of knowing that the others are +performing the same work. + +#### The Worker + +The worker, found in `worker.py`, is the entity that _actually_ calls `dmod.evaluations` in order to perform +evaluation duties, _not the runner._ The runner will collect information from messages and call the runner in a +child process with the received parameters. + +The worker may be called via the command line to perform evaluations manually. If there is a need to script out +evaluations directly to `dmod.evaluations` and bypass the service entirely, the worker contains an excellent example +of how to do so. + +#### How does the runner listen? + +`runner:listen` is called from `runner:main` with instructions on how to connect to redis and the limit of how many +jobs may be run at the same time. + +`runner:listen`, called the listener from now on, will create a multiprocessed event to use a signal to stop +listening and spawn a thread to poll a queue of actively running `Future`s that are evaluating data. Then listener +creates a consumer for the group performing listening duties, but create the group if it is not already present. +A counter used to track errors will be created. This tracker will identify faults and count the times that they occur. +Errors are identified by where in the code base that they are thrown. If the same error from the same locations are +encountered too many times in a short period the listener will exit since this indicates a core problem of the +application. The listener continues to listen for the same reason web servers continue to function after encountering a +500+ error. A portion causes an error but it isn't clear if it should throughly halt all operations or not. It make be +due to a user request (such as a bad configuration) or it may be a freak accident that is never encountered again. + +A while loop is then entered that will run until the stop signal is set. Within the loop, a generator is created that +will set itself up to read and deserialize messages it manages to grab from the redis stream. Other runners that may +be running in tandem may be able to claim messages first and prevent an instance of the runner from claiming its own +until another message is added to the stream. This is intended behavior. It allows multiple runners to run at once +on multiple machines without interfering with one another. + +Each message read via the generator, which will block until a message is received, is fed into a function to interpret +the collected message. As of writing (August 30th, 2024), there are three things that may be interpreted from the +messages: + +1. Launch an evaluation +2. Stop Listening +3. Ignore the message + +##### Launch an Evaluation + +If a message comes through stating that an evaluation should be run, the given parameters are stored in a `JobRecord` +with information on where the message came from, what generated it, what the evaluation is, and a reference to the +process running the evaluation. That is passed back to the main loop where it is stored in a queue the the monitoring +thread will poll. + +##### Stop Listening + +If a message comes through stating that the purpose of the message is to close, kill, or terminate the application, +the stop signal is set and nothing is returned from the interpretation function. Since the stop signal is set, +the loop will then end. The message will be acknowledged via `xack` so that the group won't read it again. + +##### Ignore the Message + +If a message comes through that the listener doesn't know what to do with, it is logged, and the message is +acknowledged via `xack`. Maybe there is another group that reads from the same stream that may interact with said +message or maybe there's a consumer attached to a process in the same group that may handle it. The configuration +that will cause such messages to pass through are not ideal, but they will not interupt the performance of _this_ +operation. \ No newline at end of file diff --git a/python/services/evaluationservice/dmod/evaluationservice/runner.py b/python/services/evaluationservice/dmod/evaluationservice/runner.py index 8452ae7bc..6b00e5f3a 100755 --- a/python/services/evaluationservice/dmod/evaluationservice/runner.py +++ b/python/services/evaluationservice/dmod/evaluationservice/runner.py @@ -16,14 +16,13 @@ import threading from argparse import ArgumentParser -from multiprocessing.pool import ApplyResult import redis from concurrent import futures -from datetime import timedelta from functools import partial +import utilities from dmod.metrics import CommunicatorGroup from dmod.core.context import DMODObjectManager @@ -31,11 +30,26 @@ import service import worker +from service.service_logging import get_logger from utilities.common import ErrorCounter from utilities import streams EXCEPTION_LIMIT: typing.Final[int] = 10 +""" +The maximum number of a specific type of error to catch before exiting. If an error occurs 11 times in rapid +succession and the limit is 10, the runner should stop. If it only occurs 9 times it could be the result of something +that this has no control over and may remain functional. +""" + +SUCCESSFUL_EXIT: typing.Final[int] = 0 +"""The exit code for a successful operation. `os.EX_OK` would be ideal, but that approach isn't portable""" + +ERROR_EXIT: typing.Final[int] = 1 +""" +The exit code when the runner halts because of an error. 1 is used since that is generally associated with the catch +all error code. +""" def get_concurrency_executor_type(**kwargs) -> typing.Callable[[], futures.Executor]: @@ -301,20 +315,34 @@ class JobRecord: """The ID of the message with the instructions for the evaluation""" evaluation_id: str """The ID of the evaluation""" - job: ApplyResult + job: futures.Future """The asynchronous results for the job""" - def mark_complete(self, connection: redis.Redis) -> bool: + def mark_complete(self, connection: redis.Redis, object_manager: DMODObjectManager) -> bool: """ Attempt to mark a job as no longer running Args: connection: A redis connection used to mark the record as complete + object_manager: A shared object manager that may hold data for this evaluation Returns: Whether the job was acknowledged as complete """ try: + object_manager.free(self.evaluation_id, fail_on_missing_scope=False) + except KeyError: + service.info(f"There is no scope for '{self.evaluation_id}' in the object manager to close") + except Exception as exception: + service.error( + f"Failed to clear the scope of shared objects in evaluation '{self.evaluation_id}'", + exception=exception + ) + + try: + service.info( + f"{self.stream_name}:{self.group_name}:{self.consumer_name} will now delete message '{self.message_id}' since it has been consumed" + ) confirmation = connection.xdel(self.stream_name, self.message_id) return bool(confirmation) except Exception as exception: @@ -324,21 +352,43 @@ def mark_complete(self, connection: redis.Redis) -> bool: def launch_evaluation( stream_message: streams.StreamMessage, - worker_pool: multiprocessing.Pool -) -> JobRecord: + worker_pool: futures.Executor, + object_manager: DMODObjectManager, +) -> typing.Optional[JobRecord]: """ Launch an evaluation based on a message received through a redis stream Args: stream_message: The message received through a redis stream worker_pool: The pool that handles the creation of other processes + object_manager: A shared object creator and tracker Returns: A record of the evaluation job that was created """ payload = stream_message.payload + evaluation_id = payload.get('evaluation_id') + scope = object_manager.establish_scope(evaluation_id) + + try: + communicators: CommunicatorGroup = utilities.get_communicators( + communicator_id=evaluation_id, + verbosity=payload.get("verbosity"), + object_manager=scope, + host=service.REDIS_HOST, + port=service.REDIS_PORT, + password=service.REDIS_PASSWORD, + include_timestamp=False + ) + service.info(f"Communicators have been created for the evaluation named '{evaluation_id}'") + except Exception as exception: + service.error( + message=f"Could not create communicators for evaluation: {evaluation_id} due to {exception}", + exception=exception + ) + return None - service.info(f"Launching an evaluation for {payload['evaluation_id']}...") + service.info(f"Launching an evaluation for {evaluation_id}...") instructions = payload.get("instructions") if not instructions: @@ -347,19 +397,27 @@ def launch_evaluation( if isinstance(instructions, dict): instructions = json.dumps(instructions, indent=4) - arguments = JobArguments( + arguments = WorkerProcessArguments( evaluation_id=payload['evaluation_id'], instructions=instructions, verbosity=payload.get("verbosity"), - start_delay=payload.get("start_delay") + start_delay=payload.get("start_delay"), + communicators=communicators ) - new_job: ApplyResult = worker_pool.apply_async( - worker.evaluate, - kwds=arguments.kwargs - ) + try: + service.info(f"Submitting the evaluation job for {evaluation_id}...") + evaluation_job: futures.Future = worker_pool.submit( + worker.evaluate, + **arguments.kwargs + ) + except Exception as exception: + service.error(f"Could not launch evaluation {evaluation_id} due to {exception}", exception=exception) + return None + + new_job: futures.Future = worker_pool.submit(worker.evaluate, **arguments.kwargs) - service.info(f"Evaluation for {payload['evaluation_id']} has been launched.") + service.info(f"Evaluation for {evaluation_id} has been launched.") job_record = JobRecord( stream_name=stream_message.stream_name, @@ -370,13 +428,22 @@ def launch_evaluation( job=new_job ) + service.info(f"Preparing to monitor objects for {evaluation_id}...") + try: + object_manager.monitor_operation(evaluation_id, evaluation_job) + service.info(f"Evaluation for {evaluation_id} has been launched.") + except BaseException as exception: + service.error(f"Could not monitor {evaluation_id} due to: {exception}") + traceback.print_exc() + return job_record def interpret_message( stream_message: streams.StreamMessage, - worker_pool: multiprocessing.Pool, - stop_signal: multiprocessing.Event + worker_pool: futures.Executor, + stop_signal: multiprocessing.Event, + object_manager: DMODObjectManager ) -> typing.Optional[JobRecord]: """ Figures out what to do based on a message received through a redis stream @@ -387,6 +454,7 @@ def interpret_message( stream_message: The message received through a redis stream worker_pool: A multiprocessing pool that can be used to start new evaluations stop_signal: The signal used to stop the reading loop + object_manager: A shared object manager Returns: The record of a job if one has been launched @@ -404,11 +472,10 @@ def interpret_message( purpose = stream_message.payload.get("purpose").lower() if purpose == 'launch': - return launch_evaluation(stream_message, worker_pool) + return launch_evaluation(stream_message, worker_pool, object_manager=object_manager) if purpose in ("close", "kill", "terminate"): stop_signal.set() service.info("Exit message received. Closing the runner.") - sys.exit(0) else: service.info( f"Runner => The purpose was not to launch or terminate. Only launching is handled through the runner." @@ -419,8 +486,9 @@ def interpret_message( def monitor_running_jobs( connection: redis.Redis, - active_job_queue: queue.Queue[ApplyResult[JobRecord]], - stop_signal: multiprocessing.Event + active_job_queue: queue.Queue[JobRecord], + stop_signal: multiprocessing.Event, + object_manager: DMODObjectManager ): """ Poll a queue of jobs and close ones that are marked as complete @@ -431,8 +499,9 @@ def monitor_running_jobs( connection: A connection to redis that will be used to acknowledge completed jobs active_job_queue: A queue of jobs to poll stop_signal: A signal used to stop the reading loop + object_manager: An object manager that may hold scope for a running job """ - potentially_complete_job: typing.Optional[ApplyResult] = None + record: typing.Optional[JobRecord] = None encountered_errors = ErrorCounter(limit=EXCEPTION_LIMIT) """ @@ -449,15 +518,13 @@ def monitor_running_jobs( while not stop_signal.is_set(): try: - potentially_complete_job = active_job_queue.get(block=True, timeout=1) + record = active_job_queue.get(block=True, timeout=1) - if not potentially_complete_job.ready(): - active_job_queue.put(potentially_complete_job) + if not record.job.done(): + active_job_queue.put(record) continue - record: JobRecord = potentially_complete_job.get() - - marked_complete = record.mark_complete(connection=connection) + marked_complete = record.mark_complete(connection=connection, object_manager=object_manager) if not marked_complete: service.error( @@ -466,10 +533,10 @@ def monitor_running_jobs( f"message '{record.message_id}', could not be marked as complete" ) - potentially_complete_job = None + record = None except TimeoutError: - if potentially_complete_job: - active_job_queue.put(potentially_complete_job) + if record and not record.job.cancelled(): + active_job_queue.put(record) except queue.Empty: # There are plenty of times when this might be empty and that's fine. In this case we just want pass @@ -477,154 +544,7 @@ def monitor_running_jobs( encountered_errors.add_error(error=exception) service.error(exception) - potentially_complete_job = None - - -def run_job( - launch_message: dict, - worker_pool: futures.Executor, - object_manager: DMODObjectManager -): - """ - Adds the evaluation to the worker pool for background processing - - Args: - launch_message: A dictionary containing data to send to the process running the job - worker_pool: The pool with processes ready to run an evaluation - object_manager: The object manager used to create shared objects - """ - if launch_message['type'] != 'message': - # We exit because this isn't a useful message - return - - launch_parameters = launch_message['data'] - if not isinstance(launch_parameters, dict): - try: - launch_parameters = json.loads(launch_parameters) - except Exception as exception: - service.error("The passed message wasn't JSON") - service.error(launch_parameters, exception) - return - - if 'purpose' not in launch_parameters: - service.info(f"A purpose was not communicated through the {service.EVALUATION_QUEUE_NAME} channel") - return - - purpose = launch_parameters.get("purpose").lower() - - if purpose == 'launch': - evaluation_id = launch_parameters.get('evaluation_id') - scope = object_manager.establish_scope(evaluation_id) - try: - communicators: CommunicatorGroup = utilities.get_communicators( - communicator_id=evaluation_id, - verbosity=launch_parameters.get("verbosity"), - object_manager=scope, - host=service.REDIS_HOST, - port=service.REDIS_PORT, - password=service.REDIS_PASSWORD, - include_timestamp=False - ) - service.debug(f"Communicators have been created for the evaluation named '{evaluation_id}'") - except Exception as exception: - service.error( - message=f"Could not create communicators for evaluation: {evaluation_id} due to {exception}", - exception=exception - ) - return - - service.info(f"Launching an evaluation for {launch_parameters['evaluation_id']}...") - instructions = launch_parameters.get("instructions") - - if isinstance(instructions, dict): - instructions = json.dumps(instructions, indent=4) - - arguments = WorkerProcessArguments( - evaluation_id=launch_parameters['evaluation_id'], - instructions=instructions, - verbosity=launch_parameters.get("verbosity"), - start_delay=launch_parameters.get("start_delay"), - communicators=communicators - ) - - try: - service.debug(f"Submitting the evaluation job for {evaluation_id}...") - evaluation_job: futures.Future = worker_pool.submit( - worker.evaluate, - **arguments.kwargs - ) - except Exception as exception: - service.error(f"Could not launch evaluation {evaluation_id} due to {exception}", exception=exception) - return - - service.debug(f"Preparing to monitor {evaluation_id}...") - try: - object_manager.monitor_operation(evaluation_id, evaluation_job) - service.debug(f"Evaluation for {launch_parameters['evaluation_id']} has been launched.") - except BaseException as exception: - service.error(f"Could not monitor {evaluation_id} due to: {exception}") - traceback.print_exc() - elif purpose in ("close", "kill", "terminate"): - service.info("Exit message received. Closing the runner.") - sys.exit(0) - else: - service.info( - f"Runner => The purpose was not to launch or terminate. Only launching is handled through the runner." - f"{os.linesep}Message: {json.dumps(payload)}" - ) - return None - - -def monitor_running_jobs( - connection: redis.Redis, - active_job_queue: queue.Queue[ApplyResult[JobRecord]], - stop_signal: multiprocessing.Event -): - """ - Poll a queue of jobs and close ones that are marked as complete - - Meant to be run in a separate thread - - Args: - connection: A connection to redis that will be used to acknowledge completed jobs - active_job_queue: A queue of jobs to poll - stop_signal: A signal used to stop the reading loop - """ - potentially_complete_job: typing.Optional[ApplyResult] = None - - monitor_errors = ErrorCounter(limit=EXCEPTION_LIMIT) - - while not stop_signal.is_set(): - try: - potentially_complete_job = active_job_queue.get(block=True, timeout=1) - - if not potentially_complete_job.ready(): - active_job_queue.put(potentially_complete_job) - continue - - record: JobRecord = potentially_complete_job.get() - - marked_complete = record.mark_complete(connection=connection) - - if not marked_complete: - service.error( - f"Evaluation '{record.evaluation_id}', recognized by the '{record.consumer_name}' consumer " - f"within the '{record.group_name}' group on the '{record.stream_name}' stream as coming from " - f"message '{record.message_id}', could not be marked as complete" - ) - - potentially_complete_job = None - except TimeoutError: - if potentially_complete_job: - active_job_queue.put(potentially_complete_job) - except queue.Empty: - # There are plenty of times when this might be empty and that's fine. In this case we just want - pass - except Exception as exception: - monitor_errors.add_error(error=exception) - service.error(exception) - - potentially_complete_job = None + record = None def get_consumer_name() -> str: @@ -645,13 +565,15 @@ def listen( stream_parameters: The means to connect to redis job_limit: The maximum number of jobs that may be run at once """ - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGQUIT, signal_handler) job_limit = job_limit or int(float(os.environ.get("MAXIMUM_RUNNING_JOBS", os.cpu_count()))) + """The maximum number of jobs that may be actively running from this listener""" + stop_signal: multiprocessing.Event = multiprocessing.Event() + """Tells this function and the associated monitoring thread to stop polling""" + active_jobs: queue.Queue[JobRecord] = queue.Queue(maxsize=job_limit) + """A queue of active jobs to poll""" service.info( f"Listening for evaluation jobs on '{stream_parameters.stream_name}' through the " @@ -668,22 +590,31 @@ def listen( redis_connection=connection ) - monitoring_thread = threading.Thread( - target=monitor_running_jobs, - name=f"{service.application_values.APPLICATION_NAME}: Monitor for {consumer_name}", - kwargs={ - "connection": connection, - "active_job_queue": active_jobs, - "stop_signal": stop_signal, - }, - daemon=True - ) - monitoring_thread.start() + monitoring_thread: typing.Optional[threading.Thread] = None error_counter = ErrorCounter(limit=EXCEPTION_LIMIT) + executor_type: typing.Callable[[], futures.Executor] = get_concurrency_executor_type( + max_workers=job_limit + ) + try: - with multiprocessing.Pool(processes=job_limit) as worker_pool: # type: multiprocessing.pool.Pool + with get_object_manager(monitor_scope=True) as object_manager, executor_type() as worker_pool: + object_manager.logger = get_logger() + + monitoring_thread = threading.Thread( + target=monitor_running_jobs, + name=f"{service.application_values.APPLICATION_NAME}: Monitor for {consumer_name}", + kwargs={ + "connection": connection, + "active_job_queue": active_jobs, + "stop_signal": stop_signal, + "object_manager": object_manager + }, + daemon=True + ) + monitoring_thread.start() + while not stop_signal.is_set(): if already_listening: service.info(f"{consumer_name}: Starting to listen for evaluation jobs again") @@ -701,7 +632,16 @@ def listen( ) for message in message_stream: - possible_record = interpret_message(message, worker_pool, stop_signal) + service.info( + f"{message.stream_name}:{message.group_name}:{consumer_name}: Received message " + f"'{message.message_id}'" + ) + possible_record = interpret_message( + message, + worker_pool, + stop_signal=stop_signal, + object_manager=object_manager + ) if possible_record: # This will block until another entry may be put into the queue - this will prevent one @@ -711,11 +651,13 @@ def listen( else: # Since this message isn't considered one for the runner, acknowledge that it's been seen # and move on so something else may view the message + service.info(f"{message.stream_name}:{message.group_name}:{consumer_name}: Acknowledging message '{message.message_id}'") connection.xack( stream_parameters.stream_name, stream_parameters.group_name, message.message_id ) + service.info(f"{message.stream_name}:{message.group_name} will no longer use message '{message.message_id}'") if stop_signal.is_set(): break @@ -730,85 +672,6 @@ def listen( monitoring_thread.join(timeout=5) -# TODO: Switch from pubsub to a redis stream -def listen( - channel: str, - host: str = None, - port: typing.Union[str, int] = None, - username: str = None, - password: str = None, - db: int = 0, - job_limit: int = None -): - """ - Listen for requested evaluations - - Args: - channel: The channel to listen to - host: The address of the redis server - port: The port of the host that is serving the redis server - username: The username used for credentials into the redis server - password: A password that might be needed to access redis - db: The database number of the redis server to interact with - job_limit: The number of jobs that may be run at once - """ - # Trap signals that stop the application to correctly inform what exactly shut the runner down - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGQUIT, signal_handler) - - job_limit = job_limit or int(float(os.environ.get("MAXIMUM_RUNNING_JOBS", os.cpu_count()))) - - service.info(f"Listening for evaluation jobs on '{channel}'...") - already_listening = False - - error_tracker: TimedOccurrenceWatcher = TimedOccurrenceWatcher( - duration=timedelta(seconds=10), - threshold=10, - on_filled=too_many_exceptions_hit - ) - - try: - with get_object_manager(monitor_scope=True) as object_manager: - object_manager.logger = get_logger() - while True: - if already_listening: - service.info("Starting to listen for evaluation jobs again") - else: - service.info("Listening out for evaluation jobs") - already_listening = True - - try: - connection = utilities.get_redis_connection( - host=host, - port=port, - password=password, - username=username, - db=db - ) - - listener = connection.pubsub() - listener.subscribe(channel) - - executor_type: typing.Callable[[], futures.Executor] = get_concurrency_executor_type( - max_workers=job_limit - ) - - with executor_type() as worker_pool: - for message in listener.listen(): - run_job(launch_message=message, worker_pool=worker_pool, object_manager=object_manager) - except Exception as exception: - service.error(message="An error occured while listening for evaluation jobs", exception=exception) - except Exception as exception: - service.error( - message="A critical error caused the evaluation listener to fail and not recover", - exception=exception - ) - - # Inform the error tracker that the exception was hit. An exception will be hit and the loop will halt if - # the type of exception has been hit too many times in a short period - error_tracker.value_encountered(value=exception) - def initialize_consumer(stream_name: str, group_name: str, consumer_name: str, redis_connection: redis.Redis) -> None: """ @@ -844,7 +707,7 @@ def initialize_consumer(stream_name: str, group_name: str, consumer_name: str, r redis_connection.xgroup_createconsumer(name=stream_name, groupname=group_name, consumername=consumer_name) -def cleanup(redis_parameters: streams.StreamParameters): +def cleanup(redis_parameters: streams.StreamParameters) -> None: """ Clean up any leftover artifacts that might be considered no longer needed @@ -863,11 +726,14 @@ def cleanup(redis_parameters: streams.StreamParameters): service.error(exception) - def main(*args): """ Listen to a redis stream and launch evaluations based on received content """ + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGQUIT, signal_handler) + arguments = Arguments(*args) redis_parameters = streams.StreamParameters( @@ -889,19 +755,19 @@ def main(*args): try: listen(stream_parameters=redis_parameters, job_limit=arguments.limit) - exit_code = 0 + exit_code = SUCCESSFUL_EXIT except KeyboardInterrupt: - exit_code = os.EX_OK + exit_code = SUCCESSFUL_EXIT cleanup(redis_parameters=REDIS_PARAMETERS_FOR_PROCESS) except Exception as exception: service.error(exception) - exit_code = 1 + exit_code = ERROR_EXIT finally: try: cleanup(redis_parameters=redis_parameters) except Exception as exception: service.error(exception) - exit_code = 1 + exit_code = ERROR_EXIT sys.exit(exit_code) diff --git a/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py b/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py index 1cbb79100..70fd4ee95 100644 --- a/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py +++ b/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py @@ -511,11 +511,13 @@ def create_handler_configuration( 'loggers': { DEFAULT_LOGGER_NAME: { 'handlers': [f'{DEFAULT_LOGGER_NAME}_Handler', 'stdout'], - 'level': get_log_level() + 'level': get_log_level(), + 'propagate': False }, DEFAULT_SOCKET_LOGGER_NAME: { 'handlers': [f"{DEFAULT_SOCKET_LOGGER_NAME}_Handler", "stdout"], - 'level': get_socket_log_level() + 'level': get_socket_log_level(), + 'propagate': False } } } diff --git a/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py b/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py index eaf887145..a1d95b239 100644 --- a/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py +++ b/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py @@ -4,7 +4,6 @@ from dmod.metrics import Communicator from dmod.metrics import CommunicatorGroup -from dmod.core.context import DMODObjectManager from dmod.core.context.base import ObjectCreatorProtocol from .common import * diff --git a/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py b/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py index 2fea5fa66..5d29fbe11 100644 --- a/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py +++ b/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py @@ -11,11 +11,10 @@ import redis -import dmod.metrics.communication as communication +from dmod.metrics import communication from dmod.core.common import to_json from dmod.core.context import DMODObjectManager -import service from service import application_values from . import common @@ -366,9 +365,7 @@ def error( exception.__traceback__ ) ) - service.error(formatted_exception) - else: - service.error(message) + message += f"{os.linesep}{formatted_exception}" if verbosity and self._verbosity < verbosity: return @@ -416,7 +413,6 @@ def info(self, message: str, verbosity: communication.Verbosity = None, publish: message = f"[{timestamp}] {message}" self.__connection.rpush(self.__info_key, message) - service.info(message) if publish: self.write(reason="info", data={"info": message})