/* * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ import * as Redis from 'ioredis'; import { bindThis } from '@/decorators.js'; import { TimeService } from '@/core/TimeService.js'; export interface RedisCacheServices extends MemoryCacheServices { readonly redisClient: Redis.Redis } export interface RedisKVCacheOpts { lifetime: number; memoryCacheLifetime: number; fetcher?: RedisKVCache['fetcher']; toRedisConverter?: RedisKVCache['toRedisConverter']; fromRedisConverter?: RedisKVCache['fromRedisConverter']; } export class RedisKVCache { private readonly redisClient: Redis.Redis; private readonly lifetime: number; private readonly memoryCache: MemoryKVCache; public readonly fetcher: (key: string) => Promise; public readonly toRedisConverter: (value: T) => string; public readonly fromRedisConverter: (value: string) => T | undefined; constructor( public name: string, services: RedisCacheServices, opts: RedisKVCacheOpts, ) { this.redisClient = services.redisClient; this.lifetime = opts.lifetime; // OK: we forward all management calls to the inner cache. // eslint-disable-next-line no-restricted-syntax this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime, services); this.fetcher = opts.fetcher ?? (() => { throw new Error('fetch not supported - use get/set directly'); }); this.toRedisConverter = opts.toRedisConverter ?? ((value) => JSON.stringify(value)); this.fromRedisConverter = opts.fromRedisConverter ?? ((value) => JSON.parse(value)); } @bindThis public async set(key: string, value: T): Promise { this.memoryCache.set(key, value); if (this.lifetime === Infinity) { await this.redisClient.set( `kvcache:${this.name}:${key}`, this.toRedisConverter(value), ); } else if (this.lifetime > 0) { await this.redisClient.set( `kvcache:${this.name}:${key}`, this.toRedisConverter(value), 'EX', Math.round(this.lifetime / 1000), ); } } @bindThis public async get(key: string): Promise { const memoryCached = this.memoryCache.get(key); if (memoryCached !== undefined) return memoryCached; const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`); if (cached == null) return undefined; const value = this.fromRedisConverter(cached); if (value !== undefined) { this.memoryCache.set(key, value); } return value; } @bindThis public async delete(key: string): Promise { this.memoryCache.delete(key); await this.redisClient.del(`kvcache:${this.name}:${key}`); } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons: * * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster. * * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value. * * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses. */ @bindThis public async fetch(key: string): Promise { const cachedValue = await this.get(key); if (cachedValue !== undefined) { // Cache HIT return cachedValue; } // Cache MISS const value = await this.fetcher(key); await this.set(key, value); return value; } @bindThis public async refresh(key: string) { const value = await this.fetcher(key); await this.set(key, value); // TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする } @bindThis public clear() { this.memoryCache.clear(); } @bindThis public gc() { this.memoryCache.gc(); } @bindThis public dispose() { this.memoryCache.dispose(); } } export interface RedisSingleCacheOpts { lifetime: number; memoryCacheLifetime: number; fetcher?: RedisSingleCache['fetcher']; toRedisConverter?: RedisSingleCache['toRedisConverter']; fromRedisConverter?: RedisSingleCache['fromRedisConverter']; } export class RedisSingleCache { private readonly redisClient: Redis.Redis; private readonly lifetime: number; private readonly memoryCache: MemorySingleCache; public readonly fetcher: () => Promise; public readonly toRedisConverter: (value: T) => string; public readonly fromRedisConverter: (value: string) => T | undefined; constructor( public name: string, services: RedisCacheServices, opts: RedisSingleCacheOpts, ) { this.redisClient = services.redisClient; this.lifetime = opts.lifetime; // OK: we forward all management calls to the inner cache. // eslint-disable-next-line no-restricted-syntax this.memoryCache = new MemorySingleCache(opts.memoryCacheLifetime, services); this.fetcher = opts.fetcher ?? (() => { throw new Error('fetch not supported - use get/set directly'); }); this.toRedisConverter = opts.toRedisConverter ?? ((value) => JSON.stringify(value)); this.fromRedisConverter = opts.fromRedisConverter ?? ((value) => JSON.parse(value)); } @bindThis public async set(value: T): Promise { this.memoryCache.set(value); if (this.lifetime === Infinity) { await this.redisClient.set( `singlecache:${this.name}`, this.toRedisConverter(value), ); } else if (this.lifetime > 0) { await this.redisClient.set( `singlecache:${this.name}`, this.toRedisConverter(value), 'EX', Math.round(this.lifetime / 1000), ); } } @bindThis public async get(): Promise { const memoryCached = this.memoryCache.get(); if (memoryCached !== undefined) return memoryCached; const cached = await this.redisClient.get(`singlecache:${this.name}`); if (cached == null) return undefined; const value = this.fromRedisConverter(cached); if (value !== undefined) { this.memoryCache.set(value); } return value; } @bindThis public gc(): void { this.memoryCache.gc(); } @bindThis public async delete(): Promise { this.memoryCache.delete(); await this.redisClient.del(`singlecache:${this.name}`); } @bindThis public clear(): void { this.memoryCache.clear(); } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons: * * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster. * * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value. * * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses. */ @bindThis public async fetch(): Promise { const cachedValue = await this.get(); if (cachedValue !== undefined) { // Cache HIT return cachedValue; } // Cache MISS const value = await this.fetcher(); await this.set(value); return value; } @bindThis public async refresh() { const value = await this.fetcher(); await this.set(value); // TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする } @bindThis public dispose(): void { this.clear(); this.memoryCache.dispose(); } } export interface MemoryCacheServices { readonly timeService: TimeService; } // TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする? export class MemoryKVCache { private readonly cache = new Map(); private readonly timeService: TimeService; constructor( private readonly lifetime: number, services: MemoryCacheServices, ) { this.timeService = services.timeService; } @bindThis /** * Mapにキャッシュをセットします * @deprecated これを直接呼び出すべきではない。InternalEventなどで変更を全てのプロセス/マシンに通知するべき */ public set(key: string, value: T): void { this.cache.set(key, { date: this.timeService.now, value, }); } @bindThis public get(key: string): T | undefined { const cached = this.cache.get(key); if (cached == null) return undefined; if ((this.timeService.now - cached.date) > this.lifetime) { this.cache.delete(key); return undefined; } return cached.value; } public has(key: string): boolean { const cached = this.cache.get(key); if (cached == null) return false; if ((this.timeService.now - cached.date) > this.lifetime) { this.cache.delete(key); return false; } return true; } @bindThis public delete(key: string): void { this.cache.delete(key); } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetch(key: string, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(key); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); this.set(key, value); return value; } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetchMaybe(key: string, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(key); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); if (value !== undefined) { this.set(key, value); } return value; } @bindThis public gc(): void { const now = this.timeService.now; for (const [key, { date }] of this.cache.entries()) { // The map is ordered from oldest to youngest. // We can stop once we find an entry that's still active, because all following entries must *also* be active. const age = now - date; if (age < this.lifetime) break; this.cache.delete(key); } } /** * Removes all entries from the cache, but does not dispose it. */ @bindThis public clear(): void { this.cache.clear(); } @bindThis public dispose(): void { this.clear(); } public get size() { return this.cache.size; } public get entries() { return this.cache.entries(); } } export class MemorySingleCache { private readonly timeService: TimeService; private cachedAt: number | null = null; private value: T | undefined; constructor( private lifetime: number, services: MemoryCacheServices, ) { this.timeService = services.timeService; } @bindThis public set(value: T): void { this.cachedAt = this.timeService.now; this.value = value; } @bindThis public gc(): void { // Check if we have a valid, non-expired value. // This is a little convoluted but protects against edge cases and invalid states. if (this.value !== undefined && this.cachedAt != null) { const age = this.timeService.now - this.cachedAt; if (Number.isSafeInteger(age) && age <= this.lifetime) { return; } } // If we get here, then it's expired or otherwise invalid. // Whatever the case, we should clear everything back to zeros. this.delete(); } @bindThis public get(): T | undefined { this.gc(); return this.value; } @bindThis public delete() { this.value = undefined; this.cachedAt = null; } @bindThis public clear() { this.delete(); } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetch(fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); this.set(value); return value; } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetchMaybe(fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); if (value !== undefined) { this.set(value); } return value; } @bindThis public dispose() { this.clear(); } }