forked from SciPhi-AI/R2R
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_client.py
103 lines (85 loc) · 3.23 KB
/
test_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import uuid
from r2r.client import R2RClient
# Initialize the client with the base URL of your API
base_url = "http://localhost:8000" # Change this to your actual API base URL
client = R2RClient(base_url)
print("Upserting entry to remote db...")
# Upsert a single entry
entry_response = client.add_entry(
str(uuid.uuid5(uuid.NAMESPACE_DNS, "doc 1")), # document_id
{"txt": "This is a test entry"},
{"tags": ["example", "test"]},
do_upsert=True,
)
print(f"Upsert entry response:\n{entry_response}\n\n")
entry_response = client.add_entry(
str(uuid.uuid5(uuid.NAMESPACE_DNS, "doc 1")), # document_id
{"txt": "This is a test entry"},
{"tags": ["example", "test"]},
do_upsert=False,
)
print(f"Copy same entry response:\n{entry_response}\n\n")
print("Upserting entries to remote db...")
# Upsert multiple entries
entries = [
{
"document_id": str(uuid.uuid5(uuid.NAMESPACE_DNS, "doc 2")),
"blobs": {"txt": "Second test entry"},
"metadata": {"tags": "bulk"},
},
{
"document_id": str(uuid.uuid5(uuid.NAMESPACE_DNS, "doc 3")),
"blobs": {"txt": "Third test entry"},
"metadata": {"tags": "example"},
},
]
bulk_upsert_response = client.add_entries(entries, do_upsert=True)
print(f"Upsert entries response:\n{bulk_upsert_response}\n\n")
# Perform a search
print("Searching remote db...")
search_response = client.search("test", 5)
print(f"Search response:\n{search_response}\n\n")
print("Searching remote db with filter...")
# Perform a search w/ filter
filtered_search_response = client.search("test", 5, filters={"tags": "bulk"})
print(f"Search response w/ filter:\n{filtered_search_response}\n\n")
print("Deleting sample document in remote db...")
# Delete a document
response = client.filtered_deletion(
"document_id", str(uuid.uuid5(uuid.NAMESPACE_DNS, "doc 2"))
)
print(f"Deletion response:\n{response}\n\n")
print("Searching remote db with filter after deletion...")
# Perform a search w/ filter after deletion
post_deletion_filtered_search_response = client.search(
"test", 5, filters={"tags": "bulk"}
)
print(
f"Search response w/ filter+deletion:\n{post_deletion_filtered_search_response}\n\n"
)
# Example file path for upload
file_path = "examples/client/test.pdf" # Ensure this file exists in your script's directory
print(f"Uploading and processing file: {file_path}...")
# # Upload and process a file
pdf_document_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, "pdf 1")) # document_id
metadata = {"tags": ["example", "test"]}
upload_pdf_response = client.upload_and_process_file(
pdf_document_id, file_path, metadata, None
)
print(f"Upload test pdf response:\n{upload_pdf_response}\n\n")
print("Searching remote db after upload...")
# Perform a search on this file
pdf_filtered_search_response = client.search(
"what is a cool physics equation?",
5,
filters={"document_id": pdf_document_id},
)
print(
f"Search response w/ uploaded pdf filter:\n{pdf_filtered_search_response}\n"
)
print("Fetching logs after all steps...")
logs_response = client.get_logs()
print(f"Logs response:\n{logs_response}\n")
print("Fetching logs summary after all steps...")
logs_summary_response = client.get_logs_summary()
print(f"Logs summary response:\n{logs_summary_response}\n")