implement QuantumKVCache.reset

This commit is contained in:
Hazelnoot 2025-06-18 01:32:46 -04:00
parent 64b415c469
commit 3e1668348e
3 changed files with 78 additions and 1 deletions

View file

@ -276,6 +276,7 @@ export interface InternalEventTypes {
userListMemberBulkUpdated: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; }; userListMemberBulkUpdated: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; };
userListMemberBulkRemoved: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; }; userListMemberBulkRemoved: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; };
quantumCacheUpdated: { name: string, keys: string[] }; quantumCacheUpdated: { name: string, keys: string[] };
quantumCacheReset: { name: string };
} }
type EventTypesToEventPayload<T> = EventUnionFromDictionary<UndefinedAsNullAll<SerializedAll<T>>>; type EventTypesToEventPayload<T> = EventUnionFromDictionary<UndefinedAsNullAll<SerializedAll<T>>>;

View file

@ -41,6 +41,13 @@ export interface QuantumKVOpts<T> {
*/ */
onChanged?: (keys: string[], cache: QuantumKVCache<T>) => void | Promise<void>; onChanged?: (keys: string[], cache: QuantumKVCache<T>) => void | Promise<void>;
/**
* Optional callback when all values are removed from the cache, either locally or elsewhere in the cluster.
* This is called *after* the cache state is updated.
* Implementations may be synchronous or async.
*/
onReset?: (cache: QuantumKVCache<T>) => void | Promise<void>;
// TODO equality comparer // TODO equality comparer
} }
@ -65,6 +72,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
public readonly fetcher: QuantumKVOpts<T>['fetcher']; public readonly fetcher: QuantumKVOpts<T>['fetcher'];
public readonly bulkFetcher: QuantumKVOpts<T>['bulkFetcher']; public readonly bulkFetcher: QuantumKVOpts<T>['bulkFetcher'];
public readonly onChanged: QuantumKVOpts<T>['onChanged']; public readonly onChanged: QuantumKVOpts<T>['onChanged'];
public readonly onReset: QuantumKVOpts<T>['onReset'];
/** /**
* @param name Unique name of the cache - must be the same in all processes. * @param name Unique name of the cache - must be the same in all processes.
@ -83,12 +91,17 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
this.fetcher = opts.fetcher; this.fetcher = opts.fetcher;
this.bulkFetcher = opts.bulkFetcher; this.bulkFetcher = opts.bulkFetcher;
this.onChanged = opts.onChanged; this.onChanged = opts.onChanged;
this.onReset = opts.onReset;
this.internalEventService = services.internalEventService; this.internalEventService = services.internalEventService;
this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, { this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
// Ignore our own events, otherwise we'll immediately erase any set value. // Ignore our own events, otherwise we'll immediately erase any set value.
ignoreLocal: true, ignoreLocal: true,
}); });
this.internalEventService.on('quantumCacheReset', this.onQuantumCacheReset, {
// Ignore our own events, otherwise we'll immediately erase any set value.
ignoreLocal: true,
});
} }
/** /**
@ -376,10 +389,24 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
* Does not send any events or update other processes. * Does not send any events or update other processes.
*/ */
@bindThis @bindThis
public clear() { public clear(): void {
this.memoryCache.clear(); this.memoryCache.clear();
} }
/**
* Erases all entries from the cache.
* Fires an onReset event and updates other processes.
*/
public async reset(): Promise<void> {
this.clear();
await this.internalEventService.emit('quantumCacheReset', { name: this.name });
if (this.onReset) {
await this.onReset(this);
}
}
/** /**
* Removes expired cache entries from the local view. * Removes expired cache entries from the local view.
* Does not send any events or update other processes. * Does not send any events or update other processes.
@ -396,6 +423,7 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
@bindThis @bindThis
public dispose() { public dispose() {
this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated); this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated);
this.internalEventService.off('quantumCacheReset', this.onQuantumCacheReset);
this.memoryCache.dispose(); this.memoryCache.dispose();
} }
@ -461,6 +489,17 @@ export class QuantumKVCache<T> implements Iterable<readonly [key: string, value:
} }
} }
@bindThis
private async onQuantumCacheReset(data: InternalEventTypes['quantumCacheReset']): Promise<void> {
if (data.name === this.name) {
this.clear();
if (this.onReset) {
await this.onReset(this);
}
}
}
/** /**
* Iterates all [key, value] pairs in memory. * Iterates all [key, value] pairs in memory.
* This applies to the local subset view, not the cross-cluster cache state. * This applies to the local subset view, not the cross-cluster cache state.

View file

@ -778,6 +778,43 @@ describe(QuantumKVCache, () => {
}); });
}); });
describe('refresh', () => {
it('should erase all items', async () => {
const cache = makeCache<string>();
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
await cache.reset();
expect(cache.size).toBe(0);
});
it('should call onReset', async () => {
const fakeOnReset = jest.fn(() => Promise.resolve());
const cache = makeCache<string>({
onReset: fakeOnReset,
});
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
await cache.reset();
expect(fakeOnReset).toHaveBeenCalled();
});
it('should emit event', async () => {
const cache = makeCache<string>({
name: 'fake',
});
await cache.set('foo', 'bar');
await cache.set('alpha', 'omega');
await cache.reset();
expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheReset', { name: 'fake' }]]);
});
});
describe('add', () => { describe('add', () => {
it('should add the item', () => { it('should add the item', () => {
const cache = makeCache(); const cache = makeCache();