From ac741ea060029ba2ba6f867c5326a6a380335f12 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 14:26:41 -0800 Subject: [PATCH 1/2] Patch post request sites --- log10/load.py | 143 +++++++++++++++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 55 deletions(-) diff --git a/log10/load.py b/log10/load.py index 3ff8c3fd..593cf600 100644 --- a/log10/load.py +++ b/log10/load.py @@ -76,9 +76,7 @@ def post_request(url: str, json_payload: dict = {}) -> requests.Response: # todo: set timeout res = requests.post(url, headers=headers, json=json_payload) # raise_for_status() will raise an exception if the status is 4xx, 5xxx - res.raise_for_status() logger.debug(f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}") - return res except requests.Timeout: logger.error("HTTP request: POST Timeout") @@ -101,31 +99,33 @@ def get_session_id(): if target_service == "bigquery": return str(uuid.uuid4()) + session_id = None try: res = post_session_request() - - return res.json()["sessionID"] + session_id = res.json()["sessionID"] except requests.HTTPError as http_err: if "401" in str(http_err): - raise Exception( + logging.warn( "Failed anthorization. Please verify that LOG10_TOKEN and LOG10_ORG_ID are set correctly and try again." + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" ) else: - raise Exception(f"Failed to create LOG10 session. Error: {http_err}") + logging.warn(f"Failed to create LOG10 session. Error: {http_err}") except requests.ConnectionError: - raise Exception( + logging.warn( "Invalid LOG10_URL. Please verify that LOG10_URL is set correctly and try again." + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" ) except Exception as e: - raise Exception( + logging.warn( "Failed to create LOG10 session: " + str(e) + "\nLikely cause: LOG10 env vars missing or not picked up correctly!" + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" ) + return session_id + # Global variable to store the current sessionID. sessionID = get_session_id() @@ -190,32 +190,42 @@ def log_url(res, completionID): async def log_async(completion_url, func, **kwargs): global last_completion_response - res = post_request(completion_url) - # todo: handle session id for bigquery scenario - last_completion_response = res.json() - completionID = res.json()["completionID"] + res = None + try: + res = post_request(completion_url) + last_completion_response = res.json() + completionID = res.json()["completionID"] + + if DEBUG: + log_url(res, completionID) + + # in case the usage of load(openai) and langchain.ChatOpenAI + if "api_key" in kwargs: + kwargs.pop("api_key") + + log_row = { + # do we want to also store args? + "status": "started", + "orig_module": func.__module__, + "orig_qualname": func.__qualname__, + "request": json.dumps(kwargs), + "session_id": sessionID, + "tags": global_tags, + } + if target_service == "log10": + try: + res = post_request(completion_url + "/" + completionID, log_row) + except Exception as e: + logging.warn(f"LOG10: failed to log: {e}. Skipping") + return None + + elif target_service == "bigquery": + pass + # NOTE: We only save on request finalization. - if DEBUG: - log_url(res, completionID) - - # in case the usage of load(openai) and langchain.ChatOpenAI - if "api_key" in kwargs: - kwargs.pop("api_key") - - log_row = { - # do we want to also store args? - "status": "started", - "orig_module": func.__module__, - "orig_qualname": func.__qualname__, - "request": json.dumps(kwargs), - "session_id": sessionID, - "tags": global_tags, - } - if target_service == "log10": - res = post_request(completion_url + "/" + completionID, log_row) - elif target_service == "bigquery": - pass - # NOTE: We only save on request finalization. + except Exception as e: + logging.warn(f"LOG10: failed to log: {e}. SKipping") + return None return completionID @@ -227,25 +237,31 @@ def run_async_in_thread(completion_url, func, result_queue, **kwargs): def log_sync(completion_url, func, **kwargs): global last_completion_response - res = post_request(completion_url) + completionID = None + + try: + res = post_request(completion_url) + last_completion_response = res.json() + completionID = res.json()["completionID"] + if DEBUG: + log_url(res, completionID) + # in case the usage of load(openai) and langchain.ChatOpenAI + if "api_key" in kwargs: + kwargs.pop("api_key") + log_row = { + # do we want to also store args? + "status": "started", + "orig_module": func.__module__, + "orig_qualname": func.__qualname__, + "request": json.dumps(kwargs), + "session_id": sessionID, + "tags": global_tags, + } + res = post_request(completion_url + "/" + completionID, log_row) + except Exception as e: + logging.warn(f"LOG10: failed to get completionID from log10: {e}") + return None - last_completion_response = res.json() - completionID = res.json()["completionID"] - if DEBUG: - log_url(res, completionID) - # in case the usage of load(openai) and langchain.ChatOpenAI - if "api_key" in kwargs: - kwargs.pop("api_key") - log_row = { - # do we want to also store args? - "status": "started", - "orig_module": func.__module__, - "orig_qualname": func.__qualname__, - "request": json.dumps(kwargs), - "session_id": sessionID, - "tags": global_tags, - } - res = post_request(completion_url + "/" + completionID, log_row) return completionID @@ -271,6 +287,11 @@ def wrapper(*args, **kwargs): else: completionID = log_sync(completion_url=completion_url, func=func, **kwargs) + if completionID is None: + logging.warn("LOG10: failed to get completionID from log10. Skipping log.") + func_with_backoff(func, *args, **kwargs) + return + current_stack_frame = traceback.extract_stack() stacktrace = [ { @@ -292,6 +313,11 @@ def wrapper(*args, **kwargs): while result_queue.empty(): pass completionID = result_queue.get() + + if completionID is None: + logging.warn(f"LOG10: failed to get completionID from log10: {e}. Skipping log.") + return + logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): @@ -310,7 +336,10 @@ def wrapper(*args, **kwargs): "session_id": sessionID, "tags": global_tags, } - res = post_request(completion_url + "/" + completionID, log_row) + try: + res = post_request(completion_url + "/" + completionID, log_row) + except Exception as le: + logging.warn(f"LOG10: failed to log: {le}. Skipping, but raising LLM error.") raise e else: # finished with no exceptions @@ -354,9 +383,13 @@ def wrapper(*args, **kwargs): } if target_service == "log10": - res = post_request(completion_url + "/" + completionID, log_row) - if res.status_code != 200: - logger.error(f"LOG10: failed to insert in log10: {log_row} with error {res.text}") + try: + res = post_request(completion_url + "/" + completionID, log_row) + if res.status_code != 200: + logger.error(f"LOG10: failed to insert in log10: {log_row} with error {res.text}") + except Exception as e: + logging.warn(f"LOG10: failed to log: {e}. Skipping") + elif target_service == "bigquery": try: log_row["id"] = str(uuid.uuid4()) From da53cb26fc3cbc2f38b210b0b6c4078b90670ebe Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 14:28:04 -0800 Subject: [PATCH 2/2] Reintroduce raising in method, since we suppress call site --- log10/load.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/log10/load.py b/log10/load.py index 593cf600..74eeaa81 100644 --- a/log10/load.py +++ b/log10/load.py @@ -76,6 +76,8 @@ def post_request(url: str, json_payload: dict = {}) -> requests.Response: # todo: set timeout res = requests.post(url, headers=headers, json=json_payload) # raise_for_status() will raise an exception if the status is 4xx, 5xxx + res.raise_for_status() + logger.debug(f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}") return res except requests.Timeout: