Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(trace): add retry after header #19

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ PORT=3000
AUTH_KEY=very-strong-and-long-api-key-with-special-chars
# 3. Request body limit
FASTIFY_BODY_LIMIT=10485760 ## default is 10MB
# 4. value for `Retry-After` header in seconds. See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
# RETRY_AFTER_SECONDS=2 ## the default value is 2

###########################################
################# Redis ###################
Expand Down
19 changes: 18 additions & 1 deletion src/testing/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ export const buildUrl = (route: string): string => {
return new URL(route, `http://127.0.0.1:${port}`).toString();
};

function parseRetryAfterHeader(retryAfter: string | null) {
if (!retryAfter) {
throw new Error('The "Retry-After" header is missing in the response');
}
if (!/^-?\d+$/.test(retryAfter)) {
throw new Error('The "Retry-After" has an invalid format. It must be an integer');
}
return parseInt(retryAfter);
}

const MLFLOW_MAX_RETREIVE_ATTEMPTS = 5;
export async function waitForMlflowTrace({
attempt = 1,
Expand All @@ -48,8 +58,15 @@ export async function waitForMlflowTrace({
});

const { result } = await traceResponse.json();

if (traceResponse.status === 404) {
const retryAfter = parseRetryAfterHeader(traceResponse.headers.get('Retry-After'));
await setTimeoutPromise(retryAfter * 1000);
return waitForMlflowTrace({ traceId, attempt: ++attempt });
}

if (result?.mlflow?.step !== 'CLOSE_TRACE' && attempt <= MLFLOW_MAX_RETREIVE_ATTEMPTS) {
await setTimeoutPromise(500 * attempt);
await setTimeoutPromise(2000);
return waitForMlflowTrace({ traceId, attempt: ++attempt });
}

Expand Down
3 changes: 2 additions & 1 deletion src/trace/trace.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ export async function getTrace({
throw new ErrorWithProps(
`The trace entity with id ${id} does not exist`,
{ code: ErrorWithPropsCodes.NOT_FOUND },
StatusCodes.NOT_FOUND
StatusCodes.NOT_FOUND,
{ addRetryAfterHeader: true }
);
}

Expand Down
11 changes: 11 additions & 0 deletions src/trace/trace.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ describe('trace module', () => {
expect(res.status).toBe(200);
});

it('should use "Retry-After" header and wait until the trace is ready', async () => {
let retryAfterTraceId: string | undefined = undefined;
await agent.run({ prompt }).middleware((ctx) => (retryAfterTraceId = ctx.emitter.trace?.id));
if (retryAfterTraceId) await waitForMlflowTrace({ traceId: retryAfterTraceId });

const traceResponse = await makeRequest({ route: `v1/traces/${retryAfterTraceId}` });

// assert it was successful response
expect(traceResponse.status).toBe(200);
});

it('should return the `bad request` response when the invalid result is sent', async () => {
const { status, statusText } = await sendCustomProtobuf({
invalidSpanKey: 4200,
Expand Down
1 change: 1 addition & 0 deletions src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export const constants = Object.freeze({
FASTIFY_BODY_LIMIT: parseInt(process.env.FASTIFY_BODY_LIMIT || '10485760'), // default is 10 MB
REDIS_URL,
DATA_EXPIRATION_IN_DAYS: parseInt(process.env.DATA_EXPIRATION_IN_DAYS || '7'),
RETRY_AFTER_SECONDS: process.env.RETRY_AFTER_SECONDS || 2,
MLFLOW: Object.freeze({
API_URL: MLFLOW_API_URL,
AUTHORIZATION: Object.freeze({
Expand Down
28 changes: 25 additions & 3 deletions src/utils/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { FromSchema, JSONSchema } from 'json-schema-to-ts';
import fp from 'fastify-plugin';
import { StatusCodes } from 'http-status-codes';

import { constants } from './constants.js';

export enum ErrorWithPropsCodes {
AUTH_ERROR = 'AUTH_ERROR',
INTERNAL_SERVER_ERROR = 'INTERNAL_SERVER_ERROR',
Expand All @@ -29,20 +31,27 @@ export enum ErrorWithPropsCodes {
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS'
}

interface ErrorWithPropsConfig {
addRetryAfterHeader?: boolean;
}

export class ErrorWithProps<CODE extends ErrorWithPropsCodes, REASON extends string> extends Error {
public readonly code: CODE;
public readonly reason?: REASON;
public readonly statusCode: StatusCodes;
public readonly config?: ErrorWithPropsConfig;

constructor(
message: string,
{ code, reason }: { code: CODE; reason?: REASON },
statusCode: StatusCodes
statusCode: StatusCodes,
config?: ErrorWithPropsConfig
) {
super(message);
this.code = code;
this.reason = reason;
this.statusCode = statusCode;
this.config = config;
}

toDto(): BaseErrorResponse {
Expand All @@ -64,9 +73,10 @@ export class MlflowError<
constructor(
message: string,
{ code, reason, response }: { code: CODE; reason?: REASON; response: RESPONSE },
statusCode: StatusCodes
statusCode: StatusCodes,
config?: ErrorWithPropsConfig
) {
super(message, { code, reason }, statusCode);
super(message, { code, reason }, statusCode, config);
this.response = response;
}

Expand All @@ -90,6 +100,18 @@ export const errorPlugin: FastifyPluginAsync = fp.default(async (app) => {

app.setErrorHandler(function (error, request, reply) {
if (error instanceof ErrorWithProps) {
if (
[
StatusCodes.SERVICE_UNAVAILABLE,
StatusCodes.TOO_MANY_REQUESTS,
StatusCodes.MOVED_PERMANENTLY
].includes(error.statusCode)
) {
reply.header('Retry-After', constants.RETRY_AFTER_SECONDS);
}
if (error.config?.addRetryAfterHeader) {
reply.header('Retry-After', constants.RETRY_AFTER_SECONDS);
}
reply.status(error.statusCode).send(error.toDto());
} else if (error instanceof SyntaxError) {
// When request body cannot by parsed by ContentTypeParser
Expand Down
Loading