Skip to content

Commit

Permalink
Refactored reading messages in batch implementation to handle global …
Browse files Browse the repository at this point in the history
…position from event metadata instead of count
  • Loading branch information
oskardudycz committed Nov 14, 2024
1 parent 168ac71 commit 09819f7
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import { mapRows, sql, type SQLExecutor } from '@event-driven-io/dumbo';
import {
event,
type DefaultStreamVersionType,
type Event,
type EventDataOf,
type EventMetaDataOf,
type EventTypeOf,
type ReadEvent,
type ReadEventMetadata,
type ReadEventMetadataWithGlobalPosition,
type ReadStreamResult,
} from '@event-driven-io/emmett';
import { PostgreSQLEventStoreDefaultStreamVersion } from '../postgreSQLEventStore';
import { defaultTag, eventsTable } from './typing';

type ReadMessagesBatchSqlResult<EventType extends Event> = {
Expand All @@ -29,78 +27,84 @@ type ReadMessagesBatchSqlResult<EventType extends Event> = {
export type ReadMessagesBatchOptions =
| {
from: bigint;
batchSize: number;
}
| { to: bigint }
| { from: bigint; batchSize?: number }
| { batchSize: number };
| { to: bigint; batchSize: number }
| { from: bigint; to: bigint };

export const readMessagesBatch = async <MessageType extends Event>(
execute: SQLExecutor,
options?: ReadMessagesBatchOptions & { partition?: string },
): Promise<
ReadStreamResult<
MessageType,
DefaultStreamVersionType,
ReadEventMetadataWithGlobalPosition
>
> => {
const fromCondition: string =
options && 'from' in options
? `AND global_position >= ${options.from}`
: '';
export type ReadMessagesBatchResult<
EventType extends Event,
ReadEventMetadataType extends ReadEventMetadata = ReadEventMetadata,
> = {
currentGlobalPosition: bigint;
events: ReadEvent<EventType, ReadEventMetadataType>[];
areEventsLeft: boolean;
};

export const readMessagesBatch = async <
MessageType extends Event,
ReadEventMetadataType extends
ReadEventMetadataWithGlobalPosition = ReadEventMetadataWithGlobalPosition,
>(
execute: SQLExecutor,
options: ReadMessagesBatchOptions & { partition?: string },
): Promise<ReadMessagesBatchResult<MessageType, ReadEventMetadataType>> => {
const from = 'from' in options ? options.from : 0n;
const batchSize =
options && 'batchSize' in options
? options.batchSize
: options.to - options.from;
const to = Number(
options && 'to' in options
? options.to
: options && 'batchSize' in options && options.batchSize
? (options && 'from' in options ? options.from : 0n) +
BigInt(options.batchSize)
: NaN,
'to' in options ? options.to : from + BigInt(options.batchSize),
);

const fromCondition: string =
from !== -0n ? `AND global_position >= ${from}` : '';

const toCondition = !isNaN(to) ? `AND global_position <= ${to}` : '';

const events: ReadEvent<MessageType, ReadEventMetadataWithGlobalPosition>[] =
await mapRows(
execute.query<ReadMessagesBatchSqlResult<MessageType>>(
sql(
`SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id
const events: ReadEvent<MessageType, ReadEventMetadataType>[] = await mapRows(
execute.query<ReadMessagesBatchSqlResult<MessageType>>(
sql(
`SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id
FROM ${eventsTable.name}
WHERE partition = %L AND is_archived = FALSE AND transaction_id < pg_snapshot_xmin(pg_current_snapshot()) ${fromCondition} ${toCondition}
ORDER BY transaction_id, global_position`,
options?.partition ?? defaultTag,
),
options?.partition ?? defaultTag,
),
(row) => {
const rawEvent = event<MessageType>(
row.event_type,
row.event_data,
row.event_metadata,
) as MessageType;
),
(row) => {
const rawEvent = event<MessageType>(
row.event_type,
row.event_data,
row.event_metadata,
) as MessageType;

const metadata: ReadEventMetadataWithGlobalPosition = {
...rawEvent.metadata,
eventId: row.event_id,
streamName: row.stream_id,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
};

return {
...rawEvent,
metadata: {
...rawEvent.metadata,
eventId: row.event_id,
streamName: row.stream_id,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
},
};
},
);
return {
...rawEvent,
metadata: metadata as ReadEventMetadataType,
};
},
);

return events.length > 0
? {
currentStreamVersion:
events[events.length - 1]!.metadata.streamPosition,
currentGlobalPosition:
events[events.length - 1]!.metadata.globalPosition,
events,
streamExists: true,
areEventsLeft: events.length === batchSize,
}
: {
currentStreamVersion: PostgreSQLEventStoreDefaultStreamVersion,
currentGlobalPosition: from,
events: [],
streamExists: false,
areEventsLeft: false,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type ReadEvent,
type ReadEventMetadataWithGlobalPosition,
} from '@event-driven-io/emmett';
import { setTimeout } from 'timers';
import {
readMessagesBatch,
type ReadMessagesBatchOptions,
Expand Down Expand Up @@ -71,10 +72,15 @@ const messageBatchPooler = <EventType extends Event = Event>({
let isRunning = false;

let start: Promise<void>;

const pollMessages = async () => {
const options: ReadMessagesBatchOptions = { from: 0n, batchSize };

let waitTime = 100;

do {
const { events } = await readMessagesBatch(executor, options);
const { events, currentGlobalPosition, areEventsLeft } =
await readMessagesBatch(executor, options);

for (const message of events) {
const result = await eachMessage(
Expand All @@ -89,7 +95,14 @@ const messageBatchPooler = <EventType extends Event = Event>({
}
}
}
options.from += BigInt(events.length);
options.from = currentGlobalPosition;

if (!areEventsLeft) {
waitTime = Math.min(waitTime * 2, 5000);
await new Promise((resolve) => setTimeout(resolve, waitTime));
} else {
waitTime = 0;
}
} while (isRunning);
};

Expand Down

0 comments on commit 09819f7

Please sign in to comment.