Skip to content

Commit

Permalink
feat: minimizing the memory footprint with buffering streams
Browse files Browse the repository at this point in the history
  • Loading branch information
barjin committed Jun 27, 2024
1 parent 99a12a6 commit 53bee4b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 25 deletions.
100 changes: 82 additions & 18 deletions packages/core/src/storages/sitemap_request_list.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Transform } from 'node:stream';

import defaultLog from '@apify/log';
import { parseSitemap } from '@crawlee/utils';
import ow from 'ow';
Expand Down Expand Up @@ -77,11 +79,12 @@ export class SitemapRequestList implements IRequestList {
};

/**
* Queue of URLs parsed from the sitemaps.
* Object stream of URLs parsed from the sitemaps.
* Using `highWaterMark`, this can manage the speed of the sitemap loading.
*
* Fetch the next URL to be processed using `fetchNextRequest()`.
*/
private urlQueue: string[] = [];
private urlQueueStream: Transform;

/**
* Indicates whether the request list sitemap loading was aborted.
Expand Down Expand Up @@ -126,9 +129,55 @@ export class SitemapRequestList implements IRequestList {
this.persistStateKey = options.persistStateKey;
this.proxyUrl = options.proxyUrl;

this.urlQueueStream = new Transform({
objectMode: true,
highWaterMark: 100,
});
this.urlQueueStream.pause();

this.sitemapParsingProgress.pendingSitemapUrls = new Set(options.sitemapUrls);
}

/**
* Adds a URL to the queue of parsed URLs.
*
* Blocks if the stream is full until it is drained.
*/
private async pushNextUrl(url: string | null) {
return new Promise<void>((resolve) => {
if (!this.urlQueueStream.push(url)) {
// This doesn't work with the 'drain' event (it's not emitted for some reason).
this.urlQueueStream.once('readdata', () => {
resolve();
});
} else {
resolve();
}
});
}

/**
* Reads the next URL from the queue of parsed URLs.
*
* If the stream is empty, blocks until a new URL is pushed.
* @returns The next URL from the queue or `null` if we have read all URLs.
*/
private async readNextUrl(): Promise<string | null> {
return new Promise((resolve) => {
const result = this.urlQueueStream.read();

if (!result && !this.isSitemapFullyLoaded()) {
this.urlQueueStream.once('readable', () => {
const nextUrl = this.urlQueueStream.read();
resolve(nextUrl);
});
} else {
resolve(result);
}
this.urlQueueStream.emit('readdata');
});
}

/**
* Indicates whether the background processing of sitemap contents has successfully finished.
*
Expand Down Expand Up @@ -164,7 +213,7 @@ export class SitemapRequestList implements IRequestList {
}

if (!this.sitemapParsingProgress.inProgressEntries.has(item.loc)) {
this.urlQueue.push(item.loc);
await this.pushNextUrl(item.loc);
this.sitemapParsingProgress.inProgressEntries.add(item.loc);
}
}
Expand All @@ -176,6 +225,8 @@ export class SitemapRequestList implements IRequestList {
this.sitemapParsingProgress.inProgressEntries.clear();
this.sitemapParsingProgress.inProgressSitemapUrl = null;
}

await this.pushNextUrl(null);
}

/**
Expand Down Expand Up @@ -207,26 +258,23 @@ export class SitemapRequestList implements IRequestList {
* @inheritDoc
*/
length(): number {
return this.urlQueue.length + this.handledUrlCount - this.inProgress.size - this.reclaimed.size;
return this.urlQueueStream.readableLength + this.handledUrlCount - this.inProgress.size - this.reclaimed.size;
}

/**
* @inheritDoc
*/
async isFinished(): Promise<boolean> {
return (
this.urlQueue.length === 0 &&
this.inProgress.size === 0 &&
this.reclaimed.size === 0 &&
(this.isSitemapFullyLoaded() || this.abortLoading)
(await this.isEmpty()) && this.inProgress.size === 0 && (this.isSitemapFullyLoaded() || this.abortLoading)
);
}

/**
* @inheritDoc
*/
async isEmpty(): Promise<boolean> {
return this.reclaimed.size === 0 && this.urlQueue.length === 0;
return this.reclaimed.size === 0 && this.urlQueueStream.readableLength === 0;
}

/**
Expand All @@ -246,13 +294,27 @@ export class SitemapRequestList implements IRequestList {

this.store ??= await KeyValueStore.open();

const urlQueue = [];

while (this.urlQueueStream.readableLength > 0) {
const url = this.urlQueueStream.read();
if (url === null) {
break;
}
urlQueue.push(url);
}

for (const url of urlQueue) {
this.urlQueueStream.push(url);
}

await this.store.setValue(this.persistStateKey, {
sitemapParsingProgress: {
pendingSitemapUrls: Array.from(this.sitemapParsingProgress.pendingSitemapUrls),
inProgressSitemapUrl: this.sitemapParsingProgress.inProgressSitemapUrl,
inProgressEntries: Array.from(this.sitemapParsingProgress.inProgressEntries),
},
urlQueue: this.urlQueue,
urlQueue,
reclaimed: [...this.inProgress, ...this.reclaimed], // In-progress and reclaimed requests will be both retried if state is restored
abortLoading: this.abortLoading,
} satisfies SitemapRequestListState);
Expand All @@ -278,7 +340,11 @@ export class SitemapRequestList implements IRequestList {
inProgressSitemapUrl: state.sitemapParsingProgress.inProgressSitemapUrl,
inProgressEntries: new Set(state.sitemapParsingProgress.inProgressEntries),
};
this.urlQueue = state.urlQueue;

for (const url of state.urlQueue) {
this.urlQueueStream.push(url);
}

this.abortLoading = state.abortLoading;
}

Expand All @@ -294,7 +360,7 @@ export class SitemapRequestList implements IRequestList {
}

// Otherwise return next request.
const nextUrl = this.urlQueue.shift();
const nextUrl = await this.readNextUrl();
if (!nextUrl) {
return null;
}
Expand All @@ -309,13 +375,11 @@ export class SitemapRequestList implements IRequestList {
* @inheritDoc
*/
async *waitForNextRequest() {
while ((!this.isSitemapFullyLoaded() && !this.abortLoading) || this.urlQueue.length > 0) {
while ((!this.isSitemapFullyLoaded() && !this.abortLoading) || !(await this.isEmpty())) {
const request = await this.fetchNextRequest();
if (request) {
yield request;
} else {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
if (!request) break;

yield request;
}
}

Expand Down
7 changes: 0 additions & 7 deletions test/core/sitemap_request_list.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,6 @@ describe('SitemapRequestList', () => {
const secondRequest = await list.fetchNextRequest();
expect(secondRequest).not.toBe(null);

await expect(list.fetchNextRequest()).resolves.toBe(null);

await sleep(100);

await expect(list.isFinished(), 'list should not be finished').resolves.toBe(false);
await expect(list.isEmpty(), 'list should not be empty').resolves.toBe(false);

const thirdRequest = await list.fetchNextRequest();
expect(thirdRequest).not.toBe(null);
});
Expand Down

0 comments on commit 53bee4b

Please sign in to comment.