normalize userFollowingsCache / userFollowersCache and add hibernatedUserCache to reduce the number of cache-clears and allow use of caching in many more places
This commit is contained in:
parent
372714c9b6
commit
fa68751a19
28 changed files with 816 additions and 581 deletions
|
|
@ -21,18 +21,18 @@ export interface QuantumKVOpts<T> {
|
|||
fetcher: (key: string, cache: QuantumKVCache<T>) => T | Promise<T>;
|
||||
|
||||
/**
|
||||
* Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster.
|
||||
* This is called *after* the cache state is updated.
|
||||
* Optional callback to fetch the value for multiple keys that weren't found in the cache.
|
||||
* May be synchronous or async.
|
||||
* If not provided, then the implementation will fall back on repeated calls to fetcher().
|
||||
*/
|
||||
onSet?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
|
||||
bulkFetcher?: (keys: string[], cache: QuantumKVCache<T>) => Iterable<[key: string, value: T]> | Promise<Iterable<[key: string, value: T]>>;
|
||||
|
||||
/**
|
||||
* Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster.
|
||||
* Optional callback when one or more values are changed (created, updated, or deleted) in the cache, either locally or elsewhere in the cluster.
|
||||
* This is called *after* the cache state is updated.
|
||||
* May be synchronous or async.
|
||||
* Implementations may be synchronous or async.
|
||||
*/
|
||||
onDelete?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
|
||||
onChanged?: (keys: string[], cache: QuantumKVCache<T>) => void | Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -44,8 +44,8 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
private readonly memoryCache: MemoryKVCache<T>;
|
||||
|
||||
public readonly fetcher: QuantumKVOpts<T>['fetcher'];
|
||||
public readonly onSet: QuantumKVOpts<T>['onSet'];
|
||||
public readonly onDelete: QuantumKVOpts<T>['onDelete'];
|
||||
public readonly bulkFetcher: QuantumKVOpts<T>['bulkFetcher'];
|
||||
public readonly onChanged: QuantumKVOpts<T>['onChanged'];
|
||||
|
||||
/**
|
||||
* @param internalEventService Service bus to synchronize events.
|
||||
|
|
@ -59,8 +59,8 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
) {
|
||||
this.memoryCache = new MemoryKVCache(opts.lifetime);
|
||||
this.fetcher = opts.fetcher;
|
||||
this.onSet = opts.onSet;
|
||||
this.onDelete = opts.onDelete;
|
||||
this.bulkFetcher = opts.bulkFetcher;
|
||||
this.onChanged = opts.onChanged;
|
||||
|
||||
this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
|
||||
// Ignore our own events, otherwise we'll immediately erase any set value.
|
||||
|
|
@ -122,10 +122,10 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
|
||||
this.memoryCache.set(key, value);
|
||||
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] });
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] });
|
||||
|
||||
if (this.onSet) {
|
||||
await this.onSet(key, this);
|
||||
if (this.onChanged) {
|
||||
await this.onChanged([key], this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -146,12 +146,10 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
}
|
||||
|
||||
if (changedKeys.length > 0) {
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys });
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: changedKeys });
|
||||
|
||||
if (this.onSet) {
|
||||
for (const key of changedKeys) {
|
||||
await this.onSet(key, this);
|
||||
}
|
||||
if (this.onChanged) {
|
||||
await this.onChanged(changedKeys, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -180,12 +178,26 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
|
||||
/**
|
||||
* Gets a value from the local memory cache, or returns undefined if not found.
|
||||
* Returns cached data only - does not make any fetches.
|
||||
*/
|
||||
@bindThis
|
||||
public get(key: string): T | undefined {
|
||||
return this.memoryCache.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets multiple values from the local memory cache; returning undefined for any missing keys.
|
||||
* Returns cached data only - does not make any fetches.
|
||||
*/
|
||||
@bindThis
|
||||
public getMany(keys: Iterable<string>): [key: string, value: T | undefined][] {
|
||||
const results: [key: string, value: T | undefined][] = [];
|
||||
for (const key of keys) {
|
||||
results.push([key, this.get(key)]);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or fetches a value from the cache.
|
||||
* Fires an onSet event, but does not emit an update event to other processes.
|
||||
|
|
@ -197,13 +209,49 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
value = await this.fetcher(key, this);
|
||||
this.memoryCache.set(key, value);
|
||||
|
||||
if (this.onSet) {
|
||||
await this.onSet(key, this);
|
||||
if (this.onChanged) {
|
||||
await this.onChanged([key], this);
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or fetches multiple values from the cache.
|
||||
* Fires onSet events, but does not emit any update events to other processes.
|
||||
*/
|
||||
@bindThis
|
||||
public async fetchMany(keys: Iterable<string>): Promise<[key: string, value: T][]> {
|
||||
const results: [key: string, value: T][] = [];
|
||||
const toFetch: string[] = [];
|
||||
|
||||
// Spliterate into cached results / uncached keys.
|
||||
for (const key of keys) {
|
||||
const fromCache = this.get(key);
|
||||
if (fromCache) {
|
||||
results.push([key, fromCache]);
|
||||
} else {
|
||||
toFetch.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch any uncached keys
|
||||
if (toFetch.length > 0) {
|
||||
const fetched = await this.bulkFetch(toFetch);
|
||||
|
||||
// Add to cache and return set
|
||||
this.addMany(fetched);
|
||||
results.push(...fetched);
|
||||
|
||||
// Emit event
|
||||
if (this.onChanged) {
|
||||
await this.onChanged(toFetch, this);
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true is a key exists in memory.
|
||||
* This applies to the local subset view, not the cross-cluster cache state.
|
||||
|
|
@ -221,10 +269,10 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
public async delete(key: string): Promise<void> {
|
||||
this.memoryCache.delete(key);
|
||||
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] });
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] });
|
||||
|
||||
if (this.onDelete) {
|
||||
await this.onDelete(key, this);
|
||||
if (this.onChanged) {
|
||||
await this.onChanged([key], this);
|
||||
}
|
||||
}
|
||||
/**
|
||||
|
|
@ -233,21 +281,22 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
* Skips if the input is empty.
|
||||
*/
|
||||
@bindThis
|
||||
public async deleteMany(keys: string[]): Promise<void> {
|
||||
if (keys.length === 0) {
|
||||
return;
|
||||
}
|
||||
public async deleteMany(keys: Iterable<string>): Promise<void> {
|
||||
const deleted: string[] = [];
|
||||
|
||||
for (const key of keys) {
|
||||
this.memoryCache.delete(key);
|
||||
deleted.push(key);
|
||||
}
|
||||
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys });
|
||||
if (deleted.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.onDelete) {
|
||||
for (const key of keys) {
|
||||
await this.onDelete(key, this);
|
||||
}
|
||||
await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: deleted });
|
||||
|
||||
if (this.onChanged) {
|
||||
await this.onChanged(deleted, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -262,6 +311,13 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
return value;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async refreshMany(keys: Iterable<string>): Promise<[key: string, value: T][]> {
|
||||
const values = await this.bulkFetch(keys);
|
||||
await this.setMany(values);
|
||||
return values;
|
||||
}
|
||||
|
||||
/**
|
||||
* Erases all entries from the local memory cache.
|
||||
* Does not send any events or update other processes.
|
||||
|
|
@ -291,19 +347,30 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
|||
this.memoryCache.dispose();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async bulkFetch(keys: Iterable<string>): Promise<[key: string, value: T][]> {
|
||||
if (this.bulkFetcher) {
|
||||
const results = await this.bulkFetcher(Array.from(keys), this);
|
||||
return Array.from(results);
|
||||
}
|
||||
|
||||
const results: [key: string, value: T][] = [];
|
||||
for (const key of keys) {
|
||||
const value = await this.fetcher(key, this);
|
||||
results.push([key, value]);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> {
|
||||
if (data.name === this.name) {
|
||||
for (const key of data.keys) {
|
||||
this.memoryCache.delete(key);
|
||||
}
|
||||
|
||||
if (data.op === 's' && this.onSet) {
|
||||
await this.onSet(key, this);
|
||||
}
|
||||
|
||||
if (data.op === 'd' && this.onDelete) {
|
||||
await this.onDelete(key, this);
|
||||
}
|
||||
if (this.onChanged) {
|
||||
await this.onChanged(data.keys, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,8 +5,6 @@
|
|||
|
||||
import * as Redis from 'ioredis';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
import { InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||
|
||||
export class RedisKVCache<T> {
|
||||
private readonly lifetime: number;
|
||||
|
|
@ -120,9 +118,9 @@ export class RedisKVCache<T> {
|
|||
export class RedisSingleCache<T> {
|
||||
private readonly lifetime: number;
|
||||
private readonly memoryCache: MemorySingleCache<T>;
|
||||
private readonly fetcher: () => Promise<T>;
|
||||
private readonly toRedisConverter: (value: T) => string;
|
||||
private readonly fromRedisConverter: (value: string) => T | undefined;
|
||||
public readonly fetcher: () => Promise<T>;
|
||||
public readonly toRedisConverter: (value: T) => string;
|
||||
public readonly fromRedisConverter: (value: string) => T | undefined;
|
||||
|
||||
constructor(
|
||||
private redisClient: Redis.Redis,
|
||||
|
|
@ -245,6 +243,16 @@ export class MemoryKVCache<T> {
|
|||
return cached.value;
|
||||
}
|
||||
|
||||
public has(key: string): boolean {
|
||||
const cached = this.cache.get(key);
|
||||
if (cached == null) return false;
|
||||
if ((Date.now() - cached.date) > this.lifetime) {
|
||||
this.cache.delete(key);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public delete(key: string): void {
|
||||
this.cache.delete(key);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue