Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development' into parse_hatom_le…
Browse files Browse the repository at this point in the history
…nding_events
  • Loading branch information
GuticaStefan committed Sep 19, 2024
2 parents e43b6cf + 386c9fb commit 10f308d
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
148 changes: 148 additions & 0 deletions libs/services/src/event-processor/event.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import axios from 'axios';

export class EventProcessor {
private options: EventProcessorOptions = new EventProcessorOptions();

async start(options: EventProcessorOptions) {
this.options = options;

if (!options.emitterAddresses && !options.eventIdentifiers) {
throw new Error(`No emitter address or identifier set. Could not resolve all the events without filters.`);
}

await this.startProcess(options);
}

private async startProcess(options: EventProcessorOptions) {
const lastTimestampFunc = options.getLastProcessedTimestamp;
if (!lastTimestampFunc) {
throw new Error(`Undefined function for getting the last processed timestamp`);
}

const maxHeight = await lastTimestampFunc();
if (!maxHeight) {
throw new Error(`Cannot get the last processed timestamp via the provided getLastProcessedTimestamp()`);
}

await this.callElasticsearchEvents();
}

async callElasticsearchEvents() {
const url = `${this.options.elasticUrl}/events/_search?scroll=${this.options.scrollTimeout}`;
const timestampFunc = this.options.getLastProcessedTimestamp;
if (!timestampFunc) {
return;
}

const elasticQuery = this.generateElasticsearchQuery(await timestampFunc() ?? -1);
const result = await axios.post(url, elasticQuery);

if (!result.data || !result.data.hits || !result.data.hits || !result.data.hits.hits) {
return;
}

const elasticEvents = result.data.hits.hits;
const events = elasticEvents.map((e: { _source: any; }) => e._source);
await this.handleElasticEvents(events);

const scrollId = result.data._scroll_id;
while (true) {
const scrollResult = await axios.post(`${this.options.elasticUrl}/_search/scroll`,
{
scroll_id: scrollId,
});

const scrollDocuments = scrollResult.data.hits.hits;
if (scrollDocuments.length === 0) {
break;
}

const scrollEvents = scrollDocuments.map((e: { _source: any; }) => e._source);
await this.handleElasticEvents(scrollEvents);
}
}

async handleElasticEvents(events: EventSource[]) {
if (events.length === 0) {
return;
}

const lastTimestamp = events[events.length - 1].timestamp ?? 0;

const onEventsFunc = this.options.onEventsReceived;
if (onEventsFunc) {
await onEventsFunc(lastTimestamp, events);
}

const setLatestProcessedTimestampFunc = this.options.setLastProcessedTimestamp;
if (setLatestProcessedTimestampFunc) {
await setLatestProcessedTimestampFunc(lastTimestamp);
}
}

generateElasticsearchQuery(timestamp: number) {
return {
size: this.options.pageSize,
query: {
bool: {
must: [
{
terms: {
identifier: this.options.eventIdentifiers, // Query by identifiers
},
},
{
terms: {
address: this.options.emitterAddresses, // Query by addresses
},
},
{
range: {
timestamp: {
gt: `${timestamp}`,
},
},
},
],
},
},
sort: [
{
timestamp: {
order: 'asc', // Sorting by timestamp in ascending order
},
},
],
};
}
}

export class EventProcessorOptions {
elasticUrl?: string;
emitterAddresses?: string[];
eventIdentifiers?: string[];
pageSize: number = 10000;
scrollTimeout: string = "1m";
onEventsReceived?: (highestTimestamp: number, events: EventSource[]) => Promise<void>;
getLastProcessedTimestamp?: () => Promise<number | undefined>;
setLastProcessedTimestamp?: (nonce: number) => Promise<void>;
timeout?: number | undefined;

constructor(options: Partial<EventProcessorOptions> = {}) {
Object.assign(this, options);
}
}

export class EventSource {
originalTxHash?: string;
logAddress?: string;
identifier?: string;
address?: string;
topics?: string[];
shardID?: number;
additionalData?: string[];
txOrder?: number;
txHash?: string;
order?: number;
timestamp?: number;
}
19 changes: 19 additions & 0 deletions libs/services/src/event-processor/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { EventProcessor, EventProcessorOptions } from './event.processor';

const eventsProcessor = new EventProcessor();

void eventsProcessor.start(new EventProcessorOptions({
elasticUrl: 'https://index.multiversx.com',
eventIdentifiers: ['ESDTTransfer'],
emitterAddresses: ['erd1z08z0svvqs84dnrh5hrm47agcxmch79fslmupzgqcgfdtpp3slwqlna25a'],
pageSize: 3,
getLastProcessedTimestamp: async () => {
await Promise.resolve();
return 1726700632;
},
onEventsReceived: async (highestTimestamp, events) => {
await Promise.resolve();
console.log(`onEventsReceived -> highestTimestamp: ${highestTimestamp}`);
console.log(`onEventsReceived -> events: ${JSON.stringify(events)}`);
},
}));

0 comments on commit 10f308d

Please sign in to comment.