Skip to content

Commit

Permalink
fix: handle backoff and bundle requests for API timing (#81)
Browse files Browse the repository at this point in the history
Nomic's embedding API is rate limited to 2 requests per second, but these can include multiple embeddings. This PR does two things.

1. Batches together all requests in the `Embedder` class into 510 ms groups to ensure that users are automatically kept within the rate limit.
2. Respects the 429s newly sent from the API server with exponential backoff of up to 8 seconds.
<!-- ELLIPSIS_HIDDEN -->

----

> [!IMPORTANT]
> Batch requests in `Embedder` class every 510 ms and handle 429 errors with exponential backoff to comply with API rate limits.
> 
>   - **Behavior**:
>     - Batches requests in `Embedder` class every 510 ms to comply with API rate limit of 2 requests per second.
>     - Implements exponential backoff up to 8 seconds for 429 errors in `flushDeferredEmbeddings()`.
>   - **Constants**:
>     - Increases `BATCH_SIZE` from 32 to 400 in `embedding.ts`.
>   - **Error Handling**:
>     - Re-queues failed requests due to 429 errors in `flushDeferredEmbeddings()`.
>     - Throws error if `embedQueue` exceeds 100,000 items in `embed()`.
>   - **Misc**:
>     - Adjusts `setTimeout` in `periodicallyFlushCache()` to 510 ms.
> 
> <sup>This description was created by </sup>[<img alt="Ellipsis" src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=nomic-ai%2Fts-nomic&utm_source=github&utm_medium=referral)<sup> for 2dcf449. It will automatically update as commits are pushed.</sup>

<!-- ELLIPSIS_HIDDEN -->
  • Loading branch information
bmschmidt authored Oct 29, 2024
1 parent 5e5ce3e commit d5893f3
Showing 1 changed file with 44 additions and 40 deletions.
84 changes: 44 additions & 40 deletions src/embedding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type NomicEmbedResponse = {

// Uploads on the server may be batched even smaller than this, so there
// are probably not advantages to making this number larger.
const BATCH_SIZE = 32;
const BATCH_SIZE = 400;

/**
* A class that pools and runs requests to the Nomic embedding API.
Expand Down Expand Up @@ -142,45 +142,43 @@ export class Embedder extends BaseAtlasClass<{}> {
return;
}

// Batch into groups of 128 and send them into the cloud.
for (let i = 0; i < this.embedQueue.length; i += BATCH_SIZE) {
const toEmbed = this.embedQueue.slice(i, i + BATCH_SIZE);
this._embed(toEmbed.map((d) => d[0]))
.then(({ embeddings, usage }) => {
// iterate over the returned embeddings for the batch and resolve the
// associated promises.
for (let i = 0; i < embeddings.length; i++) {
// Resolve all the associated promises.
toEmbed[i][1](embeddings[i]);
// Pull the first BATCH_SIZE items off and send them into the cloud.
// Any amounts larger than this will have to wait until the next interval to be embedded.
const toEmbed = this.embedQueue.splice(0, BATCH_SIZE);
this._embed(toEmbed.map((d) => d[0]))
.then(({ embeddings, usage }) => {
// iterate over the returned embeddings for the batch and resolve the
// associated promises.
for (let i = 0; i < embeddings.length; i++) {
// Resolve all the associated promises.
toEmbed[i][1](embeddings[i]);
}
this.tokensUsed += usage.total_tokens;
})
.catch((err) => {
// TODO: -- not the right way to test the error type!
if (('' + err).match(/50[0-9]|429/)) {
this.embedQueue = [...toEmbed, ...this.embedQueue];
if (this.backoff && this.backoff > 8000) {
this.epitaph = new Error(
'Too many requests have failed, disabling embedder. Please try again later.'
);
}
this.tokensUsed += usage.total_tokens;
})
.catch((err) => {
// TODO: -- not the right way to test the error type!
if (('' + err).match(/50[0-9]/)) {
this.embedQueue = [...toEmbed, ...this.embedQueue];
if (this.backoff && this.backoff > 8000) {
this.epitaph = new Error(
'Too many requests have failed, disabling embedder. Please try again later.'
);
}
this.backoff = this.backoff ? this.backoff * 2 : 1000;
} else {
// Propagate the error to the user for each text.
for (let [text, resolve, reject] of toEmbed) {
const failure = new Error(
`Embedding call for string ${text.slice(
0,
30
)}... failed with error ${err}`
);
reject(failure);
}
this.backoff = this.backoff ? this.backoff * 2 : 1000;
} else {
// Propagate the error to the user for each text.
for (let [text, resolve, reject] of toEmbed) {
const failure = new Error(
`Embedding call for string ${text.slice(
0,
30
)}... failed with error ${err}`
);
reject(failure);
}
// Put them back onto the front of the queue.
});
}
this.embedQueue.length = 0;
}
// Put them back onto the front of the queue.
});
}

// Schedule periodic resolutions of all outstanding embedding calls.
Expand All @@ -191,13 +189,14 @@ export class Embedder extends BaseAtlasClass<{}> {
if (this.nextScheduledFlush === null) {
// There's no timeout. Immediately flush the queue.
this.flushDeferredEmbeddings();
// Now schedule the next flush for 100 ms from now.
// Now schedule the next flush for 510 ms from now.
this.nextScheduledFlush = setTimeout(() => {
// Remove the scheduled flush so the next call will do a quick flush
this.nextScheduledFlush = null;
// flush now immediately.
this.flushDeferredEmbeddings();
}, 100);
// The API rate limits you to 2 requests per second, so we can't go faster than that.
}, 510);
} else {
// There's a flush scheduled already, we don't need to do anything.
}
Expand All @@ -214,6 +213,11 @@ export class Embedder extends BaseAtlasClass<{}> {
`This embedder has permanently failed with error ${this.epitaph} `
);
}
if (this.embedQueue.length > 1e5) {
throw new Error(
`There are already ${this.embedQueue.length} texts queued up for embedding on this machine. Please slow down and try again!`
);
}
// If it's a single string, wrap it in an array for consistent processing
const values = isSingleString ? [value] : value;

Expand Down

0 comments on commit d5893f3

Please sign in to comment.