Skip to content

Commit

Permalink
Await values in execute_stream_async_iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Cito committed Aug 11, 2024
1 parent 9627bc5 commit fd9edfb
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
34 changes: 30 additions & 4 deletions src/graphql/execution/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,7 @@ async def execute_stream_async_iterator(
parent_context: IncrementalDataRecord | None = None,
) -> None:
"""Execute stream iterator."""
is_awaitable = self.is_awaitable
incremental_publisher = self.incremental_publisher
index = initial_index
previous_incremental_data_record = parent_context
Expand Down Expand Up @@ -1721,10 +1722,35 @@ async def execute_stream_async_iterator(
except StopAsyncIteration:
done = True

incremental_publisher.complete_stream_items_record(
incremental_data_record,
[completed_item],
)
if is_awaitable(completed_item):

async def await_completed_item(
incremental_data_record: StreamItemsRecord, completed_item: Any
) -> None:
try:
value = await completed_item
except GraphQLError as error:
incremental_publisher.add_field_error(
incremental_data_record, error
)
incremental_publisher.filter(path, incremental_data_record)
incremental_publisher.complete_stream_items_record(
incremental_data_record, None
)
else:
incremental_publisher.complete_stream_items_record(
incremental_data_record, [value]
)

self.add_task(
await_completed_item(incremental_data_record, completed_item)
)

else:
incremental_publisher.complete_stream_items_record(
incremental_data_record,
[completed_item],
)

if done:
break
Expand Down
10 changes: 10 additions & 0 deletions tests/execution/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,9 @@ async def friend_list(_info):
],
},
],
"hasNext": True,
},
{
"hasNext": False,
},
]
Expand Down Expand Up @@ -1848,6 +1851,9 @@ async def get_friends(_info):
"path": ["friendList", 2],
}
],
"hasNext": True,
},
{
"hasNext": False,
},
]
Expand Down Expand Up @@ -1927,6 +1933,10 @@ async def get_friends(_info):
"path": ["nestedObject", "nestedFriendList", 1],
},
],
"hasNext": True,
}
result5 = await anext(iterator)
assert result5.formatted == {
"hasNext": False,
}

Expand Down

0 comments on commit fd9edfb

Please sign in to comment.