Skip to content

Commit

Permalink
feat: customizable bufferSize, Request changes persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
barjin committed Jul 3, 2024
1 parent 53bee4b commit 7965373
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 40 deletions.
4 changes: 2 additions & 2 deletions packages/core/src/storages/request_list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export interface IRequestList {
*
* Unlike `fetchNextRequest()`, this function returns an `AsyncGenerator` that can be used in a `for await...of` loop.
*/
waitForNextRequest(): AsyncGenerator<Request | null>;
requestIterator(): AsyncGenerator<Request | null>;

/**
* Reclaims request to the list if its processing failed.
Expand Down Expand Up @@ -686,7 +686,7 @@ export class RequestList implements IRequestList {
/**
* @inheritDoc
*/
async *waitForNextRequest() {
async *requestIterator() {
while (true) {
yield await this.fetchNextRequest();
}
Expand Down
51 changes: 35 additions & 16 deletions packages/core/src/storages/sitemap_request_list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ export interface SitemapRequestListOptions {
* Timeout for sitemap loading in milliseconds. If both `signal` and `timeoutMillis` are provided, either of them can abort the loading.
*/
timeoutMillis?: number;
/**
* Maximum number of buffered URLs for the sitemap loading stream.
* If the buffer is full, the stream will pause until the buffer is drained.
*
* @default 200
*/
maxBufferSize?: number;
}

interface SitemapParsingProgress {
Expand All @@ -43,6 +50,7 @@ interface SitemapRequestListState {
reclaimed: string[];
sitemapParsingProgress: Record<keyof SitemapParsingProgress, any>;
abortLoading: boolean;
requestData: [string, Request][];
}

/**
Expand All @@ -60,6 +68,13 @@ export class SitemapRequestList implements IRequestList {
/** Set of URLs for which `reclaimRequest()` was called. */
private reclaimed = new Set<string>();

/**
* Map of returned Request objects that have not been marked as handled yet.
*
* We use this to persist custom user fields on the in-progress (or reclaimed) requests.
*/
private requestData = new Map<string, Request>();

/**
* Object for keeping track of the sitemap parsing progress.
*/
Expand Down Expand Up @@ -123,6 +138,7 @@ export class SitemapRequestList implements IRequestList {
persistStateKey: ow.optional.string,
signal: ow.optional.any(),
timeoutMillis: ow.optional.number,
maxBufferSize: ow.optional.number,
}),
);

Expand All @@ -131,7 +147,7 @@ export class SitemapRequestList implements IRequestList {

this.urlQueueStream = new Transform({
objectMode: true,
highWaterMark: 100,
highWaterMark: options.maxBufferSize ?? 200,
});
this.urlQueueStream.pause();

Expand Down Expand Up @@ -316,6 +332,7 @@ export class SitemapRequestList implements IRequestList {
},
urlQueue,
reclaimed: [...this.inProgress, ...this.reclaimed], // In-progress and reclaimed requests will be both retried if state is restored
requestData: Array.from(this.requestData.entries()),
abortLoading: this.abortLoading,
} satisfies SitemapRequestListState);
}
Expand All @@ -341,6 +358,8 @@ export class SitemapRequestList implements IRequestList {
inProgressEntries: new Set(state.sitemapParsingProgress.inProgressEntries),
};

this.requestData = new Map(state.requestData ?? []);

