Skip to content

Commit

Permalink
Merge pull request #100 from castore-dev/create-dynamodb-event-storag…
Browse files Browse the repository at this point in the history
…e-adapter-2
  • Loading branch information
ThomasAribart authored Apr 27, 2023
2 parents f09d148 + 8fd11a9 commit a774af0
Show file tree
Hide file tree
Showing 11 changed files with 1,175 additions and 226 deletions.
2 changes: 1 addition & 1 deletion packages/dynamodb-event-storage-adapter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"watch": "rm -rf dist && concurrently 'yarn:package-* --watch'"
},
"dependencies": {
"@aws/dynamodb-auto-marshaller": "^0.7.1"
"@aws-sdk/util-dynamodb": "^3.319.0"
},
"devDependencies": {
"@aws-sdk/client-dynamodb": "^3.2.0",
Expand Down
97 changes: 44 additions & 53 deletions packages/dynamodb-event-storage-adapter/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
DynamoDBClient,
TransactWriteItemsCommand,
} from '@aws-sdk/client-dynamodb';
import { Marshaller } from '@aws/dynamodb-auto-marshaller';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';

import {
Aggregate,
Expand All @@ -17,37 +17,21 @@ import {
StorageAdapter,
} from '@castore/core';

import {
EVENT_TABLE_INITIAL_EVENT_INDEX_NAME,
EVENT_TABLE_IS_INITIAL_EVENT_KEY,
EVENT_TABLE_PK,
EVENT_TABLE_SK,
EVENT_TABLE_TIMESTAMP_KEY,
MARSHALL_OPTIONS,
} from './constants';
import { DynamoDBEventAlreadyExistsError } from './error';
import { isConditionalCheckFailedException } from './utils/isConditionalCheckFailed';
import {
parseAppliedListAggregateIdsOptions,
ParsedPageToken,
} from './utils/parseAppliedListAggregateIdsOptions';

const marshaller = new Marshaller() as {
marshallItem: (
item: Record<string, unknown>,
) => Record<string, AttributeValue>;
unmarshallItem: (
item: Record<string, AttributeValue>,
) => Record<string, unknown>;
};

export const EVENT_TABLE_PK = 'aggregateId';
export const EVENT_TABLE_SK = 'version';
export const EVENT_TABLE_TIMESTAMP_KEY = 'timestamp';
export const EVENT_TABLE_EVENT_TYPE_KEY = 'type';
export const EVENT_TABLE_PAYLOAD_KEY = 'payload';
export const EVENT_TABLE_METADATA_KEY = 'metadata';
export const EVENT_TABLE_IS_INITIAL_EVENT_KEY = 'isInitialEvent';
export const EVENT_TABLE_INITIAL_EVENT_INDEX_NAME = 'initialEvents';

const isConditionalCheckFailedException = (error: Error): boolean =>
typeof error === 'object' &&
((error as { code?: unknown }).code === 'ConditionalCheckFailedException' ||
(error as { errorType?: unknown }).errorType ===
'ConditionalCheckFailedException' ||
(error as { name?: unknown }).name === 'ConditionalCheckFailedException');

type DynamoDbGroupedEvent<
EVENT_DETAILS extends EventDetail = EventDetail,
AGGREGATE extends Aggregate = Aggregate,
Expand Down Expand Up @@ -182,11 +166,14 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {
? { '#version': EVENT_TABLE_SK }
: {}),
},
ExpressionAttributeValues: marshaller.marshallItem({
':aggregateId': aggregateId,
...(maxVersion !== undefined ? { ':maxVersion': maxVersion } : {}),
...(minVersion !== undefined ? { ':minVersion': minVersion } : {}),
}),
ExpressionAttributeValues: marshall(
{
':aggregateId': aggregateId,
...(maxVersion !== undefined ? { ':maxVersion': maxVersion } : {}),
...(minVersion !== undefined ? { ':minVersion': minVersion } : {}),
},
MARSHALL_OPTIONS,
),
ConsistentRead: true,
...(reverse !== undefined ? { ScanIndexForward: !reverse } : {}),
...(limit !== undefined ? { Limit: limit } : {}),
Expand All @@ -207,7 +194,7 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {

return {
events: marshalledEvents
.map(item => marshaller.unmarshallItem(item))
.map(item => unmarshall(item))
.map((item): EventDetail => {
const {
aggregateId: evtAggregateId,
Expand Down Expand Up @@ -236,15 +223,18 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {

return {
TableName: this.getTableName(),
Item: marshaller.marshallItem({
aggregateId,
version,
type,
timestamp,
...(payload !== undefined ? { payload } : {}),
...(metadata !== undefined ? { metadata } : {}),
...(version === 1 ? { isInitialEvent: 1 } : {}),
}),
Item: marshall(
{
aggregateId,
version,
type,
timestamp,
...(payload !== undefined ? { payload } : {}),
...(metadata !== undefined ? { metadata } : {}),
...(version === 1 ? { isInitialEvent: 1 } : {}),
},
MARSHALL_OPTIONS,
),
ExpressionAttributeNames: { '#version': EVENT_TABLE_SK },
ConditionExpression: 'attribute_not_exists(#version)',
};
Expand Down Expand Up @@ -324,9 +314,7 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {
ExpressionAttributeNames: {
'#isInitialEvent': EVENT_TABLE_IS_INITIAL_EVENT_KEY,
},
ExpressionAttributeValues: marshaller.marshallItem({
':true': 1,
}),
ExpressionAttributeValues: marshall({ ':true': 1 }, MARSHALL_OPTIONS),
IndexName: EVENT_TABLE_INITIAL_EVENT_INDEX_NAME,
};

Expand Down Expand Up @@ -359,14 +347,17 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {

aggregateIdsQueryCommandInput.ExpressionAttributeValues = {
...aggregateIdsQueryCommandInput.ExpressionAttributeValues,
...marshaller.marshallItem({
...(initialEventBefore !== undefined
? { ':initialEventBefore': initialEventBefore }
: {}),
...(initialEventAfter !== undefined
? { ':initialEventAfter': initialEventAfter }
: {}),
}),
...marshall(
{
...(initialEventBefore !== undefined
? { ':initialEventBefore': initialEventBefore }
: {}),
...(initialEventAfter !== undefined
? { ':initialEventAfter': initialEventAfter }
: {}),
},
MARSHALL_OPTIONS,
),
};
}

Expand Down Expand Up @@ -395,7 +386,7 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {

return {
aggregateIds: unmarshalledInitialEvents
.map(item => marshaller.unmarshallItem(item))
.map(item => unmarshall(item))
.map(item => {
const { aggregateId } = item as Pick<EventDetail, 'aggregateId'>;

Expand Down
Loading

0 comments on commit a774af0

Please sign in to comment.