Skip to content

Commit

Permalink
feat: Add compact, get_server_version and flush api (#2326)
Browse files Browse the repository at this point in the history
issue: #2325

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
  • Loading branch information
czs007 authored Nov 6, 2024
1 parent 55800a6 commit 23ca4e3
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 0 deletions.
83 changes: 83 additions & 0 deletions examples/milvus_client/compact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import time
import numpy as np
from pymilvus import (
MilvusClient,
)

fmt = "\n=== {:30} ===\n"
dim = 8
collection_name = "hello_milvus"
milvus_client = MilvusClient("http://localhost:19530")

has_collection = milvus_client.has_collection(collection_name, timeout=5)
if has_collection:
milvus_client.drop_collection(collection_name)
milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2")

rng = np.random.default_rng(seed=19530)
rows = [
{"id": 1, "vector": rng.random((1, dim))[0], "a": 100},
{"id": 2, "vector": rng.random((1, dim))[0], "b": 200},
{"id": 3, "vector": rng.random((1, dim))[0], "c": 300},
{"id": 4, "vector": rng.random((1, dim))[0], "d": 400},
{"id": 5, "vector": rng.random((1, dim))[0], "e": 500},
{"id": 6, "vector": rng.random((1, dim))[0], "f": 600},
]

print(fmt.format("Start inserting entities"))
insert_result = milvus_client.insert(collection_name, rows)
print(fmt.format("Inserting entities done"))
print(insert_result)

upsert_ret = milvus_client.upsert(collection_name, {"id": 2 , "vector": rng.random((1, dim))[0], "g": 100})
print(upsert_ret)

print(fmt.format("Start flush"))
milvus_client.flush(collection_name)
print(fmt.format("flush done"))

result = milvus_client.query(collection_name, "", output_fields = ["count(*)"])
print(f"final entities in {collection_name} is {result[0]['count(*)']}")

rows = [
{"id": 7, "vector": rng.random((1, dim))[0], "g": 700},
{"id": 8, "vector": rng.random((1, dim))[0], "h": 800},
{"id": 9, "vector": rng.random((1, dim))[0], "i": 900},
{"id": 10, "vector": rng.random((1, dim))[0], "j": 1000},
{"id": 11, "vector": rng.random((1, dim))[0], "k": 1100},
{"id": 12, "vector": rng.random((1, dim))[0], "l": 1200},
]

print(fmt.format("Start inserting entities"))
insert_result = milvus_client.insert(collection_name, rows)
print(fmt.format("Inserting entities done"))
print(insert_result)

print(fmt.format("Start flush"))
milvus_client.flush(collection_name)
print(fmt.format("flush done"))

result = milvus_client.query(collection_name, "", output_fields = ["count(*)"])
print(f"final entities in {collection_name} is {result[0]['count(*)']}")

print(fmt.format("Start compact"))
job_id = milvus_client.compact(collection_name)
print(f"job_id:{job_id}")

cnt = 0
state = milvus_client.get_compaction_state(job_id)
while (state != "Completed" and cnt < 10):
time.sleep(1.0)
state = milvus_client.get_compaction_state(job_id)
print(f"compaction state: {state}")
cnt += 1

if state == "Completed":
print(fmt.format("compact done"))
else:
print(fmt.format("compact timeout"))

result = milvus_client.query(collection_name, "", output_fields = ["count(*)"])
print(f"final entities in {collection_name} is {result[0]['count(*)']}")

milvus_client.drop_collection(collection_name)
57 changes: 57 additions & 0 deletions examples/milvus_client/flush.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time
import numpy as np
from pymilvus import (
MilvusClient,
)

fmt = "\n=== {:30} ===\n"
dim = 8
collection_name = "hello_milvus"
milvus_client = MilvusClient("http://localhost:19530")

has_collection = milvus_client.has_collection(collection_name, timeout=5)
if has_collection:
milvus_client.drop_collection(collection_name)
milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2")

rng = np.random.default_rng(seed=19530)
rows = [
{"id": 1, "vector": rng.random((1, dim))[0], "a": 100},
{"id": 2, "vector": rng.random((1, dim))[0], "b": 200},
{"id": 3, "vector": rng.random((1, dim))[0], "c": 300},
{"id": 4, "vector": rng.random((1, dim))[0], "d": 400},
{"id": 5, "vector": rng.random((1, dim))[0], "e": 500},
{"id": 6, "vector": rng.random((1, dim))[0], "f": 600},
]

print(fmt.format("Start inserting entities"))
insert_result = milvus_client.insert(collection_name, rows)
print(fmt.format("Inserting entities done"))
print(insert_result)

upsert_ret = milvus_client.upsert(collection_name, {"id": 2 , "vector": rng.random((1, dim))[0], "g": 100})
print(upsert_ret)

print(fmt.format("Start flush"))
milvus_client.flush(collection_name)
print(fmt.format("flush done"))


result = milvus_client.query(collection_name, "", output_fields = ["count(*)"])
print(f"final entities in {collection_name} is {result[0]['count(*)']}")


print(f"start to delete by specifying filter in collection {collection_name}")
delete_result = milvus_client.delete(collection_name, ids=[6])
print(delete_result)


print(fmt.format("Start flush"))
milvus_client.flush(collection_name)
print(fmt.format("flush done"))


result = milvus_client.query(collection_name, "", output_fields = ["count(*)"])
print(f"final entities in {collection_name} is {result[0]['count(*)']}")

milvus_client.drop_collection(collection_name)
8 changes: 8 additions & 0 deletions examples/milvus_client/get_server_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from pymilvus import (
MilvusClient,
)

milvus_client = MilvusClient("http://localhost:19530")

version = milvus_client.get_server_version()
print(f"server version: {version}")
4 changes: 4 additions & 0 deletions pymilvus/client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ def __init__(
self.in_timeout = in_timeout
self.completed = completed

@property
def state_name(self):
return self.state.name

def __repr__(self) -> str:
return f"""
CompactionState
Expand Down
92 changes: 92 additions & 0 deletions pymilvus/milvus_client/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,3 +1059,95 @@ def drop_database(self, db_name: str, **kwargs):
def list_databases(self, **kwargs) -> List[str]:
conn = self._get_connection()
return conn.list_database(**kwargs)

def flush(
self,
collection_name: str,
timeout: Optional[float] = None,
**kwargs,
):
"""Seal all segments in the collection. Inserts after flushing will be written into
new segments.
Args:
collection_name(``string``): The name of collection.
timeout (float): an optional duration of time in seconds to allow for the RPCs.
If timeout is not set, the client keeps waiting until the server
responds or an error occurs.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
conn.flush([collection_name], timeout=timeout, **kwargs)

def compact(
self,
collection_name: str,
is_clustering: Optional[bool] = False,
timeout: Optional[float] = None,
**kwargs,
) -> int:
"""Compact merge the small segments in a collection
Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.
is_clustering (``bool``, optional): Option to trigger clustering compaction.
Raises:
MilvusException: If anything goes wrong.
Returns:
int: An integer represents the server's compaction job. You can use this job ID
for subsequent state inquiries.
"""
conn = self._get_connection()
return conn.compact(collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs)

def get_compaction_state(
self,
job_id: int,
timeout: Optional[float] = None,
**kwargs,
) -> str:
"""Get the state of compaction job
Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.
Raises:
MilvusException: If anything goes wrong.
Returns:
str: the state of this compaction job. Possible values are "UndefiedState", "Executing"
and "Completed".
"""
conn = self._get_connection()
result = conn.get_compaction_state(job_id, timeout=timeout, **kwargs)
return result.state_name

def get_server_version(
self,
timeout: Optional[float] = None,
**kwargs,
) -> str:
"""Get the running server's version
Args:
timeout (``float``, optional): A duration of time in seconds to allow for the RPC.
If timeout is set to None, the client keeps waiting until the server
responds or an error occurs.
Returns:
str: A string represent the server's version.
Raises:
MilvusException: If anything goes wrong
"""
conn = self._get_connection()
return conn.get_server_version(timeout=timeout, **kwargs)

0 comments on commit 23ca4e3

Please sign in to comment.