for (const url of state.urlQueue) {
this.urlQueueStream.push(url);
}
Expand All @@ -353,28 +372,26 @@ export class SitemapRequestList implements IRequestList {
*/
async fetchNextRequest(): Promise<Request | null> {
// Try to return a reclaimed request first
const url = this.reclaimed.values().next().value as string | undefined;
if (url !== undefined) {
this.reclaimed.delete(url);
return new Request({ url });
}

// Otherwise return next request.
const nextUrl = await this.readNextUrl();
if (!nextUrl) {
return null;
let nextUrl: string | null = this.reclaimed.values().next().value;
if (nextUrl) {
this.reclaimed.delete(nextUrl);
} else {
// Otherwise read next url from the stream
nextUrl = await this.readNextUrl();
if (!nextUrl) {
return null;
}
this.requestData.set(nextUrl, new Request({ url: nextUrl }));
}

const request = new Request({ url: nextUrl });
this.inProgress.add(request.url);

return request;
this.inProgress.add(nextUrl);
return this.requestData.get(nextUrl)!;
}

/**
* @inheritDoc
*/
async *waitForNextRequest() {
async *requestIterator() {
while ((!this.isSitemapFullyLoaded() && !this.abortLoading) || !(await this.isEmpty())) {
const request = await this.fetchNextRequest();
if (!request) break;
Expand All @@ -389,6 +406,7 @@ export class SitemapRequestList implements IRequestList {
async reclaimRequest(request: Request): Promise<void> {
this.ensureInProgressAndNotReclaimed(request.url);
this.reclaimed.add(request.url);
this.inProgress.delete(request.url);
}

/**
Expand All @@ -398,6 +416,7 @@ export class SitemapRequestList implements IRequestList {
this.handledUrlCount += 1;
this.ensureInProgressAndNotReclaimed(request.url);
this.inProgress.delete(request.url);
this.requestData.delete(request.url);
}

private ensureInProgressAndNotReclaimed(url: string): void {
Expand Down
54 changes: 32 additions & 22 deletions test/core/sitemap_request_list.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ describe('SitemapRequestList', () => {
expect(list.handledCount()).toBe(7);
});

test('for..await syntax works with waitForNextRequest', async () => {
test('for..await syntax works with requestIterator', async () => {
const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap-index.xml`] });

for await (const request of list.waitForNextRequest()) {
for await (const request of list.requestIterator()) {
await list.markRequestHandled(request);
}

Expand All @@ -214,7 +214,7 @@ describe('SitemapRequestList', () => {
await sleep(50); // Loads the first sub-sitemap, but not the second
controller.abort();

for await (const request of list.waitForNextRequest()) {
for await (const request of list.requestIterator()) {
await list.markRequestHandled(request);
}

Expand All @@ -229,7 +229,7 @@ describe('SitemapRequestList', () => {
timeoutMillis: 50, // Loads the first sub-sitemap, but not the second
});

for await (const request of list.waitForNextRequest()) {
for await (const request of list.requestIterator()) {
await list.markRequestHandled(request);
}

Expand All @@ -255,7 +255,7 @@ describe('SitemapRequestList', () => {
}

const newList = await SitemapRequestList.open(options);
for await (const request of newList.waitForNextRequest()) {
for await (const request of newList.requestIterator()) {
await newList.markRequestHandled(request);
}

Expand All @@ -266,10 +266,6 @@ describe('SitemapRequestList', () => {
const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap.xml`] });
const requests: Request[] = [];

while (await list.isEmpty()) {
await sleep(20);
}

await expect(list.isFinished()).resolves.toBe(false);

while (!(await list.isFinished())) {
Expand All @@ -294,10 +290,6 @@ describe('SitemapRequestList', () => {
const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap.xml`] });
const requests: Request[] = [];

while (await list.isEmpty()) {
await sleep(20);
}

await expect(list.isFinished()).resolves.toBe(false);
let counter = 0;

Expand Down Expand Up @@ -332,10 +324,6 @@ describe('SitemapRequestList', () => {
const options = { sitemapUrls: [`${url}/sitemap-stream.xml`], persistStateKey: 'some-key' };
const list = await SitemapRequestList.open(options);

while (await list.isEmpty()) {
await sleep(20);
}

const firstRequest = await list.fetchNextRequest();
await list.markRequestHandled(firstRequest);

Expand All @@ -345,16 +333,38 @@ describe('SitemapRequestList', () => {
await expect(newList.isEmpty()).resolves.toBe(false);

while (!(await newList.isFinished())) {
if (await newList.isEmpty()) {
await sleep(20);
continue;
}

const request = await newList.fetchNextRequest();
await newList.markRequestHandled(request);
}

expect(list.handledCount()).toBe(1);
expect(newList.handledCount()).toBe(2);
});

test('state persistence tracks user changes', async () => {
const options = {
sitemapUrls: [`${url}/sitemap-stream.xml`],
persistStateKey: 'persist-user-changes',
};

const userDataPayload = { some: 'data' };
let firstLoadedUrl;

{
const list = await SitemapRequestList.open(options);

const firstRequest = await list.fetchNextRequest();
firstRequest.userData = userDataPayload;
firstLoadedUrl = firstRequest.url;

await list.persistState();
// simulates a migration in the middle of request processing
}

const newList = await SitemapRequestList.open(options);
const restoredRequest = await newList.fetchNextRequest();

expect(restoredRequest.url).toEqual(firstLoadedUrl);
expect(restoredRequest.userData).toEqual(userDataPayload);
});
});

0 comments on commit 7965373

Please sign in to comment.