Skip to content

Commit

Permalink
refactor: Only one bus instance per named cache. All subsequent names…
Browse files Browse the repository at this point in the history
…paces under it use the same bus.
  • Loading branch information
gkachru authored and Julien-R44 committed Oct 9, 2024
1 parent c772f83 commit 3813247
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 37 deletions.
36 changes: 20 additions & 16 deletions packages/bentocache/src/bus/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Bus as RlanzBus } from '@boringnode/bus'
import type { Transport } from '@boringnode/bus/types/main'

import { CacheBusMessageType } from '../types/bus.js'
import { BaseDriver } from '../drivers/base_driver.js'
import type { LocalCache } from '../cache/facades/local_cache.js'
import { BusMessageReceived } from '../events/bus/bus_message_received.js'
import { BusMessagePublished } from '../events/bus/bus_message_published.js'
Expand All @@ -17,22 +16,20 @@ import type { BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main
* the same channel and will receive the message and update their
* local cache accordingly.
*/
export class Bus extends BaseDriver {
export class Bus {
#bus: RlanzBus
#logger: Logger
#emitter: Emitter
#cache?: LocalCache
#localCaches: Map<string, LocalCache> = new Map()
#channelName = 'bentocache.notifications'

constructor(
name: string,
driver: Transport,
cache: LocalCache,
logger: Logger,
emitter: Emitter,
options: BusOptions = {},
) {
super(options)
this.#cache = cache
this.#emitter = emitter
this.#logger = logger.child({ context: 'bentocache.bus' })

Expand All @@ -44,37 +41,44 @@ export class Bus extends BaseDriver {
},
})

if (this.prefix) this.#channelName += `:${this.prefix}`
if (name) this.#channelName += `:${name}`

this.#bus.subscribe<CacheBusMessage>(this.#channelName, this.#onMessage.bind(this))
this.#logger.trace({ channel: this.#channelName }, 'bus subscribed to channel')
}

namespace(namespace: string): string {
return this.createNamespacePrefix(namespace)
/**
* Add a LocalCache for this bus to manage
* @param namespace The namespace
* @param cache The LocalCache instance
*/
manageCache(namespace: string, cache: LocalCache) {
this.#logger.trace({ namespace, channel: this.#channelName }, 'added namespaced cache')
this.#localCaches?.set(namespace, cache)
}

/**
* When a message is received through the bus.
* This is where we update the local cache.
*/
async #onMessage(message: CacheBusMessage) {
this.#logger.trace(
{ keys: message.keys, type: message.type, channel: this.#channelName },
'received message from bus',
)
if (!message.namespace || !this.#localCaches.has(message.namespace)) return

this.#logger.trace({ ...message, channel: this.#channelName }, 'received message from bus')
this.#emitter.emit('bus:message:received', new BusMessageReceived(message))

const cache = this.#localCaches.get(message.namespace)

if (message.type === CacheBusMessageType.Delete) {
for (const key of message.keys) this.#cache?.delete(key)
for (const key of message.keys) cache?.delete(key)
}

if (message.type === CacheBusMessageType.Set) {
for (const key of message.keys) this.#cache?.logicallyExpire(key)
for (const key of message.keys) cache?.logicallyExpire(key)
}

if (message.type === CacheBusMessageType.Clear) {
this.#cache?.clear()
cache?.clear()
}
}

Expand Down
36 changes: 33 additions & 3 deletions packages/bentocache/src/bus/encoders/binary_encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ export class BinaryEncoder implements TransportEncoder {
0,
)

const totalLength = this.#busIdLength + 1 + totalKeysLength
const namespaceKeyLength = payload.namespace ? Buffer.byteLength(payload.namespace, 'utf8') : 0

const totalLength = this.#busIdLength + 1 + 4 + namespaceKeyLength + totalKeysLength

/**
* Allocate a single buffer for the entire message
Expand All @@ -74,9 +76,26 @@ export class BinaryEncoder implements TransportEncoder {
buffer.writeUInt8(this.busMessageTypeToNum(payload.type), this.#busIdLength)

/**
* 3. Write the keys
* 3. Write the namespace
*/
let offset = this.#busIdLength + 1
/**
* Write the length of the namespace key
*/
buffer.writeUInt32BE(namespaceKeyLength, offset)
offset += 4

/**
* Write the namespace itself, if not empty
*/
if (payload.namespace) {
buffer.write(payload.namespace, offset, namespaceKeyLength, 'utf8')
offset += namespaceKeyLength
}

/**
* 4. Write the keys
*/
for (const key of payload.keys) {
/**
* Compute the length of the key in bytes and write it as a 4-byte big-endian integer
Expand Down Expand Up @@ -114,6 +133,17 @@ export class BinaryEncoder implements TransportEncoder {
const typeValue = buffer.readUInt8(offset++)
const type = this.numToBusMessageType(typeValue)

/**
* Then the namespace
*/
const namespaceKeyLength = buffer.readUInt32BE(offset)
offset += 4

const namespace = namespaceKeyLength
? buffer.toString('utf8', offset, offset + namespaceKeyLength)
: ''
offset += namespaceKeyLength

/**
* Finally, the keys
*/
Expand All @@ -134,6 +164,6 @@ export class BinaryEncoder implements TransportEncoder {
keys.push(key)
}

return { busId, payload: { keys, type } }
return { busId, payload: { keys, type, namespace } }
}
}
6 changes: 3 additions & 3 deletions packages/bentocache/src/cache/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ export class Cache implements CacheProvider {

this.#stack.emit(new events.CacheDeleted(key, this.name))

await this.#stack.bus?.publish({ type: CacheBusMessageType.Delete, keys: [key] })
await this.#stack.publish({ type: CacheBusMessageType.Delete, keys: [key] })

return true
}
Expand All @@ -240,7 +240,7 @@ export class Cache implements CacheProvider {

keys.forEach((key) => this.#stack.emit(new events.CacheDeleted(key, this.name)))

await this.#stack.bus?.publish({ type: CacheBusMessageType.Delete, keys })
await this.#stack.publish({ type: CacheBusMessageType.Delete, keys })

return true
}
Expand All @@ -254,7 +254,7 @@ export class Cache implements CacheProvider {
await Promise.all([
this.#stack.l1?.clear(),
this.#stack.l2?.clear(cacheOptions),
this.#stack.bus?.publish({ type: CacheBusMessageType.Clear, keys: [] }),
this.#stack.publish({ type: CacheBusMessageType.Clear, keys: [] }),
])

this.#stack.emit(new events.CacheCleared(this.name))
Expand Down
41 changes: 29 additions & 12 deletions packages/bentocache/src/cache/stack/cache_stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import lodash from '@poppinss/utils/lodash'
import { Bus } from '../../bus/bus.js'
import { LocalCache } from '../facades/local_cache.js'
import { RemoteCache } from '../facades/remote_cache.js'
import { BaseDriver } from '../../drivers/base_driver.js'
import { JsonSerializer } from '../../serializers/json.js'
import type { BentoCacheOptions } from '../../bento_cache_options.js'
import { CacheEntryOptions } from '../cache_entry/cache_entry_options.js'
Expand All @@ -11,10 +12,11 @@ import type {
BusOptions,
CacheEvent,
CacheStackDrivers,
CacheBusMessage,
Logger,
} from '../../types/main.js'

export class CacheStack {
export class CacheStack extends BaseDriver {
#serializer = new JsonSerializer()

l1?: LocalCache
Expand All @@ -32,29 +34,31 @@ export class CacheStack {
drivers: CacheStackDrivers,
bus?: Bus,
) {
super(options)
this.logger = options.logger.child({ cache: this.name })

if (drivers.l1Driver) this.l1 = new LocalCache(drivers.l1Driver, this.logger)
if (drivers.l2Driver) this.l2 = new RemoteCache(drivers.l2Driver, this.logger)

this.bus = this.#createBus(drivers.busDriver, bus, drivers.busOptions)
this.bus = bus ? bus : this.#createBus(drivers.busDriver, drivers.busOptions)
if (this.l1) this.bus?.manageCache(this.prefix, this.l1)

this.defaultOptions = new CacheEntryOptions(options)
}

get emitter() {
return this.options.emitter
}

#createBus(busDriver?: BusDriver, bus?: Bus, busOptions?: BusOptions) {
if (bus) return bus
if (!busDriver || !this.l1) return
#createBus(busDriver?: BusDriver, busOptions?: BusOptions) {
if (!busDriver) return

this.#busDriver = busDriver
this.#busOptions = lodash.merge(
{ retryQueue: { enabled: true, maxSize: undefined } },
busOptions,
)
const newBus = new Bus(this.#busDriver, this.l1, this.logger, this.emitter, this.#busOptions)
const newBus = new Bus(this.name, this.#busDriver, this.logger, this.emitter, this.#busOptions)

return newBus
}
Expand All @@ -63,18 +67,31 @@ export class CacheStack {
if (!this.#namespaceCache.has(namespace)) {
this.#namespaceCache.set(
namespace,
new CacheStack(this.name, this.options, {
l1Driver: this.l1?.namespace(namespace),
l2Driver: this.l2?.namespace(namespace),
busDriver: this.#busDriver,
busOptions: { ...this.#busOptions, prefix: this.bus?.namespace(namespace) },
}),
new CacheStack(
this.name,
this.options.cloneWith({ prefix: this.createNamespacePrefix(namespace) }),
{
l1Driver: this.l1?.namespace(namespace),
l2Driver: this.l2?.namespace(namespace),
},
this.bus,
),
)
}

return <CacheStack>this.#namespaceCache.get(namespace)
}

/**
* Publish a message to the bus channel
*
* @returns true if the message was published, false if not
* and undefined if a bus is not part of the stack
*/
async publish(message: CacheBusMessage): Promise<boolean | undefined> {
return this.bus?.publish({ ...message, namespace: this.prefix })
}

emit(event: CacheEvent) {
return this.emitter.emit(event.name, event.toJSON())
}
Expand Down
2 changes: 1 addition & 1 deletion packages/bentocache/src/cache/stack/cache_stack_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class CacheStackWriter {

this.cacheStack.l1?.set(key, item, options)
await this.cacheStack.l2?.set(key, item, options)
await this.cacheStack.bus?.publish({ type: CacheBusMessageType.Set, keys: [key] })
await this.cacheStack.publish({ type: CacheBusMessageType.Set, keys: [key] })

this.cacheStack.emit(new CacheWritten(key, value, this.cacheStack.name))
return true
Expand Down
4 changes: 2 additions & 2 deletions packages/bentocache/src/types/bus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Transport } from '@boringnode/bus/types/main'

import type { Duration } from './helpers.js'
import type { DriverCommonOptions } from './main.js'

/**
* Interface for the bus driver
Expand All @@ -14,6 +13,7 @@ export type BusDriver = Transport
export type CacheBusMessage = {
keys: string[]
type: CacheBusMessageType
namespace?: string
}

export enum CacheBusMessageType {
Expand Down Expand Up @@ -54,4 +54,4 @@ export type BusOptions = {
*/
retryInterval?: Duration | false
}
} & DriverCommonOptions
}
7 changes: 7 additions & 0 deletions packages/bentocache/tests/bus/bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ test.group('Bus synchronization', () => {

await cache1NSUsersMe.set(key, 24)
await cache3NSAdmin.set(key, 42)
await cache1.set(key, 33)
await setTimeout(100)

assert.equal(await cache1NSUsersMe.get(key), 24)
Expand All @@ -87,6 +88,11 @@ test.group('Bus synchronization', () => {
await setTimeout(100)

assert.isUndefined(await cache3NSAdmin.get(key))
assert.equal(await cache2.get(key), 33)
await cache2.delete(key)
await setTimeout(100)

assert.isUndefined(await cache1.get(key))
}).disableTimeout()

test('synchronize clear across namespaces', async ({ assert }) => {
Expand Down Expand Up @@ -312,6 +318,7 @@ test.group('Bus synchronization', () => {
const data = {
keys: [],
type: CacheBusMessageType.Clear,
namespace: 'users',
}

bus1.subscribe('foo', (message: any) => {
Expand Down

0 comments on commit 3813247

Please sign in to comment.