Skip to content

Commit

Permalink
Merge pull request #73 from Doist/kjh/refine-message-batching
Browse files Browse the repository at this point in the history
feat: attempt to send the largest batch that fits in the message size limit
  • Loading branch information
tartansandal authored Feb 19, 2024
2 parents 4f5fd94 + 88bc019 commit fe9d2e3
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sqs-workers"
version = "0.5.14"
version = "0.5.15"
description = "An opinionated queue processor for Amazon SQS"
authors = ["Doist Developers <dev@doist.com>"]
license = "MIT"
Expand Down
47 changes: 35 additions & 12 deletions sqs_workers/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

DEFAULT_MESSAGE_GROUP_ID = "default"
SEND_BATCH_SIZE = 10
MAX_SQS_MESSAGE_SIZE = 262144 # in bytes
MAX_MESSAGE_LENGTH = 262144 # 256 KiB

if TYPE_CHECKING:
from sqs_workers import SQSEnv
Expand Down Expand Up @@ -448,36 +448,59 @@ def _should_flush_batch(self) -> bool:
Check if a message batch should be flushed, i.e. all messages should be sent
and removed from the internal cache.
Right now this only checks the number of messages. In the future we may
want to improve that by also ensuring the batch size is small enough.
This not only checks the number of messages, but that the total batch size has
not gotten to too large.
"""
max_size = SEND_BATCH_SIZE if self._batch_level > 0 else 1

# Attempt to flush if we have gotten close to the message size limit
if self._est_message_size() > MAX_SQS_MESSAGE_SIZE:
if self._estimated_message_length(self._batched_messages) > MAX_MESSAGE_LENGTH:
list_length = len(self._batched_messages)
logger.info(
f"Flushing SQS batch on oversized message list with only {list_length} items"
)
return True

return len(self._batched_messages) >= max_size

def _est_message_size(self) -> int:
def _estimated_message_length(self, messages: list) -> int:
"""
Return an estimate of the size (in bytes) of the combined message we want to send.
Return an estimate of the length of the combined message we want to send.
We add an extra 1K to account for any extra headers.
The length is in bytes and we add a 1K buffer to account for any extra headers.
"""
return len(json.dumps(self._batched_messages).encode("utf-8")) + 1024
return len(json.dumps(messages).encode("utf-8")) + 1024

def _flush_batch_if_needed(self) -> None:
queue = self.get_queue()

# There should be at most 1 batch to send. But just in case, prepare to
# send more than that.
while self._should_flush_batch():
send_batch_size = SEND_BATCH_SIZE
send_batch_size = len(self._batched_messages) # will be <= SEND_BATCH_SIZE

if self._est_message_size() > MAX_SQS_MESSAGE_SIZE:
# If we have batch of large messages, try to send half of it at a time
send_batch_size = SEND_BATCH_SIZE // 2
while (
send_batch_size > 1
and self._estimated_message_length(
self._batched_messages[:send_batch_size]
)
> MAX_MESSAGE_LENGTH
):
# Progressively reduce the batch size until it fits.
send_batch_size -= 1

# XXX: temporary logging while we check the length estimates against what
# the SQS service reports. Remove if still here after 2024-02-25.
message_length = self._estimated_message_length(
self._batched_messages[:send_batch_size]
)
if message_length > MAX_MESSAGE_LENGTH:
# We log here so we can match up with the corresponding error message
# from SQS. Note, SQS errors are dynamic and difficult to catch
# explicitly and its not worth the trouble here.
logger.warning(
f"SQS Message is probably too long: {message_length} bytes",
)

msgs = self._batched_messages[:send_batch_size]
self._batched_messages = self._batched_messages[send_batch_size:]
Expand Down
34 changes: 34 additions & 0 deletions tests/test_sqs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
from typing import Dict, List, Optional

import botocore
import pytest

from sqs_workers import (
Expand Down Expand Up @@ -154,6 +155,39 @@ def test_batch_should_keep_messages_until_overflow(sqs, queue_name):
assert len(queue.get_raw_messages(0)) == 1


def test_batch_flush_on_large_messages(sqs, queue_name):
queue = sqs.queue(queue_name)
say_hello_task = queue.connect_processor("say_hello", say_hello)

# 256KiB is our message limit
with say_hello_task.batch():
# no messages after 9 tasks
for n in range(9):
# each message is approx 32427 Bytes
say_hello_task.delay(username=f"Homer {n} 🍩" * 1_000_000)
# first 9 items is ~283651 Bytes so flush is triggered
# and we walk back 1 item
assert len(queue.get_raw_messages(0)) == 8

# 1 message remaining: it's added once the batch is closed
assert len(queue.get_raw_messages(0)) == 1


def test_batch_fails_on_a_giant_message(sqs_session, sqs, queue_name):
if isinstance(sqs_session, MemorySession):
pytest.skip("MessageTooLong not implemented with MemorySession")

queue = sqs.queue(queue_name)
say_hello_task = queue.connect_processor("say_hello", say_hello)

# 262144 Bytes is our message limit
with say_hello_task.batch():
with pytest.raises(botocore.exceptions.ClientError) as excinfo:
# message ~264087 bytes long
say_hello_task.delay(username="Homer 🍩" * 10_150_000)
assert "MessageTooLong" in str(excinfo.value)


def test_call_raises_exception(sqs, queue_name):
queue = sqs.queue(queue_name)
say_hello_task = queue.connect_processor("say_hello", say_hello)
Expand Down

0 comments on commit fe9d2e3

Please sign in to comment.