Skip to content

Commit

Permalink
Remove gevent dep. (#72)
Browse files Browse the repository at this point in the history
* Remove gevent dep.

* Add scheduling test.

* Typo

* One more typo.

* Global scope for test var.

* Fix tests.
  • Loading branch information
maximelb authored Sep 23, 2022
1 parent 3350172 commit a590f10
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 67 deletions.
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
FROM python:3.8-slim

RUN pip install gevent

# Install base library.
ADD . /lc-service
WORKDIR /lc-service
Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ the CherryPy project for an HTTP server, and a Google Cloud Function compatible
server to deploy without containers or infrastructure. Writing transports is easy
so feel free to suggest new ones.

***Cloud Function Transport*** is currently having an issue with the initialization
and monkey patching of gevent in the Cloud Function environment.

## Using

The RI is structure so that all you have to do is inherit from the main Service class:
Expand Down
11 changes: 1 addition & 10 deletions lcservice/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Reference implementation for LimaCharlie.io services."""

__version__ = "1.8.7"
__version__ = "1.9.0"
__author__ = "Maxime Lamothe-Brassard ( Refraction Point, Inc )"
__author_email__ = "maxime@refractionpoint.com"
__license__ = "Apache v2"
Expand All @@ -9,12 +9,3 @@
from .service import Service # noqa: F401
from .service import InteractiveService # noqa: F401
from . import servers # noqa: F401

def enableGRPC():
'''Helper function to call to enable safe use of gRPC given the use of gevent
in the limacharlie SDK. See: https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/experimental/gevent.py
'''
import os
os.environ[ 'GRPC_DNS_RESOLVER' ] = 'native'
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
103 changes: 53 additions & 50 deletions lcservice/service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import limacharlie
from . import __version__ as lcservice_version
from .jobs import Job
import gevent
from gevent.lock import BoundedSemaphore
import gevent.pool
import gevent.util
import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError

import time
import hmac
Expand Down Expand Up @@ -42,10 +40,10 @@ def __init__( self, serviceName, originSecret, isTraceComms = False ):
self._serviceName = serviceName
self._originSecret = originSecret
self._startedAt = int( time.time() )
self._lock = BoundedSemaphore()
self._backgroundStopEvent = gevent.event.Event()
self._lock = threading.Lock()
self._backgroundStopEvent = threading.Event()
self._nCallsInProgress = 0
self._threads = gevent.pool.Group()
self._threads = []
self._detectSubscribed = set()
self._internalResources = {}
self._supportedRequestParameters = {}
Expand Down Expand Up @@ -393,12 +391,10 @@ def _managedThread( self, func, *args, **kw_args ):
# yet executed. This way we can schedule calls a long time in
# the future and only wait for it if it actually started executing.
try:
self._threads.add( gevent.util.getcurrent() )
self._threads.append( threading.current_thread() )
if self._backgroundStopEvent.wait( 0 ):
return
func( *args, **kw_args )
except gevent.GreenletExit:
raise
except:
self.logCritical( traceback.format_exc() )

Expand All @@ -423,17 +419,21 @@ def schedule( self, delay, func, *args, **kw_args ):
raise
finally:
if not self._backgroundStopEvent.wait( 0 ):
gevent.spawn_later( delay, self._schedule, delay, func, *args, **kw_args )
t = threading.Thread( target = self._schedule, args = ( delay, func, *args ), kwargs = kw_args )
self._threads.append( t )
t.start()

def _schedule( self, delay, func, *args, **kw_args ):
if not self._backgroundStopEvent.wait( 0 ):
if not self._backgroundStopEvent.wait( delay ):
try:
self._managedThread( func, *args, **kw_args )
except:
raise
finally:
if not self._backgroundStopEvent.wait( 0 ):
gevent.spawn_later( delay, self._schedule, delay, func, *args, **kw_args )
t = threading.Thread( target = self._schedule, args = ( delay, func, *args ), kwargs = kw_args )
self._threads.append( t )
t.start()

def delay( self, inDelay, func, *args, **kw_args ):
'''Delay the execution of a function.
Expand All @@ -449,22 +449,43 @@ def delay( self, inDelay, func, *args, **kw_args ):
:param args: positional arguments to the function
:param kw_args: keyword arguments to the function
'''
gevent.spawn_later( inDelay, self._managedThread, func, *args, **kw_args )
def _delayTread():
if self._backgroundStopEvent.wait( inDelay ):
return
self._managedThread( func, *args, **kw_args )
t = threading.Thread( target = _delayTread )
self._threads.append( t )
t.start()

def parallelExec( self, f, objects, timeout = None, maxConcurrent = None ):
'''Applies a function to N objects in parallel in up to maxConcurrent threads and waits to return the list results.
:param f: the function to apply
:param objects: the collection of objects to apply using f
:param timeouts: number of seconds to wait for results, or None for indefinitely
:param maxConcurrent: maximum number of concurrent tasks
'''Execute a function on a list of objects in parallel.
Args:
f (callable): function to apply to each object.
objects (iterable): list of objects to apply the function on.
timeout (int): maximum number of seconds to wait for collection of calls.
maxConcurrent (int): maximum number of function application to do concurrently.
Returns:
list of return values (or Exception if an exception occured).
'''

results = []
with ThreadPoolExecutor( max_workers=maxConcurrent ) as executor:
future = executor.map( lambda o: self._retExecOrExc( f, o, timeout ), objects, timeout = timeout )
results = future.result()
return list( results )

:returns: a list of return values from f(object), or Exception if one occured.
'''
def _retExecOrExc( self, f, o, timeout ):
try:
return f( o )
except ( Exception, TimeoutError ) as e:
return e

g = gevent.pool.Pool( size = maxConcurrent )
results = g.imap_unordered( lambda o: _retExecOrExc( f, o, timeout ), tuple( objects ) )
return list( results )
def _retExecOrExcWithKey( self, f, o, timeout ):
k, o = o
try:
return ( k, f( o ) )
except ( Exception, TimeoutError ) as e:
return e

def parallelExecEx( self, f, objects, timeout = None, maxConcurrent = None ):
'''Applies a function to N objects in parallel in up to maxConcurrent threads and waits to return the generated results.
Expand All @@ -477,8 +498,11 @@ def parallelExecEx( self, f, objects, timeout = None, maxConcurrent = None ):
:returns: a generator of tuples( key name, f(object) ), or Exception if one occured.
'''

g = gevent.pool.Pool( size = maxConcurrent )
return g.imap_unordered( lambda o: _retExecOrExcWithKey( f, o, timeout ), objects.items() )
results = []
with ThreadPoolExecutor( max_workers=maxConcurrent ) as executor:
future = executor.map( lambda o: self._retExecOrExcWithKey( f, o, timeout ), objects.items(), timeout = timeout )
results = future.result()
return list( results )

# LC Service Lifecycle Functions
def _onStartup( self ):
Expand All @@ -494,7 +518,8 @@ def onStartup( self ):

def _onShutdown( self ):
self._backgroundStopEvent.set()
self._threads.join( timeout = 30 )
for t in self._threads:
t.join( timeout = 30 )
self.onShutdown()

def onShutdown( self ):
Expand Down Expand Up @@ -659,28 +684,6 @@ def every30DayPerSensor( self, lc, oid, request ):
'''
return self.responseNotImplemented()

# Simple wrappers to enable clean parallel executions.
def _retExecOrExc( f, o, timeout ):
try:
if timeout is None:
return f( o )
else:
with gevent.Timeout( timeout ):
return f( o )
except ( Exception, gevent.Timeout ) as e:
return e

def _retExecOrExcWithKey( f, o, timeout ):
k, o = o
try:
if timeout is None:
return ( k, f( o ) )
else:
with gevent.Timeout( timeout ):
return ( k, f( o ) )
except ( Exception, gevent.Timeout ) as e:
return ( k, e )

class InteractiveService( Service ):
'''InteractiveService provide for asynchronous tasking of sensors.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup

__version__ = "1.8.7"
__version__ = "1.9.0"
__author__ = "Maxime Lamothe-Brassard ( Refraction Point, Inc )"
__author_email__ = "maxime@refractionpoint.com"
__license__ = "Apache v2"
Expand Down
28 changes: 27 additions & 1 deletion tests/test_lifecycle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import lcservice
import uuid
import time

TEST_SECRET = 'test-secret'

Expand Down Expand Up @@ -152,4 +153,29 @@ def onRequest( self, lc, oid, request ):
assert( resp[ 'success' ] )

if __name__ == '__main__':
test_create_service()
test_create_service()


n = 0
def test_schedules():
global n
svc = lcservice.Service( 'test-service', None )

def _inc():
global n
n += 1

svc.delay( 5, _inc )
assert( 0 == n )
time.sleep( 6 )

n = 0

svc.schedule( 2, _inc )
assert( 1 == n )
time.sleep( 2.1 )
assert( 2 == n )
time.sleep( 2.1 )
assert( 3 == n )

svc._onShutdown()

0 comments on commit a590f10

Please sign in to comment.