diff --git a/packages/backend/src/misc/QuantumKVCache.ts b/packages/backend/src/misc/QuantumKVCache.ts index 9c87b7fab9..b16b5c8aa9 100644 --- a/packages/backend/src/misc/QuantumKVCache.ts +++ b/packages/backend/src/misc/QuantumKVCache.ts @@ -4,53 +4,152 @@ */ import { EntityNotFoundError } from 'typeorm'; +import promiseLimit from 'promise-limit'; import { bindThis } from '@/decorators.js'; -import { InternalEventService } from '@/global/InternalEventService.js'; -import type { InternalEventTypes } from '@/core/GlobalEventService.js'; +import type { InternalEventService, EventTypes } from '@/global/InternalEventService.js'; import { MemoryKVCache, type MemoryCacheServices } from '@/misc/cache.js'; import { makeKVPArray, type KVPArray } from '@/misc/kvp-array.js'; import { renderInlineError } from '@/misc/render-inline-error.js'; import { FetchFailedError } from '@/misc/errors/FetchFailedError.js'; import { KeyNotFoundError } from '@/misc/errors/KeyNotFoundError.js'; +import { QuantumCacheError } from '@/misc/errors/QuantumCacheError.js'; +import { DisposedError, DisposingError } from '@/misc/errors/DisposeError.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; +import { withCleanup, withSignal } from '@/misc/promiseUtils.js'; +import { promiseTry } from '@/misc/promise-try.js'; -export interface QuantumKVOpts { +export interface QuantumKVOpts = Value> { /** * Memory cache lifetime in milliseconds. */ lifetime: number; /** - * Callback to fetch the value for a key that wasn't found in the cache. - * Return null/undefined or throw an error if no value exists for the given key. - * May be synchronous or async. + * Callback to fetch required values by key. */ - fetcher: (key: string, cache: QuantumKVCache) => T | null | undefined | Promise; + fetcher: Fetcher; /** - * Optional callback to fetch the value for multiple keys that weren't found in the cache. - * Don't throw or return null if a key has no value; just omit it from the response. - * May be synchronous or async. - * If not provided, then the implementation will fall back on repeated calls to fetcher(). + * Callback to fetch optional values by key. */ - bulkFetcher?: (keys: string[], cache: QuantumKVCache) => Iterable<[key: string, value: T]> | Promise>; + optionalFetcher?: OptionalFetcher; /** - * Optional callback when one or more values are changed (created, updated, or deleted) in the cache, either locally or elsewhere in the cluster. - * This is called *after* the cache state is updated. - * Implementations may be synchronous or async. + * Callback to fetch multiple optional values by key. */ - onChanged?: (keys: string[], cache: QuantumKVCache) => void | Promise; + bulkFetcher?: BulkFetcher; /** - * Optional callback when all values are removed from the cache, either locally or elsewhere in the cluster. - * This is called *after* the cache state is updated. - * Implementations may be synchronous or async. + * Callback to handle changes to the cross-cluster state (create, update, or delete values). */ - onReset?: (cache: QuantumKVCache) => void | Promise; + onChanged?: OnChanged; - // TODO equality comparer + /** + * Callback to handle a whole-state reset (all values deleted). + */ + onReset?: OnReset; + + /** + * Optional limit on the number of calls to fetcher to allow at once. + * If more than this many fetches are attempted, the excess will be queued until the earlier operations complete. + * The total number of calls will never exceed maxConcurrency. + * Min: 1 + * Default: 4 + */ + fetcherConcurrency?: number; + + /** + * Optional limit on the number of calls to optionalFetcher to allow at once. + * If more than this many fetches are attempted, the excess will be queued until the earlier operations complete. + * The total number of calls will never exceed maxConcurrency. + * Min: 1 + * Default: 4 + */ + optionalFetcherConcurrency?: number; + + /** + * Optional limit on the number of calls to bulkFetcher to allow at once. + * If more than this many fetches are attempted, the excess will be queued until the earlier operations complete. + * The total number of calls will never exceed maxConcurrency. + * Min: 1 + * Default: 2 + */ + bulkFetcherConcurrency?: number; + + /** + * Optional limit on the total number of calls to fetcher, optionalFetcher, or bulkFetcher to allow at once. + * If more than this many fetches are attempted, the excess will be queued until the earlier operations complete. + * Min: 1 + * Default: fetcherConcurrency, optionalFetcherConcurrency, or bulkFetcherConcurrency - whichever is highest. + */ + maxConcurrency?: number; } +export interface CallbackMeta { + /** + * The cache instance that triggered this callback. + */ + readonly cache: QuantumKVCache; + + /** + * AbortSignal that will fire when the cache is disposed. + * Should be propagated to ensure smooth cleanup and shutdown. + */ + readonly disposeSignal: AbortSignal; +} + +/** + * Callback to fetch the value for a key that wasn't found in the cache, and is required to continue. + * Should return the fetched value, or null/undefined if no value exists for the given key. + * Missing keys may also produce an EntityNotFound or KeyNotFoundException exception, which will be wrapped to gracefully abort the operation. + * May be synchronous or async. + */ +export type Fetcher = (key: string, meta: CallbackMeta) => MaybePromise | null | undefined>; + +/** + * Optional callback to fetch the value for a key that wasn't found in the cache, and isn't required to continue. + * Should return the fetched value, or null/undefined if no value exists for the given key. + * Missing keys should *not* produce any exception, as it will be wrapped to gracefully abort the operation. + * May be synchronous or async. + * If not provided, then the implementation will fall back on fetcher(). + */ +export type OptionalFetcher = (key: string, meta: CallbackMeta) => MaybePromise | null | undefined>; + +/** + * Optional callback to fetch the value for multiple keys that weren't found in the cache. + * Should return the fetched values for each key, or null/undefined if no value exists for the given key. + * Missing keys may also be omitted from the response entirely, but no error should be thrown. + * May be synchronous or async. + * If not provided, then the implementation will fall back on repeated calls to optionalFetcher() or fetcher(). + */ +export type BulkFetcher = (keys: string[], meta: CallbackMeta) => MaybePromise | null | undefined]>>; + +/** + * Optional callback when one or more values are changed (created, updated, or deleted) in the cache, either locally or elsewhere in the cluster. + * This is called *after* the cache state is updated. + * May be synchronous or async. + */ +export type OnChanged = (keys: string[], meta: CallbackMeta) => MaybePromise; + +/** + * Optional callback when all values are removed from the cache, either locally or elsewhere in the cluster. + * This is called *after* the cache state is updated. + * May be synchronous or async. + */ +export type OnReset = (meta: CallbackMeta) => MaybePromise; + +type ActiveFetcher = Promise; +type ActiveOptionalFetcher = Promise; +type ActiveBulkFetcher = Promise[]>; + +// Make sure null / undefined cannot be a valid type +// https://stackoverflow.com/a/63045455 +type Value = NonNullable; +type KeyValue = [key: string, value: T]; +type Limiter = (callback: () => Promise) => Promise; +type MaybePromise = T | Promise; +type AtLeastOne = [T, ...T[]]; + export interface QuantumCacheServices extends MemoryCacheServices { /** * Event bus to attach to. @@ -64,15 +163,29 @@ export interface QuantumCacheServices extends MemoryCacheServices { * All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache. * This ensures that a call to get() will never return stale data. */ -export class QuantumKVCache implements Iterable { +export class QuantumKVCache = Value> implements Iterable { private readonly internalEventService: InternalEventService; private readonly memoryCache: MemoryKVCache; - public readonly fetcher: QuantumKVOpts['fetcher']; - public readonly bulkFetcher: QuantumKVOpts['bulkFetcher']; - public readonly onChanged: QuantumKVOpts['onChanged']; - public readonly onReset: QuantumKVOpts['onReset']; + private readonly activeFetchers = new Map>(); + private readonly activeOptionalFetchers = new Map>(); + private readonly activeBulkFetchers = new Map>(); + + private readonly globalLimiter: Limiter; + private readonly fetcherLimiter: Limiter; + private readonly optionalFetcherLimiter: Limiter; + private readonly bulkFetcherLimiter: Limiter; + + public readonly fetcher: Fetcher; + public readonly optionalFetcher: OptionalFetcher | undefined; + public readonly bulkFetcher: BulkFetcher | undefined; + public readonly onChanged: OnChanged | undefined; + public readonly onReset: OnReset | undefined; + + private readonly disposeController = new AbortController(); + private isDisposing = false; + private isDisposed = false; /** * @param name Unique name of the cache - must be the same in all processes. @@ -80,15 +193,37 @@ export class QuantumKVCache implements Iterable, + opts: QuantumKVOpts, ) { // OK: we forward all management calls to the inner cache. // eslint-disable-next-line no-restricted-syntax this.memoryCache = new MemoryKVCache(name + ':mem', services, { lifetime: opts.lifetime }); + + // Set up rate limiters + const fetcherConcurrency = opts.fetcherConcurrency + ? Math.max(opts.fetcherConcurrency, 1) + : 4; + this.fetcherLimiter = promiseLimit(fetcherConcurrency); + + const optionalFetcherConcurrency = opts.optionalFetcherConcurrency + ? Math.max(opts.optionalFetcherConcurrency, 1) + : 4; + this.optionalFetcherLimiter = promiseLimit(optionalFetcherConcurrency); + + const bulkFetcherConcurrency = opts.bulkFetcherConcurrency + ? Math.max(opts.bulkFetcherConcurrency, 1) + : 2; + this.bulkFetcherLimiter = promiseLimit(bulkFetcherConcurrency); + + const globalConcurrency = opts.maxConcurrency + ? Math.max(opts.maxConcurrency, 1) + : Math.max(fetcherConcurrency, optionalFetcherConcurrency, bulkFetcherConcurrency); + this.globalLimiter = promiseLimit(globalConcurrency); + this.fetcher = opts.fetcher; + this.optionalFetcher = opts.optionalFetcher; this.bulkFetcher = opts.bulkFetcher; this.onChanged = opts.onChanged; this.onReset = opts.onReset; @@ -104,6 +239,17 @@ export class QuantumKVCache implements Iterable { + return { + cache: this, + disposeSignal: this.disposeController.signal, + }; + } + + private get nameForError() { + return `QuantumCache[${this.name}]`; + } + /** * The number of items currently in memory. * This applies to the local subset view, not the cross-cluster cache state. @@ -112,6 +258,14 @@ export class QuantumKVCache implements Iterable { + return this.entries(); + } + /** * Iterates all [key, value] pairs in memory. * This applies to the local subset view, not the cross-cluster cache state. @@ -152,6 +306,8 @@ export class QuantumKVCache implements Iterable { + this.throwIfDisposed(); + if (this.memoryCache.get(key) === value) { return; } @@ -161,7 +317,7 @@ export class QuantumKVCache implements Iterable implements Iterable): Promise { + this.throwIfDisposed(); + const changedKeys: string[] = []; for (const item of items) { @@ -185,7 +343,7 @@ export class QuantumKVCache implements Iterable implements Iterable implements Iterable): void { + this.throwIfDisposed(); + for (const [key, value] of items) { this.memoryCache.set(key, value); } @@ -242,14 +404,16 @@ export class QuantumKVCache implements Iterable { + this.throwIfDisposed(); + let value = this.memoryCache.get(key); if (value == null) { - value = await this.callFetch(key); + value = await this.doFetch(key); this.memoryCache.set(key, value); if (this.onChanged) { - await this.onChanged([key], this); + await this.onChanged([key], this.callbackMeta); } } return value; @@ -261,12 +425,14 @@ export class QuantumKVCache implements Iterable { + this.throwIfDisposed(); + let value = this.memoryCache.get(key); if (value != null) { return value; } - value = await this.callFetchMaybe(key); + value = await this.doFetchMaybe(key); if (value == null) { return undefined; } @@ -274,7 +440,7 @@ export class QuantumKVCache implements Iterable implements Iterable): Promise> { + this.throwIfDisposed(); + const results: [key: string, value: T][] = []; const toFetch: string[] = []; @@ -302,7 +470,7 @@ export class QuantumKVCache implements Iterable 0) { - const fetched = await this.bulkFetch(toFetch); + const fetched = await this.doFetchMany(toFetch); // Add to cache and return set this.addMany(fetched); @@ -310,7 +478,7 @@ export class QuantumKVCache implements Iterable implements Iterable { + this.throwIfDisposed(); + this.memoryCache.delete(key); await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] }); if (this.onChanged) { - await this.onChanged([key], this); + await this.onChanged([key], this.callbackMeta); } } /** @@ -347,6 +517,8 @@ export class QuantumKVCache implements Iterable): Promise { + this.throwIfDisposed(); + const deleted: string[] = []; for (const key of keys) { @@ -361,7 +533,7 @@ export class QuantumKVCache implements Iterable implements Iterable { - const value = await this.callFetch(key); + this.throwIfDisposed(); + + const value = await this.doFetch(key); await this.set(key, value); return value; } @bindThis public async refreshMany(keys: Iterable): Promise> { + this.throwIfDisposed(); + const toFetch = Array.from(keys); - const fetched = await this.bulkFetch(toFetch); + const fetched = await this.doFetchMany(toFetch); await this.setMany(fetched); return makeKVPArray(fetched); } @@ -390,6 +566,8 @@ export class QuantumKVCache implements Iterable implements Iterable { + this.throwIfDisposed(); + this.clear(); await this.internalEventService.emit('quantumCacheReset', { name: this.name }); if (this.onReset) { - await this.onReset(this); + await this.onReset(this.callbackMeta); } } @@ -412,7 +592,9 @@ export class QuantumKVCache implements Iterable implements Iterable { + if (this.isDisposed) return; + this.isDisposing = true; - this.memoryCache.dispose(); + try { + // Stop handling events *first* + this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated); + this.internalEventService.off('quantumCacheReset', this.onQuantumCacheReset); + + // Kill active fetchers + const error = new DisposingError({ source: this.nameForError }); + this.disposeController.abort(error); + + // Wait for cleanup + await Promise.allSettled([ + ...this.activeFetchers.values().map(p => trackPromise(p)), + ...this.activeOptionalFetchers.values().map(p => trackPromise(p)), + ...this.activeBulkFetchers.values().map(p => trackPromise(p)), + ]); + + // Purge memory for faster GC + this.activeFetchers.clear(); + this.activeOptionalFetchers.clear(); + this.activeBulkFetchers.clear(); + this.memoryCache.dispose(); + } finally { + this.isDisposing = false; + this.isDisposed = true; + } } @bindThis - private async bulkFetch(keys: string[]): Promise<[key: string, value: T][]> { - // Use the bulk fetcher if available. - if (this.bulkFetcher) { - try { - const results = await this.bulkFetcher(keys, this); - return Array.from(results); - } catch (err) { - throw new FetchFailedError(this.name, keys, renderInlineError(err), { cause: err }); - } - } + private async onQuantumCacheUpdated(data: EventTypes['quantumCacheUpdated']): Promise { + this.throwIfDisposed(); - // Otherwise fall back to regular fetch. - const results: [key: string, value: T][] = []; - for (const key of keys) { - const value = await this.callFetchMaybe(key); - if (value != null) { - results.push([key, value]); - } - } - return results; - } - - @bindThis - private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise { if (data.name === this.name) { for (const key of data.keys) { this.memoryCache.delete(key); } if (this.onChanged) { - await this.onChanged(data.keys, this); + await this.onChanged(data.keys, this.callbackMeta); } } } @bindThis - private async callFetch(key: string): Promise { - const value = await this.callFetchMaybe(key); + private async onQuantumCacheReset(data: EventTypes['quantumCacheReset']): Promise { + this.throwIfDisposed(); - if (value == null) { - throw new KeyNotFoundError(this.name, key); - } - - return value; - } - - @bindThis - private async callFetchMaybe(key: string): Promise { - try { - const value = await this.fetcher(key, this); - return value ?? undefined; - } catch (err) { - if (err instanceof EntityNotFoundError) { - return undefined; - } - - throw new FetchFailedError(this.name, key, renderInlineError(err), { cause: err }); - } - } - - @bindThis - private async onQuantumCacheReset(data: InternalEventTypes['quantumCacheReset']): Promise { if (data.name === this.name) { this.clear(); if (this.onReset) { - await this.onReset(this); + await this.onReset(this.callbackMeta); } } } /** - * Iterates all [key, value] pairs in memory. - * This applies to the local subset view, not the cross-cluster cache state. + * Executes a fetch operation and translates results. + * Always uses fetcher(). + * Concurrent calls for the same key are de-duplicated. */ - [Symbol.iterator](): Iterator<[key: string, value: T]> { - return this.entries(); + @bindThis + private doFetch(key: string): Promise> { + // De-duplicate call + let promise = this.activeFetchers.get(key); + if (!promise) { + // Start new call + const fetchPromise = promiseTry(this.callFetcher, key) + .catch(async err => { + if (err instanceof QuantumCacheError && err.cacheName === this.nameForError) { + throw err; + } + + if (err instanceof EntityNotFoundError) { + throw new KeyNotFoundError(this.nameForError, key, renderInlineError(err), { cause: err }); + } + + throw new FetchFailedError(this.nameForError, key, renderInlineError(err), { cause: err }); + }) + .then(async result => { + if (result == null) { + throw new KeyNotFoundError(this.nameForError, key); + } + return result; + }); + + // Untrack when it finalizes + const cleanupCallback = async () => { + if (this.activeFetchers.get(key) === promise) { + this.activeFetchers.delete(key); + } else { + throw new QuantumCacheError(this.nameForError, `Internal error: fetcher race detected for key "${key}"`); + } + }; + promise = withCleanup(fetchPromise, cleanupCallback); + + // Track it!! + this.activeFetchers.set(key, promise); + } + + return promise; + } + + /** + * Executes a fetchMaybe operation and translates results. + * Automatically uses the best available fetch implementation. + * Concurrent calls for the same key are de-duplicated. + */ + @bindThis + private doFetchMaybe(key: string): Promise | undefined> { + // De-duplicate call + let promise = this.activeOptionalFetchers.get(key); + if (!promise) { + // Use optionalFetcher() if available + if (this.optionalFetcher != null) { + // Start new call + const fetchPromise = promiseTry(this.callOptionalFetcher, key) + .catch(async err => { + if (err instanceof QuantumCacheError && err.cacheName === this.nameForError) { + throw err; + } + + throw new FetchFailedError(this.nameForError, key, renderInlineError(err), { cause: err }); + }) + .then(result => result ?? undefined); + + // Untrack when it finalizes + const cleanupCallback = async () => { + if (this.activeOptionalFetchers.get(key) === promise) { + this.activeOptionalFetchers.delete(key); + } else { + throw new QuantumCacheError(this.nameForError, `Internal error: optionalFetcher race detected for key "${key}"`); + } + }; + promise = withCleanup(fetchPromise, cleanupCallback); + + // Track it!! + this.activeOptionalFetchers.set(key, promise); + } else { + // Fall back on fetcher() if optionalFetcher() is unavailable + promise = promiseTry(this.doFetch, key) + .catch(async err => { + if (err instanceof KeyNotFoundError) { + return undefined; + } + + throw err; + }); + } + } + + // Await result + return promise; + } + + /** + * Executes a fetchMany operation and translates results. + * Automatically uses the best available fetch implementation. + * Concurrent calls for the same key are de-duplicated. + */ + @bindThis + private doFetchMany(keys: string[]): Promise<[key: string, value: Value][]> { + const uniqueKeys = new Set(keys); + const fetcherPromises = new Map>(); + const optionalFetcherPromises = new Map>(); + const bulkFetcherPromises = new Set>(); + const remainingKeys: string[] = []; + + // If any keys are covered by an active promise, then re-use it to avoid duplicate fetches. + for (const key of uniqueKeys) { + // Re-use an optionalFetcher() call + const optionalPromise = this.activeOptionalFetchers.get(key); + if (optionalPromise) { + optionalFetcherPromises.set(key, optionalPromise); + continue; + } + + // Re-use a fetcher() call + const fetchPromise = this.activeFetchers.get(key); + if (fetchPromise) { + fetcherPromises.set(key, fetchPromise); + continue; + } + + // Re-use a bulkFetcher() call + const bulkPromise = this.activeBulkFetchers.get(key); + if (bulkPromise) { + bulkFetcherPromises.add(bulkPromise); + continue; + } + + // Queue up for a new bulkFetcher() call + remainingKeys.push(key); + } + + // Start a new fetch for any keys that weren't already covered. + if (hasAtLeastOne(remainingKeys)) { + if (remainingKeys.length > 1 && this.bulkFetcher != null) { + // Use the bulk fetcher if available + const promise = this.callBulkFetcherWithTracking(remainingKeys); + bulkFetcherPromises.add(promise); + } else { + // Otherwise fall back to single fetcher + for (const key of remainingKeys) { + const promise = this.doFetchMaybe(key); + optionalFetcherPromises.set(key, promise); + } + } + } + + return Promise + // Wrap all promises into a common shape + .allSettled[]>([ + ...fetcherPromises + .entries() + .map(([key, promise]) => promise + .catch(async err => { + if (err instanceof KeyNotFoundError) { + return undefined; + } + throw err; + }) + .then(value => { + if (value === undefined) { + return []; + } + return [[key, value]] as KeyValue[]; + })), + ...optionalFetcherPromises + .entries() + .map(([key, promise]) => promise.then(value => { + if (value === undefined) { + return []; + } + return [[key, value]] as KeyValue[]; + })), + ...bulkFetcherPromises, + ]) + // Unpack results and handle errors + .then(async promiseResults => { + const results: KeyValue[][] = []; + const errors: unknown[] = []; + + for (const pr of promiseResults) { + if (pr.status === 'fulfilled') { + results.push(pr.value); + } else { + errors.push(pr.reason); + } + } + + if (errors.length === 1) { + const innerException = errors[0]; + throw new FetchFailedError(this.nameForError, keys, renderInlineError(innerException), { cause: innerException }); + } else if (errors.length > 1) { + const innerException = new AggregateError(errors); + throw new FetchFailedError(this.nameForError, keys, 'Multiple exceptions thrown; see inner exception (cause) for details', { cause: innerException }); + } + + return results.flat(); + }); + } + + /** + * Calls fetcher(). + * Do not call this directly - use doFetch() instead! + */ + @bindThis + private callFetcher(key: string): Promise { + // Safety check, in case this gets called directly by mistake + this.throwIfDisposed(); + if (this.activeFetchers.has(key)) { + throw new QuantumCacheError(this.nameForError, `Internal error: attempted to call fetcher multiple times for key "${key}"`); + } + + // Start limiter cascade + return this.globalLimiter(async () => { + this.throwIfDisposed(); + + return await this.fetcherLimiter(async () => { + this.throwIfDisposed(); + + return await withSignal( + // Execute callback and adapt results + async () => await this.fetcher(key, this.callbackMeta), + + // Bind abort signal in case fetcher stalls out + this.disposeController.signal, + ); + }); + }); + } + + /** + * Calls optionalFetcher(). + * Do not call this directly - use doFetchMaybe() instead! + */ + @bindThis + private callOptionalFetcher(key: string): Promise { + // Safety checks, in case this gets called directly by mistake + this.throwIfDisposed(); + const optionalFetcher = this.optionalFetcher; + if (optionalFetcher == null) { + throw new QuantumCacheError(this.nameForError, 'Internal error: attempted to call optionalFetcher for a cache that doesn\'t support it'); + } + if (this.activeOptionalFetchers.has(key)) { + throw new QuantumCacheError(this.nameForError, `Internal error: attempted to call optionalFetcher multiple times for key "${key}"`); + } + + // Start limiter cascade + return this.globalLimiter(async () => { + this.throwIfDisposed(); + + return await this.optionalFetcherLimiter(async () => { + this.throwIfDisposed(); + + return await withSignal( + // Execute callback and adapt results + async () => await optionalFetcher(key, this.callbackMeta), + + // Bind abort signal in case fetcher stalls out + this.disposeController.signal, + ); + }); + }); + } + + /** + * Calls bulkFetcher() and tracks the promise. + * Do not call this directly - use doBulkFetch() instead! + */ + @bindThis + private callBulkFetcherWithTracking(keys: AtLeastOne): ActiveBulkFetcher { + // Start new call + const fetchPromise = promiseTry(this.callBulkFetcher, keys) + .then(results => Array.from(results).filter((result): result is KeyValue => { + return result[1] != null; + })); + + // Untrack when it finalizes + const cleanupCallback = async () => { + const racedKeys: string[] = []; + + for (const key of keys) { + if (this.activeBulkFetchers.get(key) === promise) { + this.activeBulkFetchers.delete(key); + } else { + racedKeys.push(key); + } + } + + if (racedKeys.length > 0) { + const allKeys = racedKeys.map(k => `"${k}"`).join(', '); + throw new QuantumCacheError(this.nameForError, `Internal error: bulkFetcher race detected for key(s) ${allKeys}`); + } + }; + const promise = withCleanup(fetchPromise, cleanupCallback); + + // Track it!! + for (const key of keys) { + this.activeBulkFetchers.set(key, promise); + } + + return promise; + } + + /** + * Calls bulkFetcher(). + * Do not call this directly - use bulkFetch() instead! + */ + @bindThis + private callBulkFetcher(keys: AtLeastOne): Promise>> { + // Safety checks, in case this gets called directly by mistake + const bulkFetcher = this.bulkFetcher; + this.throwIfDisposed(); + if (bulkFetcher == null) { + throw new QuantumCacheError(this.nameForError, 'Internal error: attempted to call bulkFetcher for a cache that doesn\'t support it'); + } + const duplicateKeys = keys.filter(key => this.activeBulkFetchers.has(key)); + if (duplicateKeys.length > 0) { + const allKeys = duplicateKeys.map(k => `"${k}"`).join(', '); + throw new QuantumCacheError(this.nameForError, `Internal error: attempted to call bulkFetcher multiple times for key(s) ${allKeys}`); + } + + // Start limiter cascade + return this.globalLimiter(async () => { + this.throwIfDisposed(); + + return await this.bulkFetcherLimiter(async () => { + this.throwIfDisposed(); + + return await withSignal( + // Execute callback and adapt results + async () => await bulkFetcher(keys, this.callbackMeta), + + // Bind abort signal in case fetcher stalls out + this.disposeController.signal, + ); + }); + }); + } + + @bindThis + private throwIfDisposed() { + if (this.isDisposing) { + throw new DisposingError({ source: this.nameForError }); + } + if (this.isDisposed) { + throw new DisposedError({ source: this.nameForError }); + } } } +function hasAtLeastOne(array: T[]): array is AtLeastOne { + return array.length > 0; +} diff --git a/packages/backend/test/unit/misc/QuantumKVCache.ts b/packages/backend/test/unit/misc/QuantumKVCache.ts deleted file mode 100644 index 21498738a4..0000000000 --- a/packages/backend/test/unit/misc/QuantumKVCache.ts +++ /dev/null @@ -1,994 +0,0 @@ -/* - * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import { jest } from '@jest/globals'; -import { GodOfTimeService } from '../../misc/GodOfTimeService.js'; -import { MockInternalEventService } from '../../misc/MockInternalEventService.js'; -import { QuantumKVCache, QuantumKVOpts } from '@/misc/QuantumKVCache.js'; -import { KeyNotFoundError } from '@/misc/errors/KeyNotFoundError.js'; -import { FetchFailedError } from '@/misc/errors/FetchFailedError.js'; - -describe(QuantumKVCache, () => { - let fakeTimeService: GodOfTimeService; - let fakeInternalEventService: MockInternalEventService; - let madeCaches: { dispose: () => void }[]; - - function makeCache(opts?: Partial> & { name?: string }): QuantumKVCache { - const _opts = { - name: 'test', - lifetime: Infinity, - fetcher: () => { throw new Error('not implemented'); }, - } satisfies QuantumKVOpts & { name: string }; - - if (opts) { - Object.assign(_opts, opts); - } - - const services = { - internalEventService: fakeInternalEventService, - timeService: fakeTimeService, - }; - - const cache = new QuantumKVCache(_opts.name, services, _opts); - madeCaches.push(cache); - return cache; - } - - beforeEach(() => { - madeCaches = []; - fakeTimeService = new GodOfTimeService(); - fakeInternalEventService = new MockInternalEventService(); - }); - - afterEach(() => { - madeCaches.forEach(cache => { - cache.dispose(); - }); - }); - - it('should connect on construct', () => { - makeCache(); - - expect(fakeInternalEventService._calls).toContainEqual(['on', ['quantumCacheUpdated', expect.anything(), { ignoreLocal: true }]]); - }); - - it('should disconnect on dispose', () => { - const cache = makeCache(); - - cache.dispose(); - - const callback = fakeInternalEventService._calls - .find(c => c[0] === 'on' && c[1][0] === 'quantumCacheUpdated') - ?.[1][1]; - expect(fakeInternalEventService._calls).toContainEqual(['off', ['quantumCacheUpdated', callback]]); - }); - - it('should store in memory cache', async () => { - const cache = makeCache(); - - await cache.set('foo', 'bar'); - await cache.set('alpha', 'omega'); - - const result1 = await cache.get('foo'); - const result2 = await cache.get('alpha'); - - expect(result1).toBe('bar'); - expect(result2).toBe('omega'); - }); - - it('should emit event when storing', async () => { - const cache = makeCache({ name: 'fake' }); - - await cache.set('foo', 'bar'); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); - }); - - it('should call onChanged when storing', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await cache.set('foo', 'bar'); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); - }); - - it('should not emit event when storing unchanged value', async () => { - const cache = makeCache({ name: 'fake' }); - - await cache.set('foo', 'bar'); - await cache.set('foo', 'bar'); - - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - }); - - it('should not call onChanged when storing unchanged value', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await cache.set('foo', 'bar'); - await cache.set('foo', 'bar'); - - expect(fakeOnChanged).toHaveBeenCalledTimes(1); - }); - - it('should fetch an unknown value', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - const result = await cache.fetch('foo'); - - expect(result).toBe('value#foo'); - }); - - it('should store fetched value in memory cache', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.fetch('foo'); - - const result = cache.has('foo'); - expect(result).toBe(true); - }); - - it('should call onChanged when fetching', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - onChanged: fakeOnChanged, - }); - - await cache.fetch('foo'); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); - }); - - it('should not emit event when fetching', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.fetch('foo'); - - expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); - }); - - it('should delete from memory cache', async () => { - const cache = makeCache(); - - await cache.set('foo', 'bar'); - await cache.delete('foo'); - - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should call onChanged when deleting', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await cache.set('foo', 'bar'); - await cache.delete('foo'); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); - }); - - it('should emit event when deleting', async () => { - const cache = makeCache({ name: 'fake' }); - - await cache.set('foo', 'bar'); - await cache.delete('foo'); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); - }); - - it('should delete when receiving set event', async () => { - const cache = makeCache({ name: 'fake' }); - await cache.set('foo', 'bar'); - - await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); - - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should call onChanged when receiving set event', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); - }); - - it('should delete when receiving delete event', async () => { - const cache = makeCache({ name: 'fake' }); - await cache.set('foo', 'bar'); - - await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); - - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should call onChanged when receiving delete event', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - await cache.set('foo', 'bar'); - - await fakeInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); - }); - - describe('get', () => { - it('should return value if present', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = cache.get('foo'); - - expect(result).toBe('bar'); - }); - it('should return undefined if missing', () => { - const cache = makeCache(); - - const result = cache.get('foo'); - - expect(result).toBe(undefined); - }); - }); - - describe('setMany', () => { - it('should populate all values', async () => { - const cache = makeCache(); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(cache.has('foo')).toBe(true); - expect(cache.has('alpha')).toBe(true); - }); - - it('should emit one event', async () => { - const cache = makeCache({ - name: 'fake', - }); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - }); - - it('should call onChanged once with all items', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); - expect(fakeOnChanged).toHaveBeenCalledTimes(1); - }); - - it('should emit events only for changed items', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await cache.set('foo', 'bar'); - fakeOnChanged.mockClear(); - fakeInternalEventService.mockReset(); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['alpha'] }]]); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], cache); - expect(fakeOnChanged).toHaveBeenCalledTimes(1); - }); - }); - - describe('getMany', () => { - it('should return empty for empty input', () => { - const cache = makeCache(); - const result = cache.getMany([]); - expect(result).toEqual([]); - }); - - it('should return the value for all keys', () => { - const cache = makeCache(); - cache.add('foo', 'bar'); - cache.add('alpha', 'omega'); - - const result = cache.getMany(['foo', 'alpha']); - - expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]); - }); - - it('should return undefined for missing keys', () => { - const cache = makeCache(); - cache.add('foo', 'bar'); - - const result = cache.getMany(['foo', 'alpha']); - - expect(result).toEqual([['foo', 'bar'], ['alpha', undefined]]); - }); - }); - - describe('fetch', () => { - it('should throw FetchFailedError when fetcher throws error', async () => { - const cache = makeCache({ - fetcher: () => { throw new Error('test error'); }, - }); - - await expect(cache.fetch('foo')).rejects.toThrow(FetchFailedError); - }); - - it('should throw KeyNotFoundError when fetcher returns null', async () => { - const cache = makeCache({ - fetcher: () => null, - }); - - await expect(cache.fetch('foo')).rejects.toThrow(KeyNotFoundError); - }); - - it('should throw KeyNotFoundError when fetcher undefined', async () => { - const cache = makeCache({ - fetcher: () => undefined, - }); - - await expect(cache.fetch('foo')).rejects.toThrow(KeyNotFoundError); - }); - }); - - describe('fetchMaybe', () => { - it('should return value when found by fetcher', async () => { - const cache = makeCache({ - fetcher: () => 'bar', - }); - - const result = await cache.fetchMaybe('foo'); - - expect(result).toBe('bar'); - }); - - it('should call onChanged when found by fetcher', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - fetcher: () => 'bar', - onChanged: fakeOnChanged, - }); - - await cache.fetchMaybe('foo'); - - expect(fakeOnChanged).toHaveBeenCalled(); - }); - - it('should return undefined when fetcher returns undefined', async () => { - const cache = makeCache({ - fetcher: () => undefined, - }); - - const result = await cache.fetchMaybe('foo'); - - expect(result).toBe(undefined); - }); - - it('should not call onChanged when fetcher returns undefined', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - fetcher: () => undefined, - onChanged: fakeOnChanged, - }); - - await cache.fetchMaybe('foo'); - - expect(fakeOnChanged).not.toHaveBeenCalled(); - }); - - it('should return undefined when fetcher returns null', async () => { - const cache = makeCache({ - fetcher: () => null, - }); - - const result = await cache.fetchMaybe('foo'); - - expect(result).toBe(undefined); - }); - - it('should not call onChanged when fetcher returns null', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - fetcher: () => null, - onChanged: fakeOnChanged, - }); - - await cache.fetchMaybe('foo'); - - expect(fakeOnChanged).not.toHaveBeenCalled(); - }); - - it('should throw FetchFailedError when fetcher throws error', async () => { - const cache = makeCache({ - fetcher: () => { throw new Error('test error'); }, - }); - - await expect(cache.fetchMaybe('foo')).rejects.toThrow(FetchFailedError); - }); - }); - - describe('fetchMany', () => { - it('should do nothing for empty input', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onChanged: fakeOnChanged, - }); - - await cache.fetchMany([]); - - expect(fakeOnChanged).not.toHaveBeenCalled(); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - - it('should return existing items', async () => { - const cache = makeCache(); - cache.add('foo', 'bar'); - cache.add('alpha', 'omega'); - - const result = await cache.fetchMany(['foo', 'alpha']); - - expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]); - }); - - it('should return existing items without events', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onChanged: fakeOnChanged, - }); - cache.add('foo', 'bar'); - cache.add('alpha', 'omega'); - - await cache.fetchMany(['foo', 'alpha']); - - expect(fakeOnChanged).not.toHaveBeenCalled(); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - - it('should call bulkFetcher for missing items', async () => { - const cache = makeCache({ - bulkFetcher: keys => keys.map(k => [k, `${k}#many`]), - fetcher: key => `${key}#single`, - }); - - const results = await cache.fetchMany(['foo', 'alpha']); - - expect(results).toEqual([['foo', 'foo#many'], ['alpha', 'alpha#many']]); - }); - - it('should call bulkFetcher only once', async () => { - const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); - const cache = makeCache({ - bulkFetcher: mockBulkFetcher, - }); - - await cache.fetchMany(['foo', 'bar']); - - expect(mockBulkFetcher).toHaveBeenCalledTimes(1); - }); - - it('should call fetcher when fetchMany is undefined', async () => { - const cache = makeCache({ - fetcher: key => `${key}#single`, - }); - - const results = await cache.fetchMany(['foo', 'alpha']); - - expect(results).toEqual([['foo', 'foo#single'], ['alpha', 'alpha#single']]); - }); - - it('should call onChanged', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onChanged: fakeOnChanged, - fetcher: k => k, - }); - - await cache.fetchMany(['foo', 'alpha']); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); - expect(fakeOnChanged).toHaveBeenCalledTimes(1); - }); - - it('should call onChanged only for changed', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onChanged: fakeOnChanged, - fetcher: k => k, - }); - cache.add('foo', 'bar'); - - await cache.fetchMany(['foo', 'alpha']); - - expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], cache); - expect(fakeOnChanged).toHaveBeenCalledTimes(1); - }); - - it('should not emit event', async () => { - const cache = makeCache({ - fetcher: k => k, - }); - - await cache.fetchMany(['foo', 'alpha']); - - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - }); - - describe('refreshMany', () => { - it('should do nothing for empty input', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onChanged: fakeOnChanged, - }); - - const result = await cache.refreshMany([]); - - expect(result).toEqual([]); - expect(fakeOnChanged).not.toHaveBeenCalled(); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - - it('should call bulkFetcher for all keys', async () => { - const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); - const cache = makeCache({ - bulkFetcher: mockBulkFetcher, - }); - - const result = await cache.refreshMany(['foo', 'alpha']); - - expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]); - expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], cache); - expect(mockBulkFetcher).toHaveBeenCalledTimes(1); - }); - - it('should replace any existing keys', async () => { - const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); - const cache = makeCache({ - bulkFetcher: mockBulkFetcher, - }); - cache.add('foo', 'bar'); - - const result = await cache.refreshMany(['foo', 'alpha']); - - expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]); - expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], cache); - expect(mockBulkFetcher).toHaveBeenCalledTimes(1); - }); - - it('should call onChanged for all keys', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - bulkFetcher: keys => keys.map(k => [k, `${k}#value`]), - onChanged: fakeOnChanged, - }); - cache.add('foo', 'bar'); - - await cache.refreshMany(['foo', 'alpha']); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); - expect(fakeOnChanged).toHaveBeenCalledTimes(1); - }); - - it('should emit event for all keys', async () => { - const cache = makeCache({ - name: 'fake', - bulkFetcher: keys => keys.map(k => [k, `${k}#value`]), - }); - cache.add('foo', 'bar'); - - await cache.refreshMany(['foo', 'alpha']); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - }); - - it('should throw FetchFailedError when bulk fetcher throws error', async () => { - const cache = makeCache({ - bulkFetcher: () => { throw new Error('test error'); }, - }); - - await expect(cache.refreshMany(['foo'])).rejects.toThrow(FetchFailedError); - }); - - it('should throw FetchFailedError when fallback fetcher throws error', async () => { - const cache = makeCache({ - fetcher: () => { throw new Error('test error'); }, - }); - - await expect(cache.refreshMany(['foo'])).rejects.toThrow(FetchFailedError); - }); - - it('should not throw when fallback fetcher returns null', async () => { - const cache = makeCache({ - fetcher: () => null, - }); - - const result = await cache.refreshMany(['foo']); - - expect(result).toHaveLength(0); - }); - - it('should not throw when fallback fetcher returns undefined', async () => { - const cache = makeCache({ - fetcher: () => undefined, - }); - - const result = await cache.refreshMany(['foo']); - - expect(result).toHaveLength(0); - }); - - it('should not throw when bulk fetcher returns empty', async () => { - const cache = makeCache({ - bulkFetcher: () => [], - }); - - const result = await cache.refreshMany(['foo']); - - expect(result).toHaveLength(0); - }); - }); - - describe('deleteMany', () => { - it('should remove keys from memory cache', async () => { - const cache = makeCache(); - - await cache.set('foo', 'bar'); - await cache.set('alpha', 'omega'); - await cache.deleteMany(['foo', 'alpha']); - - expect(cache.has('foo')).toBe(false); - expect(cache.has('alpha')).toBe(false); - }); - - it('should emit only one event', async () => { - const cache = makeCache({ - name: 'fake', - }); - - await cache.deleteMany(['foo', 'alpha']); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - }); - - it('should call onChanged once with all items', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await cache.deleteMany(['foo', 'alpha']); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); - expect(fakeOnChanged).toHaveBeenCalledTimes(1); - }); - - it('should do nothing if no keys are provided', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onChanged: fakeOnChanged, - }); - - await cache.deleteMany([]); - - expect(fakeOnChanged).not.toHaveBeenCalled(); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - }); - - describe('refresh', () => { - it('should populate the value', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.refresh('foo'); - - const result = cache.has('foo'); - expect(result).toBe(true); - }); - - it('should return the value', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - const result = await cache.refresh('foo'); - - expect(result).toBe('value#foo'); - }); - - it('should replace the value if it exists', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.set('foo', 'bar'); - const result = await cache.refresh('foo'); - - expect(result).toBe('value#foo'); - }); - - it('should call onChanged', async () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - onChanged: fakeOnChanged, - }); - - await cache.refresh('foo'); - - expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); - }); - - it('should emit event', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.refresh('foo'); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); - }); - }); - - describe('reset', () => { - it('should erase all items', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - await cache.set('alpha', 'omega'); - - await cache.reset(); - - expect(cache.size).toBe(0); - }); - - it('should call onReset', async () => { - const fakeOnReset = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onReset: fakeOnReset, - }); - await cache.set('foo', 'bar'); - await cache.set('alpha', 'omega'); - - await cache.reset(); - - expect(fakeOnReset).toHaveBeenCalled(); - }); - - it('should emit event', async () => { - const cache = makeCache({ - name: 'fake', - }); - await cache.set('foo', 'bar'); - await cache.set('alpha', 'omega'); - - await cache.reset(); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheReset', { name: 'fake' }]]); - }); - }); - - describe('add', () => { - it('should add the item', () => { - const cache = makeCache(); - cache.add('foo', 'bar'); - expect(cache.has('foo')).toBe(true); - }); - - it('should not emit event', () => { - const cache = makeCache({ - name: 'fake', - }); - - cache.add('foo', 'bar'); - - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - - it('should not call onChanged', () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onChanged: fakeOnChanged, - }); - - cache.add('foo', 'bar'); - - expect(fakeOnChanged).not.toHaveBeenCalled(); - }); - }); - - describe('addMany', () => { - it('should add all items', () => { - const cache = makeCache(); - - cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(cache.has('foo')).toBe(true); - expect(cache.has('alpha')).toBe(true); - }); - - it('should not emit event', () => { - const cache = makeCache({ - name: 'fake', - }); - - cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - - it('should not call onChanged', () => { - const fakeOnChanged = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onChanged: fakeOnChanged, - }); - - cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeOnChanged).not.toHaveBeenCalled(); - }); - }); - - describe('has', () => { - it('should return false when empty', () => { - const cache = makeCache(); - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should return false when value is not in memory', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = cache.has('alpha'); - - expect(result).toBe(false); - }); - - it('should return true when value is in memory', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = cache.has('foo'); - - expect(result).toBe(true); - }); - }); - - describe('size', () => { - it('should return 0 when empty', () => { - const cache = makeCache(); - expect(cache.size).toBe(0); - }); - - it('should return correct size when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - expect(cache.size).toBe(1); - }); - }); - - describe('entries', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache.entries()); - - expect(result).toHaveLength(0); - }); - - it('should return all entries when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache.entries()); - - expect(result).toEqual([['foo', 'bar']]); - }); - }); - - describe('keys', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache.keys()); - - expect(result).toHaveLength(0); - }); - - it('should return all keys when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache.keys()); - - expect(result).toEqual(['foo']); - }); - }); - - describe('values', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache.values()); - - expect(result).toHaveLength(0); - }); - - it('should return all values when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache.values()); - - expect(result).toEqual(['bar']); - }); - }); - - describe('[Symbol.iterator]', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache); - - expect(result).toHaveLength(0); - }); - - it('should return all entries when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache); - - expect(result).toEqual([['foo', 'bar']]); - }); - }); -}); diff --git a/packages/backend/test/unit/misc/QuantumKVCacheTests.ts b/packages/backend/test/unit/misc/QuantumKVCacheTests.ts new file mode 100644 index 0000000000..560e1491ef --- /dev/null +++ b/packages/backend/test/unit/misc/QuantumKVCacheTests.ts @@ -0,0 +1,1560 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { jest } from '@jest/globals'; +import { GodOfTimeService } from '../../misc/GodOfTimeService.js'; +import { MockInternalEventService } from '../../misc/MockInternalEventService.js'; +import * as assert from '../../misc/custom-assertions.js'; +import { QuantumKVCache, type QuantumKVOpts } from '@/misc/QuantumKVCache.js'; +import { KeyNotFoundError } from '@/misc/errors/KeyNotFoundError.js'; +import { FetchFailedError } from '@/misc/errors/FetchFailedError.js'; +import { DisposedError, DisposingError } from '@/misc/errors/DisposeError.js'; + +describe(QuantumKVCache, () => { + let mockTimeService: GodOfTimeService; + let mockInternalEventService: MockInternalEventService; + let madeCaches: QuantumKVCache[] = []; + + function makeCache(opts?: Partial> & { name?: string }): QuantumKVCache { + const _opts = { + name: expect.getState().currentTestName || 'test', + lifetime: Infinity, + fetcher: () => { throw new Error('not implemented'); }, + } satisfies QuantumKVOpts & { name: string }; + + if (opts) { + Object.assign(_opts, opts); + } + + const services = { + internalEventService: mockInternalEventService, + timeService: mockTimeService, + }; + + const cache = new QuantumKVCache(_opts.name, services, _opts); + madeCaches.push(cache); + return cache; + } + + beforeAll(() => { + mockTimeService = new GodOfTimeService(); + mockInternalEventService = new MockInternalEventService(); + }); + + afterEach(async () => { + for (const cache of madeCaches) { + await cache.dispose(); + } + madeCaches = []; + mockTimeService.reset(); + mockInternalEventService.mockReset(); + }); + + describe('dispose', () => { + it('should disconnect events', async () => { + const cache = makeCache(); + + await cache.dispose(); + + expect(mockInternalEventService._calls).toContainEqual(['off', ['quantumCacheUpdated', expect.anything()]]); + expect(mockInternalEventService._calls).toContainEqual(['off', ['quantumCacheReset', expect.anything()]]); + }); + + it('should clear memory cache', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + await cache.dispose(); + + expect(cache.size).toBe(0); + }); + + it('should prevent future calls', async () => { + const cache = makeCache(); + + await cache.dispose(); + + await assert.throwsAsync(DisposedError, async () => { + return await cache.set('foo', 'bar'); + }); + }); + + it('should pass dispose signal to fetchers', async () => { + let abortReason: unknown = undefined; + const cache = makeCache({ + fetcher: (key, meta) => { + meta.disposeSignal.addEventListener('abort', () => { + abortReason = meta.disposeSignal.reason; + }, { once: true }); + return `${key}#value`; + }, + }); + await cache.fetch('foo'); + + await cache.dispose(); + + expect(abortReason).toBeDefined(); + expect(abortReason).toBeInstanceOf(DisposingError); + }); + + it('should abort active fetches', async () => { + const testReady = Promise.withResolvers(); + const testComplete = Promise.withResolvers(); + const cache = makeCache({ + fetcher: async () => { + testReady.resolve(); + await testComplete.promise; + return 'test ending'; + }, + }); + const promise = cache.fetch('foo').finally(() => {}); + await testReady.promise; + + // must be in here: + await cache.dispose(); + + await assert.rejectsAsync(FetchFailedError, promise); + testComplete.resolve(); + }); + }); + + describe('set', () => { + it('should store in memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + + const result1 = cache.get('foo'); + expect(result1).toBe('bar'); + const result2 = cache.get('alpha'); + expect(result2).toBe('omega'); + }); + + it('should emit event when storing', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); + }); + + it('should call onChanged when storing', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.set('foo', 'bar'); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], expect.objectContaining({ cache })); + }); + + it('should not emit event when storing unchanged value', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.set('foo', 'bar'); + + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should not call onChanged when storing unchanged value', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.set('foo', 'bar'); + await cache.set('foo', 'bar'); + + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + }); + + describe('constructor', () => { + it('should connect quantumCacheUpdated event', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onChanged: fakeOnChanged, + }); + await cache.set('foo', 'foo'); + await cache.set('bar', 'bar'); + + await mockInternalEventService.mockEmit('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); + + expect(cache.size).toBe(1); + expect(cache.has('foo')).toBe(false); + expect(cache.has('bar')).toBe(true); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], expect.objectContaining({ cache })); + expect(mockInternalEventService._calls).toContainEqual(['on', ['quantumCacheUpdated', expect.anything(), { ignoreLocal: true }]]); + }); + + it('should connect quantumCacheReset event', async () => { + const fakeOnReset = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onReset: fakeOnReset, + }); + await cache.set('foo', 'foo'); + await cache.set('bar', 'bar'); + + await mockInternalEventService.mockEmit('quantumCacheReset', { name: 'fake' }); + + expect(cache.size).toBe(0); + expect(fakeOnReset).toHaveBeenCalledWith(expect.objectContaining({ cache })); + expect(mockInternalEventService._calls).toContainEqual(['on', ['quantumCacheReset', expect.anything(), { ignoreLocal: true }]]); + }); + }); + + describe('get', () => { + it('should return value if present', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.get('foo'); + + expect(result).toBe('bar'); + }); + + it('should return undefined if missing', () => { + const cache = makeCache(); + + const result = cache.get('foo'); + + expect(result).toBe(undefined); + }); + }); + + describe('setMany', () => { + it('should populate all values', async () => { + const cache = makeCache(); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(cache.has('foo')).toBe(true); + expect(cache.has('alpha')).toBe(true); + }); + + it('should emit one event', async () => { + const cache = makeCache({ + name: 'fake', + }); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should call onChanged once with all items', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], expect.objectContaining({ cache })); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should emit events only for changed items', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onChanged: fakeOnChanged, + }); + + await cache.set('foo', 'bar'); + fakeOnChanged.mockClear(); + mockInternalEventService.mockReset(); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['alpha'] }]]); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], expect.objectContaining({ cache })); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + }); + + describe('getMany', () => { + it('should return empty for empty input', () => { + const cache = makeCache(); + const result = cache.getMany([]); + expect(result).toEqual([]); + }); + + it('should return the value for all keys', () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + cache.add('alpha', 'omega'); + + const result = cache.getMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]); + }); + + it('should return undefined for missing keys', () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + + const result = cache.getMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'bar'], ['alpha', undefined]]); + }); + }); + + describe('fetch', () => { + it('should fetch an unknown value', async () => { + const cache = makeCache({ + fetcher: key => `value#${key}`, + }); + + const result = await cache.fetch('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should store fetched value in memory cache', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.fetch('foo'); + + const result = cache.has('foo'); + expect(result).toBe(true); + }); + + it('should call onChanged', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + fetcher: key => `value#${key}`, + onChanged: fakeOnChanged, + }); + + await cache.fetch('foo'); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], expect.objectContaining({ cache })); + }); + + it('should not emit event', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.fetch('foo'); + + expect(mockInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); + }); + + it('should throw FetchFailedError when fetcher throws error', async () => { + const cache = makeCache({ + fetcher: () => { throw new Error('test error'); }, + }); + + await assert.throwsAsync(FetchFailedError, async () => { + return await cache.fetch('foo'); + }); + }); + + it('should throw KeyNotFoundError when fetcher returns null', async () => { + const cache = makeCache({ + fetcher: () => null, + }); + + await assert.throwsAsync(KeyNotFoundError, async () => { + return await cache.fetch('foo'); + }); + }); + + it('should throw KeyNotFoundError when fetcher returns undefined', async () => { + const cache = makeCache({ + fetcher: () => undefined, + }); + + await assert.throwsAsync(KeyNotFoundError, async () => { + return await cache.fetch('foo'); + }); + }); + + it('should respect fetcherConcurrency', async () => { + await testConcurrency( + { + fetcher: key => `value#${key}`, + fetcherConcurrency: 2, + }, + (cache, key) => cache.fetch(key), + ['value#foo', 'value#bar', 'value#baz'], + ); + }); + + it('should respect maxConcurrency', async () => { + await testConcurrency( + { + fetcher: key => `value#${key}`, + maxConcurrency: 2, + }, + (cache, key) => cache.fetch(key), + ['value#foo', 'value#bar', 'value#baz'], + ); + }); + + it('should de-duplicate calls', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const cache = makeCache({ fetcher: mockFetcher }); + + // Act + const fetch1 = cache.fetch('foo'); + const fetch2 = cache.fetch('foo'); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toBe('value#foo'); + await expect(fetch2).resolves.toBe('value#foo'); + expect(mockFetcher).toHaveBeenCalledTimes(1); + }); + }); + + describe('fetchMaybe', () => { + it('should return value when found by fetcher', async () => { + const cache = makeCache({ + fetcher: () => 'bar', + }); + + const result = await cache.fetchMaybe('foo'); + + expect(result).toBe('bar'); + }); + + it('should call onChanged when found by fetcher', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + fetcher: () => 'bar', + onChanged: fakeOnChanged, + }); + + await cache.fetchMaybe('foo'); + + expect(fakeOnChanged).toHaveBeenCalled(); + }); + + it('should return undefined when fetcher returns undefined', async () => { + const cache = makeCache({ + fetcher: () => undefined, + }); + + const result = await cache.fetchMaybe('foo'); + + expect(result).toBe(undefined); + }); + + it('should not call onChanged when fetcher returns undefined', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + fetcher: () => undefined, + onChanged: fakeOnChanged, + }); + + await cache.fetchMaybe('foo'); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + }); + + it('should return undefined when fetcher returns null', async () => { + const cache = makeCache({ + fetcher: () => null, + }); + + const result = await cache.fetchMaybe('foo'); + + expect(result).toBe(undefined); + }); + + it('should not call onChanged when fetcher returns null', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + fetcher: () => null, + onChanged: fakeOnChanged, + }); + + await cache.fetchMaybe('foo'); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + }); + + it('should throw FetchFailedError when fetcher throws error', async () => { + const cache = makeCache({ + fetcher: () => { throw new Error('test error'); }, + }); + + await assert.throwsAsync(FetchFailedError, async () => { + return await cache.fetchMaybe('foo'); + }); + }); + + it('should respect optionalFetcherConcurrency', async () => { + await testConcurrency( + { + optionalFetcher: key => `value#${key}`, + optionalFetcherConcurrency: 2, + }, + (cache, key) => cache.fetchMaybe(key), + ['value#foo', 'value#bar', 'value#baz'], + ); + }); + + it('should respect maxConcurrency', async () => { + await testConcurrency( + { + optionalFetcher: key => `value#${key}`, + maxConcurrency: 2, + }, + (cache, key) => cache.fetchMaybe(key), + ['value#foo', 'value#bar', 'value#baz'], + ); + }); + + it('should de-duplicate calls', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const cache = makeCache({ optionalFetcher: mockFetcher }); + + // Act + const fetch1 = cache.fetchMaybe('foo'); + const fetch2 = cache.fetchMaybe('foo'); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toBe('value#foo'); + await expect(fetch2).resolves.toBe('value#foo'); + expect(mockFetcher).toHaveBeenCalledTimes(1); + }); + }); + + describe('fetchMany', () => { + it('should do nothing for empty input', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.fetchMany([]); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should return existing items', async () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + cache.add('alpha', 'omega'); + + const result = await cache.fetchMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]); + }); + + it('should return existing items without events', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + cache.add('foo', 'bar'); + cache.add('alpha', 'omega'); + + await cache.fetchMany(['foo', 'alpha']); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should call bulkFetcher for missing items', async () => { + const cache = makeCache({ + bulkFetcher: keys => keys.map(k => [k, `${k}#many`]), + fetcher: key => `${key}#single`, + }); + + const results = await cache.fetchMany(['foo', 'alpha']); + + expect(results).toEqual([['foo', 'foo#many'], ['alpha', 'alpha#many']]); + }); + + it('should call bulkFetcher only once', async () => { + const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); + const cache = makeCache({ + bulkFetcher: mockBulkFetcher, + }); + + await cache.fetchMany(['foo', 'bar']); + + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + }); + + it('should call optionalFetcher for single item', async () => { + const cache = makeCache({ + optionalFetcher: () => 'good', + bulkFetcher: keys => keys.map(k => [k, 'bad']), + fetcher: () => 'bad', + }); + + const results = await cache.fetchMany(['foo']); + + expect(results).toEqual([['foo', 'good']]); + }); + + it('should call fetcher for single item when optionalFetcher is not defined', async () => { + const cache = makeCache({ + bulkFetcher: keys => keys.map(k => [k, 'bad']), + fetcher: () => 'good', + }); + + const results = await cache.fetchMany(['foo']); + + expect(results).toEqual([['foo', 'good']]); + }); + + it('should call fetcher when fetchMany is undefined', async () => { + const cache = makeCache({ + fetcher: key => `${key}#single`, + }); + + const results = await cache.fetchMany(['foo', 'alpha']); + + expect(results).toEqual([['foo', 'foo#single'], ['alpha', 'alpha#single']]); + }); + + it('should call onChanged', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + fetcher: k => k, + }); + + await cache.fetchMany(['foo', 'alpha']); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], expect.objectContaining({ cache })); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should call onChanged only for changed', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + fetcher: k => k, + }); + cache.add('foo', 'bar'); + + await cache.fetchMany(['foo', 'alpha']); + + expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], expect.objectContaining({ cache })); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should not emit event', async () => { + const cache = makeCache({ + fetcher: k => k, + }); + + await cache.fetchMany(['foo', 'alpha']); + + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should respect bulkFetcherConcurrency', async () => { + await testConcurrency( + { + bulkFetcher: keys => [[keys[0], `value#${keys[0]}`]], + bulkFetcherConcurrency: 2, + }, + (cache, key) => cache.fetchMany([key, `${key}#dupe`]), + [[['foo', 'value#foo']], [['bar', 'value#bar']], [['baz', 'value#baz']]], + ); + }); + + it('should respect maxConcurrency', async () => { + await testConcurrency( + { + bulkFetcher: keys => [[keys[0], `value#${keys[0]}`]], + maxConcurrency: 2, + }, + (cache, key) => cache.fetchMany([key, `${key}#dupe`]), + [[['foo', 'value#foo']], [['bar', 'value#bar']], [['baz', 'value#baz']]], + ); + }); + + it('should de-duplicate calls using fetcher', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockBulkFetcher = jest.fn(async (keys: string[]) => { + await testComplete.promise; + return keys.map(key => [key, `value#${key}`] as [string, string]); + }); + const cache = makeCache({ + fetcher: mockFetcher, + bulkFetcher: mockBulkFetcher, + }); + + // Act + const fetch1 = cache.fetch('foo'); + const fetch2 = cache.fetchMany(['foo', 'bar', 'baz']); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toEqual('value#foo'); + await expect(fetch2).resolves.toEqual([['foo', 'value#foo'], ['bar', 'value#bar'], ['baz', 'value#baz']]); + expect(mockFetcher).toHaveBeenCalledTimes(1); + expect(mockFetcher).toHaveBeenCalledWith('foo', expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + expect(mockBulkFetcher).toHaveBeenCalledWith(['bar', 'baz'], expect.objectContaining({ cache })); + }); + + it('should de-duplicate calls using optionalFetcher', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockBulkFetcher = jest.fn(async (keys: string[]) => { + await testComplete.promise; + return keys.map(key => [key, `value#${key}`] as [string, string]); + }); + const cache = makeCache({ + optionalFetcher: mockFetcher, + bulkFetcher: mockBulkFetcher, + }); + + // Act + const fetch1 = cache.fetchMaybe('foo'); + const fetch2 = cache.fetchMany(['foo', 'bar', 'baz']); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toEqual('value#foo'); + await expect(fetch2).resolves.toEqual([['foo', 'value#foo'], ['bar', 'value#bar'], ['baz', 'value#baz']]); + expect(mockFetcher).toHaveBeenCalledTimes(1); + expect(mockFetcher).toHaveBeenCalledWith('foo', expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + expect(mockBulkFetcher).toHaveBeenCalledWith(['bar', 'baz'], expect.objectContaining({ cache })); + }); + + it('should de-duplicate calls using fetcher and optionalFetcher', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockOptionalFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockBulkFetcher = jest.fn(async (keys: string[]) => { + await testComplete.promise; + return keys.map(key => [key, `value#${key}`] as [string, string]); + }); + const cache = makeCache({ + fetcher: mockFetcher, + optionalFetcher: mockOptionalFetcher, + bulkFetcher: mockBulkFetcher, + }); + + // Act + const fetch1 = cache.fetch('foo'); + const fetch2 = cache.fetchMaybe('bar'); + const fetch3 = cache.fetchMany(['foo', 'bar', 'baz', 'wow']); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toEqual('value#foo'); + await expect(fetch2).resolves.toEqual('value#bar'); + await expect(fetch3).resolves.toEqual([['foo', 'value#foo'], ['bar', 'value#bar'], ['baz', 'value#baz'], ['wow', 'value#wow']]); + expect(mockFetcher).toHaveBeenCalledTimes(1); + expect(mockFetcher).toHaveBeenCalledWith('foo', expect.objectContaining({ cache })); + expect(mockOptionalFetcher).toHaveBeenCalledTimes(1); + expect(mockOptionalFetcher).toHaveBeenCalledWith('bar', expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + expect(mockBulkFetcher).toHaveBeenCalledWith(['baz', 'wow'], expect.objectContaining({ cache })); + }); + }); + + describe('refresh', () => { + it('should populate the value', async () => { + const cache = makeCache({ + fetcher: key => `value#${key}`, + }); + + await cache.refresh('foo'); + + const result = cache.has('foo'); + expect(result).toBe(true); + }); + + it('should return the value', async () => { + const cache = makeCache({ + fetcher: key => `value#${key}`, + }); + + const result = await cache.refresh('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should replace the value if it exists', async () => { + const cache = makeCache({ + fetcher: key => `value#${key}`, + }); + + await cache.set('foo', 'bar'); + const result = await cache.refresh('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should call onChanged', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + fetcher: key => `value#${key}`, + onChanged: fakeOnChanged, + }); + + await cache.refresh('foo'); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], expect.objectContaining({ cache })); + }); + + it('should emit event', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.refresh('foo'); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); + }); + + it('should respect fetcherConcurrency', async () => { + await testConcurrency( + { + fetcher: key => `value#${key}`, + fetcherConcurrency: 2, + }, + (cache, key) => cache.refresh(key), + ['value#foo', 'value#bar', 'value#baz'], + ); + }); + + it('should respect maxConcurrency', async () => { + await testConcurrency( + { + fetcher: key => `value#${key}`, + maxConcurrency: 2, + }, + (cache, key) => cache.refresh(key), + ['value#foo', 'value#bar', 'value#baz'], + ); + }); + + it('should de-duplicate calls', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const cache = makeCache({ fetcher: mockFetcher }); + + // Act + const fetch1 = cache.refresh('foo'); + const fetch2 = cache.refresh('foo'); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toBe('value#foo'); + await expect(fetch2).resolves.toBe('value#foo'); + expect(mockFetcher).toHaveBeenCalledTimes(1); + }); + }); + + describe('refreshMany', () => { + it('should do nothing for empty input', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + const result = await cache.refreshMany([]); + + expect(result).toEqual([]); + expect(fakeOnChanged).not.toHaveBeenCalled(); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should call bulkFetcher for all keys', async () => { + const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); + const cache = makeCache({ + bulkFetcher: mockBulkFetcher, + }); + + const result = await cache.refreshMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]); + expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + }); + + it('should replace any existing keys', async () => { + const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); + const cache = makeCache({ + bulkFetcher: mockBulkFetcher, + }); + cache.add('foo', 'bar'); + + const result = await cache.refreshMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]); + expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + }); + + it('should call onChanged for all keys', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + bulkFetcher: keys => keys.map(k => [k, `${k}#value`]), + onChanged: fakeOnChanged, + }); + cache.add('foo', 'bar'); + + await cache.refreshMany(['foo', 'alpha']); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], expect.objectContaining({ cache })); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should emit event for all keys', async () => { + const cache = makeCache({ + name: 'fake', + bulkFetcher: keys => keys.map(k => [k, `${k}#value`]), + }); + cache.add('foo', 'bar'); + + await cache.refreshMany(['foo', 'alpha']); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should call optionalFetcher for single item', async () => { + const cache = makeCache({ + optionalFetcher: () => 'good', + bulkFetcher: keys => keys.map(k => [k, 'bad']), + fetcher: () => 'bad', + }); + + const results = await cache.refreshMany(['foo']); + + expect(results).toEqual([['foo', 'good']]); + }); + + it('should call fetcher for single item when optionalFetcher is not defined', async () => { + const cache = makeCache({ + bulkFetcher: keys => keys.map(k => [k, 'bad']), + fetcher: () => 'good', + }); + + const results = await cache.refreshMany(['foo']); + + expect(results).toEqual([['foo', 'good']]); + }); + + it('should throw FetchFailedError when bulk fetcher throws error', async () => { + const cache = makeCache({ + bulkFetcher: () => { throw new Error('test error'); }, + }); + + await assert.throwsAsync(FetchFailedError, async () => { + return await cache.refreshMany(['foo']); + }); + }); + + it('should throw FetchFailedError when fallback fetcher throws error', async () => { + const cache = makeCache({ + fetcher: () => { throw new Error('test error'); }, + }); + + await assert.throwsAsync(FetchFailedError, async () => { + return await cache.refreshMany(['foo']); + }); + }); + + it('should not throw when fallback fetcher returns null', async () => { + const cache = makeCache({ + fetcher: () => null, + }); + + const result = await cache.refreshMany(['foo']); + + expect(result).toHaveLength(0); + }); + + it('should not throw when fallback fetcher returns undefined', async () => { + const cache = makeCache({ + fetcher: () => undefined, + }); + + const result = await cache.refreshMany(['foo']); + + expect(result).toHaveLength(0); + }); + + it('should not throw when bulk fetcher returns empty', async () => { + const cache = makeCache({ + bulkFetcher: () => [], + }); + + const result = await cache.refreshMany(['foo', 'bar']); + + expect(result).toHaveLength(0); + }); + + it('should respect bulkFetcherConcurrency', async () => { + await testConcurrency( + { + bulkFetcher: keys => [[keys[0], `value#${keys[0]}`]], + bulkFetcherConcurrency: 2, + }, + (cache, key) => cache.refreshMany([key, `${key}#dupe`]), + [[['foo', 'value#foo']], [['bar', 'value#bar']], [['baz', 'value#baz']]], + ); + }); + + it('should respect maxConcurrency', async () => { + await testConcurrency( + { + bulkFetcher: keys => [[keys[0], `value#${keys[0]}`]], + maxConcurrency: 2, + }, + (cache, key) => cache.refreshMany([key, `${key}#dupe`]), + [[['foo', 'value#foo']], [['bar', 'value#bar']], [['baz', 'value#baz']]], + ); + }); + + it('should de-duplicate calls using fetcher', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockBulkFetcher = jest.fn(async (keys: string[]) => { + await testComplete.promise; + return keys.map(key => [key, `value#${key}`] as [string, string]); + }); + const cache = makeCache({ + fetcher: mockFetcher, + bulkFetcher: mockBulkFetcher, + }); + + // Act + const fetch1 = cache.fetch('foo'); + const fetch2 = cache.refreshMany(['foo', 'bar', 'baz']); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toEqual('value#foo'); + await expect(fetch2).resolves.toEqual([['foo', 'value#foo'], ['bar', 'value#bar'], ['baz', 'value#baz']]); + expect(mockFetcher).toHaveBeenCalledTimes(1); + expect(mockFetcher).toHaveBeenCalledWith('foo', expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + expect(mockBulkFetcher).toHaveBeenCalledWith(['bar', 'baz'], expect.objectContaining({ cache })); + }); + + it('should de-duplicate calls using optionalFetcher', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockOptionalFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockBulkFetcher = jest.fn(async (keys: string[]) => { + await testComplete.promise; + return keys.map(key => [key, `value#${key}`] as [string, string]); + }); + const cache = makeCache({ + optionalFetcher: mockOptionalFetcher, + bulkFetcher: mockBulkFetcher, + }); + + // Act + const fetch1 = cache.fetchMaybe('foo'); + const fetch2 = cache.refreshMany(['foo', 'bar', 'baz']); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toEqual('value#foo'); + await expect(fetch2).resolves.toEqual([['foo', 'value#foo'], ['bar', 'value#bar'], ['baz', 'value#baz']]); + expect(mockOptionalFetcher).toHaveBeenCalledTimes(1); + expect(mockOptionalFetcher).toHaveBeenCalledWith('foo', expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + expect(mockBulkFetcher).toHaveBeenCalledWith(['bar', 'baz'], expect.objectContaining({ cache })); + }); + + it('should de-duplicate calls using fetcher and optionalFetcher', async () => { + // Arrange + const testComplete = Promise.withResolvers(); + const mockFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockOptionalFetcher = jest.fn(async (key: string) => { + await testComplete.promise; + return `value#${key}`; + }); + const mockBulkFetcher = jest.fn(async (keys: string[]) => { + await testComplete.promise; + return keys.map(key => [key, `value#${key}`] as [string, string]); + }); + const cache = makeCache({ + fetcher: mockFetcher, + optionalFetcher: mockOptionalFetcher, + bulkFetcher: mockBulkFetcher, + }); + + // Act + const fetch1 = cache.fetch('foo'); + const fetch2 = cache.fetchMaybe('bar'); + const fetch3 = cache.refreshMany(['foo', 'bar', 'baz', 'wow']); + + // Assert + testComplete.resolve(); + await expect(fetch1).resolves.toEqual('value#foo'); + await expect(fetch2).resolves.toEqual('value#bar'); + await expect(fetch3).resolves.toEqual([['foo', 'value#foo'], ['bar', 'value#bar'], ['baz', 'value#baz'], ['wow', 'value#wow']]); + expect(mockFetcher).toHaveBeenCalledTimes(1); + expect(mockFetcher).toHaveBeenCalledWith('foo', expect.objectContaining({ cache })); + expect(mockOptionalFetcher).toHaveBeenCalledTimes(1); + expect(mockOptionalFetcher).toHaveBeenCalledWith('bar', expect.objectContaining({ cache })); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + expect(mockBulkFetcher).toHaveBeenCalledWith(['baz', 'wow'], expect.objectContaining({ cache })); + }); + }); + + describe('delete', () => { + it('should delete from memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should call onChanged when deleting', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], expect.objectContaining({ cache })); + }); + + it('should emit event when deleting', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); + }); + }); + + describe('deleteMany', () => { + it('should remove keys from memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + await cache.deleteMany(['foo', 'alpha']); + + expect(cache.has('foo')).toBe(false); + expect(cache.has('alpha')).toBe(false); + }); + + it('should emit only one event', async () => { + const cache = makeCache({ + name: 'fake', + }); + + await cache.deleteMany(['foo', 'alpha']); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should call onChanged once with all items', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.deleteMany(['foo', 'alpha']); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], expect.objectContaining({ cache })); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should do nothing if no keys are provided', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.deleteMany([]); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + }); + + describe('reset', () => { + it('should erase all items', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + + await cache.reset(); + + expect(cache.size).toBe(0); + }); + + it('should call onReset', async () => { + const fakeOnReset = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onReset: fakeOnReset, + }); + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + + await cache.reset(); + + expect(fakeOnReset).toHaveBeenCalled(); + }); + + it('should emit event', async () => { + const cache = makeCache({ + name: 'fake', + }); + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + + await cache.reset(); + + expect(mockInternalEventService._calls).toContainEqual(['emit', ['quantumCacheReset', { name: 'fake' }]]); + }); + }); + + describe('add', () => { + it('should add the item', () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + expect(cache.has('foo')).toBe(true); + }); + + it('should not emit event', () => { + const cache = makeCache(); + + cache.add('foo', 'bar'); + + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should not call onChanged', () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + cache.add('foo', 'bar'); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + }); + }); + + describe('addMany', () => { + it('should add all items', () => { + const cache = makeCache(); + + cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(cache.has('foo')).toBe(true); + expect(cache.has('alpha')).toBe(true); + }); + + it('should not emit event', () => { + const cache = makeCache(); + + cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(mockInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should not call onChanged', () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + }); + }); + + describe('has', () => { + it('should return false when empty', () => { + const cache = makeCache(); + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should return false when value is not in memory', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.has('alpha'); + + expect(result).toBe(false); + }); + + it('should return true when value is in memory', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.has('foo'); + + expect(result).toBe(true); + }); + }); + + describe('size', () => { + it('should return 0 when empty', () => { + const cache = makeCache(); + expect(cache.size).toBe(0); + }); + + it('should return correct size when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + expect(cache.size).toBe(1); + }); + }); + + describe('entries', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.entries()); + + expect(result).toHaveLength(0); + }); + + it('should return all entries when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.entries()); + + expect(result).toEqual([['foo', 'bar']]); + }); + }); + + describe('keys', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.keys()); + + expect(result).toHaveLength(0); + }); + + it('should return all keys when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.keys()); + + expect(result).toEqual(['foo']); + }); + }); + + describe('values', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.values()); + + expect(result).toHaveLength(0); + }); + + it('should return all values when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.values()); + + expect(result).toEqual(['bar']); + }); + }); + + describe('[Symbol.iterator]', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache); + + expect(result).toHaveLength(0); + }); + + it('should return all entries when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache); + + expect(result).toEqual([['foo', 'bar']]); + }); + }); + + async function testConcurrency(opts: Partial>, fetchCallback: (cache: QuantumKVCache, key: string) => Promise, expectedResults: unknown): Promise { + const fetcher = opts.fetcher; + const optionalFetcher = opts.optionalFetcher; + const bulkFetcher = opts.bulkFetcher; + + // Arrange + const fetches = {} as Record>; + const testReady = Promise.withResolvers(); + const cache = makeCache({ + fetcherConcurrency: 4, + optionalFetcherConcurrency: 4, + bulkFetcherConcurrency: 4, + maxConcurrency: 4, + + ...opts, + + fetcher: fetcher ? async (key, meta) => { + await waitForSignalBeforeFetch(testReady, key, fetches); + return fetcher(key, meta); + } : undefined, + optionalFetcher: optionalFetcher ? async (key, meta) => { + await waitForSignalBeforeFetch(testReady, key, fetches); + return optionalFetcher(key, meta); + } : undefined, + bulkFetcher: bulkFetcher ? async (keys, meta) => { + await waitForSignalBeforeFetch(testReady, keys[0], fetches); + return bulkFetcher(keys, meta); + } : undefined, + }); + for (const key of ['foo', 'bar', 'baz']) { + const fetcher = { + created: false, + creating: Promise.withResolvers(), + gate: Promise.withResolvers(), + promise: fetchCallback(cache, key), + execute: async () => { + await fetcher.creating.promise; + return await fetcher.execute(); + }, + complete: async () => { + if (!fetcher.created) throw new Error(`test error: cannot complete an unstarted fetcher for ${key}`); + + fetcher.gate.resolve(); + return await fetcher.promise; + }, + }; + fetches[key] = fetcher; + } + + // Act + testReady.resolve(); + + // Assert: should create fetchers up to the limit + await Promise.all([fetches.foo.creating.promise, fetches.bar.creating.promise]); + expect(fetches.foo.created).toBe(true); + expect(fetches.bar.created).toBe(true); + expect(fetches.baz.created).toBe(false); + + // Assert: when one completes, should create the next one + await fetches.foo.complete(); + await fetches.baz.creating.promise; + expect(fetches.baz.created).toBe(true); + + // Assert: when all complete, final results should be correct + const results = await Promise.all([ + fetches.foo.complete(), + fetches.bar.complete(), + fetches.baz.complete(), + ]); + expect(results).toEqual(expectedResults); + } +}); + +// used for concurrency tests +async function waitForSignalBeforeFetch(testReady: PromiseWithResolvers, key: string, fetches: Record>) { + await testReady.promise; + + const fetch = fetches[key]; + expect(fetch).toBeTruthy(); + + fetch.created = true; + fetch.creating.resolve(); + + await fetch.gate.promise; +} + +// used for concurrency tests +interface FetchController { + // create phase + /** set to true when fetch callback is executed */ + created: boolean, + /** triggered internally when the callback is executed */ + creating: PromiseWithResolvers, + + // execute phase + /** triggered externally to start the fetcher */ + gate: PromiseWithResolvers, + /** resolves when fetcher completes */ + promise: Promise, + + // controls + /** starts and executes the fetcher */ + complete: () => Promise; + /** awaits creation, then starts and executes the fetcher */ + execute: () => Promise; +}