From a590f1080cb299633435bb346b3b0bc5703a0aec Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Fri, 23 Sep 2022 13:23:38 -0500 Subject: [PATCH] Remove gevent dep. (#72) * Remove gevent dep. * Add scheduling test. * Typo * One more typo. * Global scope for test var. * Fix tests. --- Dockerfile | 2 - README.md | 3 -- lcservice/__init__.py | 11 +---- lcservice/service.py | 103 +++++++++++++++++++++------------------- setup.py | 2 +- tests/test_lifecycle.py | 28 ++++++++++- 6 files changed, 82 insertions(+), 67 deletions(-) diff --git a/Dockerfile b/Dockerfile index d88629d..b8e0963 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,5 @@ FROM python:3.8-slim -RUN pip install gevent - # Install base library. ADD . /lc-service WORKDIR /lc-service diff --git a/README.md b/README.md index cdc4972..62fed67 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,6 @@ the CherryPy project for an HTTP server, and a Google Cloud Function compatible server to deploy without containers or infrastructure. Writing transports is easy so feel free to suggest new ones. -***Cloud Function Transport*** is currently having an issue with the initialization -and monkey patching of gevent in the Cloud Function environment. - ## Using The RI is structure so that all you have to do is inherit from the main Service class: diff --git a/lcservice/__init__.py b/lcservice/__init__.py index 7de62b8..ded3de3 100644 --- a/lcservice/__init__.py +++ b/lcservice/__init__.py @@ -1,6 +1,6 @@ """Reference implementation for LimaCharlie.io services.""" -__version__ = "1.8.7" +__version__ = "1.9.0" __author__ = "Maxime Lamothe-Brassard ( Refraction Point, Inc )" __author_email__ = "maxime@refractionpoint.com" __license__ = "Apache v2" @@ -9,12 +9,3 @@ from .service import Service # noqa: F401 from .service import InteractiveService # noqa: F401 from . import servers # noqa: F401 - -def enableGRPC(): - '''Helper function to call to enable safe use of gRPC given the use of gevent - in the limacharlie SDK. See: https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/experimental/gevent.py - ''' - import os - os.environ[ 'GRPC_DNS_RESOLVER' ] = 'native' - import grpc.experimental.gevent as grpc_gevent - grpc_gevent.init_gevent() \ No newline at end of file diff --git a/lcservice/service.py b/lcservice/service.py index c55ca1c..83c9aae 100644 --- a/lcservice/service.py +++ b/lcservice/service.py @@ -1,10 +1,8 @@ import limacharlie from . import __version__ as lcservice_version from .jobs import Job -import gevent -from gevent.lock import BoundedSemaphore -import gevent.pool -import gevent.util +import threading +from concurrent.futures import ThreadPoolExecutor, TimeoutError import time import hmac @@ -42,10 +40,10 @@ def __init__( self, serviceName, originSecret, isTraceComms = False ): self._serviceName = serviceName self._originSecret = originSecret self._startedAt = int( time.time() ) - self._lock = BoundedSemaphore() - self._backgroundStopEvent = gevent.event.Event() + self._lock = threading.Lock() + self._backgroundStopEvent = threading.Event() self._nCallsInProgress = 0 - self._threads = gevent.pool.Group() + self._threads = [] self._detectSubscribed = set() self._internalResources = {} self._supportedRequestParameters = {} @@ -393,12 +391,10 @@ def _managedThread( self, func, *args, **kw_args ): # yet executed. This way we can schedule calls a long time in # the future and only wait for it if it actually started executing. try: - self._threads.add( gevent.util.getcurrent() ) + self._threads.append( threading.current_thread() ) if self._backgroundStopEvent.wait( 0 ): return func( *args, **kw_args ) - except gevent.GreenletExit: - raise except: self.logCritical( traceback.format_exc() ) @@ -423,17 +419,21 @@ def schedule( self, delay, func, *args, **kw_args ): raise finally: if not self._backgroundStopEvent.wait( 0 ): - gevent.spawn_later( delay, self._schedule, delay, func, *args, **kw_args ) + t = threading.Thread( target = self._schedule, args = ( delay, func, *args ), kwargs = kw_args ) + self._threads.append( t ) + t.start() def _schedule( self, delay, func, *args, **kw_args ): - if not self._backgroundStopEvent.wait( 0 ): + if not self._backgroundStopEvent.wait( delay ): try: self._managedThread( func, *args, **kw_args ) except: raise finally: if not self._backgroundStopEvent.wait( 0 ): - gevent.spawn_later( delay, self._schedule, delay, func, *args, **kw_args ) + t = threading.Thread( target = self._schedule, args = ( delay, func, *args ), kwargs = kw_args ) + self._threads.append( t ) + t.start() def delay( self, inDelay, func, *args, **kw_args ): '''Delay the execution of a function. @@ -449,22 +449,43 @@ def delay( self, inDelay, func, *args, **kw_args ): :param args: positional arguments to the function :param kw_args: keyword arguments to the function ''' - gevent.spawn_later( inDelay, self._managedThread, func, *args, **kw_args ) + def _delayTread(): + if self._backgroundStopEvent.wait( inDelay ): + return + self._managedThread( func, *args, **kw_args ) + t = threading.Thread( target = _delayTread ) + self._threads.append( t ) + t.start() def parallelExec( self, f, objects, timeout = None, maxConcurrent = None ): - '''Applies a function to N objects in parallel in up to maxConcurrent threads and waits to return the list results. - - :param f: the function to apply - :param objects: the collection of objects to apply using f - :param timeouts: number of seconds to wait for results, or None for indefinitely - :param maxConcurrent: maximum number of concurrent tasks + '''Execute a function on a list of objects in parallel. + Args: + f (callable): function to apply to each object. + objects (iterable): list of objects to apply the function on. + timeout (int): maximum number of seconds to wait for collection of calls. + maxConcurrent (int): maximum number of function application to do concurrently. + Returns: + list of return values (or Exception if an exception occured). + ''' + + results = [] + with ThreadPoolExecutor( max_workers=maxConcurrent ) as executor: + future = executor.map( lambda o: self._retExecOrExc( f, o, timeout ), objects, timeout = timeout ) + results = future.result() + return list( results ) - :returns: a list of return values from f(object), or Exception if one occured. - ''' + def _retExecOrExc( self, f, o, timeout ): + try: + return f( o ) + except ( Exception, TimeoutError ) as e: + return e - g = gevent.pool.Pool( size = maxConcurrent ) - results = g.imap_unordered( lambda o: _retExecOrExc( f, o, timeout ), tuple( objects ) ) - return list( results ) + def _retExecOrExcWithKey( self, f, o, timeout ): + k, o = o + try: + return ( k, f( o ) ) + except ( Exception, TimeoutError ) as e: + return e def parallelExecEx( self, f, objects, timeout = None, maxConcurrent = None ): '''Applies a function to N objects in parallel in up to maxConcurrent threads and waits to return the generated results. @@ -477,8 +498,11 @@ def parallelExecEx( self, f, objects, timeout = None, maxConcurrent = None ): :returns: a generator of tuples( key name, f(object) ), or Exception if one occured. ''' - g = gevent.pool.Pool( size = maxConcurrent ) - return g.imap_unordered( lambda o: _retExecOrExcWithKey( f, o, timeout ), objects.items() ) + results = [] + with ThreadPoolExecutor( max_workers=maxConcurrent ) as executor: + future = executor.map( lambda o: self._retExecOrExcWithKey( f, o, timeout ), objects.items(), timeout = timeout ) + results = future.result() + return list( results ) # LC Service Lifecycle Functions def _onStartup( self ): @@ -494,7 +518,8 @@ def onStartup( self ): def _onShutdown( self ): self._backgroundStopEvent.set() - self._threads.join( timeout = 30 ) + for t in self._threads: + t.join( timeout = 30 ) self.onShutdown() def onShutdown( self ): @@ -659,28 +684,6 @@ def every30DayPerSensor( self, lc, oid, request ): ''' return self.responseNotImplemented() -# Simple wrappers to enable clean parallel executions. -def _retExecOrExc( f, o, timeout ): - try: - if timeout is None: - return f( o ) - else: - with gevent.Timeout( timeout ): - return f( o ) - except ( Exception, gevent.Timeout ) as e: - return e - -def _retExecOrExcWithKey( f, o, timeout ): - k, o = o - try: - if timeout is None: - return ( k, f( o ) ) - else: - with gevent.Timeout( timeout ): - return ( k, f( o ) ) - except ( Exception, gevent.Timeout ) as e: - return ( k, e ) - class InteractiveService( Service ): '''InteractiveService provide for asynchronous tasking of sensors. diff --git a/setup.py b/setup.py index 26837c0..7ab5022 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup -__version__ = "1.8.7" +__version__ = "1.9.0" __author__ = "Maxime Lamothe-Brassard ( Refraction Point, Inc )" __author_email__ = "maxime@refractionpoint.com" __license__ = "Apache v2" diff --git a/tests/test_lifecycle.py b/tests/test_lifecycle.py index 917018e..22e4796 100644 --- a/tests/test_lifecycle.py +++ b/tests/test_lifecycle.py @@ -1,5 +1,6 @@ import lcservice import uuid +import time TEST_SECRET = 'test-secret' @@ -152,4 +153,29 @@ def onRequest( self, lc, oid, request ): assert( resp[ 'success' ] ) if __name__ == '__main__': - test_create_service() \ No newline at end of file + test_create_service() + + +n = 0 +def test_schedules(): + global n + svc = lcservice.Service( 'test-service', None ) + + def _inc(): + global n + n += 1 + + svc.delay( 5, _inc ) + assert( 0 == n ) + time.sleep( 6 ) + + n = 0 + + svc.schedule( 2, _inc ) + assert( 1 == n ) + time.sleep( 2.1 ) + assert( 2 == n ) + time.sleep( 2.1 ) + assert( 3 == n ) + + svc._onShutdown() \ No newline at end of file