-
I don't see an equivalent of asyncio's Is the intention that tasks only return values via streams? For my use cases, streams seem very heavyweight - or am I missing something? It's quite likely that this is a documentation issue rather than a functionality issue, and it may even be that all that is needed is for beginners to be pointed at a "how to get started thinking in terms of async" document. But I'm struggling to find such a thing, and I feel like I'm ending up writing my own low-level utilities, when I expected async to let me focus more on the higher level tasks. |
Beta Was this translation helpful? Give feedback.
Replies: 14 comments 6 replies
-
That's reasonably simple to do. You start a taskgroup, create an event, and start each task with a wrapper that sets the event when its task finishes. The taskgroup's main code continues to wait on the event, then cancels the taskgroup. |
Beta Was this translation helpful? Give feedback.
-
But how do I get the task's return value? Also, can I set the event multiple times (once after each task completes)? The event docs say "events cannot be reused" which suggests not... |
Beta Was this translation helpful? Give feedback.
-
Why would you want to set the event multiple times? You want a return-one semantic which implies that you only need one result. Returning a result is easy enough: either use some common object to store the result in, or use a memory channel instead of an event. Memory channels aren't a good fit for one-off results, though, because they're more expensive to set up. |
Beta Was this translation helpful? Give feedback.
-
No, I wait on first completed in a loop, so I get each result in turn, as it's available. At least that's how I do it in asyncio.
That's a "producer/consumer" model, whereas I'm trying to write a "parallel map" model. See below for a bit more explanation, but I'm explicitly trying not to have to use producer/consumer because I've had problems making it modular enough for my requiremnents. I may well have another try, but that's not what I'm trying to do right now. I feel like this has drifted away from my original question. That's my fault, and I apologise. I'm definitely guilty here of the XY problem - asking about something that I think I need to do rather than about my actual task. But I genuinely do in this case, just want to know if it's possible to implement the specific function I'm thinking about, and whether that function is actually useful in solving my larget problem is something I'll consider separately. So to be precise, if I have a function
can I write, in anyio, a function¹
The constraints are:
I can write a function like this using asnycio (most of it is here, and I added Ctrl-C handling to that). My question is whether I can do so in anyio. I'm sort of also interested in whether I can do it in trio, as the trio model seems similar, but anyio is a more important question to me as it's backend-agnostic. I can almost certainly write my code in a producer/consumer style using memory channels, rather than using a "parallel map" style. But I tried that in an earlier iteration (using a thread pool and a queue) and it ended up being a nasty tangle of tightly coupled parts. So I'm experimenting with "parallel map" to see if that helps reduce complexity for me. So right now, the honest answer to "why don't you use channels and producer/consumer" is "because I want to see how the alternative works out"... Sorry if I wasn't clearer in my original question. It's hard to reduce my question to something sufficiently simple to ask, without omitting a whole bunch of half-formed concerns from previous failed approaches... ¹ On an unrelated note, I feel like there's not much of an "ecosystem" of async utility functions like this. Sort of the async equivalent of things like |
Beta Was this translation helpful? Give feedback.
-
I think that the parallel map function would still use a memory object stream. What you would need in addition is an async context manager, for managing the task group. How's this? from anyio import create_memory_object_stream, create_task_group, run, sleep
async def wait(delay):
await sleep(delay)
return f'task done: {delay}'
class TaskMapper:
def __init__(self, func, args):
self.func = func
self.args = args
self._completed = 0
async def _run_task(self, arg):
retval = await self.func(arg)
await self._send.send(retval)
self._completed += 1
if self._completed == len(self.args):
self._send.close()
async def __aenter__(self):
self._task_group = create_task_group()
await self._task_group.__aenter__()
self._send, self._receive = create_memory_object_stream()
for arg in self.args:
self._task_group.start_soon(self._run_task, arg)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
def __aiter__(self):
return self._receive.__aiter__()
async def main():
delays = [1, 3, 2]
async with TaskMapper(wait, delays) as mapper:
async for result in mapper:
print(result)
run(main) EDIT: Fixed the design. |
Beta Was this translation helpful? Give feedback.
-
I really shouldn't be writing code when I'm sleepy. I'll amend it to actually help with the requested task. |
Beta Was this translation helpful? Give feedback.
-
Ah, thank you. I see. That looks a lot more reasonable than the way I was trying to do it. So it looks like it's more about understanding the right way of thinking about the problem, rather than it being something that's "not supported". Thanks for pointing me in the right direction 🙂 |
Beta Was this translation helpful? Give feedback.
-
AnyIO's task group model was copied from trio, so by design the return values of the task functions are ignored. However, mapping multiple arguments to concurrent tasks seems to be something that is often requested. What do you think, @njsmith and @graingert ? Should AnyIO (and trio?) offer this out of the box? |
Beta Was this translation helpful? Give feedback.
-
aiostream.stream.map seems the nicest one of these. I think it would be better to try and finish the anyio port |
Beta Was this translation helpful? Give feedback.
-
One thing I'd like to add here - it may be off-topic but it's important for me, at least. My biggest issue with the problem I'm working on is that as I said, not creating tens of thousands of tasks is crucial to me, as is being able to cleanly interrupt. @agronholm's Both of these problems are fixable:
But if a built in map function was available, and it didn't address these two points, it wouldn't be much use to me. While I understand that my use case is extreme, I do think these are things that an "out of the box" map function should consider. Even if it's just to say that it's not intended for massive workloads, but building blocks (maybe a worker pool abstraction) are provided for people who need to roll their own. By the way - at this point, I should say that it's not critically important to me if any of this gets implemented in anyio. If you feel it's out of scope, I'm fine with that - my original question has been fully answered and I don't have any pressing need for a backend-agnostic solution. I may take a look at |
Beta Was this translation helpful? Give feedback.
-
Btw there's |
Beta Was this translation helpful? Give feedback.
-
Also rather than cancelling your task group you probably want to close your MemoryObjectStreams for a slightly more graceful shutdown |
Beta Was this translation helpful? Give feedback.
-
some psuedo code based on https://trio.readthedocs.io/en/stable/reference-core.html#managing-multiple-producers-and-or-multiple-consumers async def task(fn, source, dest):
with source, dest:
async for msg in chan:
v = await fn(msg)
try:
await dest.send(v)
except ClosedResourceError as e:
return
@contextlib.asynccontextmanager
async def amap(
fn: Callable[[T], Awaitable[U]],
source: anyio.abc.MemoryObjectRecieveStream[T],
tasks: int = 1,
) -> typing.AsyncGenerator[MemoryObjectRecieveStream[U]]:
async with anyio.create_task_group() as tg:
tx, rx = anyio.open_memory_object_stream()
with source, tx:
for _ in range(tasks):
tg.start_soon(task, fn, source.clone(), tx.clone())
with rx:
yield rx
async def afor_then_call(aiter: AsyncIterable[object], thunk: Callable[[], object]):
async with contextlib.aclosing(aiter):
async for v in aiter:
thunk()
return
@contextlib.contextmanager
def move_on_after_interrupt(
spawn_system_task: SpawnType,
sigs: Iterable[int] = (signal.SIGINT,),
delay: Optional[float] = None,
shield: bool = False,
):
with anyio.move_on_after(delay, shield) as cs, anyio.open_signal_reciever(
sigs
) as aiter:
spawn_system_task(afor_then_call, aitier, cs.cancel)
yield cs
async def amain(spawn_sytem_task):
async with amap(process, source.clone(), tasks=10) as results:
with move_on_after_interrupt(spawn_system_task):
async for result in results:
print(result) |
Beta Was this translation helpful? Give feedback.
-
@pfmoore I realised I forgot about https://github.com/florimondmanca/aiometer which already has this amap context manager |
Beta Was this translation helpful? Give feedback.
@pfmoore I realised I forgot about https://github.com/florimondmanca/aiometer which already has this amap context manager