diff --git a/.github/workflows/test_publish.yml b/.github/workflows/test_publish.yml new file mode 100644 index 0000000..d7222b8 --- /dev/null +++ b/.github/workflows/test_publish.yml @@ -0,0 +1,110 @@ +name: Test brainmaze + +on: [push, pull_request] + +jobs: + test: + runs-on: ${{ matrix.os }} + + strategy: + matrix: + include: + - os: ubuntu-latest + python-version: '3.9' + architecture: 'x64' + - os: ubuntu-latest + python-version: '3.10' + architecture: 'x64' + - os: ubuntu-latest + python-version: '3.11' + architecture: 'x64' + - os: ubuntu-latest + python-version: '3.12' + architecture: 'x64' + + - os: macos-latest + python-version: '3.11' + - os: macos-latest + python-version: '3.12' + + - os: windows-latest + python-version: '3.11' + - os: windows-latest + python-version: '3.12' + + steps: + - uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Checkout submodules + shell: bash + run: | + auth_header="$(git config --local --get http.https://github.com/.extraheader)" + git submodule sync --recursive + git -c "http.extraheader=$auth_header" -c protocol.version=2 submodule update --init --force --recursive --depth=1 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools + pip install numpy + pip install wheel + - name: Setup package + run: + python setup.py bdist_wheel sdist + + - name: Install package + run: + pip install . + + - name: Test package + run: + pytest + + release: + if: contains(github.ref, 'refs/tags/') + needs: test + + runs-on: ${{ matrix.os }} + + strategy: + matrix: + include: + - os: ubuntu-latest + python-version: '3.11' + docker_python_version: 'cp36-cp36m' + steps: + + - uses: actions/checkout@v2 + + - name: Checkout submodules + shell: bash + run: | + auth_header="$(git config --local --get http.https://github.com/.extraheader)" + git submodule sync --recursive + git -c "http.extraheader=$auth_header" -c protocol.version=2 submodule update --init --force --recursive --depth=1 + # Set up python after manylinux build otherwise it interferes with auditwheel in the image + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools + pip install twine + pip install wheel + - name: Setup package + run: + python setup.py bdist_wheel sdist + + - name: Publish package + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_Token_General }} + run: | + twine upload --skip-existing dist/* \ No newline at end of file diff --git a/brainmaze_zmq/__init__.py b/brainmaze_zmq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/brainmaze_zmq/abstract.py b/brainmaze_zmq/abstract.py new file mode 100644 index 0000000..a6f6ea1 --- /dev/null +++ b/brainmaze_zmq/abstract.py @@ -0,0 +1,243 @@ + +""" +Abstract base classes for handling ZeroMQ socket operations. +""" + +import abc +import threading +import zmq +from zmq.eventloop import ioloop, zmqstream + +from brainmaze_zmq.utils import ( + setup_request_socket, setup_reply_socket, setup_publisher_socket, + setup_subscriber_socket, setup_pull_socket, is_socket_alive +) + +class ABExitHandler(abc.ABC): + """ + Abstract base class for handling an exit signal via a ZeroMQ PULL socket. + """ + def __init__(self): + """ + Initialize the ABExitHandler with context and socket attributes. + """ + self.exit_socket: zmq.Socket = None + self.context: zmq.Context = None + + def setup_exit_handler(self, port: int): + """ + Set up a PULL socket to listen for an exit signal. + + Args: + port (int): The port to bind the PULL socket to. + """ + if self.context is None: + self.context = zmq.Context() + + self.exit_socket = setup_pull_socket(port, self.context) + stream_pull = zmqstream.ZMQStream(self.exit_socket) + stream_pull.on_recv(self.on_recv_exit) + + def on_recv_exit(self, msg): + """ + Handle receiving an exit signal. + + Args: + msg (list): The message received via the PULL socket. + """ + if msg[0] == b'Exit': + ioloop.IOLoop.instance().add_callback(self.stop) + + def kill_exit_handler(self): + """ + Close the exit socket and clean up resources. + """ + if self.exit_socket is not None: + self.exit_socket.close() + self.exit_socket = None + + @abc.abstractmethod + def stop(self): + """ + Abstract method to be implemented by subclasses to handle stopping operations. + """ + + +class ABReplyHandler(abc.ABC): + """ + Abstract base class for handling a REPLY mechanism via a ZeroMQ REP socket. + """ + def __init__(self): + """ + Initialize the ABReplyHandler with context and socket attributes + """ + self.reply_socket: zmq.Socket = None + self.context: zmq.Context = None + + def setup_reply_handler(self, port: int): + """ + Set up a REP socket to handle incoming requests. + + Args: + port (int): The port to bind the REP socket to. + """ + if self.context is None: + self.context = zmq.Context() + + self.reply_socket = setup_reply_socket(port, self.context) + stream_rep = zmqstream.ZMQStream(self.reply_socket) + stream_rep.on_recv(self.reply) + + def kill_reply_handler(self): + """ + Close the reply socket and clean up resources. + """ + if self.reply_socket is not None: + self.reply_socket.close() + self.reply_socket = None + + @abc.abstractmethod + def reply(self, msg): + """ + Abstract method to handle incoming messages and provide a reply. + + Args: + msg (list): The message received via the REP socket. + """ + + +class ABRequestHandler(abc.ABC): + + """ + Abstract base class for handling requests via a ZeroMQ REQ socket. + """ + def __init__(self): + """ + Initialize the ABRequestHandler with context and socket attributes. + """ + self.request_socket: zmq.Socket = None + self.request_port: int = None + self.context: zmq.Context = None + self.pending_requests: list = None + self.lock: threading.Lock = None + + def setup_request_handler(self, port: int): + """ + Set up a REQ socket to send requests. + + Args: + port (int): The port to connect the REQ socket to. + """ + self.request_port = port + if self.context is None: + self.context = zmq.Context() + + self.request_socket = setup_request_socket(self.request_port, self.context) + + def kill_request_handler(self): + """ + Close the request socket and clean up resources. + """ + if self.request_socket is not None: + if is_socket_alive(self.request_socket): + self.request_socket.close() + self.request_socket = None + + def request_reply(self, msg): + """ + Send a request and wait for a reply. + + Args: + msg (str): The request message to send. + + Returns: + Any: The reply received from the server. + """ + self.request_socket.send_string(msg) + return self.request_socket.recv_pyobj() # zmq.NOBLOCK + + +class ABPublisherHandler(abc.ABC): # done as send_multipart + """ + Abstract base class for publishing messages via a ZeroMQ PUB socket. + """ + def __init__(self): + """ + Initialize the ABPublisherHandler with context and socket attributes. + """ + self.publisher_socket: zmq.Socket = None + self.context: zmq.Context = None + + def setup_publisher_handler(self, port: int): + """ + Set up a PUB socket to publish messages. + + Args: + port (int): The port to bind the PUB socket to. + """ + if self.context is None: + self.context = zmq.Context() + + self.publisher_socket = setup_publisher_socket(port, self.context) + + def kill_publisher_handler(self): + """ + Close the publisher socket and clean up resources. + """ + if self.publisher_socket is not None: + self.publisher_socket.close() + self.publisher_socket = None + + def publish(self, topic: str, msg: tuple): + """ + Publish a message with a topic. + + Args: + topic (str): The topic for the message. + msg (tuple): The message to publish. + """ + self.publisher_socket.send_multipart((topic.encode(),) + msg) + + +class ABSubscriberHandler(abc.ABC): + """ + Abstract base class for subscribing to messages via a ZeroMQ SUB socket. + """ + def __init__(self): + """ + Initialize the ABSubscriberHandler with context and socket attributes. + """ + self.subscriber_socket: zmq.Socket = None + self.context: zmq.Context = None + + def setup_subscriber_handler(self, port: int, topic: str): + """ + Set up a SUB socket to subscribe to messages on a topic. + + Args: + port (int): The port to connect the SUB socket to. + topic (str): The topic to subscribe to. + """ + if self.context is None: + self.context = zmq.Context() + + self.subscriber_socket = setup_subscriber_socket(port, topic, self.context) + stream_sub = zmqstream.ZMQStream(self.subscriber_socket) + stream_sub.on_recv(self.on_recv_data) + + def kill_subscriber_handler(self): + """ + Close the subscriber socket and clean up resources. + """ + if self.subscriber_socket is not None: + self.subscriber_socket.close() + self.subscriber_socket = None + + @abc.abstractmethod + def on_recv_data(self, msg): + """ + Abstract method to handle incoming data. + + Args: + msg (list): The message received via the SUB socket. + """ diff --git a/brainmaze_zmq/utils.py b/brainmaze_zmq/utils.py new file mode 100644 index 0000000..87528bb --- /dev/null +++ b/brainmaze_zmq/utils.py @@ -0,0 +1,214 @@ + +""" +Socket utility functions for ZeroMQ and general networking operations. +""" + +import socket +import time +import zmq +from pythonping import ping + +def send_exit_signal(port, context=None): + """ + Sends an exit signal to a specific ZeroMQ port. + + Args: + port (int): Port to send the exit signal. + context (zmq.Context, optional): ZeroMQ context to use. + If not provided, a new one is created. + + Returns: + bool: Always returns False (placeholder for success indication). + """ + context_provided = context is not None + if context_provided is False: + context = zmq.Context() + + success = False + zmq_exit_socket = context.socket(zmq.PUSH) + zmq_exit_socket.setsockopt(zmq.LINGER, 0) + zmq_exit_socket.setsockopt(zmq.SNDTIMEO, 20) + zmq_exit_socket.connect( + f"tcp://localhost:{port}") # Connect to the process / thread + zmq_exit_socket.send(b'Exit') + + time.sleep(.1) + zmq_exit_socket.close() + time.sleep(.1) + + if context_provided is False: + context.term() + + time.sleep(.1) + return success + +def setup_pull_socket(port, context): + """ + Sets up a ZeroMQ PULL socket. + + Args: + port (int): Port to bind the socket to. + context (zmq.Context): ZeroMQ context to use. + + Returns: + zmq.Socket: Configured PULL socket. + """ + zmq_exit_socket = context.socket(zmq.PULL) + zmq_exit_socket.setsockopt(zmq.LINGER, 0) + zmq_exit_socket.bind(f"tcp://*:{port}") + time.sleep(.1) + return zmq_exit_socket + +def setup_push_socket(port, context): + """ + Sets up a ZeroMQ PUSH socket. + + Args: + port (int): Port to connect the socket to. + context (zmq.Context): ZeroMQ context to use. + + Returns: + zmq.Socket: Configured PUSH socket. + """ + zmq_exit_socket = context.socket(zmq.PUSH) + zmq_exit_socket.setsockopt(zmq.LINGER, 0) + zmq_exit_socket.setsockopt(zmq.SNDTIMEO, 100) + zmq_exit_socket.connect(f"tcp://localhost:{port}") + time.sleep(.1) + return zmq_exit_socket + +def setup_request_socket(port, context): + """ + Sets up a ZeroMQ REQ socket. + + Args: + port (int): Port to connect the socket to. + context (zmq.Context): ZeroMQ context to use. + + Returns: + zmq.Socket: Configured REQ socket. + """ + zmq_request_socket = context.socket(zmq.REQ) + zmq_request_socket.setsockopt(zmq.LINGER, 0) + zmq_request_socket.setsockopt(zmq.REQ_RELAXED, 1) + zmq_request_socket.setsockopt(zmq.REQ_CORRELATE, 1) + zmq_request_socket.setsockopt(zmq.RCVTIMEO, 200) + zmq_request_socket.setsockopt(zmq.SNDTIMEO, 200) + zmq_request_socket.connect(f"tcp://localhost:{port}") + time.sleep(.1) + return zmq_request_socket + +def setup_reply_socket(port, context): + """ + Sets up a ZeroMQ REP socket. + + Args: + port (int): Port to bind the socket to. + context (zmq.Context): ZeroMQ context to use. + + Returns: + zmq.Socket: Configured REP socket. + """ + zmq_reply_socket = context.socket(zmq.REP) + zmq_reply_socket.setsockopt(zmq.LINGER, 0) + zmq_reply_socket.setsockopt(zmq.RCVTIMEO, 200) + zmq_reply_socket.setsockopt(zmq.SNDTIMEO, 200) + zmq_reply_socket.bind(f"tcp://*:{port}") + time.sleep(.1) + return zmq_reply_socket + +def setup_publisher_socket(port, context): + """ + Sets up a ZeroMQ PUB socket. + + Args: + port (int): Port to bind the socket to. + context (zmq.Context): ZeroMQ context to use. + + Returns: + zmq.Socket: Configured PUB socket. + """ + zmq_publisher_socket = context.socket(zmq.PUB) + zmq_publisher_socket.setsockopt(zmq.LINGER, 0) + zmq_publisher_socket.bind(f"tcp://*:{port}") + time.sleep(.1) + return zmq_publisher_socket + +def setup_subscriber_socket(port, sub_topic, context): + """ + Sets up a ZeroMQ SUB socket. + + Args: + port (int): Port to connect the socket to. + sub_topic (str): Subscription topic filter. + context (zmq.Context): ZeroMQ context to use. + + Returns: + zmq.Socket: Configured SUB socket. + """ + zmq_subscriber_socket = context.socket(zmq.SUB) + zmq_subscriber_socket.setsockopt(zmq.LINGER, 0) + zmq_subscriber_socket.setsockopt_string(zmq.SUBSCRIBE, sub_topic) + zmq_subscriber_socket.connect(f"tcp://localhost:{port}") + time.sleep(.1) + return zmq_subscriber_socket + +def is_socket_alive(socket_instance: zmq.Socket): + """ + Checks if a ZeroMQ socket is alive and operational. + + Args: + zmq_socket (zmq.Socket): ZeroMQ socket to check. + + Returns: + bool: True if the socket is alive, False otherwise. + """ + if socket_instance is None: + return False + try: + # Get the value of the ZMQ_EVENTS socket option + events = socket_instance.getsockopt(zmq.EVENTS) + # Check if the socket is readable or writable + return (events & zmq.POLLIN) or (events & zmq.POLLOUT) + except zmq.ZMQError: + return False + +def ping_ip(ip_address: str): + """ + Pings an IP address to check connectivity. + + Args: + ip_address (str): IP address to ping. + + Returns: + bool: True if the ping is successful, False otherwise. + """ + response = ping(ip_address, count=1, verbose=True, timeout=0.5) + + if response.success(): + return True + + return False + +def check_port(ip_address: str, port: int): + """ + Checks if a port on a given IP address is open. + + Args: + ip_address (str): IP address to check. + port (int): Port number to check. + + Returns: + bool: True if the port is open, False otherwise. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) # Timeout in seconds + success = False + try: + result = sock.connect_ex((ip_address, port)) + if result == 0: + success = True + finally: + sock.close() + + return success diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dc7af3e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +zmq +pythonping +pytest \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e318e18 --- /dev/null +++ b/setup.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"Setup for brainmaze-zmq " + +from os import path + +from setuptools import find_packages, setup + +NAME='brainmaze-zmq' +DESCRIPTION='Utils for around zmq supporting multiprocess communication' +EMAIL='mivalt.filip@mayo.edu' +AUTHOR='Filip Mivalt' +VERSION='0.0.1' +REQUIRES_PYTHON = '>=3.9.0' +URL='' +PACKAGES = find_packages() +REQUIRED = [] + +# if requirements.txt exists, use it to populate the REQUIRED list +if path.exists('./requirements.txt'): + with open('./requirements.txt') as f: + REQUIRED = f.read().splitlines() + + +here = path.abspath(path.dirname(__file__)) + +print(f'Installing {NAME}') + +setup( + name=NAME, + version=VERSION, + description=DESCRIPTION, + author=AUTHOR, + author_email=EMAIL, + url=URL, + install_requires=REQUIRED, + packages=PACKAGES, + python_requires=REQUIRES_PYTHON, + include_package_data=True, + classifiers=[ + # https://pypi.python.org/pypi?%3Aaction=list_classifiers + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: Implementation :: CPython', + 'Operating System :: Microsoft :: Windows' + ], + +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..cbcd4f2 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,16 @@ + +import pytest + +from .constants import dummy_constant + +@pytest.fixture() +def constant_dummy_fixture(): + print(f"Running dummy_fixture with {dummy_constant}") + yield dummy_constant + print("Tearing down dummy_fixture") + +@pytest.fixture() +def string_test_dummy_fixture(): + print(f"Running string_test_dummy_fixture") + yield "This is a constant" + print("Tearing down string_test_dummy_fixture") \ No newline at end of file diff --git a/tests/constants.py b/tests/constants.py new file mode 100644 index 0000000..77ced27 --- /dev/null +++ b/tests/constants.py @@ -0,0 +1,5 @@ + +from typing import Final + +dummy_constant: Final = "This is a constant" + diff --git a/tests/test_dummy.py b/tests/test_dummy.py new file mode 100644 index 0000000..fc93ae1 --- /dev/null +++ b/tests/test_dummy.py @@ -0,0 +1,15 @@ + +from .conftest import constant_dummy_fixture, string_test_dummy_fixture + +from .constants import dummy_constant + + +def test_dummy_constant(constant_dummy_fixture: int): + print(f"Running test_dummy with {constant_dummy_fixture}") + assert constant_dummy_fixture == dummy_constant + +def test_dummy_string(string_test_dummy_fixture: str): + test_str = string_test_dummy_fixture + print(f"Running test_dummy with {test_str}") + assert test_str == "This is a constant" +