diff --git a/.env.example b/.env.example index 3feb920..8051c41 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,8 @@ PORT=3000 AUTH_KEY=very-strong-and-long-api-key-with-special-chars # 3. Request body limit FASTIFY_BODY_LIMIT=10485760 ## default is 10MB +# 4. value for `Retry-After` header in seconds. See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After +# RETRY_AFTER_SECONDS=2 ## the default value is 2 ########################################### ################# Redis ################### diff --git a/src/testing/utils.ts b/src/testing/utils.ts index a41f136..c7fffaa 100644 --- a/src/testing/utils.ts +++ b/src/testing/utils.ts @@ -32,6 +32,16 @@ export const buildUrl = (route: string): string => { return new URL(route, `http://127.0.0.1:${port}`).toString(); }; +function parseRetryAfterHeader(retryAfter: string | null) { + if (!retryAfter) { + throw new Error('The "Retry-After" header is missing in the response'); + } + if (!/^-?\d+$/.test(retryAfter)) { + throw new Error('The "Retry-After" has an invalid format. It must be an integer'); + } + return parseInt(retryAfter); +} + const MLFLOW_MAX_RETREIVE_ATTEMPTS = 5; export async function waitForMlflowTrace({ attempt = 1, @@ -48,8 +58,15 @@ export async function waitForMlflowTrace({ }); const { result } = await traceResponse.json(); + + if (traceResponse.status === 404) { + const retryAfter = parseRetryAfterHeader(traceResponse.headers.get('Retry-After')); + await setTimeoutPromise(retryAfter * 1000); + return waitForMlflowTrace({ traceId, attempt: ++attempt }); + } + if (result?.mlflow?.step !== 'CLOSE_TRACE' && attempt <= MLFLOW_MAX_RETREIVE_ATTEMPTS) { - await setTimeoutPromise(500 * attempt); + await setTimeoutPromise(2000); return waitForMlflowTrace({ traceId, attempt: ++attempt }); } diff --git a/src/trace/trace.service.ts b/src/trace/trace.service.ts index 2ee89d0..28bad8d 100644 --- a/src/trace/trace.service.ts +++ b/src/trace/trace.service.ts @@ -70,7 +70,8 @@ export async function getTrace({ throw new ErrorWithProps( `The trace entity with id ${id} does not exist`, { code: ErrorWithPropsCodes.NOT_FOUND }, - StatusCodes.NOT_FOUND + StatusCodes.NOT_FOUND, + { addRetryAfterHeader: true } ); } diff --git a/src/trace/trace.test.ts b/src/trace/trace.test.ts index 4ad8edc..cfe7c2e 100644 --- a/src/trace/trace.test.ts +++ b/src/trace/trace.test.ts @@ -39,6 +39,17 @@ describe('trace module', () => { expect(res.status).toBe(200); }); + it('should use "Retry-After" header and wait until the trace is ready', async () => { + let retryAfterTraceId: string | undefined = undefined; + await agent.run({ prompt }).middleware((ctx) => (retryAfterTraceId = ctx.emitter.trace?.id)); + if (retryAfterTraceId) await waitForMlflowTrace({ traceId: retryAfterTraceId }); + + const traceResponse = await makeRequest({ route: `v1/traces/${retryAfterTraceId}` }); + + // assert it was successful response + expect(traceResponse.status).toBe(200); + }); + it('should return the `bad request` response when the invalid result is sent', async () => { const { status, statusText } = await sendCustomProtobuf({ invalidSpanKey: 4200, diff --git a/src/utils/constants.ts b/src/utils/constants.ts index cda7026..182e275 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -64,6 +64,7 @@ export const constants = Object.freeze({ FASTIFY_BODY_LIMIT: parseInt(process.env.FASTIFY_BODY_LIMIT || '10485760'), // default is 10 MB REDIS_URL, DATA_EXPIRATION_IN_DAYS: parseInt(process.env.DATA_EXPIRATION_IN_DAYS || '7'), + RETRY_AFTER_SECONDS: process.env.RETRY_AFTER_SECONDS || 2, MLFLOW: Object.freeze({ API_URL: MLFLOW_API_URL, AUTHORIZATION: Object.freeze({ diff --git a/src/utils/error.ts b/src/utils/error.ts index a074b42..63c2adf 100644 --- a/src/utils/error.ts +++ b/src/utils/error.ts @@ -19,6 +19,8 @@ import { FromSchema, JSONSchema } from 'json-schema-to-ts'; import fp from 'fastify-plugin'; import { StatusCodes } from 'http-status-codes'; +import { constants } from './constants.js'; + export enum ErrorWithPropsCodes { AUTH_ERROR = 'AUTH_ERROR', INTERNAL_SERVER_ERROR = 'INTERNAL_SERVER_ERROR', @@ -29,20 +31,27 @@ export enum ErrorWithPropsCodes { TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS' } +interface ErrorWithPropsConfig { + addRetryAfterHeader?: boolean; +} + export class ErrorWithProps extends Error { public readonly code: CODE; public readonly reason?: REASON; public readonly statusCode: StatusCodes; + public readonly config?: ErrorWithPropsConfig; constructor( message: string, { code, reason }: { code: CODE; reason?: REASON }, - statusCode: StatusCodes + statusCode: StatusCodes, + config?: ErrorWithPropsConfig ) { super(message); this.code = code; this.reason = reason; this.statusCode = statusCode; + this.config = config; } toDto(): BaseErrorResponse { @@ -64,9 +73,10 @@ export class MlflowError< constructor( message: string, { code, reason, response }: { code: CODE; reason?: REASON; response: RESPONSE }, - statusCode: StatusCodes + statusCode: StatusCodes, + config?: ErrorWithPropsConfig ) { - super(message, { code, reason }, statusCode); + super(message, { code, reason }, statusCode, config); this.response = response; } @@ -90,6 +100,18 @@ export const errorPlugin: FastifyPluginAsync = fp.default(async (app) => { app.setErrorHandler(function (error, request, reply) { if (error instanceof ErrorWithProps) { + if ( + [ + StatusCodes.SERVICE_UNAVAILABLE, + StatusCodes.TOO_MANY_REQUESTS, + StatusCodes.MOVED_PERMANENTLY + ].includes(error.statusCode) + ) { + reply.header('Retry-After', constants.RETRY_AFTER_SECONDS); + } + if (error.config?.addRetryAfterHeader) { + reply.header('Retry-After', constants.RETRY_AFTER_SECONDS); + } reply.status(error.statusCode).send(error.toDto()); } else if (error instanceof SyntaxError) { // When request body cannot by parsed by ContentTypeParser