diff --git a/python/lib/core/dmod/core/common/collection.py b/python/lib/core/dmod/core/common/collection.py index 4c990a460..e841315b3 100644 --- a/python/lib/core/dmod/core/common/collection.py +++ b/python/lib/core/dmod/core/common/collection.py @@ -238,171 +238,6 @@ def __contains__(self, obj: object) -> bool: return obj in self.__data -class _OccurrenceTracker(typing.Generic[_T]): - """ - Keeps track of occurrences of a type of value that have been encountered within a duration - """ - def __init__(self, key: _T, duration: timedelta, threshold: int, on_filled: typing.Callable[[_T], typing.Any]): - self.__key = key - self.__duration = duration - self.__threshold = threshold - self.__occurences: typing.List[datetime] = [] - self.__on_filled = on_filled - - def value_encountered(self): - """ - Inform the tracker that the value has been encountered again - """ - self.update_occurrences() - self.__occurences.append(datetime.now()) - if len(self.__occurences) >= self.__threshold: - self.__on_filled(self.__key) - - def update_occurrences(self) -> int: - """ - Update the list of occurrences to include only those within the current duration - - Returns: - The number of occurrences still being tracked - """ - cutoff: datetime = datetime.now() - self.__duration - self.__occurences = [ - occurrence - for occurrence in self.__occurences - if occurrence > cutoff - ] - return len(self.__occurences) - - @property - def key(self): - """ - The identifier that is being tracked - """ - return self.__key - - def __len__(self): - return len(self.__occurences) - - def __str__(self): - if len(self.__occurences) == 0: - occurrences_details = f"No Occurences within the last {self.__duration.total_seconds()} seconds." - else: - occurrences_details = (f"{len(self.__occurences)} occurrences since " - f"{self.__occurences[0].strftime('%Y-%m-%d %H:%M:%S')}") - return f"{self.key}: {occurrences_details}" - - -class TimedOccurrenceWatcher: - """ - Keeps track of the amount of occurrences of items within a range of time - """ - MINIMUM_TRACKING_SECONDS: typing.Final[float] = 0.1 - """ - The lowest number of seconds to watch for multiple occurrences. Only acting when multiple occurrences are tracked - in under 100ms would create a scenario where the watcher will most likely never trigger an action, rendering - this the wrong tool for the job. - """ - - @staticmethod - def default_key_function(obj: object) -> type: - """ - The function used to find a common identifier for an object if one is not provided - """ - return type(obj) - - def __init__( - self, - duration: timedelta, - threshold: int, - on_filled: typing.Callable[[_T], typing.Any], - key_function: typing.Callable[[_VT], _KT] = None - ): - if not isinstance(duration, timedelta): - raise ValueError(f"Cannot create a {self.__class__.__name__} - {duration} is not a timedelta object") - - if duration.total_seconds() < self.MINIMUM_TRACKING_SECONDS: - raise ValueError( - f"Cannot create a {self.__class__.__name__} - the duration is too short ({duration.total_seconds()}s)" - ) - - self.__duration = duration - - if not isinstance(key_function, typing.Callable): - key_function = self.default_key_function - - self.__key_function = key_function - self.__entries: typing.Dict[uuid.UUID, _OccurrenceTracker] = {} - self.__threshold = threshold - self.__on_filled = on_filled - - def value_encountered(self, value: _T): - """ - Add an occurrence of an object to track - - Args: - value: The item to track - """ - self.__update_trackers() - self._get_tracker(value).value_encountered() - - def _get_tracker(self, value: _T) -> _OccurrenceTracker[_T]: - """ - Get an occurrence tracker for the given value - - Args: - value: The value to track - - Returns: - A tracker for the value - """ - key = self.__key_function(value) - - for tracker in self.__entries.values(): - if tracker.key == key: - return tracker - - new_tracker = _OccurrenceTracker( - key=key, - duration=self.__duration, - threshold=self.__threshold, - on_filled=self.__on_filled - ) - self.__entries[uuid.uuid1()] = new_tracker - return new_tracker - - def __update_trackers(self): - """ - Update the amount of items in each tracker - - If a tracker becomes empty it will be removed - """ - for tracker_id, tracker in self.__entries.items(): - amount_left = tracker.update_occurrences() - if amount_left == 0: - del self.__entries[tracker_id] - - @property - def size(self) -> int: - """ - The number of items encountered within the duration - """ - self.__update_trackers() - return sum(len(tracker) for tracker in self.__entries.values()) - - @property - def duration(self) -> timedelta: - """ - The amount of time to track items for - """ - return self.__duration - - def __str__(self): - return f"{self.__class__.__name__}: {self.size} items within the last {self.duration.total_seconds()} Seconds" - - def __repr__(self): - return self.__str__() - - class EventfulMap(abc.ABC, typing.MutableMapping[_KT, _VT], typing.Generic[_KT, _VT]): @abc.abstractmethod def get_handlers(self) -> typing.Dict[CollectionEvent, typing.MutableSequence[typing.Callable]]: diff --git a/python/lib/core/dmod/core/context/base.py b/python/lib/core/dmod/core/context/base.py index e8c86379a..96b6df877 100644 --- a/python/lib/core/dmod/core/context/base.py +++ b/python/lib/core/dmod/core/context/base.py @@ -169,6 +169,7 @@ def end_scope(self): """ Override to add extra logic for when this scope is supposed to reach its end """ + self.logger.warning(f"Ending scope '{self.__scope_id}' for: {self}") self.drop_references() self.__scope_closed() diff --git a/python/lib/core/dmod/core/context/manager.py b/python/lib/core/dmod/core/context/manager.py index 52a4d4818..8945bc295 100644 --- a/python/lib/core/dmod/core/context/manager.py +++ b/python/lib/core/dmod/core/context/manager.py @@ -11,7 +11,6 @@ from multiprocessing import managers from multiprocessing import context -from multiprocessing import RLock from .base import ObjectManagerScope from .base import T @@ -23,8 +22,6 @@ TypeOfRemoteObject = typing.Union[typing.Type[managers.BaseProxy], type] """A wrapper object that is used to communicate to objects created by Managers""" -_PREPARATION_LOCK: RLock = RLock() - class DMODObjectManager(managers.BaseManager): """ @@ -132,45 +129,46 @@ def prepare( additional_proxy_types: A mapping between class types and the type of proxies used to operate upon them remotely """ - with _PREPARATION_LOCK: - if not cls.__initialized: - if not isinstance(additional_proxy_types, typing.Mapping): - additional_proxy_types = {} - - already_registered_items: typing.List[str] = list(getattr(cls, "_registry").keys()) - - for real_class, proxy_class in additional_proxy_types.items(): - name = real_class.__name__ if hasattr(real_class, "__name__") else None - - if name is None: - raise TypeError(f"Cannot add a proxy for {real_class} - {real_class} is not a standard type") - - if name in already_registered_items: - print(f"'{name}' is already registered to {cls.__name__}") - continue - - cls.register_class(class_type=real_class, type_of_proxy=proxy_class) - already_registered_items.append(name) - - # Now find all proxies attached to the SyncManager and attach those - # This will ensure that this manager has proxies for objects and structures like dictionaries - registry_initialization_arguments = ( - { - "typeid": typeid, - "callable": attributes[0], - "exposed": attributes[1], - "method_to_typeid": attributes[2], - "proxytype": attributes[3] - } - for typeid, attributes in getattr(managers.SyncManager, "_registry").items() - if typeid not in already_registered_items - ) + if not cls.__initialized: + if not isinstance(additional_proxy_types, typing.Mapping): + additional_proxy_types = {} + + already_registered_items: typing.List[str] = list(getattr(cls, "_registry").keys()) + + for real_class, proxy_class in additional_proxy_types.items(): + name = real_class.__name__ if hasattr(real_class, "__name__") else None + + if name is None: + raise TypeError(f"Cannot add a proxy for {real_class} - {real_class} is not a standard type") + + if name in already_registered_items: + print(f"'{name}' is already registered to {cls.__name__}") + continue + + cls.register_class(class_type=real_class, type_of_proxy=proxy_class) + already_registered_items.append(name) + + # Now find all proxies attached to the SyncManager and attach those + # This will ensure that this manager has proxies for objects and structures like dictionaries + registry_initialization_arguments = ( + { + "typeid": typeid, + "callable": attributes[0], + "exposed": attributes[1], + "method_to_typeid": attributes[2], + "proxytype": attributes[3] + } + for typeid, attributes in getattr(managers.SyncManager, "_registry").items() + if typeid not in already_registered_items + ) - for arguments in registry_initialization_arguments: - cls.register(**arguments) + for arguments in registry_initialization_arguments: + cls.register(**arguments) + else: + logging.warning(f"The '{cls.__name__}' cannot be prepared - it has already been done so.") - cls.__initialized = True - return cls + cls.__initialized = True + return cls def create_and_track_object(self, __class_name: str, __scope_name: str, /, *args, **kwargs) -> T: """ @@ -197,6 +195,10 @@ def create_and_track_object(self, __class_name: str, __scope_name: str, /, *args ) if __scope_name not in self.__scopes: + self.logger.warning( + f"Cannot track a '{__class_name}' object in the '{__scope_name}' " + f"scope if it has not been established. Establishing it now." + ) self.establish_scope(__scope_name) new_instance = self.create_object(__class_name, *args, **kwargs) @@ -228,12 +230,13 @@ def create_object(self, __class_name, /, *args, **kwargs) -> T: return value - def free(self, scope_name: str): + def free(self, scope_name: str, fail_on_missing_scope: bool = True): """ Remove all items associated with a given tracking key from the object manager Args: scope_name: The key used to keep track of like items + fail_on_missing_scope: Throw an exception if the scope doesn't exist Returns: The number of items that were deleted @@ -247,9 +250,13 @@ def free(self, scope_name: str): f"Received '{scope_name}' ({type(scope_name)}" ) - if scope_name not in self.__scopes: + if scope_name not in self.__scopes and fail_on_missing_scope: raise KeyError(f"Cannot free objects from {self} - no items are tracked by the key {scope_name}") + if scope_name not in self.__scopes: + self.logger.warning(f"Cannot remove scope by the name '{scope_name}' - it is not currently stored") + return + del self.__scopes[scope_name] def __inject_scope(self, scope: ObjectManagerScope): @@ -259,6 +266,9 @@ def __inject_scope(self, scope: ObjectManagerScope): Args: scope: The scope object to add """ + if scope is None: + raise ValueError(f"A scope for {self.__class__.__name__} cannot be added - it is None") + if scope.name in self.__scopes: raise KeyError( f"Cannot add a scope object '{scope.name}' to {self} - there is already a scope by that name. " @@ -297,7 +307,19 @@ def monitor_operation(self, scope: typing.Union[ObjectManagerScope, str, bytes], scope: A scope object containing references to shared objects that need to be kept alive operation: The operation using the shared objects """ + if self.__monitor_scope and not self.__scope_monitor: + if self.__scope_monitor is None: + self.__scope_monitor = FutureMonitor(logger=self.__logger) + self.__scope_monitor.start() + if not self.__monitor_scope or not self.__scope_monitor: + if not self.__monitor_scope and not self.__scope_monitor: + logging.error("This logger was not set to monitor scopes and it does not have a monitoring thread") + elif not self.__monitor_scope: + logging.error("This logger was told not to monitor scope") + else: + logging.error("There is no thread available to monitor scopes") + if isinstance(scope, ObjectManagerScope): scope_name = scope.name elif isinstance(scope, bytes): @@ -339,6 +361,9 @@ def monitor_operation(self, scope: typing.Union[ObjectManagerScope, str, bytes], @property def logger(self) -> LoggerProtocol: + """ + The logger made specifically for this manager + """ return self.__logger @logger.setter @@ -360,3 +385,15 @@ def logger(self, logger: LoggerProtocol): # the logger on the monitor is kept up to speed. if self.__scope_monitor: self.__scope_monitor.logger = logger + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.__scope_monitor: + self.__scope_monitor.stop() + + if self.__monitor_scope: + self.__scope_monitor.kill() + + for scope_name, scope in self.__scopes.items(): + scope.end_scope() + + super().__exit__(exc_type, exc_val, exc_tb) diff --git a/python/lib/core/dmod/core/context/monitor.py b/python/lib/core/dmod/core/context/monitor.py index 4af4190b3..29aaf7550 100644 --- a/python/lib/core/dmod/core/context/monitor.py +++ b/python/lib/core/dmod/core/context/monitor.py @@ -105,6 +105,8 @@ def __init__( poll_interval: typing.Union[float, timedelta] = None, logger: LoggerProtocol = None ): + logging.info(f"Creating a '{self.__class__.__name__}' instance") + if not timeout: timeout = DEFAULT_MONITOR_TIMEOUT elif isinstance(timeout, timedelta): @@ -171,6 +173,9 @@ def logger(self, logger: LoggerProtocol) -> None: @property def size(self) -> int: + """ + The number of items being monitored + """ return self.__size def __len__(self): @@ -195,6 +200,7 @@ def _monitor(self) -> bool: """ monitoring_succeeded: bool = True + self.logger.info(f"{self}: Beginning to monitor") while self.__should_be_monitoring: try: # Block to check if the loop should be exitted based on a current kill state @@ -278,9 +284,17 @@ def _monitor(self) -> bool: # Wait a little bit before polling again to allow for work to continue sleep(self.__poll_interval) + self.logger.info(f"No longer monitoring within {self}") self.__cleanup() return monitoring_succeeded + @property + def running(self) -> bool: + """ + Whether the monitor is running + """ + return self.__thread is not None and self.__thread.is_alive() + def find_scope(self, future_result: futures.Future) -> typing.Optional[ObjectManagerScope]: """ Find the scope that belongs with the given output @@ -414,11 +428,14 @@ def __cleanup(self): with self.__lock: while not self._queue.empty(): try: - entry = self._queue.get() + entry = self._queue.get(timeout=self._timeout) if isinstance(entry, futures.Future) and entry.running(): entry.cancel() except queue.Empty: pass + except TimeoutError: + pass + self._queue = queue.Queue() self.__size = 0 self.__scopes.clear() @@ -428,3 +445,9 @@ def __str__(self): def __repr__(self): return self.__str__() + + def __bool__(self): + return self.running + + def __del__(self): + self.__cleanup() diff --git a/python/lib/core/dmod/core/context/scope.py b/python/lib/core/dmod/core/context/scope.py index bc30dffad..13d7b7d95 100644 --- a/python/lib/core/dmod/core/context/scope.py +++ b/python/lib/core/dmod/core/context/scope.py @@ -20,6 +20,7 @@ class DMODObjectManagerScope(ObjectManagerScope): """ def __init__(self, name: str, object_manager: DMODObjectManager): super().__init__(name) + self.logger.info(f"Creating scope '{self.scope_id}' for '{object_manager}'") self.__object_manager: DMODObjectManager = object_manager self._perform_on_close(partial(self.__object_manager.free, self.scope_id)) diff --git a/python/lib/evaluations/dmod/evaluations/backends/file.py b/python/lib/evaluations/dmod/evaluations/backends/file.py index c8c08d3a2..f9ca6391c 100644 --- a/python/lib/evaluations/dmod/evaluations/backends/file.py +++ b/python/lib/evaluations/dmod/evaluations/backends/file.py @@ -21,7 +21,6 @@ from .. import util from .. import specification -util.configure_logging() RE_PATTERN = re.compile(r"(\{.+\}|\[.+\]|\(.+\)|(?"` allows you to only read new messages (ignoring all previous messages), and offering an id will +let you read every message that comes _after_ that id. `xread` will give you a message and `xreadgroup` will take +the message and reserve it for a single consumer within a consumer group. Only one consumer may hold a message +within a group at any given time. Multiple groups may read and claim messages at the same time. `xclaim` allows one +consumer in a group to claim ownership of a message from another consumer within the group and `xack` tells the group +that it is done processing the message, releases ownership, and the group will not read the message again. This +allows for workstealing and prevents the overprocessing of a single message. `xdel` deletes the message from the +queue altogether. Calling `xdel` is a good way of keeping _any_ further processing of a message and is a good thing +to do when operating a work queue. Once requirements for work have been fulfilled, the record may be removed to +ensure that it is not attempted again. + +Sets of actions generally correlate to groups. If something is responsible for controlling a light switch, a group +may be responsible for the incoming messages. Consumers within these groups generally correlate to actors that may +perform the actions granted to the group. If there are 4 consumers within a single group, Consumer C might be able +to claim ownership of message X within the group and consumers A, B, and D will not. If message Y comes through, +consumers A, B, or D may be able to claim that message for themselves and perform work while consumer C consumes +message X. This helps coordinate work across computational nodes. If consumers A, B, C, and D all work on different +nodes, one consumer may claim work and consume its own resources without disrupting the others. If more work +needs to be done simultaneously, more consumers may be added for horizontal scaling. + +Messages are caught in the main thread, evaluations are run in child processes, and there is a second thread that +monitors running processes to determine when shared scope should be destroyed and when messages should be removed +from the stream. The `concurrent.futures` interface is used to track running evaluations, so the monitoring thread +is able to poll the evaluations its track to see if processing within the child process has concluded. + +#### How does this differ from PubSub? + +The runner was originally implemented by subscribing to a Redis PubSub channel. A PubSub channel is a single stream +that clients may publish data to and other clients may subscribe to. Subscribers may 'listen' to the channel and +each subscriber will get the message in real time. If a subscriber misses a message, they will miss it and cannot +read it later. Imagine a PubSub channel is a TV channel. One or more entities may broadcast video through it and the +audience may do with what they receive as they like. The audience does not quite have the ability to respond to the +broadcaster unless they themselves become a broadcaster and the original broadcasters become audience members. + +PubSub works great for real time dissemination of data, but if used to coordinate work, like the runner previously, +_all_ subscribers will attempt to perform the same work. If there are four runners on four machines, each will +attempt to run the same evaluation on their own machine because they have no way of knowing that the others are +performing the same work. + +#### The Worker + +The worker, found in `worker.py`, is the entity that _actually_ calls `dmod.evaluations` in order to perform +evaluation duties, _not the runner._ The runner will collect information from messages and call the runner in a +child process with the received parameters. + +The worker may be called via the command line to perform evaluations manually. If there is a need to script out +evaluations directly to `dmod.evaluations` and bypass the service entirely, the worker contains an excellent example +of how to do so. + +#### How does the runner listen? + +`runner:listen` is called from `runner:main` with instructions on how to connect to redis and the limit of how many +jobs may be run at the same time. + +`runner:listen`, called the listener from now on, will create a multiprocessed event to use a signal to stop +listening and spawn a thread to poll a queue of actively running `Future`s that are evaluating data. Then listener +creates a consumer for the group performing listening duties, but create the group if it is not already present. +A counter used to track errors will be created. This tracker will identify faults and count the times that they occur. +Errors are identified by where in the code base that they are thrown. If the same error from the same locations are +encountered too many times in a short period the listener will exit since this indicates a core problem of the +application. The listener continues to listen for the same reason web servers continue to function after encountering a +500+ error. A portion causes an error but it isn't clear if it should throughly halt all operations or not. It make be +due to a user request (such as a bad configuration) or it may be a freak accident that is never encountered again. + +A while loop is then entered that will run until the stop signal is set. Within the loop, a generator is created that +will set itself up to read and deserialize messages it manages to grab from the redis stream. Other runners that may +be running in tandem may be able to claim messages first and prevent an instance of the runner from claiming its own +until another message is added to the stream. This is intended behavior. It allows multiple runners to run at once +on multiple machines without interfering with one another. + +Each message read via the generator, which will block until a message is received, is fed into a function to interpret +the collected message. As of writing (August 30th, 2024), there are three things that may be interpreted from the +messages: + +1. Launch an evaluation +2. Stop Listening +3. Ignore the message + +##### Launch an Evaluation + +If a message comes through stating that an evaluation should be run, the given parameters are stored in a `JobRecord` +with information on where the message came from, what generated it, what the evaluation is, and a reference to the +process running the evaluation. That is passed back to the main loop where it is stored in a queue the the monitoring +thread will poll. + +##### Stop Listening + +If a message comes through stating that the purpose of the message is to close, kill, or terminate the application, +the stop signal is set and nothing is returned from the interpretation function. Since the stop signal is set, +the loop will then end. The message will be acknowledged via `xack` so that the group won't read it again. + +##### Ignore the Message + +If a message comes through that the listener doesn't know what to do with, it is logged, and the message is +acknowledged via `xack`. Maybe there is another group that reads from the same stream that may interact with said +message or maybe there's a consumer attached to a process in the same group that may handle it. The configuration +that will cause such messages to pass through are not ideal, but they will not interupt the performance of _this_ +operation. \ No newline at end of file diff --git a/python/services/evaluationservice/dmod/evaluationservice/runner.py b/python/services/evaluationservice/dmod/evaluationservice/runner.py index 8452ae7bc..6b00e5f3a 100755 --- a/python/services/evaluationservice/dmod/evaluationservice/runner.py +++ b/python/services/evaluationservice/dmod/evaluationservice/runner.py @@ -16,14 +16,13 @@ import threading from argparse import ArgumentParser -from multiprocessing.pool import ApplyResult import redis from concurrent import futures -from datetime import timedelta from functools import partial +import utilities from dmod.metrics import CommunicatorGroup from dmod.core.context import DMODObjectManager @@ -31,11 +30,26 @@ import service import worker +from service.service_logging import get_logger from utilities.common import ErrorCounter from utilities import streams EXCEPTION_LIMIT: typing.Final[int] = 10 +""" +The maximum number of a specific type of error to catch before exiting. If an error occurs 11 times in rapid +succession and the limit is 10, the runner should stop. If it only occurs 9 times it could be the result of something +that this has no control over and may remain functional. +""" + +SUCCESSFUL_EXIT: typing.Final[int] = 0 +"""The exit code for a successful operation. `os.EX_OK` would be ideal, but that approach isn't portable""" + +ERROR_EXIT: typing.Final[int] = 1 +""" +The exit code when the runner halts because of an error. 1 is used since that is generally associated with the catch +all error code. +""" def get_concurrency_executor_type(**kwargs) -> typing.Callable[[], futures.Executor]: @@ -301,20 +315,34 @@ class JobRecord: """The ID of the message with the instructions for the evaluation""" evaluation_id: str """The ID of the evaluation""" - job: ApplyResult + job: futures.Future """The asynchronous results for the job""" - def mark_complete(self, connection: redis.Redis) -> bool: + def mark_complete(self, connection: redis.Redis, object_manager: DMODObjectManager) -> bool: """ Attempt to mark a job as no longer running Args: connection: A redis connection used to mark the record as complete + object_manager: A shared object manager that may hold data for this evaluation Returns: Whether the job was acknowledged as complete """ try: + object_manager.free(self.evaluation_id, fail_on_missing_scope=False) + except KeyError: + service.info(f"There is no scope for '{self.evaluation_id}' in the object manager to close") + except Exception as exception: + service.error( + f"Failed to clear the scope of shared objects in evaluation '{self.evaluation_id}'", + exception=exception + ) + + try: + service.info( + f"{self.stream_name}:{self.group_name}:{self.consumer_name} will now delete message '{self.message_id}' since it has been consumed" + ) confirmation = connection.xdel(self.stream_name, self.message_id) return bool(confirmation) except Exception as exception: @@ -324,21 +352,43 @@ def mark_complete(self, connection: redis.Redis) -> bool: def launch_evaluation( stream_message: streams.StreamMessage, - worker_pool: multiprocessing.Pool -) -> JobRecord: + worker_pool: futures.Executor, + object_manager: DMODObjectManager, +) -> typing.Optional[JobRecord]: """ Launch an evaluation based on a message received through a redis stream Args: stream_message: The message received through a redis stream worker_pool: The pool that handles the creation of other processes + object_manager: A shared object creator and tracker Returns: A record of the evaluation job that was created """ payload = stream_message.payload + evaluation_id = payload.get('evaluation_id') + scope = object_manager.establish_scope(evaluation_id) + + try: + communicators: CommunicatorGroup = utilities.get_communicators( + communicator_id=evaluation_id, + verbosity=payload.get("verbosity"), + object_manager=scope, + host=service.REDIS_HOST, + port=service.REDIS_PORT, + password=service.REDIS_PASSWORD, + include_timestamp=False + ) + service.info(f"Communicators have been created for the evaluation named '{evaluation_id}'") + except Exception as exception: + service.error( + message=f"Could not create communicators for evaluation: {evaluation_id} due to {exception}", + exception=exception + ) + return None - service.info(f"Launching an evaluation for {payload['evaluation_id']}...") + service.info(f"Launching an evaluation for {evaluation_id}...") instructions = payload.get("instructions") if not instructions: @@ -347,19 +397,27 @@ def launch_evaluation( if isinstance(instructions, dict): instructions = json.dumps(instructions, indent=4) - arguments = JobArguments( + arguments = WorkerProcessArguments( evaluation_id=payload['evaluation_id'], instructions=instructions, verbosity=payload.get("verbosity"), - start_delay=payload.get("start_delay") + start_delay=payload.get("start_delay"), + communicators=communicators ) - new_job: ApplyResult = worker_pool.apply_async( - worker.evaluate, - kwds=arguments.kwargs - ) + try: + service.info(f"Submitting the evaluation job for {evaluation_id}...") + evaluation_job: futures.Future = worker_pool.submit( + worker.evaluate, + **arguments.kwargs + ) + except Exception as exception: + service.error(f"Could not launch evaluation {evaluation_id} due to {exception}", exception=exception) + return None + + new_job: futures.Future = worker_pool.submit(worker.evaluate, **arguments.kwargs) - service.info(f"Evaluation for {payload['evaluation_id']} has been launched.") + service.info(f"Evaluation for {evaluation_id} has been launched.") job_record = JobRecord( stream_name=stream_message.stream_name, @@ -370,13 +428,22 @@ def launch_evaluation( job=new_job ) + service.info(f"Preparing to monitor objects for {evaluation_id}...") + try: + object_manager.monitor_operation(evaluation_id, evaluation_job) + service.info(f"Evaluation for {evaluation_id} has been launched.") + except BaseException as exception: + service.error(f"Could not monitor {evaluation_id} due to: {exception}") + traceback.print_exc() + return job_record def interpret_message( stream_message: streams.StreamMessage, - worker_pool: multiprocessing.Pool, - stop_signal: multiprocessing.Event + worker_pool: futures.Executor, + stop_signal: multiprocessing.Event, + object_manager: DMODObjectManager ) -> typing.Optional[JobRecord]: """ Figures out what to do based on a message received through a redis stream @@ -387,6 +454,7 @@ def interpret_message( stream_message: The message received through a redis stream worker_pool: A multiprocessing pool that can be used to start new evaluations stop_signal: The signal used to stop the reading loop + object_manager: A shared object manager Returns: The record of a job if one has been launched @@ -404,11 +472,10 @@ def interpret_message( purpose = stream_message.payload.get("purpose").lower() if purpose == 'launch': - return launch_evaluation(stream_message, worker_pool) + return launch_evaluation(stream_message, worker_pool, object_manager=object_manager) if purpose in ("close", "kill", "terminate"): stop_signal.set() service.info("Exit message received. Closing the runner.") - sys.exit(0) else: service.info( f"Runner => The purpose was not to launch or terminate. Only launching is handled through the runner." @@ -419,8 +486,9 @@ def interpret_message( def monitor_running_jobs( connection: redis.Redis, - active_job_queue: queue.Queue[ApplyResult[JobRecord]], - stop_signal: multiprocessing.Event + active_job_queue: queue.Queue[JobRecord], + stop_signal: multiprocessing.Event, + object_manager: DMODObjectManager ): """ Poll a queue of jobs and close ones that are marked as complete @@ -431,8 +499,9 @@ def monitor_running_jobs( connection: A connection to redis that will be used to acknowledge completed jobs active_job_queue: A queue of jobs to poll stop_signal: A signal used to stop the reading loop + object_manager: An object manager that may hold scope for a running job """ - potentially_complete_job: typing.Optional[ApplyResult] = None + record: typing.Optional[JobRecord] = None encountered_errors = ErrorCounter(limit=EXCEPTION_LIMIT) """ @@ -449,15 +518,13 @@ def monitor_running_jobs( while not stop_signal.is_set(): try: - potentially_complete_job = active_job_queue.get(block=True, timeout=1) + record = active_job_queue.get(block=True, timeout=1) - if not potentially_complete_job.ready(): - active_job_queue.put(potentially_complete_job) + if not record.job.done(): + active_job_queue.put(record) continue - record: JobRecord = potentially_complete_job.get() - - marked_complete = record.mark_complete(connection=connection) + marked_complete = record.mark_complete(connection=connection, object_manager=object_manager) if not marked_complete: service.error( @@ -466,10 +533,10 @@ def monitor_running_jobs( f"message '{record.message_id}', could not be marked as complete" ) - potentially_complete_job = None + record = None except TimeoutError: - if potentially_complete_job: - active_job_queue.put(potentially_complete_job) + if record and not record.job.cancelled(): + active_job_queue.put(record) except queue.Empty: # There are plenty of times when this might be empty and that's fine. In this case we just want pass @@ -477,154 +544,7 @@ def monitor_running_jobs( encountered_errors.add_error(error=exception) service.error(exception) - potentially_complete_job = None - - -def run_job( - launch_message: dict, - worker_pool: futures.Executor, - object_manager: DMODObjectManager -): - """ - Adds the evaluation to the worker pool for background processing - - Args: - launch_message: A dictionary containing data to send to the process running the job - worker_pool: The pool with processes ready to run an evaluation - object_manager: The object manager used to create shared objects - """ - if launch_message['type'] != 'message': - # We exit because this isn't a useful message - return - - launch_parameters = launch_message['data'] - if not isinstance(launch_parameters, dict): - try: - launch_parameters = json.loads(launch_parameters) - except Exception as exception: - service.error("The passed message wasn't JSON") - service.error(launch_parameters, exception) - return - - if 'purpose' not in launch_parameters: - service.info(f"A purpose was not communicated through the {service.EVALUATION_QUEUE_NAME} channel") - return - - purpose = launch_parameters.get("purpose").lower() - - if purpose == 'launch': - evaluation_id = launch_parameters.get('evaluation_id') - scope = object_manager.establish_scope(evaluation_id) - try: - communicators: CommunicatorGroup = utilities.get_communicators( - communicator_id=evaluation_id, - verbosity=launch_parameters.get("verbosity"), - object_manager=scope, - host=service.REDIS_HOST, - port=service.REDIS_PORT, - password=service.REDIS_PASSWORD, - include_timestamp=False - ) - service.debug(f"Communicators have been created for the evaluation named '{evaluation_id}'") - except Exception as exception: - service.error( - message=f"Could not create communicators for evaluation: {evaluation_id} due to {exception}", - exception=exception - ) - return - - service.info(f"Launching an evaluation for {launch_parameters['evaluation_id']}...") - instructions = launch_parameters.get("instructions") - - if isinstance(instructions, dict): - instructions = json.dumps(instructions, indent=4) - - arguments = WorkerProcessArguments( - evaluation_id=launch_parameters['evaluation_id'], - instructions=instructions, - verbosity=launch_parameters.get("verbosity"), - start_delay=launch_parameters.get("start_delay"), - communicators=communicators - ) - - try: - service.debug(f"Submitting the evaluation job for {evaluation_id}...") - evaluation_job: futures.Future = worker_pool.submit( - worker.evaluate, - **arguments.kwargs - ) - except Exception as exception: - service.error(f"Could not launch evaluation {evaluation_id} due to {exception}", exception=exception) - return - - service.debug(f"Preparing to monitor {evaluation_id}...") - try: - object_manager.monitor_operation(evaluation_id, evaluation_job) - service.debug(f"Evaluation for {launch_parameters['evaluation_id']} has been launched.") - except BaseException as exception: - service.error(f"Could not monitor {evaluation_id} due to: {exception}") - traceback.print_exc() - elif purpose in ("close", "kill", "terminate"): - service.info("Exit message received. Closing the runner.") - sys.exit(0) - else: - service.info( - f"Runner => The purpose was not to launch or terminate. Only launching is handled through the runner." - f"{os.linesep}Message: {json.dumps(payload)}" - ) - return None - - -def monitor_running_jobs( - connection: redis.Redis, - active_job_queue: queue.Queue[ApplyResult[JobRecord]], - stop_signal: multiprocessing.Event -): - """ - Poll a queue of jobs and close ones that are marked as complete - - Meant to be run in a separate thread - - Args: - connection: A connection to redis that will be used to acknowledge completed jobs - active_job_queue: A queue of jobs to poll - stop_signal: A signal used to stop the reading loop - """ - potentially_complete_job: typing.Optional[ApplyResult] = None - - monitor_errors = ErrorCounter(limit=EXCEPTION_LIMIT) - - while not stop_signal.is_set(): - try: - potentially_complete_job = active_job_queue.get(block=True, timeout=1) - - if not potentially_complete_job.ready(): - active_job_queue.put(potentially_complete_job) - continue - - record: JobRecord = potentially_complete_job.get() - - marked_complete = record.mark_complete(connection=connection) - - if not marked_complete: - service.error( - f"Evaluation '{record.evaluation_id}', recognized by the '{record.consumer_name}' consumer " - f"within the '{record.group_name}' group on the '{record.stream_name}' stream as coming from " - f"message '{record.message_id}', could not be marked as complete" - ) - - potentially_complete_job = None - except TimeoutError: - if potentially_complete_job: - active_job_queue.put(potentially_complete_job) - except queue.Empty: - # There are plenty of times when this might be empty and that's fine. In this case we just want - pass - except Exception as exception: - monitor_errors.add_error(error=exception) - service.error(exception) - - potentially_complete_job = None + record = None def get_consumer_name() -> str: @@ -645,13 +565,15 @@ def listen( stream_parameters: The means to connect to redis job_limit: The maximum number of jobs that may be run at once """ - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGQUIT, signal_handler) job_limit = job_limit or int(float(os.environ.get("MAXIMUM_RUNNING_JOBS", os.cpu_count()))) + """The maximum number of jobs that may be actively running from this listener""" + stop_signal: multiprocessing.Event = multiprocessing.Event() + """Tells this function and the associated monitoring thread to stop polling""" + active_jobs: queue.Queue[JobRecord] = queue.Queue(maxsize=job_limit) + """A queue of active jobs to poll""" service.info( f"Listening for evaluation jobs on '{stream_parameters.stream_name}' through the " @@ -668,22 +590,31 @@ def listen( redis_connection=connection ) - monitoring_thread = threading.Thread( - target=monitor_running_jobs, - name=f"{service.application_values.APPLICATION_NAME}: Monitor for {consumer_name}", - kwargs={ - "connection": connection, - "active_job_queue": active_jobs, - "stop_signal": stop_signal, - }, - daemon=True - ) - monitoring_thread.start() + monitoring_thread: typing.Optional[threading.Thread] = None error_counter = ErrorCounter(limit=EXCEPTION_LIMIT) + executor_type: typing.Callable[[], futures.Executor] = get_concurrency_executor_type( + max_workers=job_limit + ) + try: - with multiprocessing.Pool(processes=job_limit) as worker_pool: # type: multiprocessing.pool.Pool + with get_object_manager(monitor_scope=True) as object_manager, executor_type() as worker_pool: + object_manager.logger = get_logger() + + monitoring_thread = threading.Thread( + target=monitor_running_jobs, + name=f"{service.application_values.APPLICATION_NAME}: Monitor for {consumer_name}", + kwargs={ + "connection": connection, + "active_job_queue": active_jobs, + "stop_signal": stop_signal, + "object_manager": object_manager + }, + daemon=True + ) + monitoring_thread.start() + while not stop_signal.is_set(): if already_listening: service.info(f"{consumer_name}: Starting to listen for evaluation jobs again") @@ -701,7 +632,16 @@ def listen( ) for message in message_stream: - possible_record = interpret_message(message, worker_pool, stop_signal) + service.info( + f"{message.stream_name}:{message.group_name}:{consumer_name}: Received message " + f"'{message.message_id}'" + ) + possible_record = interpret_message( + message, + worker_pool, + stop_signal=stop_signal, + object_manager=object_manager + ) if possible_record: # This will block until another entry may be put into the queue - this will prevent one @@ -711,11 +651,13 @@ def listen( else: # Since this message isn't considered one for the runner, acknowledge that it's been seen # and move on so something else may view the message + service.info(f"{message.stream_name}:{message.group_name}:{consumer_name}: Acknowledging message '{message.message_id}'") connection.xack( stream_parameters.stream_name, stream_parameters.group_name, message.message_id ) + service.info(f"{message.stream_name}:{message.group_name} will no longer use message '{message.message_id}'") if stop_signal.is_set(): break @@ -730,85 +672,6 @@ def listen( monitoring_thread.join(timeout=5) -# TODO: Switch from pubsub to a redis stream -def listen( - channel: str, - host: str = None, - port: typing.Union[str, int] = None, - username: str = None, - password: str = None, - db: int = 0, - job_limit: int = None -): - """ - Listen for requested evaluations - - Args: - channel: The channel to listen to - host: The address of the redis server - port: The port of the host that is serving the redis server - username: The username used for credentials into the redis server - password: A password that might be needed to access redis - db: The database number of the redis server to interact with - job_limit: The number of jobs that may be run at once - """ - # Trap signals that stop the application to correctly inform what exactly shut the runner down - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGQUIT, signal_handler) - - job_limit = job_limit or int(float(os.environ.get("MAXIMUM_RUNNING_JOBS", os.cpu_count()))) - - service.info(f"Listening for evaluation jobs on '{channel}'...") - already_listening = False - - error_tracker: TimedOccurrenceWatcher = TimedOccurrenceWatcher( - duration=timedelta(seconds=10), - threshold=10, - on_filled=too_many_exceptions_hit - ) - - try: - with get_object_manager(monitor_scope=True) as object_manager: - object_manager.logger = get_logger() - while True: - if already_listening: - service.info("Starting to listen for evaluation jobs again") - else: - service.info("Listening out for evaluation jobs") - already_listening = True - - try: - connection = utilities.get_redis_connection( - host=host, - port=port, - password=password, - username=username, - db=db - ) - - listener = connection.pubsub() - listener.subscribe(channel) - - executor_type: typing.Callable[[], futures.Executor] = get_concurrency_executor_type( - max_workers=job_limit - ) - - with executor_type() as worker_pool: - for message in listener.listen(): - run_job(launch_message=message, worker_pool=worker_pool, object_manager=object_manager) - except Exception as exception: - service.error(message="An error occured while listening for evaluation jobs", exception=exception) - except Exception as exception: - service.error( - message="A critical error caused the evaluation listener to fail and not recover", - exception=exception - ) - - # Inform the error tracker that the exception was hit. An exception will be hit and the loop will halt if - # the type of exception has been hit too many times in a short period - error_tracker.value_encountered(value=exception) - def initialize_consumer(stream_name: str, group_name: str, consumer_name: str, redis_connection: redis.Redis) -> None: """ @@ -844,7 +707,7 @@ def initialize_consumer(stream_name: str, group_name: str, consumer_name: str, r redis_connection.xgroup_createconsumer(name=stream_name, groupname=group_name, consumername=consumer_name) -def cleanup(redis_parameters: streams.StreamParameters): +def cleanup(redis_parameters: streams.StreamParameters) -> None: """ Clean up any leftover artifacts that might be considered no longer needed @@ -863,11 +726,14 @@ def cleanup(redis_parameters: streams.StreamParameters): service.error(exception) - def main(*args): """ Listen to a redis stream and launch evaluations based on received content """ + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGQUIT, signal_handler) + arguments = Arguments(*args) redis_parameters = streams.StreamParameters( @@ -889,19 +755,19 @@ def main(*args): try: listen(stream_parameters=redis_parameters, job_limit=arguments.limit) - exit_code = 0 + exit_code = SUCCESSFUL_EXIT except KeyboardInterrupt: - exit_code = os.EX_OK + exit_code = SUCCESSFUL_EXIT cleanup(redis_parameters=REDIS_PARAMETERS_FOR_PROCESS) except Exception as exception: service.error(exception) - exit_code = 1 + exit_code = ERROR_EXIT finally: try: cleanup(redis_parameters=redis_parameters) except Exception as exception: service.error(exception) - exit_code = 1 + exit_code = ERROR_EXIT sys.exit(exit_code) diff --git a/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py b/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py index 1cbb79100..70fd4ee95 100644 --- a/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py +++ b/python/services/evaluationservice/dmod/evaluationservice/service/service_logging.py @@ -511,11 +511,13 @@ def create_handler_configuration( 'loggers': { DEFAULT_LOGGER_NAME: { 'handlers': [f'{DEFAULT_LOGGER_NAME}_Handler', 'stdout'], - 'level': get_log_level() + 'level': get_log_level(), + 'propagate': False }, DEFAULT_SOCKET_LOGGER_NAME: { 'handlers': [f"{DEFAULT_SOCKET_LOGGER_NAME}_Handler", "stdout"], - 'level': get_socket_log_level() + 'level': get_socket_log_level(), + 'propagate': False } } } diff --git a/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py b/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py index eaf887145..a1d95b239 100644 --- a/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py +++ b/python/services/evaluationservice/dmod/evaluationservice/utilities/__init__.py @@ -4,7 +4,6 @@ from dmod.metrics import Communicator from dmod.metrics import CommunicatorGroup -from dmod.core.context import DMODObjectManager from dmod.core.context.base import ObjectCreatorProtocol from .common import * diff --git a/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py b/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py index 2fea5fa66..5d29fbe11 100644 --- a/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py +++ b/python/services/evaluationservice/dmod/evaluationservice/utilities/communication.py @@ -11,11 +11,10 @@ import redis -import dmod.metrics.communication as communication +from dmod.metrics import communication from dmod.core.common import to_json from dmod.core.context import DMODObjectManager -import service from service import application_values from . import common @@ -366,9 +365,7 @@ def error( exception.__traceback__ ) ) - service.error(formatted_exception) - else: - service.error(message) + message += f"{os.linesep}{formatted_exception}" if verbosity and self._verbosity < verbosity: return @@ -416,7 +413,6 @@ def info(self, message: str, verbosity: communication.Verbosity = None, publish: message = f"[{timestamp}] {message}" self.__connection.rpush(self.__info_key, message) - service.info(message) if publish: self.write(reason="info", data={"info": message})