From b01d1383100afae2e8bf70c24c7bb299b3c6eed6 Mon Sep 17 00:00:00 2001 From: hh-space-invader Date: Tue, 12 Nov 2024 13:40:08 +0200 Subject: [PATCH] fix: Fix grpc timeout --- qdrant_client/connection.py | 32 ++++++++++++------------- qdrant_client/uploader/grpc_uploader.py | 3 +-- qdrant_client/uploader/rest_uploader.py | 2 +- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/qdrant_client/connection.py b/qdrant_client/connection.py index c153f51f..51d03eda 100644 --- a/qdrant_client/connection.py +++ b/qdrant_client/connection.py @@ -1,3 +1,4 @@ +import json import asyncio import collections from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union @@ -200,26 +201,23 @@ async def intercept_call( def parse_channel_options(options: Optional[Dict[str, Any]] = None) -> List[Tuple[str, Any]]: - """ - Client Server - | | - |------ RPC Call -------->| (Initial attempt) - | | (Server not responding) - |<-- 20s timeout reached --| (keepalive_timeout_ms) - | | - |---- wait 10s ------> | (min/max_reconnect_backoff_ms) - | | - |------ RPC Call -------->| (Retry attempt) - | | - |-- Keepalive ping (10s) ->| (keepalive_time_ms) - """ default_options: List[Tuple[str, Any]] = [ ("grpc.max_send_message_length", -1), ("grpc.max_receive_message_length", -1), - ("grpc.keepalive_timeout_ms", 20000), - ("grpc.keepalive_time_ms", 10000), - ("grpc.min_reconnect_backoff_ms", 10000), - ("grpc.max_reconnect_backoff_ms", 10000), + ("grpc.enable_retries", 1), + ( + "grpc.service_config", + json.dumps( + { + "methodConfig": [ + { + "name": [{}], + "timeout": "10s", # TODO: propagate timeout + }, + ] + } + ), + ), ] if options is None: return default_options diff --git a/qdrant_client/uploader/grpc_uploader.py b/qdrant_client/uploader/grpc_uploader.py index a2fde3dd..6386b4f8 100644 --- a/qdrant_client/uploader/grpc_uploader.py +++ b/qdrant_client/uploader/grpc_uploader.py @@ -1,4 +1,3 @@ -import time import logging from itertools import count from typing import Any, Generator, Iterable, Optional, Tuple, Union @@ -49,7 +48,7 @@ def upload_batch_grpc( break except Exception as e: logging.warning(f"Batch upload failed {attempt + 1} times. Retrying...") - time.sleep(5) + if attempt == max_retries - 1: raise e return True diff --git a/qdrant_client/uploader/rest_uploader.py b/qdrant_client/uploader/rest_uploader.py index 49af545e..a34ea4ca 100644 --- a/qdrant_client/uploader/rest_uploader.py +++ b/qdrant_client/uploader/rest_uploader.py @@ -69,7 +69,7 @@ def start( cls, collection_name: Optional[str] = None, uri: str = "http://localhost:6333", - max_retries: int = 3, + max_retries: int = 5, **kwargs: Any, ) -> "RestBatchUploader": if not collection_name: