From 0ed426046f47df082ac83d52a5d00cf75109e90c Mon Sep 17 00:00:00 2001 From: Braelyn Boynton Date: Tue, 17 Sep 2024 17:48:59 -0700 Subject: [PATCH] persistent dead letter queue --- .gitignore | 3 ++- agentops/helpers.py | 18 ++++++++++++- agentops/http_client.py | 25 +++++++++++++++--- tests/test_http_client.py | 55 ++++++++++++++++++++++----------------- 4 files changed, 72 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 4db649aab..558e776f8 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,5 @@ cython_debug/ .DS_Store agentops_time_travel.json -.agentops_time_travel.yaml \ No newline at end of file +.agentops_time_travel.yaml +.agentops/ \ No newline at end of file diff --git a/agentops/helpers.py b/agentops/helpers.py index 2ec132640..e4581655b 100644 --- a/agentops/helpers.py +++ b/agentops/helpers.py @@ -6,10 +6,10 @@ import http.client import json from importlib.metadata import version, PackageNotFoundError - from .log_config import logger from uuid import UUID from importlib.metadata import version +import os def get_ISO_time(): @@ -179,3 +179,19 @@ def wrapper(self, *args, **kwargs): return func(self, *args, **kwargs) return wrapper + + +def ensure_dead_letter_queue(): + # Define file path + file_path = os.path.join(".agentops", "dead_letter_queue.json") + + # Check if directory exists + if not os.path.exists(".agentops"): + os.makedirs(".agentops") + + # Check if file exists + if not os.path.isfile(file_path): + with open(file_path, "w") as f: + json.dump({"messages": []}, f) + + return file_path diff --git a/agentops/http_client.py b/agentops/http_client.py index 4ed6fff09..e047b4d0e 100644 --- a/agentops/http_client.py +++ b/agentops/http_client.py @@ -8,7 +8,8 @@ from dotenv import load_dotenv import os -from .singleton import singleton +from .helpers import ensure_dead_letter_queue +import json load_dotenv() @@ -28,25 +29,43 @@ class HttpStatus(Enum): UNKNOWN = -1 -# @singleton class DeadLetterQueue: def __init__(self): self.queue: List[dict] = [] self.is_testing = os.environ.get("ENVIRONMENT") == "test" + # if not self.is_testing: + self.file_path = ensure_dead_letter_queue() + + def read_queue(self): + if not self.is_testing: + with open(self.file_path, "r") as f: + return json.load(f)["messages"] + else: + return [] + + def write_queue(self): + if not self.is_testing: + with open(self.file_path, "w") as f: + json.dump({"messages": self.queue}, f) + def add(self, request_data: dict): if not self.is_testing: self.queue.append(request_data) + self.write_queue() def get_all(self) -> List[dict]: return self.queue def remove(self, request_data: dict): if not self.is_testing: - self.queue.remove(request_data) + if request_data in self.queue: + self.queue.remove(request_data) + self.write_queue() def clear(self): self.queue.clear() + self.write_queue() dead_letter_queue = DeadLetterQueue() diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 0a3540706..1072c3fb1 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -1,6 +1,6 @@ import os import unittest -from unittest.mock import patch, Mock +from unittest.mock import patch, Mock, mock_open import requests from agentops.http_client import ( @@ -11,6 +11,10 @@ ) +@patch("builtins.open", new_callable=mock_open, read_data='{"messages": []}') +# @patch("os.path.exists", return_value=False) +# @patch("os.path.isfile", return_value=False) +# @patch("os.makedirs") class TestHttpClient(unittest.TestCase): MAX_RETRIES = 3 RETRY_DELAY = 1 @@ -19,13 +23,14 @@ def setUp(self): # Clear DLQ before each test dead_letter_queue.is_testing = False dead_letter_queue.clear() + self.addCleanup(patch.stopall) def tearDown(self): dead_letter_queue.is_testing = True dead_letter_queue.clear() @patch("requests.Session") - def test_post_success(self, mock_session): + def test_post_success(self, mock_session, mock_open_file): # Mock a successful response mock_response = Mock() mock_response.status_code = 200 @@ -35,7 +40,7 @@ def test_post_success(self, mock_session): mock_session_instance.post.return_value = mock_response url = "https://api.agentops.ai/health" - payload = b'{"key": "value"}' + payload = {"key": "value"} response = HttpClient.post(url, payload) @@ -44,13 +49,13 @@ def test_post_success(self, mock_session): self.assertEqual(response.body, {"message": "Success"}) @patch("requests.Session") - def test_post_timeout(self, mock_session): + def test_post_timeout(self, mock_session, mock_open_file): # Mock a timeout exception mock_session_instance = mock_session.return_value mock_session_instance.post.side_effect = requests.exceptions.Timeout url = "https://api.agentops.ai/health" - payload = b'{"key": "value"}' + payload = {"key": "value"} with self.assertRaises(ApiServerException) as context: HttpClient.post(url, payload) @@ -59,7 +64,7 @@ def test_post_timeout(self, mock_session): self.assertEqual(len(dead_letter_queue.get_all()), 1) @patch("requests.Session") - def test_post_http_error(self, mock_session): + def test_post_http_error(self, mock_session, mock_open_file): # Mock an HTTPError mock_response = Mock() mock_response.status_code = 500 @@ -70,7 +75,7 @@ def test_post_http_error(self, mock_session): ) url = "https://api.agentops.ai/health" - payload = b'{"key": "value"}' + payload = {"key": "value"} with self.assertRaises(ApiServerException) as context: HttpClient.post(url, payload) @@ -82,7 +87,7 @@ def test_post_http_error(self, mock_session): self.assertEqual(failed_request["error_type"], "HTTPError") @patch("requests.Session") - def test_post_invalid_api_key(self, mock_session): + def test_post_invalid_api_key(self, mock_session, mock_open_file): # Mock a response with invalid API key mock_response = Mock() mock_response.status_code = 401 @@ -92,7 +97,7 @@ def test_post_invalid_api_key(self, mock_session): mock_session_instance.post.return_value = mock_response url = "https://api.agentops.ai/health" - payload = b'{"key": "value"}' + payload = {"key": "value"} with self.assertRaises(ApiServerException) as context: HttpClient.post(url, payload, api_key="INVALID_KEY") @@ -101,7 +106,7 @@ def test_post_invalid_api_key(self, mock_session): self.assertEqual(len(dead_letter_queue.get_all()), 0) @patch("requests.Session") - def test_get_success(self, mock_session): + def test_get_success(self, mock_session, mock_open_file): # Mock a successful response mock_response = Mock() mock_response.status_code = 200 @@ -119,7 +124,7 @@ def test_get_success(self, mock_session): self.assertEqual(response.body, {"message": "Success"}) @patch("requests.Session") - def test_get_timeout(self, mock_session): + def test_get_timeout(self, mock_session, mock_open_file): # Mock a timeout exception mock_session_instance = mock_session.return_value mock_session_instance.get.side_effect = requests.exceptions.Timeout @@ -133,7 +138,7 @@ def test_get_timeout(self, mock_session): self.assertEqual(len(dead_letter_queue.get_all()), 1) @patch("requests.Session") - def test_get_http_error(self, mock_session): + def test_get_http_error(self, mock_session, mock_open_file): # Mock an HTTPError mock_response = Mock() mock_response.status_code = 500 @@ -154,7 +159,7 @@ def test_get_http_error(self, mock_session): self.assertEqual(failed_request["url"], url) self.assertEqual(failed_request["error_type"], "HTTPError") - def test_clear_dead_letter_queue(self): + def test_clear_dead_letter_queue(self, mock_open_file): # Add a dummy request to DLQ and clear it dead_letter_queue.add( {"url": "https://api.agentops.ai/health", "error_type": "DummyError"} @@ -165,7 +170,7 @@ def test_clear_dead_letter_queue(self): self.assertEqual(len(dead_letter_queue.get_all()), 0) @patch("requests.Session") - def test_post_success_triggers_dlq_retry(self, mock_session): + def test_post_success_triggers_dlq_retry(self, mock_session, mock_open_file): # Mock successful POST response for the initial request mock_response_success = Mock() mock_response_success.status_code = 200 @@ -181,7 +186,7 @@ def test_post_success_triggers_dlq_retry(self, mock_session): # Manually add failed requests to the DLQ failed_request_1 = { "url": "https://api.agentops.ai/health", - "payload": b'{"key": "value1"}', + "payload": {"key": "value1"}, "api_key": "API_KEY_1", "parent_key": None, "jwt": None, @@ -189,7 +194,7 @@ def test_post_success_triggers_dlq_retry(self, mock_session): } failed_request_2 = { "url": "https://api.agentops.ai/health", - "payload": b'{"key": "value2"}', + "payload": {"key": "value2"}, "api_key": "API_KEY_2", "parent_key": None, "jwt": None, @@ -200,14 +205,14 @@ def test_post_success_triggers_dlq_retry(self, mock_session): # Perform an initial successful POST request url = "https://api.agentops.ai/health" - payload = b'{"key": "value"}' + payload = {"key": "value"} HttpClient.post(url, payload) # Check that both failed requests in the DLQ were retried and removed self.assertEqual(0, len(dead_letter_queue.get_all())) @patch("requests.Session") - def test_dlq_retry_fails_and_stays_in_queue(self, mock_session): + def test_dlq_retry_fails_and_stays_in_queue(self, mock_session, mock_open_file): # Mock successful POST response for the initial request mock_response_success = Mock() mock_response_success.status_code = 200 @@ -229,7 +234,7 @@ def test_dlq_retry_fails_and_stays_in_queue(self, mock_session): # Manually add a failed request to the DLQ failed_request = { "url": "https://api.agentops.ai/health", - "payload": b'{"key": "value1"}', + "payload": {"key": "value1"}, "api_key": "API_KEY_1", "parent_key": None, "jwt": None, @@ -246,7 +251,9 @@ def test_dlq_retry_fails_and_stays_in_queue(self, mock_session): self.assertEqual(len(dead_letter_queue.get_all()), 1) @patch("requests.Session") - def test_dlq_retry_successfully_retries_post_and_get(self, mock_session): + def test_dlq_retry_successfully_retries_post_and_get( + self, mock_session, mock_open_file + ): # Mock successful POST and GET responses for DLQ retries mock_response_success = Mock() mock_response_success.status_code = 200 @@ -264,7 +271,7 @@ def test_dlq_retry_successfully_retries_post_and_get(self, mock_session): # Manually add failed POST and GET requests to the DLQ failed_post_request = { "url": "https://api.agentops.ai/health", - "payload": b'{"key": "value1"}', + "payload": {"key": "value1"}, "api_key": "API_KEY_1", "parent_key": None, "jwt": None, @@ -283,17 +290,17 @@ def test_dlq_retry_successfully_retries_post_and_get(self, mock_session): # Perform an initial successful POST request url = "https://api.agentops.ai/health" - payload = b'{"key": "value"}' + payload = {"key": "value"} HttpClient.post(url, payload) # Check that both failed requests (POST and GET) in the DLQ were retried and removed self.assertEqual(len(dead_letter_queue.get_all()), 0) - def test_clear_dlq_after_success(self): + def test_clear_dlq_after_success(self, mock_open_file): # Add requests to DLQ and ensure they are removed after retry success failed_request = { "url": "https://api.agentops.ai/health", - "payload": b'{"key": "value1"}', + "payload": {"key": "value1"}, "api_key": "API_KEY_1", "parent_key": None, "jwt": None,