From 00e7473f1c416d751955a83e6abfa1e36aaf7671 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Mon, 15 Sep 2025 18:02:45 -0400 Subject: [PATCH] remove localUserByNativeTokenCache, add userByAcctCache, and convert userByIdCache/nativeTokenCache/uriPersonCache to quantum caches --- packages/backend/src/core/CacheService.ts | 130 ++++++++++++------ .../activitypub/models/ApPersonService.ts | 6 +- .../DeleteAccountProcessorService.ts | 13 +- .../src/server/api/AuthenticateService.ts | 4 +- .../api/endpoints/users/report-abuse.ts | 5 +- 5 files changed, 103 insertions(+), 55 deletions(-) diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index adb9b17061..e3f92b3a68 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -14,6 +14,7 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import type { InternalEventTypes } from '@/core/GlobalEventService.js'; import { InternalEventService } from '@/global/InternalEventService.js'; +import * as Acct from '@/misc/acct.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; import { TimeService } from '@/global/TimeService.js'; import { @@ -32,10 +33,10 @@ export interface FollowStats { @Injectable() export class CacheService implements OnApplicationShutdown { - public readonly userByIdCache: ManagedMemoryKVCache; - public readonly localUserByNativeTokenCache: ManagedMemoryKVCache; - public readonly localUserByIdCache: ManagedMemoryKVCache; - public readonly uriPersonCache: ManagedMemoryKVCache; + public readonly userByIdCache: ManagedQuantumKVCache; + public readonly nativeTokenCache: ManagedQuantumKVCache; // Token -> UserId + public readonly uriPersonCache: ManagedQuantumKVCache; // URI -> UserId + public readonly userByAcctCache: ManagedQuantumKVCache; // Acct -> UserId public readonly userProfileCache: ManagedQuantumKVCache; public readonly userMutingsCache: ManagedQuantumKVCache>; public readonly userMutedCache: ManagedQuantumKVCache>; @@ -109,10 +110,72 @@ export class CacheService implements OnApplicationShutdown { ) { //this.onMessage = this.onMessage.bind(this); - this.userByIdCache = this.cacheManagementService.createMemoryKVCache('userById', 1000 * 60 * 5); // 5m - this.localUserByNativeTokenCache = this.cacheManagementService.createMemoryKVCache('localUserByNativeToken', 1000 * 60 * 5); // 5m - this.localUserByIdCache = this.cacheManagementService.createMemoryKVCache('localUserById', 1000 * 60 * 5); // 5m - this.uriPersonCache = this.cacheManagementService.createMemoryKVCache('uriPerson', 1000 * 60 * 5); // 5m + this.userByIdCache = this.cacheManagementService.createQuantumKVCache('userById', { + lifetime: 1000 * 60 * 5, // 5m + fetcher: async (userId) => await this.usersRepository.findOneByOrFail({ id: userId }), + bulkFetcher: async (userIds) => await this.usersRepository.findBy({ id: In(userIds) }).then(us => us.map(u => [u.id, u])), + }); + + this.nativeTokenCache = this.cacheManagementService.createQuantumKVCache('localUserByNativeToken', { + lifetime: 1000 * 60 * 5, // 5m + fetcher: async (token) => { + const user = await this.usersRepository + .createQueryBuilder('user') + .select('user.id') + .where({ token }) + .getOne() as { id: string } | null; + return user?.id ?? null; + }, + bulkFetcher: async (tokens) => { + const users = await this.usersRepository + .createQueryBuilder('user') + .select('user.id') + .addSelect('user.token') + .where({ token: In(tokens) }) + .getMany() as { id: string, token: string }[]; + const userMap = new Map(users.map(u => [u.token, u.id])); + return tokens.map(token => [token, userMap.get(token) ?? null]); + }, + }); + + this.uriPersonCache = this.cacheManagementService.createQuantumKVCache('uriPerson', { + lifetime: 1000 * 60 * 5, // 5m + fetcher: async (uri) => { + const user = await this.usersRepository + .createQueryBuilder('user') + .select('user.id') + .where({ uri }) + .getOne() as { id: string } | null; + return user?.id ?? null; + }, + bulkFetcher: async (uris) => { + const users = await this.usersRepository + .createQueryBuilder('user') + .select('user.id') + .addSelect('user.uri') + .where({ uri: In(uris) }) + .getMany() as { id: string, uri: string }[]; + const userMap = new Map(users.map(u => [u.uri, u.id])); + return uris.map(uri => [uri, userMap.get(uri) ?? null]); + }, + }); + + this.userByAcctCache = this.cacheManagementService.createQuantumKVCache('userByAcct', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: async (acct) => { + const parsed = Acct.parse(acct); + const { id } = await this.usersRepository + .createQueryBuilder('user') + .select('user.id') + .where({ + usernameLower: parsed.username.toLowerCase(), + host: parsed.host ?? IsNull(), + }) + .getOneOrFail(); + return id; + }, + // No bulk fetcher for this + }); this.userProfileCache = this.cacheManagementService.createQuantumKVCache('userProfile', { lifetime: 1000 * 60 * 30, // 30m @@ -339,9 +402,6 @@ export class CacheService implements OnApplicationShutdown { for (const uid of userIds) { const toAdd: MiUser[] = []; - const localUserById = this.localUserByIdCache.get(uid); - if (localUserById) toAdd.push(localUserById); - const userById = this.userByIdCache.get(uid); if (userById) toAdd.push(userById); @@ -414,6 +474,7 @@ export class CacheService implements OnApplicationShutdown { .toArray(); await Promise.all([ + this.userByIdCache.deleteMany(ids), this.userProfileCache.deleteMany(ids), this.userMutingsCache.deleteMany(ids), this.userMutedCache.deleteMany(ids), @@ -471,13 +532,17 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - private async onTokenEvent(body: InternalEventTypes[E]): Promise { + private async onTokenEvent(body: InternalEventTypes[E], _: E, isLocal: boolean): Promise { { { { - const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as MiLocalUser; - this.localUserByNativeTokenCache.delete(body.oldToken); - this.localUserByNativeTokenCache.set(body.newToken, user); + // Local instance is responsible for expanding these events into the appropriate Quantum events + if (!isLocal) return; + + await Promise.all([ + this.nativeTokenCache.delete(body.oldToken), + this.nativeTokenCache.set(body.newToken, body.id), + ]); } } } @@ -538,8 +603,14 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - public findUserById(userId: MiUser['id']) { - return this.userByIdCache.fetch(userId, () => this.usersRepository.findOneByOrFail({ id: userId })); + public async findUserById(userId: MiUser['id']): Promise { + return await this.userByIdCache.fetch(userId); + } + + @bindThis + public async findUserByAcct(acct: string): Promise { + const id = await this.userByAcctCache.fetch(acct); + return await this.findUserById(id); } @bindThis @@ -617,30 +688,7 @@ export class CacheService implements OnApplicationShutdown { @bindThis public async getUsers(userIds: Iterable): Promise> { - const users = new Map; - - const toFetch: string[] = []; - for (const userId of userIds) { - const fromCache = this.userByIdCache.get(userId); - if (fromCache) { - users.set(userId, fromCache); - } else { - toFetch.push(userId); - } - } - - if (toFetch.length > 0) { - const fetched = await this.usersRepository.findBy({ - id: In(toFetch), - }); - - for (const user of fetched) { - users.set(user.id, user); - this.userByIdCache.set(user.id, user); - } - } - - return users; + return new Map(await this.userByIdCache.fetchMany(userIds)); } @bindThis diff --git a/packages/backend/src/core/activitypub/models/ApPersonService.ts b/packages/backend/src/core/activitypub/models/ApPersonService.ts index d53b04d87b..e78f3c931e 100644 --- a/packages/backend/src/core/activitypub/models/ApPersonService.ts +++ b/packages/backend/src/core/activitypub/models/ApPersonService.ts @@ -508,7 +508,7 @@ export class ApPersonService implements OnModuleInit { if (user == null) throw new Error(`failed to create user - user is null: ${uri}`); // Register to the cache - this.cacheService.uriPersonCache.set(user.uri, user); + await this.cacheService.uriPersonCache.set(user.uri, user.id); // Register public key to the cache. if (publicKey) { @@ -541,7 +541,7 @@ export class ApPersonService implements OnModuleInit { user = { ...user, ...updates }; // Register to the cache - this.cacheService.uriPersonCache.set(user.uri, user); + await this.cacheService.uriPersonCache.set(user.uri, user.id); } catch (err) { // Permanent error implies hidden or inaccessible, which is a normal thing. if (isRetryableError(err)) { @@ -772,8 +772,6 @@ export class ApPersonService implements OnModuleInit { const updated = { ...exist, ...updates }; - this.cacheService.uriPersonCache.set(uri, updated); - // 移行処理を行う if (updated.movedAt && ( // 初めて移行する場合はmovedAtがnullなので移行処理を許可 diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 5bf64e4f04..cbc843e208 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -19,7 +19,8 @@ import { ApLogService } from '@/core/ApLogService.js'; import { ReactionService } from '@/core/ReactionService.js'; import { QueueService } from '@/core/QueueService.js'; import { CacheService } from '@/core/CacheService.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; +import { QueueLoggerService } from '@/queue/QueueLoggerService.js'; +import * as Acct from '@/misc/acct.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; @@ -152,10 +153,14 @@ export class DeleteAccountProcessorService { await this.cacheService.hibernatedUserCache.delete(user.id); await this.cacheService.renoteMutingsCache.delete(user.id); await this.cacheService.userProfileCache.delete(user.id); - this.cacheService.userByIdCache.delete(user.id); - this.cacheService.localUserByIdCache.delete(user.id); + await this.cacheService.userByIdCache.delete(user.id); + await this.cacheService.userByAcctCache.delete(Acct.toString({ username: user.usernameLower, host: user.host })); + await this.cacheService.userFollowStatsCache.delete(user.id); if (user.token) { - this.cacheService.localUserByNativeTokenCache.delete(user.token); + await this.cacheService.nativeTokenCache.delete(user.token); + } + if (user.uri) { + await this.cacheService.uriPersonCache.delete(user.uri); } await this.followingsRepository.delete({ diff --git a/packages/backend/src/server/api/AuthenticateService.ts b/packages/backend/src/server/api/AuthenticateService.ts index e3bddbe0e8..ab8bf1522e 100644 --- a/packages/backend/src/server/api/AuthenticateService.ts +++ b/packages/backend/src/server/api/AuthenticateService.ts @@ -53,8 +53,8 @@ export class AuthenticateService { } if (isNativeUserToken(token)) { - const user = await this.cacheService.localUserByNativeTokenCache.fetch(token, - () => this.usersRepository.findOneBy({ token }) as Promise); + const userId = await this.cacheService.nativeTokenCache.fetch(token); + const user = userId ? await this.cacheService.findLocalUserById(userId) : null; if (user == null) { throw new AuthenticationError('user not found'); diff --git a/packages/backend/src/server/api/endpoints/users/report-abuse.ts b/packages/backend/src/server/api/endpoints/users/report-abuse.ts index fc2b57c4a5..a60d624726 100644 --- a/packages/backend/src/server/api/endpoints/users/report-abuse.ts +++ b/packages/backend/src/server/api/endpoints/users/report-abuse.ts @@ -65,10 +65,7 @@ export default class extends Endpoint { // eslint- ) { super(meta, paramDef, async (ps, me) => { // Lookup user - const targetUser = await this.cacheService.findOptionalUserById(ps.userId); - if (!targetUser) { - throw new ApiError(meta.errors.noSuchUser); - } + const targetUser = await this.cacheService.findUserById(ps.userId); if (targetUser.id === me.id) { throw new ApiError(meta.errors.cannotReportYourself);