Skip to content

Commit

Permalink
Refresh StoredEvent object and confirm still unpublished
Browse files Browse the repository at this point in the history
In extremely high concurrency situations, it was possible for the
entities loaded in `PublishDomainEventSubscriber::publishEvents()` to be
published by another process, before the subsequent lock was acquired.

This commit enables a Doctrine "refresh" of the entity, inside the lock,
which then allows us to perform a reliable lookup of the entity's
published status, and then only publish if it is still unpublished.
  • Loading branch information
benr77 committed Jun 2, 2024
1 parent fd802fe commit 9f2eee9
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 42 deletions.
28 changes: 5 additions & 23 deletions src/Doctrine/DoctrineEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,11 @@ public function publish(StoredEvent $storedEvent): void
*/
public function allUnpublished(): array
{
try
{
try {
if (false === $this->em->getConnection()->createSchemaManager()->tablesExist([$this->tableName])) {
return []; // Connection does exist, but the events table does not exist.
}
}
catch (ConnectionException $connectionException)
{
} catch (ConnectionException $connectionException) {
return []; // Connection itself does not exist
}

Expand All @@ -131,23 +128,8 @@ public function allUnpublished(): array
return $qb->getQuery()->getResult();
}

/*
* @return StoredEvent[]|ArrayCollection
*/
/*public function allStoredEventsSince($eventId): ArrayCollection
public function refresh(StoredEvent $storedEvent): void
{
$qb = $this->em->createQueryBuilder()
->select('e')
->from(StoredEvent::class, 'e')
->orderBy('e.eventId');
if ($eventId)
{
$qb
->where('se.eventId > :event_id')
->setParameter('event_id', $eventId);
}
return $qb->getQuery()->getResult();
}*/
$this->em->refresh($storedEvent);
}
}
10 changes: 2 additions & 8 deletions src/Domain/Model/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

namespace Headsnet\DomainEventsBundle\Domain\Model;

use Doctrine\Common\Collections\ArrayCollection;

interface EventStore
{
public function nextIdentity(): EventId;
Expand All @@ -22,16 +20,12 @@ public function append(DomainEvent $domainEvent): void;

public function replace(DomainEvent $domainEvent): void;

public function publish(StoredEvent $domainEvent): void;
public function publish(StoredEvent $storedEvent): void;

/**
* @return StoredEvent[]
*/
public function allUnpublished(): array;

/*
* @param $eventId
* @return StoredEvent[]|ArrayCollection
*/
//public function allStoredEventsSince($eventId): ArrayCollection;
public function refresh(StoredEvent $storedEvent): void;
}
4 changes: 2 additions & 2 deletions src/Domain/Model/StoredEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class StoredEvent
private $occurredOn;

/**
* @var \DateTimeImmutable
* @var \DateTimeImmutable|null
*/
private $publishedOn;

Expand Down Expand Up @@ -68,7 +68,7 @@ public function getOccurredOn(): \DateTimeImmutable
return $this->occurredOn;
}

public function getPublishedOn(): \DateTimeImmutable
public function getPublishedOn(): ?\DateTimeImmutable
{
return $this->publishedOn;
}
Expand Down
27 changes: 18 additions & 9 deletions src/EventSubscriber/PublishDomainEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,24 @@ private function publishEvent(StoredEvent $storedEvent): void
);

if ($lock->acquire()) {
$domainEvent = $this->serializer->deserialize(
$storedEvent->getEventBody(),
$storedEvent->getTypeName(),
'json'
);
assert($domainEvent instanceof DomainEvent);

$this->domainEventDispatcher->dispatch($domainEvent);
$this->eventStore->publish($storedEvent);
// Refresh the entity from the database, in case another process has
// updated it between reading it above and then acquiring the lock here.
// THIS IS CRITICAL TO AVOID DUPLICATE PUBLISHING OF EVENTS
$this->eventStore->refresh($storedEvent);

// If the event is definitely still unpublished, whilst we are here holding
// the lock, then we can proceed to publish it.
if ($storedEvent->getPublishedOn() === null) {
$domainEvent = $this->serializer->deserialize(
$storedEvent->getEventBody(),
$storedEvent->getTypeName(),
'json'
);
assert($domainEvent instanceof DomainEvent);

$this->domainEventDispatcher->dispatch($domainEvent);
$this->eventStore->publish($storedEvent);
}

$lock->release();
}
Expand Down

0 comments on commit 9f2eee9

Please sign in to comment.