-
-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(hc): Use session tied (not transactional) serialization for outboxes #57877
Conversation
Codecov Report
@@ Coverage Diff @@
## master #57877 +/- ##
==========================================
+ Coverage 79.05% 79.15% +0.10%
==========================================
Files 5131 5131
Lines 223055 224983 +1928
Branches 37574 38126 +552
==========================================
+ Hits 176330 178085 +1755
- Misses 41083 41221 +138
- Partials 5642 5677 +35
|
cursor.execute("SELECT pg_advisory_unlock(%s)", [shard_lock_id]) | ||
except Exception: | ||
# If something strange is going on with our connection, force it closed to prevent holding the lock. | ||
connections[using].close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is safe due to the behavior of django "connection wrappers": when you close a connection explicitly, the next usage will reopen the connection. https://github.com/django/django/blob/fc62e17778dad9eab9e507d90d85a33d415f64a7/django/db/backends/base/base.py#L271
@@ -524,28 +521,45 @@ def save(self, **kwds: Any) -> None: # type: ignore[override] | |||
metrics.incr("outbox.saved", 1, tags=tags) | |||
super().save(**kwds) | |||
|
|||
def lock_id(self, attrs: Iterable[str]) -> int: | |||
# 64 bit integer that roughly encodes a unique, serializable lock identifier | |||
return mmh3.hash64(".".join(str(getattr(self, attr)) for attr in attrs))[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Postgres advisory locks are backed by 64 bit keys... unfortunately they aren't scoped in any other way. We perform a 64 bit hashing of the sharding values to produce one. That means it is possible to have collision: but that's not awful. A collision only means that two distinct shards end up blocking and competing against each other, but it is highly unlikely in practice. It's the price to pay for using session rather than transaction backed locks.
|
||
try: | ||
with connections[using].cursor() as cursor: | ||
if flush_all: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the flush_all case, we do not want to block -- we try to obtain the shard, but will happily skip this shard if it is already being produced elsewise.
|
||
if obtained_lock: | ||
next_shard_row: OutboxBase | None | ||
next_shard_row = self.selected_messages_in_shard( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same query as above, but now behind the separate lock acquisition.
if not cursor.fetchone()[0]: | ||
obtained_lock = False | ||
else: | ||
cursor.execute("SELECT pg_advisory_lock(%s)", [shard_lock_id]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't be able to wait here indefinitely. In production we'll hit statement/query timeouts. However, that shouldn't matter much as we'll end up in the finally
block and release the lock and close the connection if we did fail to acquire the lock due to statement timeouts.
if flush_all: | ||
cursor.execute("SELECT pg_try_advisory_lock(%s)", [shard_lock_id]) | ||
if not cursor.fetchone()[0]: | ||
obtained_lock = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want a metric so we have visibility into any spin waits we have on these locks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, can add some metrics around this work
finally: | ||
try: | ||
with connections[using].cursor() as cursor: | ||
cursor.execute("SELECT pg_advisory_unlock(%s)", [shard_lock_id]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this fail when a lock cannot be acquired because of a query timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is considered a query that does not terminate until loc is acquired, meaning that standard query timeout applies.
try: | ||
next_shard_row = ( | ||
self.selected_messages_in_shard(latest_shard_row=latest_shard_row) | ||
.select_for_update(nowait=flush_all) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously this would have prevented multiple workers from attempting to operate on the same rows. If we end up with multiple workers processing outboxes I don't think session level locking will prevent multiple workers from handling the same outbox messages.
While outbox deliveries are scheduled every minute reducing the chances of overlapping workers, couldn't we have a backlog form (due to an outage elsewhere), and have competitive consumers race each other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of. To be honest, since all rows belong to the same shard here, any lock on any of these rows effectively locks the shard (because rows are only deleted in the same transaction as the locking).
Now, none of that is necessary because the locking is explicit on the hash, and the deletion of rows need not be a transaction as the lock is known to be acquired at the time anyways.
In practice, the serialization here is strong and there won't be any case of multiple workers acting on the same rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, "session based" does not mean it isn't shared across workers -- it only means that the lock is released automatically if the connection is dropped. that's it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also this is already tested in the test_outbox
where I used a separate thread to force a separate connection and validate the serialization when both access the same shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, "session based" does not mean it isn't shared across workers -- it only means that the lock is released automatically if the connection is dropped. that's it.
👍 I misinterpreted 'session based' with transaction scoped locks, but they are different lock scopes.
25e1d69
to
22d4ada
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the walkthrough!
Good to go, once we have the metrics.
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
PR reverted: ecea429 |
Pass 2 on #57877 Added better handling for operational error on lock timeout (something lost in the original PR), and also fixed the outbox contention issues around auth identity.
We want to avoid using an actual transaction around the outbox signal processing so that nested signal invocations can complete transactions with securely stored (committed) results. the previous implementation used
watermarks
under the assumption that cross silo interactions (which generate nested outboxes) would be in a separate process -- however, self hosted and monolith this is not the case.This approach still allows serialization (locking of entries by shard) but does not create a transaction which might prevent commits by inner signal handlers or nested invocations. The benefit of using postgres here instead of redis is the inclusion of session-bound behavior -- if pods rollover during a deploy, for instance, the graceful connection release (which happens by the OS when the process clears the files opened) will inform postgres to nearly instantly release said lock.