pass services into caches through constructor
This commit is contained in:
parent
4116c19e7e
commit
72da199f61
2 changed files with 83 additions and 38 deletions
|
|
@ -3,10 +3,13 @@
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
|
||||||
import { bindThis } from '@/decorators.js';
|
import { bindThis } from '@/decorators.js';
|
||||||
import { InternalEventTypes } from '@/core/GlobalEventService.js';
|
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||||
import { MemoryKVCache } from '@/misc/cache.js';
|
import type { InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||||
|
import { MemoryKVCache, type MemoryCacheServices } from '@/misc/cache.js';
|
||||||
|
import { makeKVPArray, type KVPArray } from '@/misc/kvp-array.js';
|
||||||
|
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||||
|
import { isRetryableSymbol } from '@/misc/is-retryable-error.js';
|
||||||
|
|
||||||
export interface QuantumKVOpts<T> {
|
export interface QuantumKVOpts<T> {
|
||||||
/**
|
/**
|
||||||
|
|
@ -35,6 +38,16 @@ export interface QuantumKVOpts<T> {
|
||||||
* Implementations may be synchronous or async.
|
* Implementations may be synchronous or async.
|
||||||
*/
|
*/
|
||||||
onChanged?: (keys: string[], cache: QuantumKVCache<T>) => void | Promise<void>;
|
onChanged?: (keys: string[], cache: QuantumKVCache<T>) => void | Promise<void>;
|
||||||
|
|
||||||
|
// TODO equality comparer
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface QuantumCacheServices extends MemoryCacheServices {
|
||||||
|
/**
|
||||||
|
* Event bus to attach to.
|
||||||
|
* This can be mocked for easier testing under DI.
|
||||||
|
*/
|
||||||
|
readonly internalEventService: InternalEventService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -42,7 +55,9 @@ export interface QuantumKVOpts<T> {
|
||||||
* All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache.
|
* All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache.
|
||||||
* This ensures that a call to get() will never return stale data.
|
* This ensures that a call to get() will never return stale data.
|
||||||
*/
|
*/
|
||||||
export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
export class QuantumKVCache<T> implements Iterable<readonly [key: string, value: T]> {
|
||||||
|
private readonly internalEventService: InternalEventService;
|
||||||
|
|
||||||
private readonly memoryCache: MemoryKVCache<T>;
|
private readonly memoryCache: MemoryKVCache<T>;
|
||||||
|
|
||||||
public readonly fetcher: QuantumKVOpts<T>['fetcher'];
|
public readonly fetcher: QuantumKVOpts<T>['fetcher'];
|
||||||
|
|
@ -50,20 +65,22 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
|
||||||
public readonly onChanged: QuantumKVOpts<T>['onChanged'];
|
public readonly onChanged: QuantumKVOpts<T>['onChanged'];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param internalEventService Service bus to synchronize events.
|
|
||||||
* @param name Unique name of the cache - must be the same in all processes.
|
* @param name Unique name of the cache - must be the same in all processes.
|
||||||
|
* @param services DI services - internalEventService is required
|
||||||
* @param opts Cache options
|
* @param opts Cache options
|
||||||
*/
|
*/
|
||||||
constructor(
|
constructor(
|
||||||
private readonly internalEventService: InternalEventService,
|
// TODO validate to make sure this is unique
|
||||||
private readonly name: string,
|
public readonly name: string,
|
||||||
|
services: QuantumCacheServices,
|
||||||
opts: QuantumKVOpts<T>,
|
opts: QuantumKVOpts<T>,
|
||||||
) {
|
) {
|
||||||
this.memoryCache = new MemoryKVCache(opts.lifetime);
|
this.memoryCache = new MemoryKVCache(opts.lifetime, services);
|
||||||
this.fetcher = opts.fetcher;
|
this.fetcher = opts.fetcher;
|
||||||
this.bulkFetcher = opts.bulkFetcher;
|
this.bulkFetcher = opts.bulkFetcher;
|
||||||
this.onChanged = opts.onChanged;
|
this.onChanged = opts.onChanged;
|
||||||
|
|
||||||
|
this.internalEventService = services.internalEventService;
|
||||||
this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
|
this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
|
||||||
// Ignore our own events, otherwise we'll immediately erase any set value.
|
// Ignore our own events, otherwise we'll immediately erase any set value.
|
||||||
ignoreLocal: true,
|
ignoreLocal: true,
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,25 @@
|
||||||
|
|
||||||
import * as Redis from 'ioredis';
|
import * as Redis from 'ioredis';
|
||||||
import { bindThis } from '@/decorators.js';
|
import { bindThis } from '@/decorators.js';
|
||||||
|
import { TimeService, NativeTimeService } from '@/core/TimeService.js';
|
||||||
|
|
||||||
|
// This matches the default DI implementation, but as a shared instance to avoid wasting memory.
|
||||||
|
const defaultTimeService: TimeService = new NativeTimeService();
|
||||||
|
|
||||||
|
export interface RedisCacheServices extends MemoryCacheServices {
|
||||||
|
readonly redisClient: Redis.Redis
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface RedisKVCacheOpts<T> {
|
||||||
|
lifetime: number;
|
||||||
|
memoryCacheLifetime: number;
|
||||||
|
fetcher?: RedisKVCache<T>['fetcher'];
|
||||||
|
toRedisConverter?: RedisKVCache<T>['toRedisConverter'];
|
||||||
|
fromRedisConverter?: RedisKVCache<T>['fromRedisConverter'];
|
||||||
|
}
|
||||||
|
|
||||||
export class RedisKVCache<T> {
|
export class RedisKVCache<T> {
|
||||||
|
private readonly redisClient: Redis.Redis;
|
||||||
private readonly lifetime: number;
|
private readonly lifetime: number;
|
||||||
private readonly memoryCache: MemoryKVCache<T>;
|
private readonly memoryCache: MemoryKVCache<T>;
|
||||||
public readonly fetcher: (key: string) => Promise<T>;
|
public readonly fetcher: (key: string) => Promise<T>;
|
||||||
|
|
@ -14,18 +31,13 @@ export class RedisKVCache<T> {
|
||||||
public readonly fromRedisConverter: (value: string) => T | undefined;
|
public readonly fromRedisConverter: (value: string) => T | undefined;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private redisClient: Redis.Redis,
|
public name: string,
|
||||||
private name: string,
|
services: RedisCacheServices,
|
||||||
opts: {
|
opts: RedisKVCacheOpts<T>,
|
||||||
lifetime: RedisKVCache<T>['lifetime'];
|
|
||||||
memoryCacheLifetime: number;
|
|
||||||
fetcher?: RedisKVCache<T>['fetcher'];
|
|
||||||
toRedisConverter?: RedisKVCache<T>['toRedisConverter'];
|
|
||||||
fromRedisConverter?: RedisKVCache<T>['fromRedisConverter'];
|
|
||||||
},
|
|
||||||
) {
|
) {
|
||||||
|
this.redisClient = services.redisClient;
|
||||||
this.lifetime = opts.lifetime;
|
this.lifetime = opts.lifetime;
|
||||||
this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime);
|
this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime, services);
|
||||||
this.fetcher = opts.fetcher ?? (() => { throw new Error('fetch not supported - use get/set directly'); });
|
this.fetcher = opts.fetcher ?? (() => { throw new Error('fetch not supported - use get/set directly'); });
|
||||||
this.toRedisConverter = opts.toRedisConverter ?? ((value) => JSON.stringify(value));
|
this.toRedisConverter = opts.toRedisConverter ?? ((value) => JSON.stringify(value));
|
||||||
this.fromRedisConverter = opts.fromRedisConverter ?? ((value) => JSON.parse(value));
|
this.fromRedisConverter = opts.fromRedisConverter ?? ((value) => JSON.parse(value));
|
||||||
|
|
@ -115,7 +127,16 @@ export class RedisKVCache<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface RedisSingleCacheOpts<T> {
|
||||||
|
lifetime: number;
|
||||||
|
memoryCacheLifetime: number;
|
||||||
|
fetcher?: RedisSingleCache<T>['fetcher'];
|
||||||
|
toRedisConverter?: RedisSingleCache<T>['toRedisConverter'];
|
||||||
|
fromRedisConverter?: RedisSingleCache<T>['fromRedisConverter'];
|
||||||
|
}
|
||||||
|
|
||||||
export class RedisSingleCache<T> {
|
export class RedisSingleCache<T> {
|
||||||
|
private readonly redisClient: Redis.Redis;
|
||||||
private readonly lifetime: number;
|
private readonly lifetime: number;
|
||||||
private readonly memoryCache: MemorySingleCache<T>;
|
private readonly memoryCache: MemorySingleCache<T>;
|
||||||
public readonly fetcher: () => Promise<T>;
|
public readonly fetcher: () => Promise<T>;
|
||||||
|
|
@ -123,18 +144,13 @@ export class RedisSingleCache<T> {
|
||||||
public readonly fromRedisConverter: (value: string) => T | undefined;
|
public readonly fromRedisConverter: (value: string) => T | undefined;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private redisClient: Redis.Redis,
|
public name: string,
|
||||||
private name: string,
|
services: RedisCacheServices,
|
||||||
opts: {
|
opts: RedisSingleCacheOpts<T>,
|
||||||
lifetime: number;
|
|
||||||
memoryCacheLifetime: number;
|
|
||||||
fetcher?: RedisSingleCache<T>['fetcher'];
|
|
||||||
toRedisConverter?: RedisSingleCache<T>['toRedisConverter'];
|
|
||||||
fromRedisConverter?: RedisSingleCache<T>['fromRedisConverter'];
|
|
||||||
},
|
|
||||||
) {
|
) {
|
||||||
|
this.redisClient = services.redisClient;
|
||||||
this.lifetime = opts.lifetime;
|
this.lifetime = opts.lifetime;
|
||||||
this.memoryCache = new MemorySingleCache(opts.memoryCacheLifetime);
|
this.memoryCache = new MemorySingleCache(opts.memoryCacheLifetime, services);
|
||||||
|
|
||||||
this.fetcher = opts.fetcher ?? (() => { throw new Error('fetch not supported - use get/set directly'); });
|
this.fetcher = opts.fetcher ?? (() => { throw new Error('fetch not supported - use get/set directly'); });
|
||||||
this.toRedisConverter = opts.toRedisConverter ?? ((value) => JSON.stringify(value));
|
this.toRedisConverter = opts.toRedisConverter ?? ((value) => JSON.stringify(value));
|
||||||
|
|
@ -219,17 +235,25 @@ export class RedisSingleCache<T> {
|
||||||
this.clear();
|
this.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface MemoryCacheServices {
|
||||||
|
readonly timeService?: TimeService;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする?
|
// TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする?
|
||||||
|
|
||||||
export class MemoryKVCache<T> {
|
export class MemoryKVCache<T> {
|
||||||
private readonly cache = new Map<string, { date: number; value: T; }>();
|
private readonly cache = new Map<string, { date: number; value: T; }>();
|
||||||
private readonly gcIntervalHandle = setInterval(() => this.gc(), 1000 * 60 * 3); // 3m
|
private readonly gcIntervalHandle: symbol;
|
||||||
|
private readonly timeService: TimeService;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly lifetime: number,
|
private readonly lifetime: number,
|
||||||
) {}
|
services?: MemoryCacheServices,
|
||||||
|
) {
|
||||||
|
this.timeService = services?.timeService ?? defaultTimeService;
|
||||||
|
this.gcIntervalHandle = this.timeService.startTimer(() => this.gc(), 1000 * 60 * 3, { repeated: true }); // 3m
|
||||||
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
/**
|
/**
|
||||||
|
|
@ -238,7 +262,7 @@ export class MemoryKVCache<T> {
|
||||||
*/
|
*/
|
||||||
public set(key: string, value: T): void {
|
public set(key: string, value: T): void {
|
||||||
this.cache.set(key, {
|
this.cache.set(key, {
|
||||||
date: Date.now(),
|
date: this.timeService.now,
|
||||||
value,
|
value,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -247,7 +271,7 @@ export class MemoryKVCache<T> {
|
||||||
public get(key: string): T | undefined {
|
public get(key: string): T | undefined {
|
||||||
const cached = this.cache.get(key);
|
const cached = this.cache.get(key);
|
||||||
if (cached == null) return undefined;
|
if (cached == null) return undefined;
|
||||||
if ((Date.now() - cached.date) > this.lifetime) {
|
if ((this.timeService.now - cached.date) > this.lifetime) {
|
||||||
this.cache.delete(key);
|
this.cache.delete(key);
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
@ -257,7 +281,7 @@ export class MemoryKVCache<T> {
|
||||||
public has(key: string): boolean {
|
public has(key: string): boolean {
|
||||||
const cached = this.cache.get(key);
|
const cached = this.cache.get(key);
|
||||||
if (cached == null) return false;
|
if (cached == null) return false;
|
||||||
if ((Date.now() - cached.date) > this.lifetime) {
|
if ((this.timeService.now - cached.date) > this.lifetime) {
|
||||||
this.cache.delete(key);
|
this.cache.delete(key);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
@ -323,7 +347,7 @@ export class MemoryKVCache<T> {
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public gc(): void {
|
public gc(): void {
|
||||||
const now = Date.now();
|
const now = this.timeService.now;
|
||||||
|
|
||||||
for (const [key, { date }] of this.cache.entries()) {
|
for (const [key, { date }] of this.cache.entries()) {
|
||||||
// The map is ordered from oldest to youngest.
|
// The map is ordered from oldest to youngest.
|
||||||
|
|
@ -346,7 +370,7 @@ export class MemoryKVCache<T> {
|
||||||
@bindThis
|
@bindThis
|
||||||
public dispose(): void {
|
public dispose(): void {
|
||||||
this.clear();
|
this.clear();
|
||||||
clearInterval(this.gcIntervalHandle);
|
this.timeService.stopTimer(this.gcIntervalHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
public get size() {
|
public get size() {
|
||||||
|
|
@ -359,23 +383,27 @@ export class MemoryKVCache<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
export class MemorySingleCache<T> {
|
export class MemorySingleCache<T> {
|
||||||
|
private readonly timeService: TimeService;
|
||||||
private cachedAt: number | null = null;
|
private cachedAt: number | null = null;
|
||||||
private value: T | undefined;
|
private value: T | undefined;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private lifetime: number,
|
private lifetime: number,
|
||||||
) {}
|
services?: MemoryCacheServices,
|
||||||
|
) {
|
||||||
|
this.timeService = services?.timeService ?? defaultTimeService;
|
||||||
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public set(value: T): void {
|
public set(value: T): void {
|
||||||
this.cachedAt = Date.now();
|
this.cachedAt = this.timeService.now;
|
||||||
this.value = value;
|
this.value = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public get(): T | undefined {
|
public get(): T | undefined {
|
||||||
if (this.cachedAt == null) return undefined;
|
if (this.cachedAt == null) return undefined;
|
||||||
if ((Date.now() - this.cachedAt) > this.lifetime) {
|
if ((this.timeService.now - this.cachedAt) > this.lifetime) {
|
||||||
this.value = undefined;
|
this.value = undefined;
|
||||||
this.cachedAt = null;
|
this.cachedAt = null;
|
||||||
return undefined;
|
return undefined;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue