Skip to content

Commit

Permalink
Get sizes for queues and states (#352)
Browse files Browse the repository at this point in the history
Lets us query specific queue sizes in a single Redis round trip.
  • Loading branch information
thomasst authored Jul 17, 2024
1 parent ede958a commit 6fb98a1
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Version 0.20

* Add `tiger.get_sizes_for_queues_and_states` ([352](https://github.com/closeio/tasktiger/pull/352))

## Version 0.19.5

* First version using the automated-release process
Expand Down
2 changes: 1 addition & 1 deletion tasktiger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .tasktiger import TaskTiger, run_worker
from .worker import Worker

__version__ = "0.19.5"
__version__ = "0.20"
__all__ = [
"TaskTiger",
"Worker",
Expand Down
17 changes: 17 additions & 0 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,23 @@ def get_queue_sizes(self, queue: str) -> Dict[str, int]:
results = pipeline.execute()
return dict(zip(states, results))

def get_sizes_for_queues_and_states(
self, queues_and_states: List[Tuple[str, str]]
) -> List[int]:
"""
Get the sizes for the specific queues and states.
queues_and_states: List of tuples (queue_name, state).
Returns a list of queue sizes in the order of the passed
queues_and_states.
"""
pipeline = self.connection.pipeline()
for queue, state in queues_and_states:
pipeline.zcard(self._key(state, queue))
results = pipeline.execute()
return results

def get_total_queue_size(self, queue: str) -> int:
"""Get total queue size for QUEUED, SCHEDULED, and ACTIVE states."""

Expand Down
35 changes: 35 additions & 0 deletions tests/test_queue_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,38 @@ def test_task_all_states(self):
max_queue_size=3,
when=datetime.timedelta(seconds=10),
)


class TestQueueSizes:
@pytest.fixture
def queue_sample_tasks(self, tiger):
tiger.delay(simple_task)
tiger.delay(simple_task)
tiger.delay(simple_task, queue="other")
tiger.delay(simple_task, when=datetime.timedelta(seconds=60))

def test_get_total_queue_size(self, tiger, queue_sample_tasks):
assert tiger.get_total_queue_size("other") == 1
assert tiger.get_total_queue_size("default") == 3

def test_get_queue_sizes(self, tiger, queue_sample_tasks):
assert tiger.get_queue_sizes("default") == {
"active": 0,
"queued": 2,
"scheduled": 1,
}
assert tiger.get_queue_sizes("other") == {
"active": 0,
"queued": 1,
"scheduled": 0,
}

def test_get_sizes_for_queues_and_states(self, tiger, queue_sample_tasks):
assert tiger.get_sizes_for_queues_and_states(
[
("default", "queued"),
("default", "scheduled"),
("other", "queued"),
("other", "scheduled"),
]
) == [2, 1, 1, 0]

0 comments on commit 6fb98a1

Please sign in to comment.