Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added streaming capabilities #76

Merged
merged 21 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9c10b80
Added client shopping cart summary projection
oskardudycz Aug 25, 2024
2547824
Added streaming to event stores
oskardudycz Jun 11, 2024
5b302d9
Split readers tracking from the catching up detection
oskardudycz Jun 13, 2024
9b6cf9b
Made retry stream a separate, focused helper
oskardudycz Jun 19, 2024
ae6ec32
Improved retry handler by handling controller termination when readin…
oskardudycz Jun 19, 2024
88356ed
Added tests for notifying about no active readers
oskardudycz Jun 19, 2024
27b79ec
Added reduce stream transformation
oskardudycz Jun 20, 2024
4f0c998
Added collectors like first, single etc.
oskardudycz Jun 20, 2024
cbf9f18
Added skip and take streams transforms
oskardudycz Jun 20, 2024
d88fe29
Added last and lastOrDefault collectors
oskardudycz Jun 20, 2024
1686756
Added and used in tests "from" stream generator
oskardudycz Jun 25, 2024
572ac71
Added map stream transformation
oskardudycz Jun 25, 2024
dbe9af9
Updated and fixed CaughtUpTransformStream implementation
oskardudycz Jun 26, 2024
fcd9f33
Added tests for stream transformations
oskardudycz Jul 2, 2024
48afeb4
Added waitAtMost transformations and tests for streaming coordinator
oskardudycz Jul 2, 2024
4b1aac2
Updated event store subscriptions implementation around catching up
oskardudycz Jul 2, 2024
7be4081
Updated tsup config
oskardudycz Jul 2, 2024
b529a03
Configured shims package
oskardudycz Jul 2, 2024
469590d
Fixed types import from shim
oskardudycz Jul 2, 2024
e172275
Disabled ESDB event store e2e tests for now
oskardudycz Jul 4, 2024
62081f2
Removed stream events method from the API to be able to merge changes…
oskardudycz Oct 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/webApi/expressjs-with-esdb/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export default defineConfig({
watch: env === 'development',
target: 'esnext',
outDir: 'dist', //env === 'production' ? 'dist' : 'lib',
entry: ['src/**/*.ts', '!src/**/*.spec.ts', '!src/**/*.internal.ts'], //include all files under src but not specs
entry: ['src/index.ts'],
sourcemap: true,
tsconfig: 'tsconfig.json', // workaround for https://github.com/egoist/tsup/issues/571#issuecomment-1760052931
});
2 changes: 1 addition & 1 deletion src/docs/snippets/gettingStarted/webApi/apiBDD.e2e.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void describe('ShoppingCart E2E', () => {
)
.then([expectResponse(204)]));

