Skip to content

Commit

Permalink
Fixed whitespace issues
Browse files Browse the repository at this point in the history
  • Loading branch information
christophertubbs authored and robertbartel committed Oct 7, 2024
1 parent a0f3d05 commit 7905fe8
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 52 deletions.
78 changes: 39 additions & 39 deletions python/services/evaluationservice/dmod/evaluationservice/runner.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,68 +13,68 @@ The runner listens to a redis stream for messages pertaining to it and pulls per

A Redis Stream is just a simple message queue. Each message within the queue is just a set of key value pairs. Calling
`xadd` allows you or a program to add a new set of key value pairs and an id may be associated with it. If an id is not given,
the id will correspond with the numeric datetime of when the message was stored. The commands `xread` and `xreadgroup`
may be used to read new messages from a given message id. Starting at `"0"` allows you to read from the earliest
message onwards, `">"` allows you to only read new messages (ignoring all previous messages), and offering an id will
let you read every message that comes _after_ that id. `xread` will give you a message and `xreadgroup` will take
the message and reserve it for a single consumer within a consumer group. Only one consumer may hold a message
within a group at any given time. Multiple groups may read and claim messages at the same time. `xclaim` allows one
consumer in a group to claim ownership of a message from another consumer within the group and `xack` tells the group
that it is done processing the message, releases ownership, and the group will not read the message again. This
allows for workstealing and prevents the overprocessing of a single message. `xdel` deletes the message from the
queue altogether. Calling `xdel` is a good way of keeping _any_ further processing of a message and is a good thing
to do when operating a work queue. Once requirements for work have been fulfilled, the record may be removed to
ensure that it is not attempted again.

Sets of actions generally correlate to groups. If something is responsible for controlling a light switch, a group
may be responsible for the incoming messages. Consumers within these groups generally correlate to actors that may
perform the actions granted to the group. If there are 4 consumers within a single group, Consumer C might be able
to claim ownership of message X within the group and consumers A, B, and D will not. If message Y comes through,
consumers A, B, or D may be able to claim that message for themselves and perform work while consumer C consumes
message X. This helps coordinate work across computational nodes. If consumers A, B, C, and D all work on different
nodes, one consumer may claim work and consume its own resources without disrupting the others. If more work
the id will correspond with the numeric datetime of when the message was stored. The commands `xread` and `xreadgroup`
may be used to read new messages from a given message id. Starting at `"0"` allows you to read from the earliest
message onwards, `">"` allows you to only read new messages (ignoring all previous messages), and offering an id will
let you read every message that comes _after_ that id. `xread` will give you a message and `xreadgroup` will take
the message and reserve it for a single consumer within a consumer group. Only one consumer may hold a message
within a group at any given time. Multiple groups may read and claim messages at the same time. `xclaim` allows one
consumer in a group to claim ownership of a message from another consumer within the group and `xack` tells the group
that it is done processing the message, releases ownership, and the group will not read the message again. This
allows for workstealing and prevents the overprocessing of a single message. `xdel` deletes the message from the
queue altogether. Calling `xdel` is a good way of keeping _any_ further processing of a message and is a good thing
to do when operating a work queue. Once requirements for work have been fulfilled, the record may be removed to
ensure that it is not attempted again.

Sets of actions generally correlate to groups. If something is responsible for controlling a light switch, a group
may be responsible for the incoming messages. Consumers within these groups generally correlate to actors that may
perform the actions granted to the group. If there are 4 consumers within a single group, Consumer C might be able
to claim ownership of message X within the group and consumers A, B, and D will not. If message Y comes through,
consumers A, B, or D may be able to claim that message for themselves and perform work while consumer C consumes
message X. This helps coordinate work across computational nodes. If consumers A, B, C, and D all work on different
nodes, one consumer may claim work and consume its own resources without disrupting the others. If more work
needs to be done simultaneously, more consumers may be added for horizontal scaling.

