From c00dc3886e0f4217acbcff14d0ec1d7b0e1914f3 Mon Sep 17 00:00:00 2001 From: bogdan-rosianu Date: Thu, 19 Sep 2024 14:15:51 +0300 Subject: [PATCH 1/2] event processor --- apps/event-processor/event.processor.ts | 148 ++++++++++++++++++++++++ apps/event-processor/test.ts | 19 +++ 2 files changed, 167 insertions(+) create mode 100644 apps/event-processor/event.processor.ts create mode 100644 apps/event-processor/test.ts diff --git a/apps/event-processor/event.processor.ts b/apps/event-processor/event.processor.ts new file mode 100644 index 0000000..5f53f65 --- /dev/null +++ b/apps/event-processor/event.processor.ts @@ -0,0 +1,148 @@ +import axios from 'axios'; + +export class EventProcessor { + private options: EventProcessorOptions = new EventProcessorOptions(); + + async start(options: EventProcessorOptions) { + this.options = options; + + if (!options.emitterAddresses && !options.eventIdentifiers) { + throw new Error(`No emitter address or identifier set. Could not resolve all the events without filters.`); + } + + await this.startProcess(options); + } + + private async startProcess(options: EventProcessorOptions) { + const lastTimestampFunc = options.getLastProcessedTimestamp; + if (!lastTimestampFunc) { + throw new Error(`Undefined function for getting the last processed timestamp`); + } + + const maxHeight = await lastTimestampFunc(); + if (!maxHeight) { + throw new Error(`Cannot get the last processed timestamp via the provided getLastProcessedTimestamp()`); + } + + await this.callElasticsearchEvents(); + } + + async callElasticsearchEvents() { + const url = `${this.options.elasticUrl}/events/_search?scroll=${this.options.scrollTimeout}`; + const timestampFunc = this.options.getLastProcessedTimestamp; + if (!timestampFunc) { + return; + } + + const elasticQuery = this.generateElasticsearchQuery(await timestampFunc() ?? -1); + const result = await axios.post(url, elasticQuery); + + if (!result.data || !result.data.hits || !result.data.hits || !result.data.hits.hits) { + return; + } + + const elasticEvents = result.data.hits.hits; + const events = elasticEvents.map((e: { _source: any; }) => e._source); + await this.handleElasticEvents(events); + + const scrollId = result.data._scroll_id; + while (true) { + const scrollResult = await axios.post(`${this.options.elasticUrl}/_search/scroll`, + { + scroll_id: scrollId, + }); + + const scrollDocuments = scrollResult.data.hits.hits; + if (scrollDocuments.length === 0) { + break; + } + + const scrollEvents = scrollDocuments.map((e: { _source: any; }) => e._source); + await this.handleElasticEvents(scrollEvents); + } + } + + async handleElasticEvents(events: EventSource[]) { + if (events.length === 0) { + return; + } + + const lastTimestamp = events[events.length - 1].timestamp ?? 0; + + const onEventsFunc = this.options.onEventsReceived; + if (onEventsFunc) { + await onEventsFunc(lastTimestamp, events); + } + + const setLatestProcessedTimestampFunc = this.options.setLastProcessedTimestamp; + if (setLatestProcessedTimestampFunc) { + await setLatestProcessedTimestampFunc(lastTimestamp); + } + } + + generateElasticsearchQuery(timestamp: number) { + return { + size: this.options.pageSize, + query: { + bool: { + must: [ + { + terms: { + identifier: this.options.eventIdentifiers, // Query by identifiers + }, + }, + { + terms: { + address: this.options.emitterAddresses, // Query by addresses + }, + }, + { + range: { + timestamp: { + gt: `${timestamp}`, + }, + }, + }, + ], + }, + }, + sort: [ + { + timestamp: { + order: 'asc', // Sorting by timestamp in ascending order + }, + }, + ], + }; + } +} + +export class EventProcessorOptions { + elasticUrl?: string; + emitterAddresses?: string[]; + eventIdentifiers?: string[]; + pageSize: number = 10000; + scrollTimeout: string = "1m"; + onEventsReceived?: (highestTimestamp: number, events: EventSource[]) => Promise; + getLastProcessedTimestamp?: () => Promise; + setLastProcessedTimestamp?: (nonce: number) => Promise; + timeout?: number | undefined; + + constructor(options: Partial = {}) { + Object.assign(this, options); + } +} + +export class EventSource { + originalTxHash?: string; + logAddress?: string; + identifier?: string; + address?: string; + topics?: string[]; + shardID?: number; + additionalData?: string[]; + txOrder?: number; + txHash?: string; + order?: number; + timestamp?: number; +} diff --git a/apps/event-processor/test.ts b/apps/event-processor/test.ts new file mode 100644 index 0000000..57a1fd8 --- /dev/null +++ b/apps/event-processor/test.ts @@ -0,0 +1,19 @@ +import { EventProcessor, EventProcessorOptions } from './event.processor'; + +const eventsProcessor = new EventProcessor(); + +void eventsProcessor.start(new EventProcessorOptions({ + elasticUrl: 'https://index.multiversx.com', + eventIdentifiers: ['ESDTTransfer'], + emitterAddresses: ['erd1z08z0svvqs84dnrh5hrm47agcxmch79fslmupzgqcgfdtpp3slwqlna25a'], + pageSize: 3, + getLastProcessedTimestamp: async () => { + await Promise.resolve(); + return 1726700632; + }, + onEventsReceived: async (highestTimestamp, events) => { + await Promise.resolve(); + console.log(`onEventsReceived -> highestTimestamp: ${highestTimestamp}`); + console.log(`onEventsReceived -> events: ${JSON.stringify(events)}`); + }, +})); From 18397a437b98d9da8abdbe777f65ffb1d9fc2fa5 Mon Sep 17 00:00:00 2001 From: bogdan-rosianu Date: Thu, 19 Sep 2024 14:21:31 +0300 Subject: [PATCH 2/2] move event processor to libs --- {apps => libs/services/src}/event-processor/event.processor.ts | 0 {apps => libs/services/src}/event-processor/test.ts | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {apps => libs/services/src}/event-processor/event.processor.ts (100%) rename {apps => libs/services/src}/event-processor/test.ts (100%) diff --git a/apps/event-processor/event.processor.ts b/libs/services/src/event-processor/event.processor.ts similarity index 100% rename from apps/event-processor/event.processor.ts rename to libs/services/src/event-processor/event.processor.ts diff --git a/apps/event-processor/test.ts b/libs/services/src/event-processor/test.ts similarity index 100% rename from apps/event-processor/test.ts rename to libs/services/src/event-processor/test.ts