QuantumKVCache upgrades:

* Implement optionalFetcher callback
* Allow rate limits for all fetcher types
* De-duplicate concurrent fetch operations
* Stricter type checks for cache value types
* Add abort-on-dispose support for all fetcher types
* Dispose terminates and awaits all in-progress fetches
* Throw on attempts to use a disposed cache
* Unit test cleanup
* Reduced overhead for async operations
* More normalized exception handling
This commit is contained in:
Hazelnoot 2025-11-11 23:59:03 -05:00
parent 815bb2234e
commit ece03bef61
3 changed files with 2173 additions and 1094 deletions

View file

@ -4,53 +4,152 @@
*/
import { EntityNotFoundError } from 'typeorm';
import promiseLimit from 'promise-limit';
import { bindThis } from '@/decorators.js';
import { InternalEventService } from '@/global/InternalEventService.js';
import type { InternalEventTypes } from '@/core/GlobalEventService.js';
import type { InternalEventService, EventTypes } from '@/global/InternalEventService.js';
import { MemoryKVCache, type MemoryCacheServices } from '@/misc/cache.js';
import { makeKVPArray, type KVPArray } from '@/misc/kvp-array.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { FetchFailedError } from '@/misc/errors/FetchFailedError.js';
import { KeyNotFoundError } from '@/misc/errors/KeyNotFoundError.js';
import { QuantumCacheError } from '@/misc/errors/QuantumCacheError.js';
import { DisposedError, DisposingError } from '@/misc/errors/DisposeError.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import { withCleanup, withSignal } from '@/misc/promiseUtils.js';
import { promiseTry } from '@/misc/promise-try.js';
export interface QuantumKVOpts<T> {
export interface QuantumKVOpts<TIn, T extends Value<TIn> = Value<TIn>> {
/**
* Memory cache lifetime in milliseconds.
*/
lifetime: number;
/**
* Callback to fetch the value for a key that wasn't found in the cache.
* Return null/undefined or throw an error if no value exists for the given key.
* May be synchronous or async.
* Callback to fetch required values by key.
*/
fetcher: (key: string, cache: QuantumKVCache<T>) => T | null | undefined | Promise<T | null | undefined>;
fetcher: Fetcher<T>;
/**
* Optional callback to fetch the value for multiple keys that weren't found in the cache.
* Don't throw or return null if a key has no value; just omit it from the response.
* May be synchronous or async.
* If not provided, then the implementation will fall back on repeated calls to fetcher().
* Callback to fetch optional values by key.
*/
bulkFetcher?: (keys: string[], cache: QuantumKVCache<T>) => Iterable<[key: string, value: T]> | Promise<Iterable<[key: string, value: T]>>;
optionalFetcher?: OptionalFetcher<T>;
/**
* Optional callback when one or more values are changed (created, updated, or deleted) in the cache, either locally or elsewhere in the cluster.
* This is called *after* the cache state is updated.
* Implementations may be synchronous or async.
* Callback to fetch multiple optional values by key.
*/
onChanged?: (keys: string[], cache: QuantumKVCache<T>) => void | Promise<void>;
bulkFetcher?: BulkFetcher<T>;
/**
* Optional callback when all values are removed from the cache, either locally or elsewhere in the cluster.
* This is called *after* the cache state is updated.
* Implementations may be synchronous or async.
* Callback to handle changes to the cross-cluster state (create, update, or delete values).
*/
onReset?: (cache: QuantumKVCache<T>) => void | Promise<void>;
onChanged?: OnChanged<T>;
// TODO equality comparer
/**
* Callback to handle a whole-state reset (all values deleted).
*/
onReset?: OnReset<T>;
/**
* Optional limit on the number of calls to fetcher to allow at once.
* If more than this many fetches are attempted, the excess will be queued until the earlier operations complete.
* The total number of calls will never exceed maxConcurrency.
* Min: 1
* Default: 4
*/
fetcherConcurrency?: number;
/**
* Optional limit on the number of calls to optionalFetcher to allow at once.
* If more than this many fetches are attempted, the excess will be queued until the earlier operations complete.
* The total number of calls will never exceed maxConcurrency.
* Min: 1
* Default: 4
*/
optionalFetcherConcurrency?: number;
/**
* Optional limit on the number of calls to bulkFetcher to allow at once.
* If more than this many fetches are attempted, the excess will be queued until the earlier operations complete.
* The total number of calls will never exceed maxConcurrency.
* Min: 1
* Default: 2
*/
bulkFetcherConcurrency?: number;
/**
* Optional limit on the total number of calls to fetcher, optionalFetcher, or bulkFetcher to allow at once.
* If more than this many fetches are attempted, the excess will be queued until the earlier operations complete.
* Min: 1
* Default: fetcherConcurrency, optionalFetcherConcurrency, or bulkFetcherConcurrency - whichever is highest.
*/
maxConcurrency?: number;
}
export interface CallbackMeta<T> {
/**
* The cache instance that triggered this callback.
*/
readonly cache: QuantumKVCache<T>;
/**
* AbortSignal that will fire when the cache is disposed.
* Should be propagated to ensure smooth cleanup and shutdown.
*/
readonly disposeSignal: AbortSignal;
}
/**
* Callback to fetch the value for a key that wasn't found in the cache, and is required to continue.
* Should return the fetched value, or null/undefined if no value exists for the given key.
* Missing keys may also produce an EntityNotFound or KeyNotFoundException exception, which will be wrapped to gracefully abort the operation.
* May be synchronous or async.
*/
export type Fetcher<T> = (key: string, meta: CallbackMeta<T>) => MaybePromise<Value<T> | null | undefined>;
/**
* Optional callback to fetch the value for a key that wasn't found in the cache, and isn't required to continue.
* Should return the fetched value, or null/undefined if no value exists for the given key.
* Missing keys should *not* produce any exception, as it will be wrapped to gracefully abort the operation.
* May be synchronous or async.
* If not provided, then the implementation will fall back on fetcher().
*/
export type OptionalFetcher<T> = (key: string, meta: CallbackMeta<T>) => MaybePromise<Value<T> | null | undefined>;
/**
* Optional callback to fetch the value for multiple keys that weren't found in the cache.
* Should return the fetched values for each key, or null/undefined if no value exists for the given key.
* Missing keys may also be omitted from the response entirely, but no error should be thrown.
* May be synchronous or async.
* If not provided, then the implementation will fall back on repeated calls to optionalFetcher() or fetcher().
*/
export type BulkFetcher<T> = (keys: string[], meta: CallbackMeta<T>) => MaybePromise<Iterable<[key: string, value: Value<T> | null | undefined]>>;
/**
* Optional callback when one or more values are changed (created, updated, or deleted) in the cache, either locally or elsewhere in the cluster.
* This is called *after* the cache state is updated.
* May be synchronous or async.
*/
export type OnChanged<T> = (keys: string[], meta: CallbackMeta<T>) => MaybePromise<void>;
/**
* Optional callback when all values are removed from the cache, either locally or elsewhere in the cluster.
* This is called *after* the cache state is updated.
* May be synchronous or async.
*/
export type OnReset<T> = (meta: CallbackMeta<T>) => MaybePromise<void>;
type ActiveFetcher<T> = Promise<T>;
type ActiveOptionalFetcher<T> = Promise<T | undefined>;
type ActiveBulkFetcher<T> = Promise<KeyValue<T>[]>;
// Make sure null / undefined cannot be a valid type
// https://stackoverflow.com/a/63045455
type Value<T> = NonNullable<T>;
type KeyValue<T> = [key: string, value: T];
type Limiter = <T>(callback: () => Promise<T>) => Promise<T>;
type MaybePromise<T> = T | Promise<T>;
type AtLeastOne<T> = [T, ...T[]];
export interface QuantumCacheServices extends MemoryCacheServices {
/**
* Event bus to attach to.
@ -64,15 +163,29 @@ export interface QuantumCacheServices extends MemoryCacheServices {
* All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache.
* This ensures that a call to get() will never return stale data.
*/
export class QuantumKVCache<T> implements Iterable<readonly [key: string, value: T]> {
export class QuantumKVCache<TIn, T extends Value<TIn> = Value<TIn>> implements Iterable<readonly [key: string, value: T]> {
private readonly internalEventService: InternalEventService;
private readonly memoryCache: MemoryKVCache<T>;
public readonly fetcher: QuantumKVOpts<T>['fetcher'];
public readonly bulkFetcher: QuantumKVOpts<T>['bulkFetcher'];
public readonly onChanged: QuantumKVOpts<T>['onChanged'];
public readonly onReset: QuantumKVOpts<T>['onReset'];
private readonly activeFetchers = new Map<string, ActiveFetcher<T>>();
private readonly activeOptionalFetchers = new Map<string, ActiveOptionalFetcher<T>>();
private readonly activeBulkFetchers = new Map<string, ActiveBulkFetcher<T>>();
private readonly globalLimiter: Limiter;
private readonly fetcherLimiter: Limiter;
private readonly optionalFetcherLimiter: Limiter;
private readonly bulkFetcherLimiter: Limiter;
public readonly fetcher: Fetcher<T>;
public readonly optionalFetcher: OptionalFetcher<T> | undefined;
public readonly bulkFetcher: BulkFetcher<T> | undefined;
public readonly onChanged: OnChanged<T> | undefined;
public readonly onReset: OnReset<T> | undefined;
private readonly disposeController = new AbortController();
private isDisposing = false;
private isDisposed = false;
/**
* @param name Unique name of the cache - must be the same in all processes.
@ -80,15 +193,37 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
* @param opts Cache options
*/
constructor(
// TODO validate to make sure this is unique
public readonly name: string,
services: QuantumCacheServices,
opts: QuantumKVOpts<T>,
opts: QuantumKVOpts<TIn, T>,
) {
// OK: we forward all management calls to the inner cache.
// eslint-disable-next-line no-restricted-syntax
this.memoryCache = new MemoryKVCache(name + ':mem', services, { lifetime: opts.lifetime });
// Set up rate limiters
const fetcherConcurrency = opts.fetcherConcurrency
? Math.max(opts.fetcherConcurrency, 1)
: 4;
this.fetcherLimiter = promiseLimit(fetcherConcurrency);
const optionalFetcherConcurrency = opts.optionalFetcherConcurrency
? Math.max(opts.optionalFetcherConcurrency, 1)
: 4;
this.optionalFetcherLimiter = promiseLimit(optionalFetcherConcurrency);
const bulkFetcherConcurrency = opts.bulkFetcherConcurrency
? Math.max(opts.bulkFetcherConcurrency, 1)
: 2;
this.bulkFetcherLimiter = promiseLimit(bulkFetcherConcurrency);
const globalConcurrency = opts.maxConcurrency
? Math.max(opts.maxConcurrency, 1)
: Math.max(fetcherConcurrency, optionalFetcherConcurrency, bulkFetcherConcurrency);
this.globalLimiter = promiseLimit(globalConcurrency);
this.fetcher = opts.fetcher;
this.optionalFetcher = opts.optionalFetcher;
this.bulkFetcher = opts.bulkFetcher;
this.onChanged = opts.onChanged;
this.onReset = opts.onReset;
@ -104,6 +239,17 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
});
}
private get callbackMeta(): CallbackMeta<T> {
return {
cache: this,
disposeSignal: this.disposeController.signal,
};
}
private get nameForError() {
return `QuantumCache[${this.name}]`;
}
/**
* The number of items currently in memory.
* This applies to the local subset view, not the cross-cluster cache state.
@ -112,6 +258,14 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
return this.memoryCache.size;
}
/**
* Iterates all [key, value] pairs in memory.
* This applies to the local subset view, not the cross-cluster cache state.
*/
[Symbol.iterator](): Iterator<[key: string, value: T]> {
return this.entries();
}
/**
* Iterates all [key, value] pairs in memory.
* This applies to the local subset view, not the cross-cluster cache state.
@ -152,6 +306,8 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async set(key: string, value: T): Promise<void> {
this.throwIfDisposed();
if (this.memoryCache.get(key) === value) {
return;
}
@ -161,7 +317,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] });
if (this.onChanged) {
await this.onChanged([key], this);
await this.onChanged([key], this.callbackMeta);
}
}
@ -172,6 +328,8 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async setMany(items: Iterable<readonly [key: string, value: T]>): Promise<void> {
this.throwIfDisposed();
const changedKeys: string[] = [];
for (const item of items) {
@ -185,7 +343,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: changedKeys });
if (this.onChanged) {
await this.onChanged(changedKeys, this);
await this.onChanged(changedKeys, this.callbackMeta);
}
}
}
@ -197,6 +355,8 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public add(key: string, value: T): void {
this.throwIfDisposed();
this.memoryCache.set(key, value);
}
@ -207,6 +367,8 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public addMany(items: Iterable<readonly [key: string, value: T]>): void {
this.throwIfDisposed();
for (const [key, value] of items) {
this.memoryCache.set(key, value);
}
@ -242,14 +404,16 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async fetch(key: string): Promise<T> {
this.throwIfDisposed();
let value = this.memoryCache.get(key);
if (value == null) {
value = await this.callFetch(key);
value = await this.doFetch(key);
this.memoryCache.set(key, value);
if (this.onChanged) {
await this.onChanged([key], this);
await this.onChanged([key], this.callbackMeta);
}
}
return value;
@ -261,12 +425,14 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async fetchMaybe(key: string): Promise<T | undefined> {
this.throwIfDisposed();
let value = this.memoryCache.get(key);
if (value != null) {
return value;
}
value = await this.callFetchMaybe(key);
value = await this.doFetchMaybe(key);
if (value == null) {
return undefined;
}
@ -274,7 +440,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
this.memoryCache.set(key, value);
if (this.onChanged) {
await this.onChanged([key], this);
await this.onChanged([key], this.callbackMeta);
}
return value;
@ -287,6 +453,8 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async fetchMany(keys: Iterable<string>): Promise<KVPArray<T>> {
this.throwIfDisposed();
const results: [key: string, value: T][] = [];
const toFetch: string[] = [];
@ -302,7 +470,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
// Fetch any uncached keys
if (toFetch.length > 0) {
const fetched = await this.bulkFetch(toFetch);
const fetched = await this.doFetchMany(toFetch);
// Add to cache and return set
this.addMany(fetched);
@ -310,7 +478,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
// Emit event
if (this.onChanged) {
await this.onChanged(toFetch, this);
await this.onChanged(toFetch, this.callbackMeta);
}
}
@ -332,12 +500,14 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async delete(key: string): Promise<void> {
this.throwIfDisposed();
this.memoryCache.delete(key);
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] });
if (this.onChanged) {
await this.onChanged([key], this);
await this.onChanged([key], this.callbackMeta);
}
}
/**
@ -347,6 +517,8 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async deleteMany(keys: Iterable<string>): Promise<void> {
this.throwIfDisposed();
const deleted: string[] = [];
for (const key of keys) {
@ -361,7 +533,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: deleted });
if (this.onChanged) {
await this.onChanged(deleted, this);
await this.onChanged(deleted, this.callbackMeta);
}
}
@ -371,15 +543,19 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public async refresh(key: string): Promise<T> {
const value = await this.callFetch(key);
this.throwIfDisposed();
const value = await this.doFetch(key);
await this.set(key, value);
return value;
}
@bindThis
public async refreshMany(keys: Iterable<string>): Promise<KVPArray<T>> {
this.throwIfDisposed();
const toFetch = Array.from(keys);
const fetched = await this.bulkFetch(toFetch);
const fetched = await this.doFetchMany(toFetch);
await this.setMany(fetched);
return makeKVPArray(fetched);
}
@ -390,6 +566,8 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
*/
@bindThis
public clear(): void {
this.throwIfDisposed();
this.memoryCache.clear();
}
@ -398,12 +576,14 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
* Fires an onReset event and updates other processes.
*/
public async reset(): Promise<void> {
this.throwIfDisposed();
this.clear();
await this.internalEventService.emit('quantumCacheReset', { name: this.name });
if (this.onReset) {
await this.onReset(this);
await this.onReset(this.callbackMeta);
}
}
@ -412,7 +592,9 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
* Does not send any events or update other processes.
*/
@bindThis
public gc() {
public gc(): void {
this.throwIfDisposed();
this.memoryCache.gc();
}
@ -421,91 +603,422 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
* This *must* be called when shutting down to prevent memory leaks!
*/
@bindThis
public dispose() {
this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated);
this.internalEventService.off('quantumCacheReset', this.onQuantumCacheReset);
public async dispose(): Promise<void> {
if (this.isDisposed) return;
this.isDisposing = true;
this.memoryCache.dispose();
try {
// Stop handling events *first*
this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated);
this.internalEventService.off('quantumCacheReset', this.onQuantumCacheReset);
// Kill active fetchers
const error = new DisposingError({ source: this.nameForError });
this.disposeController.abort(error);
// Wait for cleanup
await Promise.allSettled([
...this.activeFetchers.values().map(p => trackPromise(p)),
...this.activeOptionalFetchers.values().map(p => trackPromise(p)),
...this.activeBulkFetchers.values().map(p => trackPromise(p)),
]);
// Purge memory for faster GC
this.activeFetchers.clear();
this.activeOptionalFetchers.clear();
this.activeBulkFetchers.clear();
this.memoryCache.dispose();
} finally {
this.isDisposing = false;
this.isDisposed = true;
}
}
@bindThis
private async bulkFetch(keys: string[]): Promise<[key: string, value: T][]> {
// Use the bulk fetcher if available.
if (this.bulkFetcher) {
try {
const results = await this.bulkFetcher(keys, this);
return Array.from(results);
} catch (err) {
throw new FetchFailedError(this.name, keys, renderInlineError(err), { cause: err });
}
}
private async onQuantumCacheUpdated(data: EventTypes['quantumCacheUpdated']): Promise<void> {
this.throwIfDisposed();
// Otherwise fall back to regular fetch.
const results: [key: string, value: T][] = [];
for (const key of keys) {
const value = await this.callFetchMaybe(key);
if (value != null) {
results.push([key, value]);
}
}
return results;
}
@bindThis
private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> {
if (data.name === this.name) {
for (const key of data.keys) {
this.memoryCache.delete(key);
}
if (this.onChanged) {
await this.onChanged(data.keys, this);
await this.onChanged(data.keys, this.callbackMeta);
}
}
}
@bindThis
private async callFetch(key: string): Promise<T> {
const value = await this.callFetchMaybe(key);
private async onQuantumCacheReset(data: EventTypes['quantumCacheReset']): Promise<void> {
this.throwIfDisposed();
if (value == null) {
throw new KeyNotFoundError(this.name, key);
}
return value;
}
@bindThis
private async callFetchMaybe(key: string): Promise<T | undefined> {
try {
const value = await this.fetcher(key, this);
return value ?? undefined;
} catch (err) {
if (err instanceof EntityNotFoundError) {
return undefined;
}
throw new FetchFailedError(this.name, key, renderInlineError(err), { cause: err });
}
}
@bindThis
private async onQuantumCacheReset(data: InternalEventTypes['quantumCacheReset']): Promise<void> {
if (data.name === this.name) {
this.clear();
if (this.onReset) {
await this.onReset(this);
await this.onReset(this.callbackMeta);
}
}
}
/**
* Iterates all [key, value] pairs in memory.
* This applies to the local subset view, not the cross-cluster cache state.
* Executes a fetch operation and translates results.
* Always uses fetcher().
* Concurrent calls for the same key are de-duplicated.
*/
[Symbol.iterator](): Iterator<[key: string, value: T]> {
return this.entries();
@bindThis
private doFetch(key: string): Promise<Value<T>> {
// De-duplicate call
let promise = this.activeFetchers.get(key);
if (!promise) {
// Start new call
const fetchPromise = promiseTry(this.callFetcher, key)
.catch(async err => {
if (err instanceof QuantumCacheError && err.cacheName === this.nameForError) {
throw err;
}
if (err instanceof EntityNotFoundError) {
throw new KeyNotFoundError(this.nameForError, key, renderInlineError(err), { cause: err });
}
throw new FetchFailedError(this.nameForError, key, renderInlineError(err), { cause: err });
})
.then(async result => {
if (result == null) {
throw new KeyNotFoundError(this.nameForError, key);
}
return result;
});
// Untrack when it finalizes
const cleanupCallback = async () => {
if (this.activeFetchers.get(key) === promise) {
this.activeFetchers.delete(key);
} else {
throw new QuantumCacheError(this.nameForError, `Internal error: fetcher race detected for key "${key}"`);
}
};
promise = withCleanup(fetchPromise, cleanupCallback);
// Track it!!
this.activeFetchers.set(key, promise);
}
return promise;
}
/**
* Executes a fetchMaybe operation and translates results.
* Automatically uses the best available fetch implementation.
* Concurrent calls for the same key are de-duplicated.
*/
@bindThis
private doFetchMaybe(key: string): Promise<Value<T> | undefined> {
// De-duplicate call
let promise = this.activeOptionalFetchers.get(key);
if (!promise) {
// Use optionalFetcher() if available
if (this.optionalFetcher != null) {
// Start new call
const fetchPromise = promiseTry(this.callOptionalFetcher, key)
.catch(async err => {
if (err instanceof QuantumCacheError && err.cacheName === this.nameForError) {
throw err;
}
throw new FetchFailedError(this.nameForError, key, renderInlineError(err), { cause: err });
})
.then(result => result ?? undefined);
// Untrack when it finalizes
const cleanupCallback = async () => {
if (this.activeOptionalFetchers.get(key) === promise) {
this.activeOptionalFetchers.delete(key);
} else {
throw new QuantumCacheError(this.nameForError, `Internal error: optionalFetcher race detected for key "${key}"`);
}
};
promise = withCleanup(fetchPromise, cleanupCallback);
// Track it!!
this.activeOptionalFetchers.set(key, promise);
} else {
// Fall back on fetcher() if optionalFetcher() is unavailable
promise = promiseTry(this.doFetch, key)
.catch(async err => {
if (err instanceof KeyNotFoundError) {
return undefined;
}
throw err;
});
}
}
// Await result
return promise;
}
/**
* Executes a fetchMany operation and translates results.
* Automatically uses the best available fetch implementation.
* Concurrent calls for the same key are de-duplicated.
*/
@bindThis
private doFetchMany(keys: string[]): Promise<[key: string, value: Value<T>][]> {
const uniqueKeys = new Set(keys);
const fetcherPromises = new Map<string, ActiveFetcher<T>>();
const optionalFetcherPromises = new Map<string, ActiveOptionalFetcher<T>>();
const bulkFetcherPromises = new Set<ActiveBulkFetcher<T>>();
const remainingKeys: string[] = [];
// If any keys are covered by an active promise, then re-use it to avoid duplicate fetches.
for (const key of uniqueKeys) {
// Re-use an optionalFetcher() call
const optionalPromise = this.activeOptionalFetchers.get(key);
if (optionalPromise) {
optionalFetcherPromises.set(key, optionalPromise);
continue;
}
// Re-use a fetcher() call
const fetchPromise = this.activeFetchers.get(key);
if (fetchPromise) {
fetcherPromises.set(key, fetchPromise);
continue;
}
// Re-use a bulkFetcher() call
const bulkPromise = this.activeBulkFetchers.get(key);
if (bulkPromise) {
bulkFetcherPromises.add(bulkPromise);
continue;
}
// Queue up for a new bulkFetcher() call
remainingKeys.push(key);
}
// Start a new fetch for any keys that weren't already covered.
if (hasAtLeastOne(remainingKeys)) {
if (remainingKeys.length > 1 && this.bulkFetcher != null) {
// Use the bulk fetcher if available
const promise = this.callBulkFetcherWithTracking(remainingKeys);
bulkFetcherPromises.add(promise);
} else {
// Otherwise fall back to single fetcher
for (const key of remainingKeys) {
const promise = this.doFetchMaybe(key);
optionalFetcherPromises.set(key, promise);
}
}
}
return Promise
// Wrap all promises into a common shape
.allSettled<KeyValue<T>[]>([
...fetcherPromises
.entries()
.map(([key, promise]) => promise
.catch(async err => {
if (err instanceof KeyNotFoundError) {
return undefined;
}
throw err;
})
.then(value => {
if (value === undefined) {
return [];
}
return [[key, value]] as KeyValue<T>[];
})),
...optionalFetcherPromises
.entries()
.map(([key, promise]) => promise.then(value => {
if (value === undefined) {
return [];
}
return [[key, value]] as KeyValue<T>[];
})),
...bulkFetcherPromises,
])
// Unpack results and handle errors
.then(async promiseResults => {
const results: KeyValue<T>[][] = [];
const errors: unknown[] = [];
for (const pr of promiseResults) {
if (pr.status === 'fulfilled') {
results.push(pr.value);
} else {
errors.push(pr.reason);
}
}
if (errors.length === 1) {
const innerException = errors[0];
throw new FetchFailedError(this.nameForError, keys, renderInlineError(innerException), { cause: innerException });
} else if (errors.length > 1) {
const innerException = new AggregateError(errors);
throw new FetchFailedError(this.nameForError, keys, 'Multiple exceptions thrown; see inner exception (cause) for details', { cause: innerException });
}
return results.flat();
});
}
/**
* Calls fetcher().
* Do not call this directly - use doFetch() instead!
*/
@bindThis
private callFetcher(key: string): Promise<T | null | undefined> {
// Safety check, in case this gets called directly by mistake
this.throwIfDisposed();
if (this.activeFetchers.has(key)) {
throw new QuantumCacheError(this.nameForError, `Internal error: attempted to call fetcher multiple times for key "${key}"`);
}
// Start limiter cascade
return this.globalLimiter(async () => {
this.throwIfDisposed();
return await this.fetcherLimiter(async () => {
this.throwIfDisposed();
return await withSignal(
// Execute callback and adapt results
async () => await this.fetcher(key, this.callbackMeta),
// Bind abort signal in case fetcher stalls out
this.disposeController.signal,
);
});
});
}
/**
* Calls optionalFetcher().
* Do not call this directly - use doFetchMaybe() instead!
*/
@bindThis
private callOptionalFetcher(key: string): Promise<T | null | undefined> {
// Safety checks, in case this gets called directly by mistake
this.throwIfDisposed();
const optionalFetcher = this.optionalFetcher;
if (optionalFetcher == null) {
throw new QuantumCacheError(this.nameForError, 'Internal error: attempted to call optionalFetcher for a cache that doesn\'t support it');
}
if (this.activeOptionalFetchers.has(key)) {
throw new QuantumCacheError(this.nameForError, `Internal error: attempted to call optionalFetcher multiple times for key "${key}"`);
}
// Start limiter cascade
return this.globalLimiter(async () => {
this.throwIfDisposed();
return await this.optionalFetcherLimiter(async () => {
this.throwIfDisposed();
return await withSignal(
// Execute callback and adapt results
async () => await optionalFetcher(key, this.callbackMeta),
// Bind abort signal in case fetcher stalls out
this.disposeController.signal,
);
});
});
}
/**
* Calls bulkFetcher() and tracks the promise.
* Do not call this directly - use doBulkFetch() instead!
*/
@bindThis
private callBulkFetcherWithTracking(keys: AtLeastOne<string>): ActiveBulkFetcher<T> {
// Start new call
const fetchPromise = promiseTry(this.callBulkFetcher, keys)
.then(results => Array.from(results).filter((result): result is KeyValue<T> => {
return result[1] != null;
}));
// Untrack when it finalizes
const cleanupCallback = async () => {
const racedKeys: string[] = [];
for (const key of keys) {
if (this.activeBulkFetchers.get(key) === promise) {
this.activeBulkFetchers.delete(key);
} else {
racedKeys.push(key);
}
}
if (racedKeys.length > 0) {
const allKeys = racedKeys.map(k => `"${k}"`).join(', ');
throw new QuantumCacheError(this.nameForError, `Internal error: bulkFetcher race detected for key(s) ${allKeys}`);
}
};
const promise = withCleanup(fetchPromise, cleanupCallback);
// Track it!!
for (const key of keys) {
this.activeBulkFetchers.set(key, promise);
}
return promise;
}
/**
* Calls bulkFetcher().
* Do not call this directly - use bulkFetch() instead!
*/
@bindThis
private callBulkFetcher(keys: AtLeastOne<string>): Promise<Iterable<KeyValue<T | null | undefined>>> {
// Safety checks, in case this gets called directly by mistake
const bulkFetcher = this.bulkFetcher;
this.throwIfDisposed();
if (bulkFetcher == null) {
throw new QuantumCacheError(this.nameForError, 'Internal error: attempted to call bulkFetcher for a cache that doesn\'t support it');
}
const duplicateKeys = keys.filter(key => this.activeBulkFetchers.has(key));
if (duplicateKeys.length > 0) {
const allKeys = duplicateKeys.map(k => `"${k}"`).join(', ');
throw new QuantumCacheError(this.nameForError, `Internal error: attempted to call bulkFetcher multiple times for key(s) ${allKeys}`);
}
// Start limiter cascade
return this.globalLimiter(async () => {
this.throwIfDisposed();
return await this.bulkFetcherLimiter(async () => {
this.throwIfDisposed();
return await withSignal(
// Execute callback and adapt results
async () => await bulkFetcher(keys, this.callbackMeta),
// Bind abort signal in case fetcher stalls out
this.disposeController.signal,
);
});
});
}
@bindThis
private throwIfDisposed() {
if (this.isDisposing) {
throw new DisposingError({ source: this.nameForError });
}
if (this.isDisposed) {
throw new DisposedError({ source: this.nameForError });
}
}
}
function hasAtLeastOne<T>(array: T[]): array is AtLeastOne<T> {
return array.length > 0;
}

View file

@ -1,994 +0,0 @@
/*
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { jest } from '@jest/globals';
import { GodOfTimeService } from '../../misc/GodOfTimeService.js';
import { MockInternalEventService } from '../../misc/MockInternalEventService.js';
import { QuantumKVCache, QuantumKVOpts } from '@/misc/QuantumKVCache.js';
import { KeyNotFoundError } from '@/misc/errors/KeyNotFoundError.js';
import { FetchFailedError } from '@/misc/errors/FetchFailedError.js';
describe(QuantumKVCache, () => {
let fakeTimeService: GodOfTimeService;
let fakeInternalEventService: MockInternalEventService;
let madeCaches: { dispose: () => void }[];
function makeCache<T>(opts?: Partial<QuantumKVOpts<T>> & { name?: string }): QuantumKVCache<T> {
const _opts = {
name: 'test',
lifetime: Infinity,
fetcher: () => { throw new Error('not implemented'); },
} satisfies QuantumKVOpts<T> & { name: string };
if (opts) {
Object.assign(_opts, opts);
}
const services = {
internalEventService: fakeInternalEventService,
timeService: fakeTimeService,
};
const cache = new QuantumKVCache<T>(_opts.name, services, _opts);
madeCaches.push(cache);
return cache;
}
beforeEach(() => {
madeCaches = [];
fakeTimeService = new GodOfTimeService();
fakeInternalEventService = new MockInternalEventService();
});
afterEach(() => {
madeCaches.forEach(cache => {
cache.dispose();
});
});
it('should connect on construct', () => {
makeCache();
expect(fakeInternalEventService._calls).toContainEqual(['on', ['quantumCacheUpdated', expect.anything(), { ignoreLocal: true }]]);
});
it('should disconnect on dispose', () => {
const cache = makeCache();
cache.dispose();
const callback = fakeInternalEventService._calls
.find(c => c[0] === 'on' && c[1][0] === 'quantumCacheUpdated')
?.[1][1];
expect(fakeInternalEventService._calls).toContainEqual(['off', ['quantumCacheUpdated', callback]]);
});
it('should store in memory cache', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
const result1 = await cache.get('foo');
const result2 = await cache.get('alpha');
expect(result1).toBe('bar');
expect(result2).toBe('omega');
});
it('should emit event when storing', async () => {
const cache = makeCache<string>({ name: 'fake' });
await cache.set('foo', 'bar');
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]);
});
it('should call onChanged when storing', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.set('foo', 'bar');
expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache);
});
it('should not emit event when storing unchanged value', async () => {
const cache = makeCache<string>({ name: 'fake' });
await cache.set('foo', 'bar');
await cache.set('foo', 'bar');
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1);
});
it('should not call onChanged when storing unchanged value', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.set('foo', 'bar');
await cache.set('foo', 'bar');
expect(fakeOnChanged).toHaveBeenCalledTimes(1);
});
it('should fetch an unknown value', async () => {
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
});
const result = await cache.fetch('foo');
expect(result).toBe('value#foo');
});
it('should store fetched value in memory cache', async () => {
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
});
await cache.fetch('foo');
const result = cache.has('foo');
expect(result).toBe(true);
});
it('should call onChanged when fetching', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
onChanged: fakeOnChanged,
});
await cache.fetch('foo');
expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache);
});
it('should not emit event when fetching', async () => {
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
});
await cache.fetch('foo');
expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]);
});
it('should delete from memory cache', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
await cache.delete('foo');
const result = cache.has('foo');
expect(result).toBe(false);
});
it('should call onChanged when deleting', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.set('foo', 'bar');
await cache.delete('foo');
expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache);
});
it('should emit event when deleting', async () => {
const cache = makeCache<string>({ name: 'fake' });
await cache.set('foo', 'bar');
await cache.delete('foo');
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]);
});
it('should delete when receiving set event', async () => {
const cache = makeCache<string>({ name: 'fake' });
await cache.set('foo', 'bar');
await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] });
const result = cache.has('foo');
expect(result).toBe(false);
});
it('should call onChanged when receiving set event', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] });
expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache);
});
it('should delete when receiving delete event', async () => {
const cache = makeCache<string>({ name: 'fake' });
await cache.set('foo', 'bar');
await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] });
const result = cache.has('foo');
expect(result).toBe(false);
});
it('should call onChanged when receiving delete event', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.set('foo', 'bar');
await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] });
expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache);
});
describe('get', () => {
it('should return value if present', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
const result = cache.get('foo');
expect(result).toBe('bar');
});
it('should return undefined if missing', () => {
const cache = makeCache<string>();
const result = cache.get('foo');
expect(result).toBe(undefined);
});
});
describe('setMany', () => {
it('should populate all values', async () => {
const cache = makeCache<string>();
await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]);
expect(cache.has('foo')).toBe(true);
expect(cache.has('alpha')).toBe(true);
});
it('should emit one event', async () => {
const cache = makeCache<string>({
name: 'fake',
});
await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]);
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]);
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1);
});
it('should call onChanged once with all items', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]);
expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache);
expect(fakeOnChanged).toHaveBeenCalledTimes(1);
});
it('should emit events only for changed items', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.set('foo', 'bar');
fakeOnChanged.mockClear();
fakeInternalEventService.mockReset();
await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]);
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['alpha'] }]]);
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1);
expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], cache);
expect(fakeOnChanged).toHaveBeenCalledTimes(1);
});
});
describe('getMany', () => {
it('should return empty for empty input', () => {
const cache = makeCache();
const result = cache.getMany([]);
expect(result).toEqual([]);
});
it('should return the value for all keys', () => {
const cache = makeCache();
cache.add('foo', 'bar');
cache.add('alpha', 'omega');
const result = cache.getMany(['foo', 'alpha']);
expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]);
});
it('should return undefined for missing keys', () => {
const cache = makeCache();
cache.add('foo', 'bar');
const result = cache.getMany(['foo', 'alpha']);
expect(result).toEqual([['foo', 'bar'], ['alpha', undefined]]);
});
});
describe('fetch', () => {
it('should throw FetchFailedError when fetcher throws error', async () => {
const cache = makeCache<string>({
fetcher: () => { throw new Error('test error'); },
});
await expect(cache.fetch('foo')).rejects.toThrow(FetchFailedError);
});
it('should throw KeyNotFoundError when fetcher returns null', async () => {
const cache = makeCache<string>({
fetcher: () => null,
});
await expect(cache.fetch('foo')).rejects.toThrow(KeyNotFoundError);
});
it('should throw KeyNotFoundError when fetcher undefined', async () => {
const cache = makeCache<string>({
fetcher: () => undefined,
});
await expect(cache.fetch('foo')).rejects.toThrow(KeyNotFoundError);
});
});
describe('fetchMaybe', () => {
it('should return value when found by fetcher', async () => {
const cache = makeCache<string>({
fetcher: () => 'bar',
});
const result = await cache.fetchMaybe('foo');
expect(result).toBe('bar');
});
it('should call onChanged when found by fetcher', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
fetcher: () => 'bar',
onChanged: fakeOnChanged,
});
await cache.fetchMaybe('foo');
expect(fakeOnChanged).toHaveBeenCalled();
});
it('should return undefined when fetcher returns undefined', async () => {
const cache = makeCache<string>({
fetcher: () => undefined,
});
const result = await cache.fetchMaybe('foo');
expect(result).toBe(undefined);
});
it('should not call onChanged when fetcher returns undefined', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
fetcher: () => undefined,
onChanged: fakeOnChanged,
});
await cache.fetchMaybe('foo');
expect(fakeOnChanged).not.toHaveBeenCalled();
});
it('should return undefined when fetcher returns null', async () => {
const cache = makeCache<string>({
fetcher: () => null,
});
const result = await cache.fetchMaybe('foo');
expect(result).toBe(undefined);
});
it('should not call onChanged when fetcher returns null', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
fetcher: () => null,
onChanged: fakeOnChanged,
});
await cache.fetchMaybe('foo');
expect(fakeOnChanged).not.toHaveBeenCalled();
});
it('should throw FetchFailedError when fetcher throws error', async () => {
const cache = makeCache<string>({
fetcher: () => { throw new Error('test error'); },
});
await expect(cache.fetchMaybe('foo')).rejects.toThrow(FetchFailedError);
});
});
describe('fetchMany', () => {
it('should do nothing for empty input', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
onChanged: fakeOnChanged,
});
await cache.fetchMany([]);
expect(fakeOnChanged).not.toHaveBeenCalled();
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0);
});
it('should return existing items', async () => {
const cache = makeCache();
cache.add('foo', 'bar');
cache.add('alpha', 'omega');
const result = await cache.fetchMany(['foo', 'alpha']);
expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]);
});
it('should return existing items without events', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
onChanged: fakeOnChanged,
});
cache.add('foo', 'bar');
cache.add('alpha', 'omega');
await cache.fetchMany(['foo', 'alpha']);
expect(fakeOnChanged).not.toHaveBeenCalled();
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0);
});
it('should call bulkFetcher for missing items', async () => {
const cache = makeCache({
bulkFetcher: keys => keys.map(k => [k, `${k}#many`]),
fetcher: key => `${key}#single`,
});
const results = await cache.fetchMany(['foo', 'alpha']);
expect(results).toEqual([['foo', 'foo#many'], ['alpha', 'alpha#many']]);
});
it('should call bulkFetcher only once', async () => {
const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string]));
const cache = makeCache({
bulkFetcher: mockBulkFetcher,
});
await cache.fetchMany(['foo', 'bar']);
expect(mockBulkFetcher).toHaveBeenCalledTimes(1);
});
it('should call fetcher when fetchMany is undefined', async () => {
const cache = makeCache({
fetcher: key => `${key}#single`,
});
const results = await cache.fetchMany(['foo', 'alpha']);
expect(results).toEqual([['foo', 'foo#single'], ['alpha', 'alpha#single']]);
});
it('should call onChanged', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
onChanged: fakeOnChanged,
fetcher: k => k,
});
await cache.fetchMany(['foo', 'alpha']);
expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache);
expect(fakeOnChanged).toHaveBeenCalledTimes(1);
});
it('should call onChanged only for changed', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
onChanged: fakeOnChanged,
fetcher: k => k,
});
cache.add('foo', 'bar');
await cache.fetchMany(['foo', 'alpha']);
expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], cache);
expect(fakeOnChanged).toHaveBeenCalledTimes(1);
});
it('should not emit event', async () => {
const cache = makeCache({
fetcher: k => k,
});
await cache.fetchMany(['foo', 'alpha']);
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0);
});
});
describe('refreshMany', () => {
it('should do nothing for empty input', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
onChanged: fakeOnChanged,
});
const result = await cache.refreshMany([]);
expect(result).toEqual([]);
expect(fakeOnChanged).not.toHaveBeenCalled();
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0);
});
it('should call bulkFetcher for all keys', async () => {
const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string]));
const cache = makeCache({
bulkFetcher: mockBulkFetcher,
});
const result = await cache.refreshMany(['foo', 'alpha']);
expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]);
expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], cache);
expect(mockBulkFetcher).toHaveBeenCalledTimes(1);
});
it('should replace any existing keys', async () => {
const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string]));
const cache = makeCache({
bulkFetcher: mockBulkFetcher,
});
cache.add('foo', 'bar');
const result = await cache.refreshMany(['foo', 'alpha']);
expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]);
expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], cache);
expect(mockBulkFetcher).toHaveBeenCalledTimes(1);
});
it('should call onChanged for all keys', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
bulkFetcher: keys => keys.map(k => [k, `${k}#value`]),
onChanged: fakeOnChanged,
});
cache.add('foo', 'bar');
await cache.refreshMany(['foo', 'alpha']);
expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache);
expect(fakeOnChanged).toHaveBeenCalledTimes(1);
});
it('should emit event for all keys', async () => {
const cache = makeCache({
name: 'fake',
bulkFetcher: keys => keys.map(k => [k, `${k}#value`]),
});
cache.add('foo', 'bar');
await cache.refreshMany(['foo', 'alpha']);
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]);
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1);
});
it('should throw FetchFailedError when bulk fetcher throws error', async () => {
const cache = makeCache<string>({
bulkFetcher: () => { throw new Error('test error'); },
});
await expect(cache.refreshMany(['foo'])).rejects.toThrow(FetchFailedError);
});
it('should throw FetchFailedError when fallback fetcher throws error', async () => {
const cache = makeCache<string>({
fetcher: () => { throw new Error('test error'); },
});
await expect(cache.refreshMany(['foo'])).rejects.toThrow(FetchFailedError);
});
it('should not throw when fallback fetcher returns null', async () => {
const cache = makeCache<string>({
fetcher: () => null,
});
const result = await cache.refreshMany(['foo']);
expect(result).toHaveLength(0);
});
it('should not throw when fallback fetcher returns undefined', async () => {
const cache = makeCache<string>({
fetcher: () => undefined,
});
const result = await cache.refreshMany(['foo']);
expect(result).toHaveLength(0);
});
it('should not throw when bulk fetcher returns empty', async () => {
const cache = makeCache<string>({
bulkFetcher: () => [],
});
const result = await cache.refreshMany(['foo']);
expect(result).toHaveLength(0);
});
});
describe('deleteMany', () => {
it('should remove keys from memory cache', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
await cache.deleteMany(['foo', 'alpha']);
expect(cache.has('foo')).toBe(false);
expect(cache.has('alpha')).toBe(false);
});
it('should emit only one event', async () => {
const cache = makeCache<string>({
name: 'fake',
});
await cache.deleteMany(['foo', 'alpha']);
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]);
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1);
});
it('should call onChanged once with all items', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.deleteMany(['foo', 'alpha']);
expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache);
expect(fakeOnChanged).toHaveBeenCalledTimes(1);
});
it('should do nothing if no keys are provided', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
onChanged: fakeOnChanged,
});
await cache.deleteMany([]);
expect(fakeOnChanged).not.toHaveBeenCalled();
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0);
});
});
describe('refresh', () => {
it('should populate the value', async () => {
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
});
await cache.refresh('foo');
const result = cache.has('foo');
expect(result).toBe(true);
});
it('should return the value', async () => {
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
});
const result = await cache.refresh('foo');
expect(result).toBe('value#foo');
});
it('should replace the value if it exists', async () => {
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
});
await cache.set('foo', 'bar');
const result = await cache.refresh('foo');
expect(result).toBe('value#foo');
});
it('should call onChanged', async () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
onChanged: fakeOnChanged,
});
await cache.refresh('foo');
expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache);
});
it('should emit event', async () => {
const cache = makeCache<string>({
name: 'fake',
fetcher: key => `value#${key}`,
});
await cache.refresh('foo');
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]);
});
});
describe('reset', () => {
it('should erase all items', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
await cache.reset();
expect(cache.size).toBe(0);
});
it('should call onReset', async () => {
const fakeOnReset = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
onReset: fakeOnReset,
});
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
await cache.reset();
expect(fakeOnReset).toHaveBeenCalled();
});
it('should emit event', async () => {
const cache = makeCache<string>({
name: 'fake',
});
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
await cache.reset();
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheReset', { name: 'fake' }]]);
});
});
describe('add', () => {
it('should add the item', () => {
const cache = makeCache();
cache.add('foo', 'bar');
expect(cache.has('foo')).toBe(true);
});
it('should not emit event', () => {
const cache = makeCache({
name: 'fake',
});
cache.add('foo', 'bar');
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0);
});
it('should not call onChanged', () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
onChanged: fakeOnChanged,
});
cache.add('foo', 'bar');
expect(fakeOnChanged).not.toHaveBeenCalled();
});
});
describe('addMany', () => {
it('should add all items', () => {
const cache = makeCache();
cache.addMany([['foo', 'bar'], ['alpha', 'omega']]);
expect(cache.has('foo')).toBe(true);
expect(cache.has('alpha')).toBe(true);
});
it('should not emit event', () => {
const cache = makeCache({
name: 'fake',
});
cache.addMany([['foo', 'bar'], ['alpha', 'omega']]);
expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0);
});
it('should not call onChanged', () => {
const fakeOnChanged = jest.fn(() => Promise.resolve());
const cache = makeCache({
onChanged: fakeOnChanged,
});
cache.addMany([['foo', 'bar'], ['alpha', 'omega']]);
expect(fakeOnChanged).not.toHaveBeenCalled();
});
});
describe('has', () => {
it('should return false when empty', () => {
const cache = makeCache();
const result = cache.has('foo');
expect(result).toBe(false);
});
it('should return false when value is not in memory', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
const result = cache.has('alpha');
expect(result).toBe(false);
});
it('should return true when value is in memory', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
const result = cache.has('foo');
expect(result).toBe(true);
});
});
describe('size', () => {
it('should return 0 when empty', () => {
const cache = makeCache();
expect(cache.size).toBe(0);
});
it('should return correct size when populated', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
expect(cache.size).toBe(1);
});
});
describe('entries', () => {
it('should return empty when empty', () => {
const cache = makeCache();
const result = Array.from(cache.entries());
expect(result).toHaveLength(0);
});
it('should return all entries when populated', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
const result = Array.from(cache.entries());
expect(result).toEqual([['foo', 'bar']]);
});
});
describe('keys', () => {
it('should return empty when empty', () => {
const cache = makeCache();
const result = Array.from(cache.keys());
expect(result).toHaveLength(0);
});
it('should return all keys when populated', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
const result = Array.from(cache.keys());
expect(result).toEqual(['foo']);
});
});
describe('values', () => {
it('should return empty when empty', () => {
const cache = makeCache();
const result = Array.from(cache.values());
expect(result).toHaveLength(0);
});
it('should return all values when populated', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
const result = Array.from(cache.values());
expect(result).toEqual(['bar']);
});
});
describe('[Symbol.iterator]', () => {
it('should return empty when empty', () => {
const cache = makeCache();
const result = Array.from(cache);
expect(result).toHaveLength(0);
});
it('should return all entries when populated', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
const result = Array.from(cache);
expect(result).toEqual([['foo', 'bar']]);
});
});
});

File diff suppressed because it is too large Load diff