Skip to content

Commit

Permalink
add concurrency check, timeout, extra index deletion logic
Browse files Browse the repository at this point in the history
  • Loading branch information
vicilliar committed Sep 18, 2024
1 parent efeb7e9 commit d4be16e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/cloud-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ on:
# allows other workflows to reuse these unit tests:
workflow_call:

concurrency:
group: cloud-integration-tests
cancel-in-progress: false

permissions:
contents: read

jobs:
integration_tests:
name: Cloud Integration Tests
timeout-minutes: 60 # Hard cap of 1 hour for workflow
runs-on: ubuntu-latest
if: ${{ github.event.inputs.job_to_run == 'run_integration_tests' || github.event_name != 'workflow_dispatch' }}
environment: cloud-tests
Expand Down
31 changes: 29 additions & 2 deletions tests/cloud_test_logic/delete_all_cloud_test_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ def delete_all_test_indices(wait_for_readiness=False):

print("Indices to delete: ", indices_to_delete)
print("Marqo Cloud deletion responses:")

# First pass will either
# 1. If the index is READY, send it into DELETING
# 2. If the index is DELETED, do nothing
# 3. If the index is FAILED, send it into DELETING
# 4. If the index is CREATING, MODIFYING, DELETING, do nothing

for index_name in indices_to_delete:
index = fetch_marqo_index(client, index_name)
if index.get_status()["indexStatus"] == IndexStatus.READY:
Expand All @@ -92,22 +99,42 @@ def delete_all_test_indices(wait_for_readiness=False):
print(f"Index {index_name} has failed status, deleting anyway")
index.delete(wait_for_readiness=False)
else:
# Either CREATING, MODIFYING, DELETING.
print(f"Index {index_name} is not ready for deletion, status: {index.get_status()['indexStatus']}")

# All indexes now are either DELETING, CREATING, MODIFYING (might need future deletion)
if wait_for_readiness:
max_retries = 100
attempt = 0

while indices_to_delete:
print(f"Attempt #{attempt} at trying to delete indices: {indices_to_delete}", flush=True)
resp = fetch_marqo_indexes(client)
resp_json = resp.json()
all_index_names = [index["indexName"] for index in resp_json['results']]
for index_for_deletion_name in indices_to_delete:
# Index has successfully been DELETED
if index_for_deletion_name not in all_index_names:
print(f"Index {index_for_deletion_name} has been successfully deleted.")
indices_to_delete.remove(index_for_deletion_name)
else:
# Check if index has finally become READY or FAILED
# Kick off deletion again if so
index = fetch_marqo_index(client, index_for_deletion_name)
if index.get_status()["indexStatus"] == IndexStatus.READY or \
index.get_status()["indexStatus"] == IndexStatus.FAILED:
print(f"Index {index_for_deletion_name} has {index.get_status()['indexStatus']} status, "
f"sending a delete request.")
index.delete(wait_for_readiness=False)

if attempt > max_retries:
raise RuntimeError("Timed out waiting for indices to be deleted, still remaining: "
f"{indices_to_delete}. Please delete manually")
print("All test indices deleted successfully")
attempt += 1
time.sleep(30)

print("All test indices deleted successfully", flush=True)


if __name__ == '__main__':
delete_all_test_indices()
delete_all_test_indices(wait_for_readiness=True)
34 changes: 29 additions & 5 deletions tests/cloud_test_logic/run_cloud_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import signal
import sys

import concurrent.futures
import threading
import time

import pytest

from create_and_set_cloud_unique_run_identifier import set_unique_run_identifier
Expand All @@ -34,6 +38,27 @@ def convert_string_to_boolean(string_value):
if string_value.lower() in valid_representations_of_true:
return True

def run_pytest(pytest_args):
"""Function to run pytest suite"""
print("running pytest integration tests with args:", pytest_args)
return pytest.main(pytest_args)

def run_pytest_with_timeout():
TIMEOUT_SECONDS = 45 * 60 # 45 minute timeout
pytest_args = ['tests/', '-m', 'not ignore_during_cloud_tests'] + sys.argv[1:]

# Use ThreadPoolExecutor to run pytest in a separate thread
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_pytest, pytest_args)

try:
# Wait for the pytest to complete or timeout
pytest_exit_code = future.result(timeout=TIMEOUT_SECONDS)
except concurrent.futures.TimeoutError:
print(f"Tests exceeded the {TIMEOUT_SECONDS // 60} minute timeout and were terminated.")
pytest_exit_code = 1 # Set an exit code indicating failure due to timeout

return pytest_exit_code

if __name__ == '__main__':
# Set up the signal handler for KeyboardInterrupt (Cmd+C)
Expand All @@ -57,14 +82,13 @@ def convert_string_to_boolean(string_value):
print("Detected an error while creating indices, deleting all indices and exiting the workflow.")
delete_all_test_indices(wait_for_readiness=True)
sys.exit(1)
print(f"All indices has been created, proceeding to run tests with pytest. Arguments: {sys.argv[1:]}")
print(f"All indices have been created, proceeding to run tests with pytest. Arguments: {sys.argv[1:]}")

pytest_exit_code = run_pytest_with_timeout()

pytest_args = ['tests/', '-m', 'not ignore_during_cloud_tests'] + sys.argv[1:]
print("running integration tests with args:", pytest_args)
pytest_exit_code = pytest.main(pytest_args)
if pytest_exit_code != 0:
raise RuntimeError(f"Pytest failed with exit code: {pytest_exit_code}")
print("All tests has been executed successfully")
print("All tests have been executed successfully")
if tests_specific_kwargs['delete-indexes']:
delete_all_test_indices(wait_for_readiness=True)
except Exception as e:
Expand Down

0 comments on commit d4be16e

Please sign in to comment.