Skip to content

Commit

Permalink
Merge pull request #94 from log10-io/nqn/patch-post-request-sites
Browse files Browse the repository at this point in the history
Patch post request sites
  • Loading branch information
nqn committed Jan 30, 2024
2 parents eb6e1b4 + da53cb2 commit 0a76811
Showing 1 changed file with 89 additions and 54 deletions.
143 changes: 89 additions & 54 deletions log10/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def post_request(url: str, json_payload: dict = {}) -> requests.Response:
res = requests.post(url, headers=headers, json=json_payload)
# raise_for_status() will raise an exception if the status is 4xx, 5xxx
res.raise_for_status()
logger.debug(f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}")

logger.debug(f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}")
return res
except requests.Timeout:
logger.error("HTTP request: POST Timeout")
Expand All @@ -101,31 +101,33 @@ def get_session_id():
if target_service == "bigquery":
return str(uuid.uuid4())

session_id = None
try:
res = post_session_request()

return res.json()["sessionID"]
session_id = res.json()["sessionID"]
except requests.HTTPError as http_err:
if "401" in str(http_err):
raise Exception(
logging.warn(
"Failed anthorization. Please verify that LOG10_TOKEN and LOG10_ORG_ID are set correctly and try again."
+ "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details"
)
else:
raise Exception(f"Failed to create LOG10 session. Error: {http_err}")
logging.warn(f"Failed to create LOG10 session. Error: {http_err}")
except requests.ConnectionError:
raise Exception(
logging.warn(
"Invalid LOG10_URL. Please verify that LOG10_URL is set correctly and try again."
+ "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details"
)
except Exception as e:
raise Exception(
logging.warn(
"Failed to create LOG10 session: "
+ str(e)
+ "\nLikely cause: LOG10 env vars missing or not picked up correctly!"
+ "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details"
)

return session_id


# Global variable to store the current sessionID.
sessionID = get_session_id()
Expand Down Expand Up @@ -190,32 +192,42 @@ def log_url(res, completionID):
async def log_async(completion_url, func, **kwargs):
global last_completion_response

res = post_request(completion_url)
# todo: handle session id for bigquery scenario
last_completion_response = res.json()
completionID = res.json()["completionID"]
res = None
try:
res = post_request(completion_url)
last_completion_response = res.json()
completionID = res.json()["completionID"]

if DEBUG:
log_url(res, completionID)

# in case the usage of load(openai) and langchain.ChatOpenAI
if "api_key" in kwargs:
kwargs.pop("api_key")

log_row = {
# do we want to also store args?
"status": "started",
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
}
if target_service == "log10":
try:
res = post_request(completion_url + "/" + completionID, log_row)
except Exception as e:
logging.warn(f"LOG10: failed to log: {e}. Skipping")
return None

elif target_service == "bigquery":
pass
# NOTE: We only save on request finalization.

if DEBUG:
log_url(res, completionID)

# in case the usage of load(openai) and langchain.ChatOpenAI
if "api_key" in kwargs:
kwargs.pop("api_key")

log_row = {
# do we want to also store args?
"status": "started",
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
}
if target_service == "log10":
res = post_request(completion_url + "/" + completionID, log_row)
elif target_service == "bigquery":
pass
# NOTE: We only save on request finalization.
except Exception as e:
logging.warn(f"LOG10: failed to log: {e}. SKipping")
return None

return completionID

Expand All @@ -227,25 +239,31 @@ def run_async_in_thread(completion_url, func, result_queue, **kwargs):

def log_sync(completion_url, func, **kwargs):
global last_completion_response
res = post_request(completion_url)
completionID = None

try:
res = post_request(completion_url)
last_completion_response = res.json()
completionID = res.json()["completionID"]
if DEBUG:
log_url(res, completionID)
# in case the usage of load(openai) and langchain.ChatOpenAI
if "api_key" in kwargs:
kwargs.pop("api_key")
log_row = {
# do we want to also store args?
"status": "started",
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
}
res = post_request(completion_url + "/" + completionID, log_row)
except Exception as e:
logging.warn(f"LOG10: failed to get completionID from log10: {e}")
return None

last_completion_response = res.json()
completionID = res.json()["completionID"]
if DEBUG:
log_url(res, completionID)
# in case the usage of load(openai) and langchain.ChatOpenAI
if "api_key" in kwargs:
kwargs.pop("api_key")
log_row = {
# do we want to also store args?
"status": "started",
"orig_module": func.__module__,
"orig_qualname": func.__qualname__,
"request": json.dumps(kwargs),
"session_id": sessionID,
"tags": global_tags,
}
res = post_request(completion_url + "/" + completionID, log_row)
return completionID


Expand All @@ -271,6 +289,11 @@ def wrapper(*args, **kwargs):
else:
completionID = log_sync(completion_url=completion_url, func=func, **kwargs)

if completionID is None:
logging.warn("LOG10: failed to get completionID from log10. Skipping log.")
func_with_backoff(func, *args, **kwargs)
return

current_stack_frame = traceback.extract_stack()
stacktrace = [
{
Expand All @@ -292,6 +315,11 @@ def wrapper(*args, **kwargs):
while result_queue.empty():
pass
completionID = result_queue.get()

if completionID is None:
logging.warn(f"LOG10: failed to get completionID from log10: {e}. Skipping log.")
return

logger.debug(f"LOG10: failed - {e}")
# todo: change with openai v1 update
if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e):
Expand All @@ -310,7 +338,10 @@ def wrapper(*args, **kwargs):
"session_id": sessionID,
"tags": global_tags,
}
res = post_request(completion_url + "/" + completionID, log_row)
try:
res = post_request(completion_url + "/" + completionID, log_row)
except Exception as le:
logging.warn(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.")
raise e
else:
# finished with no exceptions
Expand Down Expand Up @@ -354,9 +385,13 @@ def wrapper(*args, **kwargs):
}

if target_service == "log10":
res = post_request(completion_url + "/" + completionID, log_row)
if res.status_code != 200:
logger.error(f"LOG10: failed to insert in log10: {log_row} with error {res.text}")
try:
res = post_request(completion_url + "/" + completionID, log_row)
if res.status_code != 200:
logger.error(f"LOG10: failed to insert in log10: {log_row} with error {res.text}")
except Exception as e:
logging.warn(f"LOG10: failed to log: {e}. Skipping")

elif target_service == "bigquery":
try:
log_row["id"] = str(uuid.uuid4())
Expand Down

0 comments on commit 0a76811

Please sign in to comment.