void it('should return details', () =>
void it('returns details', () =>
given(openedShoppingCartWithProduct)
.when((request) =>
request.get(`/clients/${clientId}/shopping-carts/current`).send(),
Expand Down
31 changes: 28 additions & 3 deletions src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@
"@types/express": "4.17.21",
"@types/node": "^22.4.1",
"@types/supertest": "6.0.2",
"web-streams-polyfill": "^4.0.0",
"supertest": "7.0.0"
},
"workspaces": [
"packages/emmett-shims",
"packages/emmett",
"packages/emmett-postgresql",
"packages/emmett-esdb",
Expand Down
121 changes: 92 additions & 29 deletions src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ import {
NO_CONCURRENCY_CHECK,
STREAM_DOES_NOT_EXIST,
STREAM_EXISTS,
globalStreamCaughtUp,
streamTransformations,
type AggregateStreamOptions,
type AggregateStreamResult,
type AppendToStreamOptions,
type AppendToStreamResult,
type DefaultStreamVersionType,
type Event,
type EventMetaDataOf,
type EventStore,
type ExpectedStreamVersion,
type GlobalStreamCaughtUp,
type ReadEvent,
type ReadEventMetadataWithGlobalPosition,
type ReadStreamOptions,
Expand All @@ -25,9 +27,16 @@ import {
StreamNotFoundError,
WrongExpectedVersionError,
jsonEvent,
type AllStreamResolvedEvent,
type AllStreamSubscription,
type AppendExpectedRevision,
type ReadStreamOptions as ESDBReadStreamOptions,
type JSONRecordedEvent,
} from '@eventstore/db-client';
import { WritableStream, type ReadableStream } from 'node:stream/web';
import { Readable } from 'stream';

const { map } = streamTransformations;

const toEventStoreDBReadOptions = (
options: ReadStreamOptions | undefined,
Expand Down Expand Up @@ -64,26 +73,13 @@ export const getEventStoreDBEventStore = (
let state = initialState();
let currentStreamVersion: bigint | undefined = undefined;

for await (const { event } of eventStore.readStream(
for await (const { event } of eventStore.readStream<EventType>(
streamName,
toEventStoreDBReadOptions(options.read),
)) {
if (!event) continue;

state = evolve(state, <
ReadEvent<EventType, ReadEventMetadataWithGlobalPosition>
>{
type: event.type,
data: event.data,
metadata: {
...((event.metadata as EventMetaDataOf<EventType> | undefined) ??
{}),
eventId: event.id,
streamName: event.streamId,
streamPosition: event.revision,
globalPosition: event.position!.commit,
},
});
state = evolve(state, mapFromESDBEvent<EventType>(event));
currentStreamVersion = event.revision;
}

Expand Down Expand Up @@ -123,23 +119,12 @@ export const getEventStoreDBEventStore = (
let currentStreamVersion: bigint | undefined = undefined;

try {
for await (const { event } of eventStore.readStream(
for await (const { event } of eventStore.readStream<EventType>(
streamName,
toEventStoreDBReadOptions(options),
)) {
if (!event) continue;
events.push(<
ReadEvent<EventType, ReadEventMetadataWithGlobalPosition>
>{
type: event.type,
data: event.data,
metadata: {
eventId: event.id,
streamName: event.streamId,
streamPosition: event.revision,
globalPosition: event.position!.commit,
},
});
events.push(mapFromESDBEvent(event));
currentStreamVersion = event.revision;
}
return currentStreamVersion
Expand Down Expand Up @@ -189,6 +174,25 @@ export const getEventStoreDBEventStore = (
throw error;
}
},

//streamEvents: streamEvents(eventStore),
};
};

const mapFromESDBEvent = <EventType extends Event = Event>(
event: JSONRecordedEvent<EventType>,
): ReadEvent<EventType, ReadEventMetadataWithGlobalPosition> => {
return <ReadEvent<EventType, ReadEventMetadataWithGlobalPosition>>{
type: event.type,
data: event.data,
metadata: {
...((event.metadata as ReadEventMetadataWithGlobalPosition) ??
({} as ReadEventMetadataWithGlobalPosition)),
eventId: event.id,
streamName: event.streamId,
streamPosition: event.revision,
globalPosition: event.position!.commit,
},
};
};

Expand Down Expand Up @@ -242,3 +246,62 @@ const assertExpectedVersionMatchesCurrent = (
if (!matchesExpectedVersion(current, expected))
throw new ExpectedVersionConflictError(current, expected);
};

// eslint-disable-next-line @typescript-eslint/no-unused-vars
const convertToWebReadableStream = (
allStreamSubscription: AllStreamSubscription,
): ReadableStream<AllStreamResolvedEvent | GlobalStreamCaughtUp> => {
// Validate the input type
if (!(allStreamSubscription instanceof Readable)) {
throw new Error('Provided stream is not a Node.js Readable stream.');
}

let globalPosition = 0n;

const stream = Readable.toWeb(
allStreamSubscription,
) as ReadableStream<AllStreamResolvedEvent>;

const writable = new WritableStream<
AllStreamResolvedEvent | GlobalStreamCaughtUp
>();

allStreamSubscription.on('caughtUp', async () => {
console.log(globalPosition);
await writable.getWriter().write(globalStreamCaughtUp({ globalPosition }));
});

const transform = map<
AllStreamResolvedEvent,
AllStreamResolvedEvent | GlobalStreamCaughtUp
>((event) => {
if (event?.event?.position.commit)
globalPosition = event.event?.position.commit;

return event;
});

return stream.pipeThrough<AllStreamResolvedEvent | GlobalStreamCaughtUp>(
transform,
);
};

// const streamEvents = (eventStore: EventStoreDBClient) => () => {
// return restream<
// AllStreamResolvedEvent | GlobalSubscriptionEvent,
// | ReadEvent<Event, ReadEventMetadataWithGlobalPosition>
// | GlobalSubscriptionEvent
// >(
// (): ReadableStream<AllStreamResolvedEvent | GlobalSubscriptionEvent> =>
// convertToWebReadableStream(
// eventStore.subscribeToAll({
// fromPosition: START,
// filter: excludeSystemEvents(),
// }),
// ),
// (
// resolvedEvent: AllStreamResolvedEvent | GlobalSubscriptionEvent,
// ): ReadEvent<Event, ReadEventMetadataWithGlobalPosition> =>
// mapFromESDBEvent(resolvedEvent.event as JSONRecordedEvent<Event>),
// );
// };
1 change: 0 additions & 1 deletion src/packages/emmett-esdb/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export default defineConfig({
minify: true, //env === 'production',
bundle: true, //env === 'production',
skipNodeModulesBundle: true,
entryPoints: ['src/index.ts'],
watch: env === 'development',
target: 'esnext',
outDir: 'dist', //env === 'production' ? 'dist' : 'lib',
Expand Down
1 change: 0 additions & 1 deletion src/packages/emmett-expressjs/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export default defineConfig({
minify: true, //env === 'production',
bundle: true, //env === 'production',
skipNodeModulesBundle: true,
entryPoints: ['src/index.ts'],
watch: env === 'development',
target: 'esnext',
outDir: 'dist', //env === 'production' ? 'dist' : 'lib',
Expand Down
1 change: 0 additions & 1 deletion src/packages/emmett-fastify/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export default defineConfig({
minify: true, //env === 'production',
bundle: true, //env === 'production',
skipNodeModulesBundle: true,
entryPoints: ['src/index.ts'],
watch: env === 'development',
target: 'esnext',
outDir: 'dist', //env === 'production' ? 'dist' : 'lib',
Expand Down
55 changes: 55 additions & 0 deletions src/packages/emmett-shims/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"name": "@event-driven-io/emmett-shims",
"version": "0.19.1",
"type": "module",
"description": "Emmett - Event Sourcing development made simple",
"scripts": {
"build": "tsup",
"build:ts": "tsc",
"build:ts:watch": "tsc --watch",
"test": "run-s test:unit test:int test:e2e",
"test:unit": "glob -c \"node --import tsx --test\" **/*.unit.spec.ts",
"test:int": "glob -c \"node --import tsx --test\" **/*.int.spec.ts",
"test:e2e": "glob -c \"node --import tsx --test\" **/*.e2e.spec.ts",
"test:watch": "node --import tsx --test --watch",
"test:unit:watch": "glob -c \"node --import tsx --test --watch\" **/*.unit.spec.ts",
"test:int:watch": "glob -c \"node --import tsx --test --watch\" **/*.int.spec.ts",
"test:e2e:watch": "glob -c \"node --import tsx --test --watch\" **/*.e2e.spec.ts"
},
"repository": {
"type": "git",
"url": "git+https://github.com/event-driven-io/emmett.git"
},
"keywords": [
"Event Sourcing"
],
"author": "Oskar Dudycz",
"bugs": {
"url": "https://github.com/event-driven-io/emmett/issues"
},
"homepage": "https://event-driven-io.github.io/emmett/",
"exports": {
".": {
"import": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"require": {
"types": "./dist/index.d.cts",
"default": "./dist/index.cjs"
}
}
},
"main": "./dist/index.cjs",
"module": "./dist/index.js",
"types": "./dist/index.d.ts",
"files": [
"dist"
],
"peerDependencies": {
"web-streams-polyfill": "^4.0.0"
},
"devDependencies": {
"@types/node": "^20.11.30"
}
}
Loading
Loading