Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error closing application #54

Open
2 tasks done
cmartmos opened this issue Sep 27, 2023 · 7 comments
Open
2 tasks done

Error closing application #54

cmartmos opened this issue Sep 27, 2023 · 7 comments

Comments

@cmartmos
Copy link

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Mode.

Expected behavior

No error expected

Actual behavior

RuntimeWarning: coroutine 'sleep' was never awaited (mode/services.py:849)

Full traceback

...
[2023-09-27 14:23:25,835] [13671] [INFO] [^Worker]: Gathering service tasks... 
[2023-09-27 14:23:25,836] [13671] [INFO] [^Worker]: Gathering all futures... 
[2023-09-27 14:23:26,848] [13671] [INFO] [^---ProducerBuffer]: Terminating cancelled task: <coroutine object ProducerBuffer._handle_pending at 0x7fa8f5063610> 
[2023-09-27 14:23:26,851] [13671] [INFO] [^--Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_handler at 0x7fa8f50633e0> 
[2023-09-27 14:23:26,852] [13671] [INFO] [^--Monitor]: Terminating cancelled task: <coroutine object Monitor._sampler at 0x7fa8f5062dc0> 
[2023-09-27 14:23:26,852] [13671] [INFO] [^--Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_livelock_detector at 0x7fa8f4f74350> 
[2023-09-27 14:23:27,854] [13671] [INFO] [^Worker]: Closing event loop 
[2023-09-27 14:23:27,992] [13671] [INFO] [^---AIOKafkaConsumerThread]: Cancelled task <coroutine object ServiceThread._keepalive2 at 0x7fa8f4f744a0>: Event loop is closed 
/venv/proto/lib/python3.10/site-packages/mode/services.py:849: RuntimeWarning: coroutine 'sleep' was never awaited
  self.log.info("Cancelled task %r: %s", task, exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Versions

  • Python version 3.10
  • Operating system Ubuntu 20.04
@wbarnha
Copy link
Member

wbarnha commented Sep 27, 2023

Do you have any code for us so we can reproduce this?

@cmartmos
Copy link
Author

cmartmos commented Sep 27, 2023

The issue arises because of unexpected application crashes and then the event loop raises the given error while closing

@wbarnha
Copy link
Member

wbarnha commented Mar 19, 2024

Hard to say what happened here.

mode/mode/services.py

Lines 841 to 854 in 5260fa1

async def _execute_task(self, task: Awaitable) -> None:
try:
await task
except asyncio.CancelledError:
if not self.should_stop:
self._log_mundane("Terminating cancelled task: %r", task)
except RuntimeError as exc:
if "Event loop is closed" in str(exc):
self.log.info("Cancelled task %r: %s", task, exc)
else:
await self.crash(exc)
except BaseException as exc:
# the exception will be re-raised by the main thread.
await self.crash(exc)

Based on this log statement, I presume you're using Faust:

[2023-09-27 14:23:26,848] [13671] [INFO] [^---ProducerBuffer]: Terminating cancelled task: <coroutine object ProducerBuffer._handle_pending at 0x7fa8f5063610> 

I wonder if this is moreso a Faust bug than a Mode bug.

@wbarnha wbarnha transferred this issue from faust-streaming/mode Mar 19, 2024
@wbarnha wbarnha transferred this issue from faust-streaming/faust Mar 19, 2024
@wbarnha
Copy link
Member

wbarnha commented Mar 19, 2024

Oh, wait, it's not. Let me dig into it more.

@didimelli
Copy link

If this happens in tests, tests hang indefinitely. Here's the output running pytest and also capturing what is printed after a ctrl+c:

===================================== 1 passed, 57 deselected, 1 warning in 5.69s =====================================
/path/to/proj/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

@didimelli
Copy link

didimelli commented Aug 13, 2024

@wbarnha Very simple reproduction:

Versions:

❯ uv pip list | grep -E 'faust|mode'
faust-streaming    0.11.2
mode-streaming     0.4.1
❯ python --version
Python 3.9.2
❯ python -c "import sys; print(sys.version)"
3.9.2 (default, Mar 27 2021, 23:26:06) 
[Clang 11.1.0 ]
import faust

app = faust.App(
    "repro",
    broker="localhost:29092",
)


async def main():
    await app.start()
    print("start")
    raise ValueError("asd")


if __name__ == "__main__":
    import anyio

    anyio.run(main)

And that prints

❯ python repro.py
start
Traceback (most recent call last):
  File "/path/to/proj/repro.py", line 22, in <module>
    anyio.run(main)
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 74, in run
    return async_backend.run(func, args, {}, backend_options)
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2034, in run
    return runner.run(wrapper())
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 194, in run
    return self._loop.run_until_complete(task)
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/path/to/proj/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2022, in wrapper
    return await func(*args)
  File "/path/to/proj/repro.py", line 16, in main
    raise ValueError("asd")
ValueError: asd
/path/to/proj/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

Until the KeyboardInterrupt, the program hangs indefinitely.

EDIT Exact same happens with asyncio

import faust

app = faust.App(
    "repro",
    broker="localhost:29092",
)


async def main():
    await app.start()
    print("start")
    raise ValueError("asd")


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

which prints

❯ python repro.py
start
Traceback (most recent call last):
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 18, in <module>
    asyncio.run(main())
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 12, in main
    raise ValueError("asd")
ValueError: asd
/home/didi/do/aurora-asms-adapter-ionscv/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

@didimelli
Copy link

Debug logging
❯ python repro.py                            
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:asyncio:Using selector: EpollSelector
INFO:faust.app.base:[^App]: Starting...
INFO:faust.sensors.monitor:[^-Monitor]: Starting...
DEBUG:faust.sensors.monitor:[^-Monitor]: Started.
INFO:faust.transport.drivers.aiokafka:[^-Producer]: Starting...
INFO:faust.transport.producer:[^--ProducerBuffer]: Starting...
DEBUG:faust.transport.producer:[^--ProducerBuffer]: Started.
DEBUG:aiokafka.producer.producer:Starting the Kafka producer
DEBUG:aiokafka:Attempting to bootstrap via node at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:aiokafka.producer.producer:Kafka producer started
DEBUG:faust.transport.drivers.aiokafka:[^-Producer]: Started.
INFO:faust.web.cache.backends.base:[^-CacheBackend]: Starting...
DEBUG:faust.web.cache.backends.base:[^-CacheBackend]: Started.
INFO:faust.web.drivers.aiohttp:[^-Web]: Starting...
INFO:faust.web.drivers.aiohttp:[^--Server]: Starting...
DEBUG:faust.web.drivers.aiohttp:[^--Server]: Started.
DEBUG:faust.web.drivers.aiohttp:[^-Web]: Started.
INFO:faust.transport.drivers.aiokafka:[^-Consumer]: Starting...
DEBUG:mode.threads:[^--MethodQueue@0x7f8108bd6520]: Starting...
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318880 index=0]: Starting...
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318880 index=0]: Started.
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318fa0 index=1]: Starting...
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318fa0 index=1]: Started.
DEBUG:mode.threads:[^--MethodQueue@0x7f8108bd6520]: Started.
INFO:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Starting...
DEBUG:aiokafka:Attempting to bootstrap via node at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: MetadataResponse_v0(brokers=[(node_id=1001, host='localhost', port=29092)], topics=[...])
DEBUG:aiokafka.conn:Closing connection at localhost:29092
DEBUG:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Started.
DEBUG:mode.threads:[^---MethodQueue@0x7f8108318b80]: Starting...
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833caf0 index=0]: Starting...
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833caf0 index=0]: Started.
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833cb80 index=1]: Starting...
DEBUG:mode.threads:[^----MethodQueueWorker@0x7f810833cb80 index=1]: Started.
DEBUG:mode.threads:[^---MethodQueue@0x7f8108318b80]: Started.
DEBUG:faust.transport.drivers.aiokafka:[^-Consumer]: Started.
INFO:faust.assignor.leader_assignor:[^-LeaderAssignor]: Starting...
INFO:faust.transport.drivers.aiokafka:[^-Producer]: Creating topic 'repro-__assignor-__leader'
DEBUG:faust.transport.drivers.aiokafka:[^-Producer]: Topic 'repro-__assignor-__leader' exists, skipping creation.
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka:Sending metadata request MetadataRequest_v1(topics=NULL) to node 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: MetadataRequest_v1(topics=NULL)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: MetadataResponse_v1(brokers=[(node_id=1001, host='localhost', port=29092, rack=None)], controller_id=1001, topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
DEBUG:faust.assignor.leader_assignor:[^-LeaderAssignor]: Started.
INFO:faust.agents.replies:[^-ReplyConsumer]: Starting...
DEBUG:faust.agents.replies:[^-ReplyConsumer]: Started.
INFO:faust.agents.manager:[^-AgentManager]: Starting...
DEBUG:faust.agents.manager:[^-AgentManager]: Started.
INFO:faust.transport.conductor:[^--Conductor]: Starting...
DEBUG:faust.transport.conductor:[^--Conductor]: Started.
INFO:faust.tables.manager:[^-TableManager]: Starting...
INFO:faust.transport.conductor:[^--Conductor]: Waiting for agents to start...
INFO:faust.transport.conductor:[^--Conductor]: Waiting for tables to be registered...
DEBUG:mode.timers:Timer Monitor.sampler woke up - iteration=0 time_spent_sleeping=1.0019908730173483 drift=-0.001990873017348349 new_interval=0.9980091269826517 since_epoch=1.0020327200181782
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=0 time_spent_sleeping=1.0016229959437624 drift=-0.001622995943762362 new_interval=0.9983770040562376 since_epoch=1.0016567530110478
INFO:faust.tables.recovery:[^--Recovery]: Starting...
DEBUG:faust.tables.recovery:[^--Recovery]: Started.
DEBUG:faust.tables.manager:[^-TableManager]: Started.
DEBUG:faust.app.base:[^App]: Started.
INFO:faust.transport.drivers.aiokafka:[^-Producer]: Creating topic 'repro-__assignor-__leader'
DEBUG:faust.transport.drivers.aiokafka:[^-Producer]: Topic 'repro-__assignor-__leader' exists, skipping creation.
DEBUG:aiokafka:Sending metadata request MetadataRequest_v1(topics=NULL) to node 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 3: MetadataRequest_v1(topics=NULL)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 3: MetadataResponse_v1(brokers=[(node_id=1001, host='localhost', port=29092, rack=None)], controller_id=1001, topics=[...])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 17, groups: 0)
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'repro-__assignor-__leader'})
INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'repro-__assignor-__leader'}
DEBUG:aiokafka:Sending FindCoordinator request for key repro to broker 1001
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: FindCoordinatorRequest_v1(coordinator_key='repro', coordinator_type=0)
DEBUG:aiokafka:Sending metadata request MetadataRequest_v1(topics=['repro-__assignor-__leader']) to node 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 3: MetadataRequest_v1(topics=['repro-__assignor-__leader'])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=0, error_message='NONE', coordinator_id=1001, host='localhost', port=29092)
DEBUG:aiokafka:Received group coordinator response FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=0, error_message='NONE', coordinator_id=1001, host='localhost', port=29092)
DEBUG:aiokafka:Initiating connection to node 1001 at localhost:29092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 3: MetadataResponse_v1(brokers=[(node_id=1001, host='localhost', port=29092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='repro-__assignor-__leader', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])])])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[...])
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1001 for group repro
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group repro
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group repro
DEBUG:aiokafka.consumer.group_coordinator:Sending JoinGroup (JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])) to coordinator 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 2: JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 2: JoinGroupResponse_v5(throttle_time_ms=0, error_code=79, generation_id=-1, group_protocol='', leader_id='', member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', members=[])
DEBUG:aiokafka.consumer.group_coordinator:Sending JoinGroup (JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])) to coordinator 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 3: JoinGroupRequest_v5(group='repro', session_timeout=60000, rebalance_timeout=60000, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, protocol_type='consumer', group_protocols=[(protocol_name='faust', protocol_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 3: JoinGroupResponse_v5(throttle_time_ms=0, error_code=0, generation_id=1, group_protocol='faust', leader_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', members=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, member_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
DEBUG:aiokafka.consumer.group_coordinator:Join group response JoinGroupResponse_v5(throttle_time_ms=0, error_code=0, generation_id=1, group_protocol='faust', leader_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', members=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, member_metadata=b'\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x9a{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75...')])
INFO:aiokafka.consumer.group_coordinator:Joined group 'repro' (generation 1) with member_id faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using faust
DEBUG:aiokafka.consumer.group_coordinator:Performing assignment for group repro using strategy faust with subscriptions {'faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f': ConsumerProtocolMemberMetadata(version=4, subscription=['repro-__assignor-__leader'], user_data=b'{"assignment":{"actives":{},"standbys":{}},"url":"http://ditl75p9gk3:6066","changelog_distribution":...')}
DEBUG:aiokafka.consumer.group_coordinator:Finished assignment for group repro: {'faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f': ConsumerProtocolMemberAssignment(version=4, assignment=[(topic='repro-__assignor-__leader', partitions=[0])], user_data=b"x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98\xb7\xc9\x988\x9e\xb4\xde\x81\xc8uz4\x16\x19\xd4\xbd\x1bJ\x03YL\xb0\xe3\xe7\x10\x1eJ\x85\x17\xf6\xa0`\x16I\xaam-\x89\xbf^\xd2\xcd=\xce\xaa...")}
DEBUG:aiokafka.consumer.group_coordinator:Sending leader SyncGroup for group repro to coordinator 1001: SyncGroupRequest_v3(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, group_assignment=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_metadata=b"\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x98x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98...")])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 4: SyncGroupRequest_v3(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', group_instance_id=None, group_assignment=[(member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f', member_metadata=b"\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x98x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98...")])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 4: SyncGroupResponse_v3(throttle_time_ms=0, error_code=0, member_assignment=b"\x00\x04\x00\x00\x00\x01\x00\x19repro-__assignor-__leader\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x98x\x9c\x95\x8fA\n\xc20\x10E\xef2\xebJ\x0bb\xc5\\EJH\x9b!\x1d\x8cI\x98LE)\xb9\xbb)]Kq\xf7\x17\xef?x+\x98\x9c\xc9\x85'\x06\x01\xb5\x82\x99\x84^\x98...")
INFO:aiokafka.consumer.group_coordinator:Successfully synced group repro with generation 1
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='repro-__assignor-__leader', partition=0)} for group repro
DEBUG:aiokafka.consumer.fetcher:Updating fetch positions for partitions [TopicPartition(topic='repro-__assignor-__leader', partition=0)]
DEBUG:aiokafka.consumer.group_coordinator:Fetching committed offsets for partitions: [TopicPartition(topic='repro-__assignor-__leader', partition=0)]
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 5: OffsetFetchRequest_v1(consumer_group='repro', topics=[(topic='repro-__assignor-__leader', partitions=[0])])
INFO:faust.app.base:Executing _on_partitions_assigned
INFO:faust.tables.recovery:generation id 1 app consumers id 1
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 5: OffsetFetchResponse_v1(topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=-1, metadata='', error_code=0)])])
INFO:faust.tables.recovery:[^--Recovery]: Seek stream partitions to committed offsets.
DEBUG:aiokafka.consumer.group_coordinator:No committed offset for partition TopicPartition(topic='repro-__assignor-__leader', partition=0)
DEBUG:aiokafka.consumer.fetcher:No committed offset found for TopicPartition(topic='repro-__assignor-__leader', partition=0)
DEBUG:aiokafka.consumer.fetcher:Resetting offset for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) using earliest strategy.
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 4: OffsetRequest_v1(replica_id=-1, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, timestamp=-2)])])
DEBUG:aiokafka.consumer.group_coordinator:Fetching committed offsets for partitions: [TopicPartition(topic='repro-__assignor-__leader', partition=0)]
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 6: OffsetFetchRequest_v1(consumer_group='repro', topics=[(topic='repro-__assignor-__leader', partitions=[0])])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 4: OffsetResponse_v1(topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, timestamp=-1, offset=0)])])
DEBUG:aiokafka.consumer.fetcher:Handling ListOffsetResponse response for TopicPartition(topic='repro-__assignor-__leader', partition=0). Fetched offset 0, timestamp -1
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 6: OffsetFetchResponse_v1(topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=-1, metadata='', error_code=0)])])
DEBUG:aiokafka.consumer.group_coordinator:No committed offset for partition TopicPartition(topic='repro-__assignor-__leader', partition=0)
DEBUG:aiokafka.consumer.consumer:Seeking to committed of partition TopicPartition(topic='repro-__assignor-__leader', partition=0) None
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 5: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
INFO:faust.tables.recovery:[^--Recovery]: Resuming flow...
INFO:faust.transport.consumer:[^--Fetcher]: Starting...
DEBUG:faust.transport.consumer:[^--Fetcher]: Started.
INFO:faust.tables.recovery:[^--Recovery]: Worker ready
start
INFO:faust.tables.recovery:[^--Recovery]: Terminating cancelled task: <coroutine object Recovery._restart_recovery at 0x7f81082dae40>
INFO:faust.transport.drivers.aiokafka:[^-Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_handler at 0x7f8108314440>
INFO:faust.transport.producer:[^--ProducerBuffer]: Terminating cancelled task: <coroutine object ProducerBuffer._handle_pending at 0x7f8108bf60c0>
INFO:faust.transport.drivers.aiokafka:[^-Consumer]: Terminating cancelled task: <coroutine object Consumer._commit_livelock_detector at 0x7f8108bbfcc0>
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318880 index=0]: Terminating cancelled task: <coroutine object MethodQueueWorker._method_queue_do_work at 0x7f8108314b40>
DEBUG:mode.threads:[^---MethodQueueWorker@0x7f8108318fa0 index=1]: Terminating cancelled task: <coroutine object MethodQueueWorker._method_queue_do_work at 0x7f8108314cc0>
INFO:faust.transport.conductor:[^--Conductor]: Terminating cancelled task: <coroutine object Conductor._subscriber at 0x7f8108314540>
INFO:faust.sensors.monitor:[^-Monitor]: Terminating cancelled task: <coroutine object Monitor._sampler at 0x7f8108bbfbc0>
INFO:faust.tables.recovery:[^--Recovery]: Terminating cancelled task: <coroutine object Recovery._slurp_changelogs at 0x7f81082daf40>
INFO:faust.tables.recovery:[^--Recovery]: Terminating cancelled task: <coroutine object Recovery._publish_stats at 0x7f81082dad40>
Traceback (most recent call last):
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 22, in <module>
    asyncio.run(main())
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/didi/do/aurora-asms-adapter-ionscv/repro.py", line 16, in main
    raise ValueError("asd")
ValueError: asd
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=1 time_spent_sleeping=1.0000067059881985 drift=-6.705988198518753e-06 new_interval=0.9999932940118015 since_epoch=2.001684717950411
INFO:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Cancelled task <coroutine object ServiceThread._keepalive2 at 0x7f8108314640>: Event loop is closed
/home/didi/do/aurora-asms-adapter-ionscv/.venv/lib/python3.9/site-packages/mode/services.py:881: RuntimeWarning: coroutine 'sleep' was never awaited
  await self.crash(exc)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 5: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 6: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=2 time_spent_sleeping=1.0014912839978933 drift=-0.0014912839978933334 new_interval=0.9985087160021067 since_epoch=3.0031919409520924
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=3 time_spent_sleeping=0.9997022351017222 drift=0.00029776489827781916 new_interval=1.0002977648982778 since_epoch=4.002909027971327
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 7: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 7: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 6: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 7: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=4 time_spent_sleeping=1.0018507899949327 drift=-0.0018507899949327111 new_interval=0.9981492100050673 since_epoch=5.0047739499714226
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 7: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 8: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=5 time_spent_sleeping=0.9995446259854361 drift=0.0004553740145638585 new_interval=1.0004553740145639 since_epoch=6.004337324993685
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=6 time_spent_sleeping=1.001647011958994 drift=-0.0016470119589939713 new_interval=0.998352988041006 since_epoch=7.006001248024404
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 8: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 8: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 8: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 9: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=7 time_spent_sleeping=0.9997663419926539 drift=0.00023365800734609365 new_interval=1.000233658007346 since_epoch=8.005789019051008
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 9: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 10: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=8 time_spent_sleeping=1.0010316009866074 drift=-0.0010316009866073728 new_interval=0.9989683990133926 since_epoch=9.00683572201524
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=9 time_spent_sleeping=1.000353861018084 drift=-0.0003538610180839896 new_interval=0.999646138981916 since_epoch=10.007212847005576
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 9: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 9: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 10: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 11: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=10 time_spent_sleeping=1.0006299020024016 drift=-0.00062990200240165 new_interval=0.9993700979975984 since_epoch=11.007861518999562
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 11: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 12: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=11 time_spent_sleeping=1.000315580982715 drift=-0.00031558098271489143 new_interval=0.9996844190172851 since_epoch=12.008191514993086
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=12 time_spent_sleeping=1.0013250630581751 drift=-0.0013250630581751466 new_interval=0.9986749369418249 since_epoch=13.009536664001644
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 10: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 10: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 12: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 13: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=13 time_spent_sleeping=1.0003391930367798 drift=-0.0003391930367797613 new_interval=0.9996608069632202 since_epoch=14.009901670971885
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 13: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 14: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=14 time_spent_sleeping=1.001379554043524 drift=-0.0013795540435239673 new_interval=0.998620445956476 since_epoch=15.011302259052172
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=15 time_spent_sleeping=0.9995917979395017 drift=0.0004082020604982972 new_interval=1.0004082020604983 since_epoch=16.01091179798823
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 11: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 11: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 14: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 15: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=16 time_spent_sleeping=1.0019008970120922 drift=-0.001900897012092173 new_interval=0.9980991029879078 since_epoch=17.01283338095527
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 15: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 16: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=17 time_spent_sleeping=0.9996139759896323 drift=0.0003860240103676915 new_interval=1.0003860240103677 since_epoch=18.012464510044083
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=18 time_spent_sleeping=1.0019195060012862 drift=-0.0019195060012862086 new_interval=0.9980804939987138 since_epoch=19.01440254598856
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 12: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 12: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 16: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 17: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=19 time_spent_sleeping=1.0002802079543471 drift=-0.00028020795434713364 new_interval=0.9997197920456529 since_epoch=20.014701541978866
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 17: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 18: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=20 time_spent_sleeping=1.0008563300361857 drift=-0.0008563300361856818 new_interval=0.9991436699638143 since_epoch=21.01557591301389
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=21 time_spent_sleeping=1.0008519539842382 drift=-0.0008519539842382073 new_interval=0.9991480460157618 since_epoch=22.01644936297089
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 13: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 13: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 18: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 19: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=22 time_spent_sleeping=1.000216327025555 drift=-0.00021632702555507421 new_interval=0.9997836729744449 since_epoch=23.016682844026946
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 19: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 20: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=23 time_spent_sleeping=1.0011810249416158 drift=-0.0011810249416157603 new_interval=0.9988189750583842 since_epoch=24.01789080305025
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=24 time_spent_sleeping=1.0004381309263408 drift=-0.0004381309263408184 new_interval=0.9995618690736592 since_epoch=25.018343757023104
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: repro[1] faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 14: HeartbeatRequest_v1(group='repro', generation_id=1, member_id='faust-0.11.2-2c80a6c0-19ce-419e-9b3c-50d3af087a5f')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 14: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
DEBUG:aiokafka.consumer.group_coordinator:Received successful heartbeat response for group repro
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Response 20: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='repro-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='repro-__assignor-__leader', partition=0) at offset 0
DEBUG:aiokafka.conn:<AIOKafkaConnection host=localhost port=29092> Request 21: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='repro-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
DEBUG:mode.timers:Timer _thread_keepalive-AIOKafkaConsumerThread woke up - iteration=25 time_spent_sleeping=1.0010486399987713 drift=-0.0010486399987712502 new_interval=0.9989513600012287 since_epoch=26.019406441948377
^CException ignored in: <module 'threading' from '/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/home/didi/.local/share/uv/python/cpython-3.9.2-linux-x86_64-gnu/lib/python3.9/threading.py", line 1428, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants