Skip to content

Commit

Permalink
enhance: support print iterator info(#2261) (#2262)
Browse files Browse the repository at this point in the history
related: #2261

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
  • Loading branch information
MrPresent-Han and MrPresent-Han authored Sep 23, 2024
1 parent 11d2fc0 commit da51ba1
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 21 deletions.
37 changes: 30 additions & 7 deletions examples/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
FieldSchema, CollectionSchema, DataType,
Collection,
)
import logging

HOST = "localhost"
PORT = "19530"
Expand All @@ -21,6 +22,28 @@
DIM = 8
CLEAR_EXIST = False

# Create a logger for the main script
log = logging.getLogger(__name__)
log.setLevel(logging.INFO) # Set the log level to INFO

# Create a console handler and set its level to INFO
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# Create a formatter for the console output
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# Add the formatter to the handler
console_handler.setFormatter(formatter)

# Add the handler to the logger (this will apply globally)
log.addHandler(console_handler)

# Now, configure the root logger to apply to the entire app (including your package)
logging.getLogger().setLevel(logging.INFO) # Set the root logger level to INFO
logging.getLogger().addHandler(console_handler) # Attach the handler to the root logger



def re_create_collection(skip_data_period: bool):
if not skip_data_period:
Expand Down Expand Up @@ -95,7 +118,7 @@ def query_iterate_collection_no_offset(collection):

query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE],
offset=0, batch_size=5, consistency_level=CONSISTENCY_LEVEL,
reduce_stop_for_best="false")
reduce_stop_for_best="false", print_iterator_cursor=True)
no_best_ids: set = set({})
page_idx = 0
while True:
Expand All @@ -113,7 +136,7 @@ def query_iterate_collection_no_offset(collection):
print("best---------------------------")
query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE],
offset=0, batch_size=5, consistency_level=CONSISTENCY_LEVEL,
reduce_stop_for_best="true")
reduce_stop_for_best="true", print_iterator_cursor=True)
best_ids: set = set({})
page_idx = 0
while True:
Expand All @@ -136,7 +159,7 @@ def query_iterate_collection_no_offset(collection):
def query_iterate_collection_with_offset(collection):
expr = f"10 <= {AGE} <= 14"
query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE],
offset=10, batch_size=50, consistency_level=CONSISTENCY_LEVEL)
offset=10, batch_size=50, consistency_level=CONSISTENCY_LEVEL, print_iterator_cursor=True)
page_idx = 0
while True:
res = query_iterator.next()
Expand All @@ -153,7 +176,7 @@ def query_iterate_collection_with_offset(collection):
def query_iterate_collection_with_limit(collection):
expr = f"10 <= {AGE} <= 44"
query_iterator = collection.query_iterator(expr=expr, output_fields=[USER_ID, AGE],
batch_size=80, limit=530, consistency_level=CONSISTENCY_LEVEL)
batch_size=80, limit=530, consistency_level=CONSISTENCY_LEVEL, print_iterator_cursor=True)
page_idx = 0
while True:
res = query_iterator.next()
Expand All @@ -177,7 +200,7 @@ def search_iterator_collection(collection):
"params": {"nprobe": 10, "radius": 1.0},
}
search_iterator = collection.search_iterator(vectors_to_search, PICTURE, search_params, batch_size=500,
output_fields=[USER_ID])
output_fields=[USER_ID], print_iterator_cursor=True)
page_idx = 0
while True:
res = search_iterator.next()
Expand All @@ -201,7 +224,7 @@ def search_iterator_collection_with_limit(collection):
"params": {"nprobe": 10, "radius": 1.0},
}
search_iterator = collection.search_iterator(vectors_to_search, PICTURE, search_params, batch_size=200, limit=755,
output_fields=[USER_ID])
output_fields=[USER_ID], print_iterator_cursor=True)
page_idx = 0
while True:
res = search_iterator.next()
Expand All @@ -216,7 +239,7 @@ def search_iterator_collection_with_limit(collection):


def main():
skip_data_period = False
skip_data_period = True
connections.connect("default", host=HOST, port=PORT)
collection = re_create_collection(skip_data_period)
if not skip_data_period:
Expand Down
46 changes: 42 additions & 4 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,10 +810,42 @@ def hybrid_search_request_with_ranker(
]
)

group_scorer = kwargs.get(RANK_GROUP_SCORER, "max")
request.rank_params.extend(
[common_types.KeyValuePair(key=RANK_GROUP_SCORER, value=group_scorer)]
)
if kwargs.get(RANK_GROUP_SCORER, None) is not None:
request.rank_params.extend(
[
common_types.KeyValuePair(
key=RANK_GROUP_SCORER, value=kwargs.get(RANK_GROUP_SCORER)
)
]
)

if kwargs.get(GROUP_BY_FIELD, None) is not None:
request.rank_params.extend(
[
common_types.KeyValuePair(
key=GROUP_BY_FIELD, value=utils.dumps(kwargs.get(GROUP_BY_FIELD))
)
]
)

if kwargs.get(GROUP_SIZE, None) is not None:
request.rank_params.extend(
[
common_types.KeyValuePair(
key=GROUP_SIZE, value=utils.dumps(kwargs.get(GROUP_SIZE))
)
]
)

if kwargs.get(GROUP_STRICT_SIZE, None) is not None:
request.rank_params.extend(
[
common_types.KeyValuePair(
key=GROUP_STRICT_SIZE, value=utils.dumps(kwargs.get(GROUP_STRICT_SIZE))
)
]
)

return request

