A small library inspired by batchy, but using gevent greenlets instead of yield to transfer control.
For example:
from gbatchy import spawn, pget, batched, batch_context
@batched()
def batch_fn(arg_list):
print 'In batch function with args:'
results = []
for args, kwargs in arg_list:
print '\t', args[0]
results.append(args[0] + 1)
print 'Batch function done'
return results
@batch_context
def fetcher():
results = pget(batch_fn(i, as_future=True) for i in xrange(3))
print results
@batch_context
def test():
pget(spawn(fetcher) for i in xrange(2))
test()
would print
In batch function with args:
0
1
2
0
1
2
Batch function done
[1, 2, 3]
[1, 2, 3]
@batch_context
: Ensures that the function is running in a batch context (i.e. all concurrent calls to@batched
functions will be coalesced)spawn(fn, *args, **kwargs)
: start a new greenlet that will runfn(*args, **kwargs)
. This creates a batch context or uses the current one.spawn_proxy(fn, *args, **kwargs)
: same as spawn(), but returns a proxy type instead of a greenlet. This should help get rid of .get() around a lot of your code.@batched(accepts_kwargs=True)
and@class_batched()
: marks this function as a batch function. All batch functions take just one arg: args_list:[(args, kwargs), ...]
(or[args, ...]
ifaccepts_kwargs=False
)pget(iterable)
: a quick way to.get()
all the arguments passed.pmap(fn, iterable)
: same asmap(fn, iterable)
, except runs in parallel. Note: keyword arguments to pmap are passed through to fn for each element.pfilter(fn, iterable)
: same asfilter(fn, iterable)
except runs in parallel.Pool(size)
: same as gevent.pool.Pool - a way to limit the maximum concurrent amount of work.iwait(greenlets)
: same as gevent.iwait, but works with batch greenlets. Using gevent.iwait with batch greenlets is strongly discouraged and will lead to mysterious hangs.wait(greenlets, timeout, count)
: same as gevent.wait.immediate(v)
: returns anAsyncResult
-like object that is immediately ready andimmediate(v).get() is v == True
.immediate_exception(exc)
: same asimmediate
, but raisesexc
.with may_block()
: a low-level primitive when you need to use a gevent-native blocking call between calls to @batched functions (e.g. gevent.queue).transform(pending, fn)
: a somewhat low-level, but performant way to take anAsyncResult
-like object and runimmediate(fn(pending.get()))
. Note that fn must be pure - it cannot interact with greenlets. Any extra kwargs will be passed tofn
.