Skip to content

Commit

Permalink
feat: coalescing calls + feature: max cache size (#877)
Browse files Browse the repository at this point in the history
* feature: coalescing calls

If options.coalesce is set, multiple calls to the circuitbreaker will
be handled as one, within the given timeframe (options.coalesceTTL).

feature: max cache size

To prevent internal cache from growing without bounds (and triggering
OOM crashes), the cache is now limited to 2^24 items. This is an insane
amount, so options.cacheSize and options.coalesceSize have been added
to control this.

* PR feedback: remove cache.delete function

---------

Co-authored-by: D. Luijten <daniel.luijten@persgroep.net>
Co-authored-by: Daan <>
  • Loading branch information
daan944 and daan-nu authored Oct 23, 2024
1 parent 0a0d75e commit d50c912
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 156 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ const stats = breaker.stats;
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
coalesceCacheHits: 0,
coalesceCacheMisses: 0,
semaphoreRejections: 0,
percentiles: {
'0': 0,
Expand Down Expand Up @@ -417,6 +419,9 @@ The code that is summing the stats samples is here:
}, bucket());
```

### Coalesce calls

Circuitbreaker offers coalescing your calls. If options.coalesce is set, multiple calls to the circuitbreaker will be handled as one, within the given timeframe (options.coalesceTTL). Performance will improve when rapidly firing the circuitbreaker with the same request, especially on a slower action. This is especially useful if multiple events can trigger the same functions at the same time. Of course, caching has the same function, but will only be effective when the call has been executed once to store the return value. Coalescing and cache can be used at the same time, coalescing calls will always use the internal cache.

### Typings

Expand Down
8 changes: 7 additions & 1 deletion lib/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
* @property {Map} cache Cache map
*/
class MemoryCache {
constructor () {
constructor (maxEntries) {
this.cache = new Map();
this.maxEntries = maxEntries ?? 2 ^ 24 - 1; // Max size for Map is 2 ^ 24.
}

/**
Expand All @@ -32,6 +33,11 @@ class MemoryCache {
* @return {void}
*/
set (key, value, ttl) {
// Evict oldest entry when at capacity.
if (this.cache.size === this.maxEntries) {
this.cache.delete(this.cache.keys().next().value);
}

this.cache.set(key, {
expiresAt: ttl,
value
Expand Down
77 changes: 70 additions & 7 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,29 @@ Please use options.errorThresholdPercentage`;
* `cacheMiss` reflect cache activity.) Default: false
* @param {Number} options.cacheTTL the time to live for the cache
* in milliseconds. Set 0 for infinity cache. Default: 0 (no TTL)
* @param {Number} options.cacheSize the max amount of entries in the internal
* cache. Only used when cacheTransport is not defined.
* Default: max size of JS map (2^24).
* @param {Function} options.cacheGetKey function that returns the key to use
* when caching the result of the circuit's fire.
* Better to use custom one, because `JSON.stringify` is not good
* from performance perspective.
* Default: `(...args) => JSON.stringify(args)`
* @param {CacheTransport} options.cacheTransport custom cache transport
* should implement `get`, `set` and `flush` methods.
* @param {boolean} options.coalesce If true, this provides coalescing of
* requests to this breaker, in other words: the promise will be cached.
* Only one action (with same cache key) is executed at a time, and the other
* pending actions wait for the result. Performance will improve when rapidly
* firing the circuitbreaker with the same request, especially on a slower
* action (e.g. multiple end-users fetching same data from remote).
* Will use internal cache only. Can be used in combination with options.cache.
* The metrics `coalesceCacheHit` and `coalesceCacheMiss` are available.
* Default: false
* @param {Number} options.coalesceTTL the time to live for the coalescing
* in milliseconds. Set 0 for infinity cache. Default: same as options.timeout
* @param {Number} options.coalesceSize the max amount of entries in the
* coalescing cache. Default: max size of JS map (2^24).
* @param {AbortController} options.abortController this allows Opossum to
* signal upon timeout and properly abort your on going requests instead of
* leaving it in the background
Expand All @@ -115,6 +131,8 @@ Please use options.errorThresholdPercentage`;
* @fires CircuitBreaker#fire
* @fires CircuitBreaker#cacheHit
* @fires CircuitBreaker#cacheMiss
* @fires CircuitBreaker#coalesceCacheHit
* @fires CircuitBreaker#coalesceCacheMiss
* @fires CircuitBreaker#reject
* @fires CircuitBreaker#timeout
* @fires CircuitBreaker#success
Expand Down Expand Up @@ -168,11 +186,13 @@ class CircuitBreaker extends EventEmitter {
((...args) => JSON.stringify(args));
this.options.enableSnapshots = options.enableSnapshots !== false;
this.options.rotateBucketController = options.rotateBucketController;
this.options.coalesce = !!options.coalesce;
this.options.coalesceTTL = options.coalesceTTL ?? this.options.timeout;

// Set default cache transport if not provided
if (this.options.cache) {
if (this.options.cacheTransport === undefined) {
this.options.cacheTransport = new MemoryCache();
this.options.cacheTransport = new MemoryCache(options.cacheSize);
} else if (typeof this.options.cacheTransport !== 'object' ||
!this.options.cacheTransport.get ||
!this.options.cacheTransport.set ||
Expand All @@ -184,6 +204,10 @@ class CircuitBreaker extends EventEmitter {
}
}

if (this.options.coalesce) {
this.options.coalesceCache = new MemoryCache(options.coalesceSize);
}

this.semaphore = new Semaphore(this.options.capacity);

// check if action is defined
Expand Down Expand Up @@ -265,6 +289,8 @@ class CircuitBreaker extends EventEmitter {
this.on('reject', increment('rejects'));
this.on('cacheHit', increment('cacheHits'));
this.on('cacheMiss', increment('cacheMisses'));
this.on('coalesceCacheHit', increment('coalesceCacheHits'));
this.on('coalesceCacheMiss', increment('coalesceCacheMisses'));
this.on('open', _ => this[STATUS].open());
this.on('close', _ => this[STATUS].close());
this.on('semaphoreLocked', increment('semaphoreRejections'));
Expand Down Expand Up @@ -593,6 +619,14 @@ class CircuitBreaker extends EventEmitter {

const args = rest.slice();

// Protection, caches and coalesce disabled.
if (!this[ENABLED]) {
const result = this.action.apply(context, args);
return (typeof result.then === 'function')
? result
: Promise.resolve(result);
}

// Need to create variable here to prevent extra calls if cache is disabled
let cacheKey = '';

Expand Down Expand Up @@ -624,11 +658,26 @@ class CircuitBreaker extends EventEmitter {
this.emit('cacheMiss');
}

if (!this[ENABLED]) {
const result = this.action.apply(context, args);
return (typeof result.then === 'function')
? result
: Promise.resolve(result);
/* When coalesce is enabled, check coalesce cache and return
promise, if any. */
if (this.options.coalesce) {
const cachedCall = this.options.coalesceCache.get(cacheKey);

if (cachedCall) {
/**
* Emitted when the circuit breaker is using coalesce cache
* and finds a cached promise.
* @event CircuitBreaker#coalesceCacheHit
*/
this.emit('coalesceCacheHit');
return cachedCall;
}
/**
* Emitted when the circuit breaker does not find a value in
* coalesce cache, but the coalesce option is enabled.
* @event CircuitBreaker#coalesceCacheMiss
*/
this.emit('coalesceCacheMiss');
}

if (!this.closed && !this.pendingClose) {
Expand All @@ -648,7 +697,8 @@ class CircuitBreaker extends EventEmitter {

let timeout;
let timeoutError = false;
return new Promise((resolve, reject) => {

const call = new Promise((resolve, reject) => {
const latencyStartTime = Date.now();
if (this.semaphore.test()) {
if (this.options.timeout) {
Expand Down Expand Up @@ -728,6 +778,19 @@ class CircuitBreaker extends EventEmitter {
handleError(err, this, timeout, args, latency, resolve, reject);
}
});

/* When coalesce is enabled, store promise in coalesceCache */
if (this.options.coalesce) {
this.options.coalesceCache.set(
cacheKey,
call,
this.options.coalesceTTL > 0
? Date.now() + this.options.coalesceTTL
: 0
);
}

return call;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ const bucket = _ => ({
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
coalesceCacheHits: 0,
coalesceCacheMisses: 0,
semaphoreRejections: 0,
percentiles: {},
latencyTimes: []
Expand Down
Loading

0 comments on commit d50c912

Please sign in to comment.