Messages are caught in the main thread, evaluations are run in child processes, and there is a second thread that
monitors running processes to determine when shared scope should be destroyed and when messages should be removed
from the stream. The `concurrent.futures` interface is used to track running evaluations, so the monitoring thread
Messages are caught in the main thread, evaluations are run in child processes, and there is a second thread that
monitors running processes to determine when shared scope should be destroyed and when messages should be removed
from the stream. The `concurrent.futures` interface is used to track running evaluations, so the monitoring thread
is able to poll the evaluations its track to see if processing within the child process has concluded.

#### How does this differ from PubSub?

The runner was originally implemented by subscribing to a Redis PubSub channel. A PubSub channel is a single stream
that clients may publish data to and other clients may subscribe to. Subscribers may 'listen' to the channel and
each subscriber will get the message in real time. If a subscriber misses a message, they will miss it and cannot
read it later. Imagine a PubSub channel is a TV channel. One or more entities may broadcast video through it and the
audience may do with what they receive as they like. The audience does not quite have the ability to respond to the
The runner was originally implemented by subscribing to a Redis PubSub channel. A PubSub channel is a single stream
that clients may publish data to and other clients may subscribe to. Subscribers may 'listen' to the channel and
each subscriber will get the message in real time. If a subscriber misses a message, they will miss it and cannot
read it later. Imagine a PubSub channel is a TV channel. One or more entities may broadcast video through it and the
audience may do with what they receive as they like. The audience does not quite have the ability to respond to the
broadcaster unless they themselves become a broadcaster and the original broadcasters become audience members.

PubSub works great for real time dissemination of data, but if used to coordinate work, like the runner previously,
_all_ subscribers will attempt to perform the same work. If there are four runners on four machines, each will
attempt to run the same evaluation on their own machine because they have no way of knowing that the others are
_all_ subscribers will attempt to perform the same work. If there are four runners on four machines, each will
attempt to run the same evaluation on their own machine because they have no way of knowing that the others are
performing the same work.

#### The Worker

The worker, found in `worker.py`, is the entity that _actually_ calls `dmod.evaluations` in order to perform
evaluation duties, _not the runner._ The runner will collect information from messages and call the runner in a
The worker, found in `worker.py`, is the entity that _actually_ calls `dmod.evaluations` in order to perform
evaluation duties, _not the runner._ The runner will collect information from messages and call the runner in a
child process with the received parameters.

The worker may be called via the command line to perform evaluations manually. If there is a need to script out
evaluations directly to `dmod.evaluations` and bypass the service entirely, the worker contains an excellent example
The worker may be called via the command line to perform evaluations manually. If there is a need to script out
evaluations directly to `dmod.evaluations` and bypass the service entirely, the worker contains an excellent example
of how to do so.

#### How does the runner listen?

`runner:listen` is called from `runner:main` with instructions on how to connect to redis and the limit of how many
jobs may be run at the same time.
`runner:listen` is called from `runner:main` with instructions on how to connect to redis and the limit of how many
jobs may be run at the same time.

`runner:listen`, called the listener from now on, will create a multiprocessed event to use a signal to stop
`runner:listen`, called the listener from now on, will create a multiprocessed event to use a signal to stop
listening and spawn a thread to poll a queue of actively running `Future`s that are evaluating data. Then listener
creates a consumer for the group performing listening duties, but create the group if it is not already present.
A counter used to track errors will be created. This tracker will identify faults and count the times that they occur.
Errors are identified by where in the code base that they are thrown. If the same error from the same locations are
encountered too many times in a short period the listener will exit since this indicates a core problem of the
encountered too many times in a short period the listener will exit since this indicates a core problem of the
application. The listener continues to listen for the same reason web servers continue to function after encountering a
500+ error. A portion causes an error but it isn't clear if it should throughly halt all operations or not. It make be
due to a user request (such as a bad configuration) or it may be a freak accident that is never encountered again.
Expand Down Expand Up @@ -112,4 +112,4 @@ If a message comes through that the listener doesn't know what to do with, it is
acknowledged via `xack`. Maybe there is another group that reads from the same stream that may interact with said
message or maybe there's a consumer attached to a process in the same group that may handle it. The configuration
that will cause such messages to pass through are not ideal, but they will not interupt the performance of _this_
operation.
operation.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

