Skip to content

Commit

Permalink
chore: updated task logging
Browse files Browse the repository at this point in the history
  • Loading branch information
aryanjassal committed Nov 19, 2024
1 parent a6bb024 commit f771659
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
6 changes: 6 additions & 0 deletions src/discovery/Discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,9 @@ class Discovery {
}
// Only create if it doesn't exist
if (taskExisting != null) return;
this.logger.info(
`Scheduling new discovery for vertex with gestaltId ${gestaltIdEncoded}`,
);
await this.taskManager.scheduleTask(
{
handlerId: this.discoverVertexHandlerId,
Expand Down Expand Up @@ -1097,6 +1100,9 @@ class Discovery {
}
if (taskExisting != null) continue;
// Schedule a new task
this.logger.info(
`Scheduling new discovery for vertex with gestaltId ${gestaltIdEncoded}`,
);
await this.taskManager.scheduleTask(
{
handlerId: this.discoverVertexHandlerId,
Expand Down
43 changes: 33 additions & 10 deletions src/tasks/TaskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,10 @@ class TaskManager {
tran.queueSuccess(() => {
this.triggerQueuing();
});
this.schedulerLogger.debug(
`Queued Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
});
this.schedulerLogger.debug(`Queued Task ${taskIdEncoded}`);
}

/**
Expand Down Expand Up @@ -1015,7 +1017,9 @@ class TaskManager {
try {
await this.requeueTask(taskId);
} catch (e) {
this.logger.error(`Failed Requeuing Task ${taskIdEncoded}`);
this.logger.error(
`Failed Requeuing Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
// This is an unrecoverable error
throw new tasksErrors.ErrorTaskRequeue(taskIdEncoded, {
cause: e,
Expand All @@ -1024,15 +1028,24 @@ class TaskManager {
} else {
if (succeeded) {
taskLogger.debug('Succeeded');
taskLogger.info(
`Task ${tasksUtils.encodeTaskId(
taskId,
)} succeeded with handler ${taskData.handlerId}`,
);
} else {
taskLogger.warn(`Failed - Reason: ${String(taskReason)}`);
taskLogger.warn(
`Failed - Reason: ${String(taskReason)}, Handler: ${
taskData.handlerId
}`,
);
}
// GC the task before dispatching events
try {
await this.gcTask(taskId);
} catch (e) {
this.logger.error(
`Failed Garbage Collecting Task ${taskIdEncoded}`,
`Failed Garbage Collecting Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
// This is an unrecoverable error
throw new tasksErrors.ErrorTaskGarbageCollection(
Expand Down Expand Up @@ -1075,7 +1088,9 @@ class TaskManager {
abortController,
);
this.activePromises.set(taskIdEncoded, activePromiseCancellable);
this.queueLogger.debug(`Started Task ${taskIdEncoded}`);
this.queueLogger.debug(
`Started Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
});
});
}
Expand All @@ -1096,7 +1111,9 @@ class TaskManager {
taskId.toBuffer(),
]);
if (taskData == null) return;
this.logger.debug(`Garbage Collecting Task ${taskIdEncoded}`);
this.logger.debug(
`Garbage Collecting Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
const taskScheduleTime = taskData.timestamp + taskData.delay;
await tran.del([
...this.tasksPathDbPath,
Expand All @@ -1116,7 +1133,9 @@ class TaskManager {
taskIdBuffer,
]);
await tran.del([...this.tasksTaskDbPath, taskId.toBuffer()]);
this.logger.debug(`Garbage Collected Task ${taskIdEncoded}`);
this.logger.debug(
`Garbage Collected Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
}

protected async requeueTask(
Expand All @@ -1128,7 +1147,6 @@ class TaskManager {
}
const taskIdBuffer = taskId.toBuffer();
const taskIdEncoded = tasksUtils.encodeTaskId(taskId);
this.logger.debug(`Requeuing Task ${taskIdEncoded}`);
await this.lockTask(tran, taskId);
const taskData = await tran.get<TaskData>([
...this.tasksTaskDbPath,
Expand All @@ -1137,6 +1155,9 @@ class TaskManager {
if (taskData == null) {
throw new tasksErrors.ErrorTaskMissing(taskIdEncoded);
}
this.logger.debug(
`Requeuing Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
// Put task into the active index
// this index will be used to retry tasks if they don't finish
await tran.del([...this.tasksActiveDbPath, taskIdBuffer]);
Expand All @@ -1150,7 +1171,9 @@ class TaskManager {
],
null,
);
this.logger.debug(`Requeued Task ${taskIdEncoded}`);
this.logger.debug(
`Requeued Task ${taskIdEncoded} with handler ${taskData.handlerId}`,
);
}

protected async cancelTask(taskId: TaskId, cancelReason: any): Promise<void> {
Expand Down Expand Up @@ -1244,7 +1267,7 @@ class TaskManager {
// Removing task from active index
await tran.del([...this.tasksActiveDbPath, ...kP]);
this.logger.warn(
`Moving Task ${taskIdEncoded} from Active to Queued`,
`Moving Task ${taskIdEncoded} with handler ${taskData.handlerId} from Active to Queued`,
);
}
}
Expand Down

0 comments on commit f771659

Please sign in to comment.