@classmethod
Expand Down Expand Up @@ -1027,6 +1059,12 @@ def query_request(

ignore_growing = kwargs.get("ignore_growing", False)
stop_reduce_for_best = kwargs.get(REDUCE_STOP_FOR_BEST, False)
is_iterator = kwargs.get(ITERATOR_FIELD)
if is_iterator is not None:
req.query_params.append(
common_types.KeyValuePair(key=ITERATOR_FIELD, value=is_iterator)
)

req.query_params.append(
common_types.KeyValuePair(key="ignore_growing", value=str(ignore_growing))
)
Expand Down
1 change: 1 addition & 0 deletions pymilvus/orm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
IS_PRIMARY = "is_primary"
REDUCE_STOP_FOR_BEST = "reduce_stop_for_best"
ITERATOR_FIELD = "iterator"
PRINT_ITERATOR_CURSOR = "print_iterator_cursor"
DEFAULT_MAX_L2_DISTANCE = 99999999.0
DEFAULT_MIN_IP_DISTANCE = -99999999.0
DEFAULT_MAX_HAMMING_DISTANCE = 99999999.0
Expand Down
28 changes: 18 additions & 10 deletions pymilvus/orm/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
MILVUS_LIMIT,
OFFSET,
PARAMS,
PRINT_ITERATOR_CURSOR,
RADIUS,
RANGE_FILTER,
REDUCE_STOP_FOR_BEST,
Expand All @@ -40,10 +41,12 @@
from .types import DataType

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.ERROR)
LOGGER.setLevel(logging.INFO)
QueryIterator = TypeVar("QueryIterator")
SearchIterator = TypeVar("SearchIterator")

log = logging.getLogger(__name__)


def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: bool) -> int:
extend_rate = 1
Expand All @@ -56,6 +59,10 @@ def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: b
return min(MAX_BATCH_SIZE, batch_size * extend_rate)


def check_set_flag(obj: Any, flag_name: str, kwargs: Dict[str, Any], key: str):
setattr(obj, flag_name, kwargs.get(key, False))


class QueryIterator:
def __init__(
self,
Expand All @@ -77,19 +84,17 @@ def __init__(
self._schema = schema
self._timeout = timeout
self._kwargs = kwargs
self.__set_up_iteration_states()
self._kwargs[ITERATOR_FIELD] = "True"
self.__check_set_batch_size(batch_size)
self._limit = limit
self.__check_set_reduce_stop_for_best()
check_set_flag(self, "_print_iterator_cursor", self._kwargs, PRINT_ITERATOR_CURSOR)
self._returned_count = 0
self.__setup__pk_prop()
self.__set_up_expr(expr)
self.__seek()
self._cache_id_in_use = NO_CACHE_ID

def __set_up_iteration_states(self):
self._kwargs[ITERATOR_FIELD] = "True"

def __check_set_reduce_stop_for_best(self):
if self._kwargs.get(REDUCE_STOP_FOR_BEST, True):
self._kwargs[REDUCE_STOP_FOR_BEST] = "True"
Expand Down Expand Up @@ -156,6 +161,8 @@ def next(self):
else:
iterator_cache.release_cache(self._cache_id_in_use)
current_expr = self.__setup_next_expr()
if self._print_iterator_cursor:
log.info(f"query_iterator_next_expr:{current_expr}")
res = self._conn.query(
collection_name=self._collection_name,
expr=current_expr,
Expand Down Expand Up @@ -194,7 +201,7 @@ def __setup__pk_prop(self):
if self._pk_field_name is None or self._pk_field_name == "":
raise MilvusException(message="schema must contain pk field, broke")

def __setup_next_expr(self) -> None:
def __setup_next_expr(self) -> str:
current_expr = self._expr
if self._next_id is None:
return current_expr
Expand Down Expand Up @@ -320,7 +327,7 @@ def __init__(
self.__check_set_params(param)
self.__check_for_special_index_param()
self._kwargs = kwargs
self.__set_up_iteration_states()
self._kwargs[ITERATOR_FIELD] = "True"
self._filtered_ids = []
self._filtered_distance = None
self._schema = schema
Expand All @@ -330,6 +337,7 @@ def __init__(
self.__check_offset()
self.__check_rm_range_search_parameters()
self.__setup__pk_prop()
check_set_flag(self, "_print_iterator_cursor", self._kwargs, PRINT_ITERATOR_CURSOR)
self.__init_search_iterator()

def __init_search_iterator(self):
Expand All @@ -348,9 +356,6 @@ def __init_search_iterator(self):
self.__update_filtered_ids(init_page)
self._init_success = True

def __set_up_iteration_states(self):
self._kwargs[ITERATOR_FIELD] = "True"

def __update_width(self, page: SearchPage):
first_hit, last_hit = page[0], page[-1]
if metrics_positive_related(self._param[METRIC_TYPE]):
Expand Down Expand Up @@ -538,6 +543,8 @@ def __try_search_fill(self) -> SearchPage:
def __execute_next_search(
self, next_params: dict, next_expr: str, to_extend_batch: bool
) -> SearchPage:
if self._print_iterator_cursor:
log.info(f"search_iterator_next_expr:{next_expr}, next_params:{next_params}")
res = self._conn.search(
self._iterator_params["collection_name"],
self._iterator_params["data"],
Expand All @@ -552,6 +559,7 @@ def __execute_next_search(
schema=self._schema,
**self._kwargs,
)

return SearchPage(res[0])

# at present, the range_filter parameter means 'larger/less and equal',
Expand Down

0 comments on commit da51ba1

Please sign in to comment.