Skip to content

Commit

Permalink
Init commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
xmival00 committed Nov 22, 2024
1 parent f9ff70d commit 8c07fee
Show file tree
Hide file tree
Showing 6 changed files with 615 additions and 0 deletions.
110 changes: 110 additions & 0 deletions .github/workflows/test_publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
name: Test brainmaze

on: [push, pull_request]

jobs:
test:
runs-on: ${{ matrix.os }}

strategy:
matrix:
include:
- os: ubuntu-latest
python-version: '3.9'
architecture: 'x64'
- os: ubuntu-latest
python-version: '3.10'
architecture: 'x64'
- os: ubuntu-latest
python-version: '3.11'
architecture: 'x64'
- os: ubuntu-latest
python-version: '3.12'
architecture: 'x64'

- os: macos-latest
python-version: '3.11'
- os: macos-latest
python-version: '3.12'

- os: windows-latest
python-version: '3.11'
- os: windows-latest
python-version: '3.12'

steps:
- uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Checkout submodules
shell: bash
run: |
auth_header="$(git config --local --get http.https://github.com/.extraheader)"
git submodule sync --recursive
git -c "http.extraheader=$auth_header" -c protocol.version=2 submodule update --init --force --recursive --depth=1
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools
pip install numpy
pip install wheel
- name: Setup package
run:
python setup.py bdist_wheel sdist

- name: Install package
run:
pip install .

- name: Test package
run:
python -m unittest

release:
if: contains(github.ref, 'refs/tags/')
needs: test

runs-on: ${{ matrix.os }}

strategy:
matrix:
include:
- os: ubuntu-latest
python-version: '3.11'
docker_python_version: 'cp36-cp36m'
steps:

- uses: actions/checkout@v2

- name: Checkout submodules
shell: bash
run: |
auth_header="$(git config --local --get http.https://github.com/.extraheader)"
git submodule sync --recursive
git -c "http.extraheader=$auth_header" -c protocol.version=2 submodule update --init --force --recursive --depth=1
# Set up python after manylinux build otherwise it interferes with auditwheel in the image
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools
pip install twine
pip install wheel
- name: Setup package
run:
python setup.py bdist_wheel sdist

