Skip to content

Commit

Permalink
Merge pull request #17 from acuminous/waitForTransports
Browse files Browse the repository at this point in the history
Replace logger.drain with logger.waitForTransports
  • Loading branch information
cressie176 authored Jul 4, 2023
2 parents cfe5367 + af16775 commit ca07f24
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to Semantic Versioning](https://semver.org/spec/v2.0.0.html)

## Unreleased

- Replace logger.drain with logger.waitForTransports, which does not block subsequent messages from being logged

## 3.1.0

- Add support for asynchronous transports via logger.drain
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,12 @@ logger.info("How blissful it is, for one who has nothing", {

### Asynchronous Transports

If one or more of the transports is asynchronous and you want to ensure all messages have been written before terminating your application, you must wait for the `logger.drain` method to yield. This method takes an optional timeout specified in milliseconds. e.g.
If one or more of the transports is asynchronous and you want to ensure all messages have been written before terminating your application, you must wait for the `logger.waitForTransports` method to yield. This method takes an optional timeout specified in milliseconds. e.g.

```js
process.once("SIGTERM", () => {
logger
.drain(1000)
.waitForTransports(1000)
.then(() => {
process.exit();
})
Expand All @@ -510,4 +510,4 @@ process.once("SIGTERM", () => {
});
```

Once you have called `logger.drain` any new logged messages will be suppressed without error and calling `logger.drain` repeatedly will yield the original promise.
Once you have called `logger.waitForTransports` any subsequent messages will be still be accepted and will prevent the promise from resolving until they have been processed. If your application logs intensively `logger.waitForTransports` could therefore block indefinitely unless you specify a timeout.
2 changes: 1 addition & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class Logger {
error(context: Error): void;
enable(): void;
disable(): void;
drain(timeout?: number): Promise<void>;
waitForTransports(timeout?: number): Promise<void>;
}

export const processors: {
Expand Down
13 changes: 4 additions & 9 deletions lib/Logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ module.exports = class Logger {
this._threshold = level;
this._disabled = false;
this._pendingTransports = new Set();
this._draining = null;

Level.decorate(this);
}
Expand All @@ -29,7 +28,7 @@ module.exports = class Logger {
}

log(level, ...args) {
if (this._draining || this._disabled || !level.satisfies(this._threshold)) return;
if (this._disabled || !level.satisfies(this._threshold)) return;

let message;
let ctx = {};
Expand All @@ -45,14 +44,12 @@ module.exports = class Logger {
this._output({ level, record });
}

drain(timeout) {
if (this._draining) return this._draining;

this._draining = new Promise((resolve, reject) => {
waitForTransports(timeout) {
return new Promise((resolve, reject) => {
const timerId =
timeout &&
setTimeout(() => {
reject(new Error("Timedout waiting for logger to drain"));
reject(new Error("Timedout waiting for transports to finish"));
});
const waitUntilDrained = () => {
if (this._pendingTransports.size > 0) return setTimeout(waitUntilDrained, 10).unref();
Expand All @@ -61,8 +58,6 @@ module.exports = class Logger {
};
waitUntilDrained();
});

return this._draining;
}

_process({ level, message, ctx }) {
Expand Down
38 changes: 22 additions & 16 deletions test/Logger.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ describe("Logger", () => {
eq(streams[Level.INFO.name].lines.length, 4);
});

it("should wait for asynchronous transports to drain", async () => {
it("should wait for asynchronous transports to finish", async () => {
const testOutputStream = new TestOutputStream();
const transport = ({ record }) => {
return new Promise((resolve) => {
Expand All @@ -292,7 +292,7 @@ describe("Logger", () => {

eq(testOutputStream.lines.length, 0);

await logger.drain();
await logger.waitForTransports();

eq(testOutputStream.lines.length, 1);
});
Expand All @@ -314,7 +314,7 @@ describe("Logger", () => {
eq(testOutputStream.lines.length, 1);
});

it("should timeout if asynchronous transports take too long to drain", async () => {
it("should timeout if asynchronous transports take too long to finish", async () => {
let resolve;
const transport = () => {
return new Promise((_resolve) => {
Expand All @@ -329,16 +329,16 @@ describe("Logger", () => {
logger.info("Tripitaka rocks!");

await rejects(
() => logger.drain(100),
() => logger.waitForTransports(100),
(err) => {
eq(err.message, "Timedout waiting for logger to drain");
eq(err.message, "Timedout waiting for transports to finish");
resolve();
return true;
}
);
});

it("should not wait when messages have already drained", async () => {
it("should not wait when messages have already finished", async () => {
const testOutputStream = new TestOutputStream();
const transport = ({ record }) => {
return new Promise((resolve) => {
Expand All @@ -357,13 +357,13 @@ describe("Logger", () => {
eq(testOutputStream.lines.length, 1);

const before = Date.now();
await logger.drain();
await logger.waitForTransports();
const after = Date.now();

ok(after - before <= 20);
});

it("should not wait when there were never any messages", async () => {
it("should not wait for transports when there were never any messages", async () => {
const testOutputStream = new TestOutputStream();
const transport = ({ record }) => {
return new Promise((resolve) => {
Expand All @@ -378,13 +378,13 @@ describe("Logger", () => {
});

const before = Date.now();
await logger.drain();
await logger.waitForTransports();
const after = Date.now();

ok(after - before <= 10);
});

it("should suppress messages logged while draining", async () => {
it("should continue logging while waiting for transports to finish", async () => {
const testOutputStream = new TestOutputStream();
const transport = ({ record }) => {
return new Promise((resolve) => {
Expand All @@ -402,16 +402,16 @@ describe("Logger", () => {

logger.info("Tripitaka rocks!");

const drained = logger.drain();
const pendingTransports = logger.waitForTransports();

logger.info("Tripitaka sucks!");

await drained;
await pendingTransports;

eq(testOutputStream.lines.length, 1);
eq(testOutputStream.lines.length, 2);
});

it("should tollerate repeated requests to drain", async () => {
it("should tollerate repeated requests to wait for transports", async () => {
const testOutputStream = new TestOutputStream();
const transport = ({ record }) => {
return new Promise((resolve) => {
Expand All @@ -429,8 +429,14 @@ describe("Logger", () => {

logger.info("Tripitaka rocks!");

await Promise.all(new Array(10).fill().map(() => logger.drain()));
const pendingTransports1 = logger.waitForTransports();

eq(testOutputStream.lines.length, 1);
logger.info("Tripitaka rocks!");

const pendingTransports2 = logger.waitForTransports();

await Promise.all([pendingTransports1, pendingTransports2]);

eq(testOutputStream.lines.length, 2);
});
});

0 comments on commit ca07f24

Please sign in to comment.