diff --git a/packages/core/src/storages/request_list.ts b/packages/core/src/storages/request_list.ts index 9dfeb7c508e..0c24177318d 100644 --- a/packages/core/src/storages/request_list.ts +++ b/packages/core/src/storages/request_list.ts @@ -74,7 +74,7 @@ export interface IRequestList { * * Unlike `fetchNextRequest()`, this function returns an `AsyncGenerator` that can be used in a `for await...of` loop. */ - waitForNextRequest(): AsyncGenerator; + requestIterator(): AsyncGenerator; /** * Reclaims request to the list if its processing failed. @@ -686,7 +686,7 @@ export class RequestList implements IRequestList { /** * @inheritDoc */ - async *waitForNextRequest() { + async *requestIterator() { while (true) { yield await this.fetchNextRequest(); } diff --git a/packages/core/src/storages/sitemap_request_list.ts b/packages/core/src/storages/sitemap_request_list.ts index 75c7395b46a..3e329edf53d 100644 --- a/packages/core/src/storages/sitemap_request_list.ts +++ b/packages/core/src/storages/sitemap_request_list.ts @@ -30,6 +30,13 @@ export interface SitemapRequestListOptions { * Timeout for sitemap loading in milliseconds. If both `signal` and `timeoutMillis` are provided, either of them can abort the loading. */ timeoutMillis?: number; + /** + * Maximum number of buffered URLs for the sitemap loading stream. + * If the buffer is full, the stream will pause until the buffer is drained. + * + * @default 200 + */ + maxBufferSize?: number; } interface SitemapParsingProgress { @@ -43,6 +50,7 @@ interface SitemapRequestListState { reclaimed: string[]; sitemapParsingProgress: Record; abortLoading: boolean; + requestData: [string, Request][]; } /** @@ -60,6 +68,13 @@ export class SitemapRequestList implements IRequestList { /** Set of URLs for which `reclaimRequest()` was called. */ private reclaimed = new Set(); + /** + * Map of returned Request objects that have not been marked as handled yet. + * + * We use this to persist custom user fields on the in-progress (or reclaimed) requests. + */ + private requestData = new Map(); + /** * Object for keeping track of the sitemap parsing progress. */ @@ -123,6 +138,7 @@ export class SitemapRequestList implements IRequestList { persistStateKey: ow.optional.string, signal: ow.optional.any(), timeoutMillis: ow.optional.number, + maxBufferSize: ow.optional.number, }), ); @@ -131,7 +147,7 @@ export class SitemapRequestList implements IRequestList { this.urlQueueStream = new Transform({ objectMode: true, - highWaterMark: 100, + highWaterMark: options.maxBufferSize ?? 200, }); this.urlQueueStream.pause(); @@ -316,6 +332,7 @@ export class SitemapRequestList implements IRequestList { }, urlQueue, reclaimed: [...this.inProgress, ...this.reclaimed], // In-progress and reclaimed requests will be both retried if state is restored + requestData: Array.from(this.requestData.entries()), abortLoading: this.abortLoading, } satisfies SitemapRequestListState); } @@ -341,6 +358,8 @@ export class SitemapRequestList implements IRequestList { inProgressEntries: new Set(state.sitemapParsingProgress.inProgressEntries), }; + this.requestData = new Map(state.requestData ?? []); + for (const url of state.urlQueue) { this.urlQueueStream.push(url); } @@ -353,28 +372,26 @@ export class SitemapRequestList implements IRequestList { */ async fetchNextRequest(): Promise { // Try to return a reclaimed request first - const url = this.reclaimed.values().next().value as string | undefined; - if (url !== undefined) { - this.reclaimed.delete(url); - return new Request({ url }); - } - - // Otherwise return next request. - const nextUrl = await this.readNextUrl(); - if (!nextUrl) { - return null; + let nextUrl: string | null = this.reclaimed.values().next().value; + if (nextUrl) { + this.reclaimed.delete(nextUrl); + } else { + // Otherwise read next url from the stream + nextUrl = await this.readNextUrl(); + if (!nextUrl) { + return null; + } + this.requestData.set(nextUrl, new Request({ url: nextUrl })); } - const request = new Request({ url: nextUrl }); - this.inProgress.add(request.url); - - return request; + this.inProgress.add(nextUrl); + return this.requestData.get(nextUrl)!; } /** * @inheritDoc */ - async *waitForNextRequest() { + async *requestIterator() { while ((!this.isSitemapFullyLoaded() && !this.abortLoading) || !(await this.isEmpty())) { const request = await this.fetchNextRequest(); if (!request) break; @@ -389,6 +406,7 @@ export class SitemapRequestList implements IRequestList { async reclaimRequest(request: Request): Promise { this.ensureInProgressAndNotReclaimed(request.url); this.reclaimed.add(request.url); + this.inProgress.delete(request.url); } /** @@ -398,6 +416,7 @@ export class SitemapRequestList implements IRequestList { this.handledUrlCount += 1; this.ensureInProgressAndNotReclaimed(request.url); this.inProgress.delete(request.url); + this.requestData.delete(request.url); } private ensureInProgressAndNotReclaimed(url: string): void { diff --git a/test/core/sitemap_request_list.test.ts b/test/core/sitemap_request_list.test.ts index cad24db7b65..b6cd36aebe2 100644 --- a/test/core/sitemap_request_list.test.ts +++ b/test/core/sitemap_request_list.test.ts @@ -192,10 +192,10 @@ describe('SitemapRequestList', () => { expect(list.handledCount()).toBe(7); }); - test('for..await syntax works with waitForNextRequest', async () => { + test('for..await syntax works with requestIterator', async () => { const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap-index.xml`] }); - for await (const request of list.waitForNextRequest()) { + for await (const request of list.requestIterator()) { await list.markRequestHandled(request); } @@ -214,7 +214,7 @@ describe('SitemapRequestList', () => { await sleep(50); // Loads the first sub-sitemap, but not the second controller.abort(); - for await (const request of list.waitForNextRequest()) { + for await (const request of list.requestIterator()) { await list.markRequestHandled(request); } @@ -229,7 +229,7 @@ describe('SitemapRequestList', () => { timeoutMillis: 50, // Loads the first sub-sitemap, but not the second }); - for await (const request of list.waitForNextRequest()) { + for await (const request of list.requestIterator()) { await list.markRequestHandled(request); } @@ -255,7 +255,7 @@ describe('SitemapRequestList', () => { } const newList = await SitemapRequestList.open(options); - for await (const request of newList.waitForNextRequest()) { + for await (const request of newList.requestIterator()) { await newList.markRequestHandled(request); } @@ -266,10 +266,6 @@ describe('SitemapRequestList', () => { const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap.xml`] }); const requests: Request[] = []; - while (await list.isEmpty()) { - await sleep(20); - } - await expect(list.isFinished()).resolves.toBe(false); while (!(await list.isFinished())) { @@ -294,10 +290,6 @@ describe('SitemapRequestList', () => { const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap.xml`] }); const requests: Request[] = []; - while (await list.isEmpty()) { - await sleep(20); - } - await expect(list.isFinished()).resolves.toBe(false); let counter = 0; @@ -332,10 +324,6 @@ describe('SitemapRequestList', () => { const options = { sitemapUrls: [`${url}/sitemap-stream.xml`], persistStateKey: 'some-key' }; const list = await SitemapRequestList.open(options); - while (await list.isEmpty()) { - await sleep(20); - } - const firstRequest = await list.fetchNextRequest(); await list.markRequestHandled(firstRequest); @@ -345,11 +333,6 @@ describe('SitemapRequestList', () => { await expect(newList.isEmpty()).resolves.toBe(false); while (!(await newList.isFinished())) { - if (await newList.isEmpty()) { - await sleep(20); - continue; - } - const request = await newList.fetchNextRequest(); await newList.markRequestHandled(request); } @@ -357,4 +340,31 @@ describe('SitemapRequestList', () => { expect(list.handledCount()).toBe(1); expect(newList.handledCount()).toBe(2); }); + + test('state persistence tracks user changes', async () => { + const options = { + sitemapUrls: [`${url}/sitemap-stream.xml`], + persistStateKey: 'persist-user-changes', + }; + + const userDataPayload = { some: 'data' }; + let firstLoadedUrl; + + { + const list = await SitemapRequestList.open(options); + + const firstRequest = await list.fetchNextRequest(); + firstRequest.userData = userDataPayload; + firstLoadedUrl = firstRequest.url; + + await list.persistState(); + // simulates a migration in the middle of request processing + } + + const newList = await SitemapRequestList.open(options); + const restoredRequest = await newList.fetchNextRequest(); + + expect(restoredRequest.url).toEqual(firstLoadedUrl); + expect(restoredRequest.userData).toEqual(userDataPayload); + }); });