From 09819f788e9e4fde9e689d34de176c7e6378b1ce Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 14 Nov 2024 13:55:11 +0100 Subject: [PATCH] Refactored reading messages in batch implementation to handle global position from event metadata instead of count --- .../eventStore/schema/readMessagesBatch.ts | 116 +++++++++--------- .../postgreSQLEventStoreSubscription.ts | 17 ++- 2 files changed, 75 insertions(+), 58 deletions(-) diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts b/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts index fae361e..344a23b 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts @@ -1,16 +1,14 @@ import { mapRows, sql, type SQLExecutor } from '@event-driven-io/dumbo'; import { event, - type DefaultStreamVersionType, type Event, type EventDataOf, type EventMetaDataOf, type EventTypeOf, type ReadEvent, + type ReadEventMetadata, type ReadEventMetadataWithGlobalPosition, - type ReadStreamResult, } from '@event-driven-io/emmett'; -import { PostgreSQLEventStoreDefaultStreamVersion } from '../postgreSQLEventStore'; import { defaultTag, eventsTable } from './typing'; type ReadMessagesBatchSqlResult = { @@ -29,78 +27,84 @@ type ReadMessagesBatchSqlResult = { export type ReadMessagesBatchOptions = | { from: bigint; + batchSize: number; } - | { to: bigint } - | { from: bigint; batchSize?: number } - | { batchSize: number }; + | { to: bigint; batchSize: number } + | { from: bigint; to: bigint }; -export const readMessagesBatch = async ( - execute: SQLExecutor, - options?: ReadMessagesBatchOptions & { partition?: string }, -): Promise< - ReadStreamResult< - MessageType, - DefaultStreamVersionType, - ReadEventMetadataWithGlobalPosition - > -> => { - const fromCondition: string = - options && 'from' in options - ? `AND global_position >= ${options.from}` - : ''; +export type ReadMessagesBatchResult< + EventType extends Event, + ReadEventMetadataType extends ReadEventMetadata = ReadEventMetadata, +> = { + currentGlobalPosition: bigint; + events: ReadEvent[]; + areEventsLeft: boolean; +}; +export const readMessagesBatch = async < + MessageType extends Event, + ReadEventMetadataType extends + ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition, +>( + execute: SQLExecutor, + options: ReadMessagesBatchOptions & { partition?: string }, +): Promise> => { + const from = 'from' in options ? options.from : 0n; + const batchSize = + options && 'batchSize' in options + ? options.batchSize + : options.to - options.from; const to = Number( - options && 'to' in options - ? options.to - : options && 'batchSize' in options && options.batchSize - ? (options && 'from' in options ? options.from : 0n) + - BigInt(options.batchSize) - : NaN, + 'to' in options ? options.to : from + BigInt(options.batchSize), ); + const fromCondition: string = + from !== -0n ? `AND global_position >= ${from}` : ''; + const toCondition = !isNaN(to) ? `AND global_position <= ${to}` : ''; - const events: ReadEvent[] = - await mapRows( - execute.query>( - sql( - `SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id + const events: ReadEvent[] = await mapRows( + execute.query>( + sql( + `SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id FROM ${eventsTable.name} WHERE partition = %L AND is_archived = FALSE AND transaction_id < pg_snapshot_xmin(pg_current_snapshot()) ${fromCondition} ${toCondition} ORDER BY transaction_id, global_position`, - options?.partition ?? defaultTag, - ), + options?.partition ?? defaultTag, ), - (row) => { - const rawEvent = event( - row.event_type, - row.event_data, - row.event_metadata, - ) as MessageType; + ), + (row) => { + const rawEvent = event( + row.event_type, + row.event_data, + row.event_metadata, + ) as MessageType; + + const metadata: ReadEventMetadataWithGlobalPosition = { + ...rawEvent.metadata, + eventId: row.event_id, + streamName: row.stream_id, + streamPosition: BigInt(row.stream_position), + globalPosition: BigInt(row.global_position), + }; - return { - ...rawEvent, - metadata: { - ...rawEvent.metadata, - eventId: row.event_id, - streamName: row.stream_id, - streamPosition: BigInt(row.stream_position), - globalPosition: BigInt(row.global_position), - }, - }; - }, - ); + return { + ...rawEvent, + metadata: metadata as ReadEventMetadataType, + }; + }, + ); return events.length > 0 ? { - currentStreamVersion: - events[events.length - 1]!.metadata.streamPosition, + currentGlobalPosition: + events[events.length - 1]!.metadata.globalPosition, events, - streamExists: true, + areEventsLeft: events.length === batchSize, } : { - currentStreamVersion: PostgreSQLEventStoreDefaultStreamVersion, + currentGlobalPosition: from, events: [], - streamExists: false, + areEventsLeft: false, }; }; diff --git a/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreSubscription.ts b/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreSubscription.ts index c768978..96fd70d 100644 --- a/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreSubscription.ts +++ b/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreSubscription.ts @@ -5,6 +5,7 @@ import { type ReadEvent, type ReadEventMetadataWithGlobalPosition, } from '@event-driven-io/emmett'; +import { setTimeout } from 'timers'; import { readMessagesBatch, type ReadMessagesBatchOptions, @@ -71,10 +72,15 @@ const messageBatchPooler = ({ let isRunning = false; let start: Promise; + const pollMessages = async () => { const options: ReadMessagesBatchOptions = { from: 0n, batchSize }; + + let waitTime = 100; + do { - const { events } = await readMessagesBatch(executor, options); + const { events, currentGlobalPosition, areEventsLeft } = + await readMessagesBatch(executor, options); for (const message of events) { const result = await eachMessage( @@ -89,7 +95,14 @@ const messageBatchPooler = ({ } } } - options.from += BigInt(events.length); + options.from = currentGlobalPosition; + + if (!areEventsLeft) { + waitTime = Math.min(waitTime * 2, 5000); + await new Promise((resolve) => setTimeout(resolve, waitTime)); + } else { + waitTime = 0; + } } while (isRunning); };