implement QuantumKVCache

This commit is contained in:
Hazelnoot 2025-06-05 10:49:16 -04:00
parent aa7cadbb6c
commit f446d77cb5
3 changed files with 665 additions and 0 deletions

View file

@ -5,6 +5,8 @@
import * as Redis from 'ioredis';
import { bindThis } from '@/decorators.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { InternalEventTypes } from '@/core/GlobalEventService.js';
export class RedisKVCache<T> {
private readonly lifetime: number;
@ -322,6 +324,10 @@ export class MemoryKVCache<T> {
clearInterval(this.gcIntervalHandle);
}
public get size() {
return this.cache.size;
}
public get entries() {
return this.cache.entries();
}
@ -410,3 +416,230 @@ export class MemorySingleCache<T> {
return value;
}
}
export interface QuantumKVOpts<T> {
/**
* Memory cache lifetime in milliseconds.
*/
lifetime: number;
/**
* Callback to fetch the value for a key that wasn't found in the cache.
* May be synchronous or async.
*/
fetcher: (key: string, cache: QuantumKVCache<T>) => T | Promise<T>;
/**
* Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster.
* This is called *after* the cache state is updated.
* May be synchronous or async.
*/
onSet?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
/**
* Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster.
* This is called *after* the cache state is updated.
* May be synchronous or async.
*/
onDelete?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
}
/**
* QuantumKVCache is a lifetime-bounded memory cache (like MemoryKVCache) with automatic cross-cluster synchronization via Redis.
* All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache.
* This ensures that a call to get() will never return stale data.
*/
export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
private readonly memoryCache: MemoryKVCache<T>;
private readonly fetcher: QuantumKVOpts<T>['fetcher'];
private readonly onSet: QuantumKVOpts<T>['onSet'];
private readonly onDelete: QuantumKVOpts<T>['onDelete'];
/**
* @param internalEventService Service bus to synchronize events.
* @param name Unique name of the cache - must be the same in all processes.
* @param opts Cache options
*/
constructor(
private readonly internalEventService: InternalEventService,
private readonly name: string,
opts: QuantumKVOpts<T>,
) {
this.memoryCache = new MemoryKVCache(opts.lifetime);
this.fetcher = opts.fetcher;
this.onSet = opts.onSet;
this.onDelete = opts.onDelete;
this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
// Ignore our own events, otherwise we'll immediately erase any set value.
ignoreLocal: true,
});
}
/**
* The number of items currently in memory.
* This applies to the local subset view, not the cross-cluster cache state.
*/
public get size() {
return this.memoryCache.size;
}
/**
* Iterates all [key, value] pairs in memory.
* This applies to the local subset view, not the cross-cluster cache state.
*/
@bindThis
public *entries(): Generator<[key: string, value: T]> {
for (const entry of this.memoryCache.entries) {
yield [entry[0], entry[1].value];
}
}
/**
* Iterates all keys in memory.
* This applies to the local subset view, not the cross-cluster cache state.
*/
@bindThis
public *keys() {
for (const entry of this.memoryCache.entries) {
yield entry[0];
}
}
/**
* Iterates all values pairs in memory.
* This applies to the local subset view, not the cross-cluster cache state.
*/
@bindThis
public *values() {
for (const entry of this.memoryCache.entries) {
yield entry[1].value;
}
}
/**
* Creates or updates a value in the cache, and erases any stale caches across the cluster.
* Fires an onSet event after the cache has been updated in all processes.
* Skips if the value is unchanged.
*/
@bindThis
public async set(key: string, value: T): Promise<void> {
if (this.memoryCache.get(key) === value) {
return;
}
this.memoryCache.set(key, value);
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', key });
if (this.onSet) {
await this.onSet(key, this);
}
}
/**
* Gets or fetches a value from the cache.
* Fires an onSet event, but does not emit an update event to other processes.
*/
@bindThis
public async get(key: string): Promise<T> {
let value = this.memoryCache.get(key);
if (value === undefined) {
value = await this.fetcher(key, this);
this.memoryCache.set(key, value);
if (this.onSet) {
await this.onSet(key, this);
}
}
return value;
}
/**
* Alias to get(), included for backwards-compatibility with RedisKVCache.
* @deprecated use get() instead
*/
@bindThis
public async fetch(key: string): Promise<T> {
return await this.get(key);
}
/**
* Returns true is a key exists in memory.
* This applies to the local subset view, not the cross-cluster cache state.
*/
@bindThis
public has(key: string): boolean {
return this.memoryCache.get(key) !== undefined;
}
/**
* Deletes a value from the cache, and erases any stale caches across the cluster.
* Fires an onDelete event after the cache has been updated in all processes.
*/
@bindThis
public async delete(key: string): Promise<void> {
this.memoryCache.delete(key);
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', key });
if (this.onDelete) {
await this.onDelete(key, this);
}
}
/**
* Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster.
* Fires an onSet event after the cache has been updated in all processes.
*/
@bindThis
public async refresh(key: string): Promise<T> {
const value = await this.fetcher(key, this);
await this.set(key, value);
return value;
}
/**
* Erases all entries from the local memory cache.
* Does not send any events or update other processes.
*/
@bindThis
public gc() {
this.memoryCache.gc();
}
/**
* Erases all data and disconnects from the cluster.
* This *must* be called when shutting down to prevent memory leaks!
*/
@bindThis
public dispose() {
this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated);
this.memoryCache.dispose();
}
@bindThis
private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> {
if (data.name === this.name) {
this.memoryCache.delete(data.key);
if (data.op === 's' && this.onSet) {
await this.onSet(data.key, this);
}
if (data.op === 'd' && this.onDelete) {
await this.onDelete(data.key, this);
}
}
}
/**
* Iterates all [key, value] pairs in memory.
* This applies to the local subset view, not the cross-cluster cache state.
*/
[Symbol.iterator](): Iterator<[key: string, value: T]> {
return this.entries();
}
}