implement QuantumKVCache.setMany and QuantumKVCache.seleteMany

This commit is contained in:
Hazelnoot 2025-06-05 14:28:19 -04:00
parent 46a6612dc0
commit 207abaff88
7 changed files with 201 additions and 51 deletions

View file

@ -17,7 +17,6 @@ import { InternalEventService } from './InternalEventService.js';
@Injectable()
export class ChannelFollowingService implements OnModuleInit {
// TODO check for regs
public userFollowingChannelsCache: QuantumKVCache<Set<string>>;
constructor(

View file

@ -265,7 +265,7 @@ export interface InternalEventTypes {
unmute: { muterId: MiUser['id']; muteeId: MiUser['id']; };
userListMemberAdded: { userListId: MiUserList['id']; memberId: MiUser['id']; };
userListMemberRemoved: { userListId: MiUserList['id']; memberId: MiUser['id']; };
quantumCacheUpdated: { name: string, key: string, op: 's' | 'd' };
quantumCacheUpdated: { name: string, keys: string[], op: 's' | 'd' };
}
type EventTypesToEventPayload<T> = EventUnionFromDictionary<UndefinedAsNullAll<SerializedAll<T>>>;

View file

@ -70,16 +70,16 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
switch (type) {
case 'userListMemberAdded': {
const { userListId, memberId } = body;
if (this.membersCache.has(userListId)) {
const members = await this.membersCache.get(userListId);
const members = this.membersCache.get(userListId);
if (members) {
members.add(memberId);
}
break;
}
case 'userListMemberRemoved': {
const { userListId, memberId } = body;
if (this.membersCache.has(userListId)) {
const members = await this.membersCache.get(userListId);
const members = this.membersCache.get(userListId);
if (members) {
members.delete(memberId);
}
break;

View file

@ -43,8 +43,6 @@ export class UserMutingService {
id: In(mutings.map(m => m.id)),
});
await Promise.all(Array
.from(new Set(mutings.map(m => m.muterId)))
.map(muterId => this.cacheService.userMutingsCache.delete(muterId)));
await this.cacheService.userMutingsCache.deleteMany(mutings.map(m => m.muterId));
}
}

View file

@ -44,8 +44,6 @@ export class UserRenoteMutingService {
id: In(mutings.map(m => m.id)),
});
await Promise.all(Array
.from(new Set(mutings.map(m => m.muterId)))
.map(muterId => this.cacheService.renoteMutingsCache.delete(muterId)));
await this.cacheService.renoteMutingsCache.deleteMany(mutings.map(m => m.muterId));
}
}

View file

@ -531,19 +531,54 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
this.memoryCache.set(key, value);
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', key });
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] });
if (this.onSet) {
await this.onSet(key, this);
}
}
/**
* Creates or updates multiple value in the cache, and erases any stale caches across the cluster.
* Fires an onSet for each changed item event after the cache has been updated in all processes.
* Skips if all values are unchanged.
*/
@bindThis
public async setMany(items: Iterable<[key: string, value: T]>): Promise<void> {
const changedKeys: string[] = [];
for (const item of items) {
if (this.memoryCache.get(item[0]) !== item[1]) {
changedKeys.push(item[0]);
this.memoryCache.set(item[0], item[1]);
}
}
if (changedKeys.length > 0) {
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys });
if (this.onSet) {
for (const key of changedKeys) {
await this.onSet(key, this);
}
}
}
}
/**
* Gets a value from the local memory cache, or returns undefined if not found.
*/
@bindThis
public get(key: string): T | undefined {
return this.memoryCache.get(key);
}
/**
* Gets or fetches a value from the cache.
* Fires an onSet event, but does not emit an update event to other processes.
*/
@bindThis
public async get(key: string): Promise<T> {
public async fetch(key: string): Promise<T> {
let value = this.memoryCache.get(key);
if (value === undefined) {
value = await this.fetcher(key, this);
@ -556,15 +591,6 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
return value;
}
/**
* Alias to get(), included for backwards-compatibility with RedisKVCache.
* @deprecated use get() instead
*/
@bindThis
public async fetch(key: string): Promise<T> {
return await this.get(key);
}
/**
* Returns true is a key exists in memory.
* This applies to the local subset view, not the cross-cluster cache state.
@ -582,12 +608,35 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
public async delete(key: string): Promise<void> {
this.memoryCache.delete(key);
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', key });
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] });
if (this.onDelete) {
await this.onDelete(key, this);
}
}
/**
* Deletes multiple values from the cache, and erases any stale caches across the cluster.
* Fires an onDelete event for each key after the cache has been updated in all processes.
* Skips if the input is empty.
*/
@bindThis
public async deleteMany(keys: string[]): Promise<void> {
if (keys.length === 0) {
return;
}
for (const key of keys) {
this.memoryCache.delete(key);
}
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys });
if (this.onDelete) {
for (const key of keys) {
await this.onDelete(key, this);
}
}
}
/**
* Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster.
@ -623,14 +672,16 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
@bindThis
private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> {
if (data.name === this.name) {
this.memoryCache.delete(data.key);
for (const key of data.keys) {
this.memoryCache.delete(key);
if (data.op === 's' && this.onSet) {
await this.onSet(data.key, this);
}
if (data.op === 's' && this.onSet) {
await this.onSet(key, this);
}
if (data.op === 'd' && this.onDelete) {
await this.onDelete(data.key, this);
if (data.op === 'd' && this.onDelete) {
await this.onDelete(key, this);
}
}
}
}