EXCEPTION_LIMIT: typing.Final[int] = 10
"""
The maximum number of a specific type of error to catch before exiting. If an error occurs 11 times in rapid
succession and the limit is 10, the runner should stop. If it only occurs 9 times it could be the result of something
The maximum number of a specific type of error to catch before exiting. If an error occurs 11 times in rapid
succession and the limit is 10, the runner should stop. If it only occurs 9 times it could be the result of something
that this has no control over and may remain functional.
"""

Expand All @@ -50,7 +50,7 @@

ERROR_EXIT: typing.Final[int] = 1
"""
The exit code when the runner halts because of an error. 1 is used since that is generally associated with the catch
The exit code when the runner halts because of an error. 1 is used since that is generally associated with the catch
all error code.
"""

Expand Down Expand Up @@ -437,7 +437,7 @@ def launch_evaluation(

if isinstance(instructions, dict):
instructions = json.dumps(instructions, indent=4)

try:
# Build communicators that will communicate evaluation updates outside of the evaluation process
communicators: CommunicatorGroup = utilities.get_communicators(
Expand Down Expand Up @@ -564,10 +564,10 @@ def monitor_running_jobs(

encountered_errors = ErrorCounter(limit=EXCEPTION_LIMIT)
"""
A collection of errors that may bear repeats of of individual types of errors.
A collection of errors that may bear repeats of of individual types of errors.
Collected errors are only finally raised if and when they have occurred over a given amount of times
This ensures that the polling loop is not interrupted on rare remote errors yet still throws errors when stuck
This ensures that the polling loop is not interrupted on rare remote errors yet still throws errors when stuck
in an infinite loop of failing code
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ def get_runner_connection(
username=username or application_values.RUNNER_USERNAME,
password=password or application_values.RUNNER_PASSWORD,
db=db or application_values.RUNNER_DB,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@
RawRedisMessageStream = typing.List[typing.Union[bytes, typing.List[RawRedisMessage]]]
"""
How each collection of messages is represented when reading from a stream.
This is implemented via a list, but a tuple would be more appropriate. The first index is the stream name and the
This is implemented via a list, but a tuple would be more appropriate. The first index is the stream name and the
second index is the collection of messages read for it
"""

LATEST_MESSAGE: typing.Final[str] = ">"
"""
The `xreadgroup` function uses a dictionary of '{<stream name>: <previously read message id>}' to determine what next
The `xreadgroup` function uses a dictionary of '{<stream name>: <previously read message id>}' to determine what next
to read. Using '{<stream name>: ">"}' will tell it to get the next unread message for <stream name>
"""

BACKUP_CONSUMER_NAME: typing.Final[str] = "backup-consumer"
"""
The name of a stream group consumer that will take ownership of messages when a consumer closes before being able to
The name of a stream group consumer that will take ownership of messages when a consumer closes before being able to
acknowledge their work
"""


IDLE_TIMEOUT: typing.Final[int] = int(timedelta(hours=6, minutes=30).total_seconds()) * 1000
"""
The maximum amount of milliseconds a message is allowed to be idle in a consumer before it will be claimed by a
The maximum amount of milliseconds a message is allowed to be idle in a consumer before it will be claimed by a
backup consumer
"""

Expand Down Expand Up @@ -402,4 +402,4 @@ def release_messages(stream_parameters: StreamParameters, consumer_name: str):
messages_to_claim = [
message['message_id'].decode()
for message in message_retriever()
]
]

0 comments on commit 7905fe8

Please sign in to comment.