- name: Publish package
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_Token_General }}
run: |
twine upload --skip-existing dist/*
Empty file added brainmaze_zmq/__init__.py
Empty file.
243 changes: 243 additions & 0 deletions brainmaze_zmq/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@

"""
Abstract base classes for handling ZeroMQ socket operations.
"""

import abc
import threading
import zmq
from zmq.eventloop import ioloop, zmqstream

from brainmaze_zmq.utils import (
setup_request_socket, setup_reply_socket, setup_publisher_socket,
setup_subscriber_socket, setup_pull_socket, is_socket_alive
)

class ABExitHandler(abc.ABC):
"""
Abstract base class for handling an exit signal via a ZeroMQ PULL socket.
"""
def __init__(self):
"""
Initialize the ABExitHandler with context and socket attributes.
"""
self.exit_socket: zmq.Socket = None
self.context: zmq.Context = None

def setup_exit_handler(self, port: int):
"""
Set up a PULL socket to listen for an exit signal.
Args:
port (int): The port to bind the PULL socket to.
"""
if self.context is None:
self.context = zmq.Context()

self.exit_socket = setup_pull_socket(port, self.context)
stream_pull = zmqstream.ZMQStream(self.exit_socket)
stream_pull.on_recv(self.on_recv_exit)

def on_recv_exit(self, msg):
"""
Handle receiving an exit signal.
Args:
msg (list): The message received via the PULL socket.
"""
if msg[0] == b'Exit':
ioloop.IOLoop.instance().add_callback(self.stop)

def kill_exit_handler(self):
"""
Close the exit socket and clean up resources.
"""
if self.exit_socket is not None:
self.exit_socket.close()
self.exit_socket = None

@abc.abstractmethod
def stop(self):
"""
Abstract method to be implemented by subclasses to handle stopping operations.
"""


class ABReplyHandler(abc.ABC):
"""
Abstract base class for handling a REPLY mechanism via a ZeroMQ REP socket.
"""
def __init__(self):
"""
Initialize the ABReplyHandler with context and socket attributes
"""
self.reply_socket: zmq.Socket = None
self.context: zmq.Context = None

def setup_reply_handler(self, port: int):
"""
Set up a REP socket to handle incoming requests.
Args:
port (int): The port to bind the REP socket to.
"""
if self.context is None:
self.context = zmq.Context()

self.reply_socket = setup_reply_socket(port, self.context)
stream_rep = zmqstream.ZMQStream(self.reply_socket)
stream_rep.on_recv(self.reply)

def kill_reply_handler(self):
"""
Close the reply socket and clean up resources.
"""
if self.reply_socket is not None:
self.reply_socket.close()
self.reply_socket = None

@abc.abstractmethod
def reply(self, msg):
"""
Abstract method to handle incoming messages and provide a reply.
Args:
msg (list): The message received via the REP socket.
"""


class ABRequestHandler(abc.ABC):

"""
Abstract base class for handling requests via a ZeroMQ REQ socket.
"""
def __init__(self):
"""
Initialize the ABRequestHandler with context and socket attributes.
"""
self.request_socket: zmq.Socket = None
self.request_port: int = None
self.context: zmq.Context = None
self.pending_requests: list = None
self.lock: threading.Lock = None

def setup_request_handler(self, port: int):
"""
Set up a REQ socket to send requests.
Args:
port (int): The port to connect the REQ socket to.
"""
self.request_port = port
if self.context is None:
self.context = zmq.Context()

self.request_socket = setup_request_socket(self.request_port, self.context)

def kill_request_handler(self):
"""
Close the request socket and clean up resources.
"""
if self.request_socket is not None:
if is_socket_alive(self.request_socket):
self.request_socket.close()
self.request_socket = None

def request_reply(self, msg):
"""
Send a request and wait for a reply.
Args:
msg (str): The request message to send.
Returns:
Any: The reply received from the server.
"""
self.request_socket.send_string(msg)
return self.request_socket.recv_pyobj() # zmq.NOBLOCK


class ABPublisherHandler(abc.ABC): # done as send_multipart
"""
Abstract base class for publishing messages via a ZeroMQ PUB socket.
"""
def __init__(self):
"""
Initialize the ABPublisherHandler with context and socket attributes.
"""
self.publisher_socket: zmq.Socket = None
self.context: zmq.Context = None

def setup_publisher_handler(self, port: int):
"""
Set up a PUB socket to publish messages.
Args:
port (int): The port to bind the PUB socket to.
"""
if self.context is None:
self.context = zmq.Context()

self.publisher_socket = setup_publisher_socket(port, self.context)

def kill_publisher_handler(self):
"""
Close the publisher socket and clean up resources.
"""
if self.publisher_socket is not None:
self.publisher_socket.close()
self.publisher_socket = None

def publish(self, topic: str, msg: tuple):
"""
Publish a message with a topic.
Args:
topic (str): The topic for the message.
msg (tuple): The message to publish.
"""
self.publisher_socket.send_multipart((topic.encode(),) + msg)


class ABSubscriberHandler(abc.ABC):
"""
Abstract base class for subscribing to messages via a ZeroMQ SUB socket.
"""
def __init__(self):
"""
Initialize the ABSubscriberHandler with context and socket attributes.
"""
self.subscriber_socket: zmq.Socket = None
self.context: zmq.Context = None

def setup_subscriber_handler(self, port: int, topic: str):
"""
Set up a SUB socket to subscribe to messages on a topic.
Args:
port (int): The port to connect the SUB socket to.
topic (str): The topic to subscribe to.
"""
if self.context is None:
self.context = zmq.Context()

self.subscriber_socket = setup_subscriber_socket(port, topic, self.context)
stream_sub = zmqstream.ZMQStream(self.subscriber_socket)
stream_sub.on_recv(self.on_recv_data)

def kill_subscriber_handler(self):
"""
Close the subscriber socket and clean up resources.
"""
if self.subscriber_socket is not None:
self.subscriber_socket.close()
self.subscriber_socket = None

@abc.abstractmethod
def on_recv_data(self, msg):
"""
Abstract method to handle incoming data.
Args:
msg (list): The message received via the SUB socket.
"""
Loading

0 comments on commit 8c07fee

Please sign in to comment.