From 612b2209674c4285385973132170b37f275df710 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 11:06:59 -0800 Subject: [PATCH 1/8] HTTPX client --- log10/load.py | 403 ++++++++++++++++++++------------------------------ 1 file changed, 159 insertions(+), 244 deletions(-) diff --git a/log10/load.py b/log10/load.py index 3ff8c3fd..cb14a95c 100644 --- a/log10/load.py +++ b/log10/load.py @@ -1,15 +1,14 @@ -import asyncio import functools import json import logging import os -import queue -import threading import time import traceback from contextlib import contextmanager from importlib.metadata import version +import httpx + import backoff import requests from dotenv import load_dotenv @@ -24,21 +23,17 @@ ) logger: logging.Logger = logging.getLogger("LOG10") -url = os.environ.get("LOG10_URL") +url = os.environ.get("LOG10_URL", "https://log10.io") token = os.environ.get("LOG10_TOKEN") org_id = os.environ.get("LOG10_ORG_ID") + # log10, bigquery target_service = os.environ.get("LOG10_DATA_STORE", "log10") - if target_service == "bigquery": - from log10.bigquery import initialize_bigquery - - bigquery_client, bigquery_table = initialize_bigquery() - import uuid - from datetime import datetime, timezone -elif target_service is None: - target_service = "log10" # default to log10 + raise NotImplementedError( + "For big query support, please get in touch with us at support@log10.io" + ) def is_openai_v1() -> bool: @@ -48,6 +43,9 @@ def is_openai_v1() -> bool: def func_with_backoff(func, *args, **kwargs): + """ + openai retries for V0. V1 has built-in retries, so we don't need to do anything in that case. + """ if func.__module__ != "openai" or is_openai_v1(): return func(*args, **kwargs) @@ -68,67 +66,57 @@ def _func_with_backoff(func, *args, **kwargs): return _func_with_backoff(func, *args, **kwargs) -# todo: should we do backoff as well? -def post_request(url: str, json_payload: dict = {}) -> requests.Response: - headers = {"x-log10-token": token, "Content-Type": "application/json"} - json_payload["organization_id"] = org_id - try: - # todo: set timeout - res = requests.post(url, headers=headers, json=json_payload) - # raise_for_status() will raise an exception if the status is 4xx, 5xxx - res.raise_for_status() - logger.debug(f"HTTP request: POST {url} {res.status_code}\n{json.dumps(json_payload, indent=4)}") - - return res - except requests.Timeout: - logger.error("HTTP request: POST Timeout") - raise - except requests.ConnectionError: - logger.error("HTTP request: POST Connection Error") - raise - except requests.HTTPError as e: - logger.error(f"HTTP request: POST HTTP Error - {e}") - raise - except requests.RequestException as e: - logger.error(f"HTTP request: POST Request Exception - {e}") - raise - - -post_session_request = functools.partial(post_request, url + "/api/sessions", {}) +# TODO: Retries on 5xx errors has to be handled in user code, so recommendation is to use tenacity. +transport = httpx.HTTPTransport(retries=5) +httpx_client = httpx.Client(transport=transport) -def get_session_id(): - if target_service == "bigquery": - return str(uuid.uuid4()) - +def post_request(url: str, json: dict = {}) -> requests.Response: + """ + Authenticated POST request to log10. + """ + json["organization_id"] = org_id + r = None try: - res = post_session_request() - - return res.json()["sessionID"] - except requests.HTTPError as http_err: + # todo: set timeout + r = httpx_client.post( + url, + headers={"x-log10-token": token, "Content-Type": "application/json"}, + json=json, + ) + r.raise_for_status() + except httpx.HTTPError as http_err: if "401" in str(http_err): - raise Exception( + logging.error( "Failed anthorization. Please verify that LOG10_TOKEN and LOG10_ORG_ID are set correctly and try again." + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" ) else: - raise Exception(f"Failed to create LOG10 session. Error: {http_err}") - except requests.ConnectionError: - raise Exception( - "Invalid LOG10_URL. Please verify that LOG10_URL is set correctly and try again." - + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" - ) + logging.error(f"Failed to create LOG10 session. Error: {http_err}") except Exception as e: - raise Exception( - "Failed to create LOG10 session: " - + str(e) - + "\nLikely cause: LOG10 env vars missing or not picked up correctly!" - + "\nSee https://github.com/log10-io/log10#%EF%B8%8F-setup for details" + logger.error(f"LOG10: failed to insert in log10: {json} with error {e}") + + return r + + +def get_session_id(): + """ + Get session ID from log10. + """ + res = post_request(url + "/api/sessions", {}) + sessionID = None + try: + sessionID = res.json().get("sessionID") + except Exception as e: + logger.warning( + f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording." ) + return sessionID + # Global variable to store the current sessionID. -sessionID = get_session_id() +sessionID = None last_completion_response = None global_tags = [] @@ -167,19 +155,6 @@ def __exit__(self, exc_type, exc_value, traceback): return -@contextmanager -def timed_block(block_name): - if DEBUG: - start_time = time.perf_counter() - try: - yield - finally: - elapsed_time = time.perf_counter() - start_time - logger.debug(f"TIMED BLOCK - {block_name} took {elapsed_time:.6f} seconds to execute.") - else: - yield - - def log_url(res, completionID): output = res.json() organizationSlug = output["organizationSlug"] @@ -187,195 +162,135 @@ def log_url(res, completionID): logger.debug(f"Completion URL: {full_url}") -async def log_async(completion_url, func, **kwargs): - global last_completion_response - - res = post_request(completion_url) - # todo: handle session id for bigquery scenario - last_completion_response = res.json() - completionID = res.json()["completionID"] - - if DEBUG: - log_url(res, completionID) - - # in case the usage of load(openai) and langchain.ChatOpenAI - if "api_key" in kwargs: - kwargs.pop("api_key") - - log_row = { - # do we want to also store args? - "status": "started", - "orig_module": func.__module__, - "orig_qualname": func.__qualname__, - "request": json.dumps(kwargs), - "session_id": sessionID, - "tags": global_tags, - } - if target_service == "log10": - res = post_request(completion_url + "/" + completionID, log_row) - elif target_service == "bigquery": - pass - # NOTE: We only save on request finalization. - - return completionID - - -def run_async_in_thread(completion_url, func, result_queue, **kwargs): - result = asyncio.run(log_async(completion_url=completion_url, func=func, **kwargs)) - result_queue.put(result) - - -def log_sync(completion_url, func, **kwargs): - global last_completion_response - res = post_request(completion_url) - - last_completion_response = res.json() - completionID = res.json()["completionID"] - if DEBUG: - log_url(res, completionID) - # in case the usage of load(openai) and langchain.ChatOpenAI - if "api_key" in kwargs: - kwargs.pop("api_key") - log_row = { - # do we want to also store args? - "status": "started", - "orig_module": func.__module__, - "orig_qualname": func.__qualname__, - "request": json.dumps(kwargs), - "session_id": sessionID, - "tags": global_tags, - } - res = post_request(completion_url + "/" + completionID, log_row) - return completionID - - def intercepting_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): - completion_url = url + "/api/completions" - output = None - result_queue = queue.Queue() + global last_completion_response + global sessionID + # + # If session ID is not set, create a new session. + # If session ID isn't returned, continue with degraded functionality (lost session scope). + # + if sessionID is None: + sessionID = get_session_id() + + # + # Get completion ID. + # If we cannot get a completion id, continue with degraded functionality (no logging). + # + completion_url = url + "/api/completions" + r = post_request(completion_url, json={}) + completionID = None + organizationSlug = None try: - with timed_block(sync_log_text + " call duration"): - if USE_ASYNC: - threading.Thread( - target=run_async_in_thread, - kwargs={ - "completion_url": completion_url, - "func": func, - "result_queue": result_queue, - **kwargs, - }, - ).start() - else: - completionID = log_sync(completion_url=completion_url, func=func, **kwargs) - - current_stack_frame = traceback.extract_stack() - stacktrace = [ - { - "file": frame.filename, - "line": frame.line, - "lineno": frame.lineno, - "name": frame.name, - } - for frame in current_stack_frame - ] + completionID = r.json().get("completionID") + organizationSlug = r.json().get("organizationSlug") + except Exception as e: + logger.warning( + f"LOG10: failed to get completion ID. Error: {e}. Skipping logging." + ) + return func_with_backoff(func, *args, **kwargs) + + url = completion_url + "/" + completionID + + full_url = url + "/app/" + organizationSlug + "/completions/" + completionID + if DEBUG: + logger.debug(f"Completion URL: {full_url}") + + # + # Create base log row (resending request in case of failure) + # + current_stack_frame = traceback.extract_stack() + stacktrace = [ + { + "file": frame.filename, + "line": frame.line, + "lineno": frame.lineno, + "name": frame.name, + } + for frame in current_stack_frame + ] + log_row = { + "status": "started", + "orig_module": func.__module__, + "orig_qualname": func.__qualname__, + "stacktrace": json.dumps(stacktrace), + "request": json.dumps(kwargs), + "session_id": sessionID, + "tags": global_tags, + } + output = None + start_time = None + try: + # + # Store request + # + post_request(url, json=log_row) + + # + # Call LLM + # start_time = time.perf_counter() output = func_with_backoff(func, *args, **kwargs) - duration = time.perf_counter() - start_time - logger.debug(f"TIMED BLOCK - LLM call duration: {duration}") + except Exception as e: - if USE_ASYNC: - with timed_block("extra time spent waiting for log10 call"): - while result_queue.empty(): - pass - completionID = result_queue.get() + duration = time.perf_counter() - start_time logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): + if type( + e + ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( + e + ): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ + failure_reason = str(e) - log_row = { - "status": "failed", - "failure_kind": failure_kind, - "failure_reason": failure_reason, - "stacktrace": json.dumps(stacktrace), - "kind": "completion", - "orig_module": func.__module__, - "orig_qualname": func.__qualname__, - "session_id": sessionID, - "tags": global_tags, - } - res = post_request(completion_url + "/" + completionID, log_row) + + log_row["status"] = "failed" + log_row["duration"] = int(duration * 1000) + log_row["failure_kind"] = failure_kind + log_row["failure_reason"] = failure_reason + + post_request(url, log_row) + + # We forward non-logger errors raise e else: - # finished with no exceptions - if USE_ASYNC: - with timed_block("extra time spent waiting for log10 call"): - while result_queue.empty(): - pass - completionID = result_queue.get() - - with timed_block("result call duration (sync)"): + # + # Store both request and response, in case of failure of first call. + # + duration = time.perf_counter() - start_time + response = output + + # Adjust the Anthropic output to match OAI completion output + if "anthropic" in type(output).__module__: + from log10.anthropic import Anthropic + + response = Anthropic.prepare_response(kwargs["prompt"], output, "text") + kind = "completion" + else: response = output - # Adjust the Anthropic output to match OAI completion output - if "anthropic" in type(output).__module__: - from log10.anthropic import Anthropic - - response = Anthropic.prepare_response(kwargs["prompt"], output, "text") - kind = "completion" - else: - response = output - kind = "chat" if output.object == "chat.completion" else "completion" - - # in case the usage of load(openai) and langchain.ChatOpenAI - if "api_key" in kwargs: - kwargs.pop("api_key") - - if hasattr(response, "model_dump_json"): - response = response.model_dump_json() - else: - response = json.dumps(response) - log_row = { - "response": response, - "status": "finished", - "duration": int(duration * 1000), - "stacktrace": json.dumps(stacktrace), - "kind": kind, - "orig_module": func.__module__, - "orig_qualname": func.__qualname__, - "request": json.dumps(kwargs), - "session_id": sessionID, - "tags": global_tags, - } - - if target_service == "log10": - res = post_request(completion_url + "/" + completionID, log_row) - if res.status_code != 200: - logger.error(f"LOG10: failed to insert in log10: {log_row} with error {res.text}") - elif target_service == "bigquery": - try: - log_row["id"] = str(uuid.uuid4()) - log_row["created_at"] = datetime.now(timezone.utc).isoformat() - log_row["request"] = json.dumps(kwargs) - - if func.__qualname__ == "Completion.create": - log_row["kind"] = "completion" - elif func.__qualname__ == "ChatCompletion.create": - log_row["kind"] = "chat" - - log_row["orig_module"] = func.__module__ - log_row["orig_qualname"] = func.__qualname__ - log_row["session_id"] = sessionID - - bigquery_client.insert_rows_json(bigquery_table, [log_row]) - - except Exception as e: - logging.error(f"LOG10: failed to insert in Bigquery: {log_row} with error {e}") + kind = "chat" if output.object == "chat.completion" else "completion" + + # in case the usage of load(openai) and langchain.ChatOpenAI + if "api_key" in kwargs: + kwargs.pop("api_key") + + if hasattr(response, "model_dump_json"): + response = response.model_dump_json() + else: + response = json.dumps(response) + + log_row["status"] = "finished" + log_row["response"] = response + log_row["duration"] = int(duration * 1000) + log_row["kind"] = kind + + post_request(url, log_row) return output From c682467caff5886b1fb5ad6a60c3fbc9f5a67266 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 12:15:52 -0800 Subject: [PATCH 2/8] Formatting --- log10/load.py | 71 +++++++++++++-------------------------------------- 1 file changed, 18 insertions(+), 53 deletions(-) diff --git a/log10/load.py b/log10/load.py index cb14a95c..4c9d7cef 100644 --- a/log10/load.py +++ b/log10/load.py @@ -4,13 +4,11 @@ import os import time import traceback -from contextlib import contextmanager +import uuid from importlib.metadata import version -import httpx - import backoff -import requests +import httpx from dotenv import load_dotenv from packaging.version import parse @@ -31,9 +29,7 @@ # log10, bigquery target_service = os.environ.get("LOG10_DATA_STORE", "log10") if target_service == "bigquery": - raise NotImplementedError( - "For big query support, please get in touch with us at support@log10.io" - ) + raise NotImplementedError("For big query support, please get in touch with us at support@log10.io") def is_openai_v1() -> bool: @@ -71,7 +67,7 @@ def _func_with_backoff(func, *args, **kwargs): httpx_client = httpx.Client(transport=transport) -def post_request(url: str, json: dict = {}) -> requests.Response: +def try_post_request(url: str, json: dict = {}) -> httpx.Response: """ Authenticated POST request to log10. """ @@ -103,14 +99,12 @@ def get_session_id(): """ Get session ID from log10. """ - res = post_request(url + "/api/sessions", {}) + res = try_post_request(url + "/api/sessions", {}) sessionID = None try: sessionID = res.json().get("sessionID") except Exception as e: - logger.warning( - f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording." - ) + logger.warning(f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording.") return sessionID @@ -141,11 +135,7 @@ def last_completion_url(self): return None return ( - url - + "/app/" - + last_completion_response["organizationSlug"] - + "/completions/" - + last_completion_response["completionID"] + f"{url}/app/{last_completion_response["organizationSlug"]}/completions/{last_completion_response["completionID"]}" ) def __exit__(self, exc_type, exc_value, traceback): @@ -155,13 +145,6 @@ def __exit__(self, exc_type, exc_value, traceback): return -def log_url(res, completionID): - output = res.json() - organizationSlug = output["organizationSlug"] - full_url = url + "/app/" + organizationSlug + "/completions/" + completionID - logger.debug(f"Completion URL: {full_url}") - - def intercepting_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -175,24 +158,10 @@ def wrapper(*args, **kwargs): if sessionID is None: sessionID = get_session_id() - # - # Get completion ID. - # If we cannot get a completion id, continue with degraded functionality (no logging). - # - completion_url = url + "/api/completions" - r = post_request(completion_url, json={}) - completionID = None + # Generate completion ID (uuid v4) + completionID = uuid.uuid4().hex + url = f"{url}/api/completions/{completionID}" organizationSlug = None - try: - completionID = r.json().get("completionID") - organizationSlug = r.json().get("organizationSlug") - except Exception as e: - logger.warning( - f"LOG10: failed to get completion ID. Error: {e}. Skipping logging." - ) - return func_with_backoff(func, *args, **kwargs) - - url = completion_url + "/" + completionID full_url = url + "/app/" + organizationSlug + "/completions/" + completionID if DEBUG: @@ -223,12 +192,12 @@ def wrapper(*args, **kwargs): output = None start_time = None - try: - # - # Store request - # - post_request(url, json=log_row) + # + # Store request + # + try_post_request(url, json=log_row) + try: # # Call LLM # @@ -239,11 +208,7 @@ def wrapper(*args, **kwargs): duration = time.perf_counter() - start_time logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type( - e - ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( - e - ): + if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ @@ -255,7 +220,7 @@ def wrapper(*args, **kwargs): log_row["failure_kind"] = failure_kind log_row["failure_reason"] = failure_reason - post_request(url, log_row) + try_post_request(url, log_row) # We forward non-logger errors raise e @@ -290,7 +255,7 @@ def wrapper(*args, **kwargs): log_row["duration"] = int(duration * 1000) log_row["kind"] = kind - post_request(url, log_row) + try_post_request(url, log_row) return output From 6a47ee54407f496b6ff2aea4cb2c4f5ce9d930f3 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 12:20:22 -0800 Subject: [PATCH 3/8] Formatting --- log10/load.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/log10/load.py b/log10/load.py index 4c9d7cef..bd721c8e 100644 --- a/log10/load.py +++ b/log10/load.py @@ -134,9 +134,7 @@ def last_completion_url(self): if last_completion_response is None: return None - return ( - f"{url}/app/{last_completion_response["organizationSlug"]}/completions/{last_completion_response["completionID"]}" - ) + return f"{url}/app/{last_completion_response["organizationSlug"]}/completions/{last_completion_response["completionID"]}" def __exit__(self, exc_type, exc_value, traceback): if self.tags is not None: @@ -190,13 +188,13 @@ def wrapper(*args, **kwargs): "tags": global_tags, } - output = None - start_time = None - # # Store request # try_post_request(url, json=log_row) + + output = None + start_time = None try: # # Call LLM From a9526ae2542cac7948f20be871c431fdbdb3bed2 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 13:23:28 -0800 Subject: [PATCH 4/8] Update to use /api/completions endpoint again --- log10/load.py | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/log10/load.py b/log10/load.py index bd721c8e..fa84d89a 100644 --- a/log10/load.py +++ b/log10/load.py @@ -29,7 +29,9 @@ # log10, bigquery target_service = os.environ.get("LOG10_DATA_STORE", "log10") if target_service == "bigquery": - raise NotImplementedError("For big query support, please get in touch with us at support@log10.io") + raise NotImplementedError( + "For big query support, please get in touch with us at support@log10.io" + ) def is_openai_v1() -> bool: @@ -104,7 +106,9 @@ def get_session_id(): try: sessionID = res.json().get("sessionID") except Exception as e: - logger.warning(f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording.") + logger.warning( + f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording." + ) return sessionID @@ -134,7 +138,7 @@ def last_completion_url(self): if last_completion_response is None: return None - return f"{url}/app/{last_completion_response["organizationSlug"]}/completions/{last_completion_response["completionID"]}" + return f"{url}/app/{last_completion_response.get('organizationSlug', 'unknown')}/completions/{last_completion_response.get('completionID')}" def __exit__(self, exc_type, exc_value, traceback): if self.tags is not None: @@ -156,12 +160,25 @@ def wrapper(*args, **kwargs): if sessionID is None: sessionID = get_session_id() - # Generate completion ID (uuid v4) - completionID = uuid.uuid4().hex - url = f"{url}/api/completions/{completionID}" - organizationSlug = None + # + # Get completion ID. In case of failure, continue with degraded functionality (lost completion scope). + # + organizationSlug = "unknown" + completionID = None + r = try_post_request(f"{url}/api/completions", json={}) + try: + completionID = r.json().get("completionID") + organizationSlug = r.json().get("organizationSlug") + last_completion_response = r.json() + except Exception as e: + logger.warning( + f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording." + ) + return func_with_backoff(func, *args, **kwargs) + + completion_url = f"{url}/api/completions/{completionID}" - full_url = url + "/app/" + organizationSlug + "/completions/" + completionID + full_url = f"{url}/app/{organizationSlug}/completions/{completionID}" if DEBUG: logger.debug(f"Completion URL: {full_url}") @@ -191,7 +208,7 @@ def wrapper(*args, **kwargs): # # Store request # - try_post_request(url, json=log_row) + try_post_request(completion_url, json=log_row) output = None start_time = None @@ -206,7 +223,11 @@ def wrapper(*args, **kwargs): duration = time.perf_counter() - start_time logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): + if type( + e + ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( + e + ): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ @@ -218,7 +239,7 @@ def wrapper(*args, **kwargs): log_row["failure_kind"] = failure_kind log_row["failure_reason"] = failure_reason - try_post_request(url, log_row) + try_post_request(completion_url, log_row) # We forward non-logger errors raise e @@ -253,7 +274,7 @@ def wrapper(*args, **kwargs): log_row["duration"] = int(duration * 1000) log_row["kind"] = kind - try_post_request(url, log_row) + try_post_request(completion_url, log_row) return output From 920fc59092445b5d8e4e74804001c599f0fa0925 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 13:23:41 -0800 Subject: [PATCH 5/8] Syntax --- log10/load.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/log10/load.py b/log10/load.py index fa84d89a..41db4cd5 100644 --- a/log10/load.py +++ b/log10/load.py @@ -29,9 +29,7 @@ # log10, bigquery target_service = os.environ.get("LOG10_DATA_STORE", "log10") if target_service == "bigquery": - raise NotImplementedError( - "For big query support, please get in touch with us at support@log10.io" - ) + raise NotImplementedError("For big query support, please get in touch with us at support@log10.io") def is_openai_v1() -> bool: @@ -106,9 +104,7 @@ def get_session_id(): try: sessionID = res.json().get("sessionID") except Exception as e: - logger.warning( - f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording." - ) + logger.warning(f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording.") return sessionID @@ -171,9 +167,7 @@ def wrapper(*args, **kwargs): organizationSlug = r.json().get("organizationSlug") last_completion_response = r.json() except Exception as e: - logger.warning( - f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording." - ) + logger.warning(f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording.") return func_with_backoff(func, *args, **kwargs) completion_url = f"{url}/api/completions/{completionID}" @@ -223,11 +217,7 @@ def wrapper(*args, **kwargs): duration = time.perf_counter() - start_time logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type( - e - ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( - e - ): + if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ From babdda872dce1766f5b856d1288d7157819e24c6 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 13:23:58 -0800 Subject: [PATCH 6/8] More ruff fixing --- log10/load.py | 1 - 1 file changed, 1 deletion(-) diff --git a/log10/load.py b/log10/load.py index 41db4cd5..0c762d09 100644 --- a/log10/load.py +++ b/log10/load.py @@ -4,7 +4,6 @@ import os import time import traceback -import uuid from importlib.metadata import version import backoff From 24bfb1250e39629680f5ba536c631226e8f3cb2c Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 13:27:46 -0800 Subject: [PATCH 7/8] Warn users about async being disabled in this version of the library --- log10/load.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/log10/load.py b/log10/load.py index 0c762d09..0395cab5 100644 --- a/log10/load.py +++ b/log10/load.py @@ -28,7 +28,9 @@ # log10, bigquery target_service = os.environ.get("LOG10_DATA_STORE", "log10") if target_service == "bigquery": - raise NotImplementedError("For big query support, please get in touch with us at support@log10.io") + raise NotImplementedError( + "For big query support, please get in touch with us at support@log10.io" + ) def is_openai_v1() -> bool: @@ -103,7 +105,9 @@ def get_session_id(): try: sessionID = res.json().get("sessionID") except Exception as e: - logger.warning(f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording.") + logger.warning( + f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording." + ) return sessionID @@ -166,7 +170,9 @@ def wrapper(*args, **kwargs): organizationSlug = r.json().get("organizationSlug") last_completion_response = r.json() except Exception as e: - logger.warning(f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording.") + logger.warning( + f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording." + ) return func_with_backoff(func, *args, **kwargs) completion_url = f"{url}/api/completions/{completionID}" @@ -216,7 +222,11 @@ def wrapper(*args, **kwargs): duration = time.perf_counter() - start_time logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): + if type( + e + ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( + e + ): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ @@ -281,7 +291,7 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): Keyword arguments: module -- the module to be intercepted (e.g. openai) DEBUG_ -- whether to show log10 related debug statements via python logging (default False) - USE_ASYNC_ -- whether to run in async mode (default True) + USE_ASYNC_ -- Disabled in 0.5.3. Calls are asynchronous. Openai V0 example: Example: @@ -353,8 +363,11 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): global DEBUG, USE_ASYNC, sync_log_text DEBUG = DEBUG_ or os.environ.get("LOG10_DEBUG", False) logger.setLevel(logging.DEBUG if DEBUG else logging.WARNING) - USE_ASYNC = USE_ASYNC_ - sync_log_text = set_sync_log_text(USE_ASYNC=USE_ASYNC) + + if USE_ASYNC_: + logger.debug( + "LOG10: USE_ASYNC is deprecated. Calls are synchronous by default." + ) # def intercept_nested_functions(obj): # for name, attr in vars(obj).items(): From cc893307dffa48ed251ba302ff71a7819ba27a34 Mon Sep 17 00:00:00 2001 From: Niklas Nielsen Date: Tue, 30 Jan 2024 13:29:03 -0800 Subject: [PATCH 8/8] Formatting --- log10/load.py | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/log10/load.py b/log10/load.py index 0395cab5..230dedad 100644 --- a/log10/load.py +++ b/log10/load.py @@ -28,9 +28,7 @@ # log10, bigquery target_service = os.environ.get("LOG10_DATA_STORE", "log10") if target_service == "bigquery": - raise NotImplementedError( - "For big query support, please get in touch with us at support@log10.io" - ) + raise NotImplementedError("For big query support, please get in touch with us at support@log10.io") def is_openai_v1() -> bool: @@ -105,9 +103,7 @@ def get_session_id(): try: sessionID = res.json().get("sessionID") except Exception as e: - logger.warning( - f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording." - ) + logger.warning(f"LOG10: failed to get session ID. Error: {e}. Skipping session scope recording.") return sessionID @@ -170,9 +166,7 @@ def wrapper(*args, **kwargs): organizationSlug = r.json().get("organizationSlug") last_completion_response = r.json() except Exception as e: - logger.warning( - f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording." - ) + logger.warning(f"LOG10: failed to get completion ID. Error: {e}. Skipping completion recording.") return func_with_backoff(func, *args, **kwargs) completion_url = f"{url}/api/completions/{completionID}" @@ -222,11 +216,7 @@ def wrapper(*args, **kwargs): duration = time.perf_counter() - start_time logger.debug(f"LOG10: failed - {e}") # todo: change with openai v1 update - if type( - e - ).__name__ == "InvalidRequestError" and "This model's maximum context length" in str( - e - ): + if type(e).__name__ == "InvalidRequestError" and "This model's maximum context length" in str(e): failure_kind = "ContextWindowExceedError" else: failure_kind = type(e).__name__ @@ -365,9 +355,7 @@ def log10(module, DEBUG_=False, USE_ASYNC_=True): logger.setLevel(logging.DEBUG if DEBUG else logging.WARNING) if USE_ASYNC_: - logger.debug( - "LOG10: USE_ASYNC is deprecated. Calls are synchronous by default." - ) + logger.debug("LOG10: USE_ASYNC is deprecated. Calls are synchronous by default.") # def intercept_nested_functions(obj): # for name, attr in vars(obj).items():