-
Notifications
You must be signed in to change notification settings - Fork 65
/
run_async.py
50 lines (37 loc) · 1.46 KB
/
run_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python -u
"""
This example will execute 10 queries in total, 2 concurrent queries at a time.
Each query will sleep for 2 seconds before returning.
Here's a sample output that shows that the queries are executed concurrently in batches of 2:
```
Completed query 1, elapsed ms since start: 2002
Completed query 0, elapsed ms since start: 2002
Completed query 3, elapsed ms since start: 4004
Completed query 2, elapsed ms since start: 4005
Completed query 4, elapsed ms since start: 6006
Completed query 5, elapsed ms since start: 6007
Completed query 6, elapsed ms since start: 8009
Completed query 7, elapsed ms since start: 8009
Completed query 9, elapsed ms since start: 10011
Completed query 8, elapsed ms since start: 10011
```
"""
import asyncio
from datetime import datetime
import clickhouse_connect
QUERIES = 10
SEMAPHORE = 2
async def concurrent_queries():
test_query = "SELECT sleep(2)"
client = await clickhouse_connect.get_async_client()
start = datetime.now()
async def semaphore_wrapper(sm: asyncio.Semaphore, num: int):
async with sm:
await client.query(query=test_query)
print(f"Completed query {num}, "
f"elapsed ms since start: {int((datetime.now() - start).total_seconds() * 1000)}")
semaphore = asyncio.Semaphore(SEMAPHORE)
await asyncio.gather(*[semaphore_wrapper(semaphore, num) for num in range(QUERIES)])
async def main():
await concurrent_queries()
asyncio.run(main())