remove localUserByNativeTokenCache, add userByAcctCache, and convert userByIdCache/nativeTokenCache/uriPersonCache to quantum caches
This commit is contained in:
parent
6f23ecd71b
commit
00e7473f1c
5 changed files with 103 additions and 55 deletions
|
|
@ -14,6 +14,7 @@ import { DI } from '@/di-symbols.js';
|
|||
import { bindThis } from '@/decorators.js';
|
||||
import type { InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||
import { InternalEventService } from '@/global/InternalEventService.js';
|
||||
import * as Acct from '@/misc/acct.js';
|
||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||
import { TimeService } from '@/global/TimeService.js';
|
||||
import {
|
||||
|
|
@ -32,10 +33,10 @@ export interface FollowStats {
|
|||
|
||||
@Injectable()
|
||||
export class CacheService implements OnApplicationShutdown {
|
||||
public readonly userByIdCache: ManagedMemoryKVCache<MiUser>;
|
||||
public readonly localUserByNativeTokenCache: ManagedMemoryKVCache<MiLocalUser | null>;
|
||||
public readonly localUserByIdCache: ManagedMemoryKVCache<MiLocalUser>;
|
||||
public readonly uriPersonCache: ManagedMemoryKVCache<MiUser | null>;
|
||||
public readonly userByIdCache: ManagedQuantumKVCache<MiUser>;
|
||||
public readonly nativeTokenCache: ManagedQuantumKVCache<string | null>; // Token -> UserId
|
||||
public readonly uriPersonCache: ManagedQuantumKVCache<string | null>; // URI -> UserId
|
||||
public readonly userByAcctCache: ManagedQuantumKVCache<string>; // Acct -> UserId
|
||||
public readonly userProfileCache: ManagedQuantumKVCache<MiUserProfile>;
|
||||
public readonly userMutingsCache: ManagedQuantumKVCache<Set<string>>;
|
||||
public readonly userMutedCache: ManagedQuantumKVCache<Set<string>>;
|
||||
|
|
@ -109,10 +110,72 @@ export class CacheService implements OnApplicationShutdown {
|
|||
) {
|
||||
//this.onMessage = this.onMessage.bind(this);
|
||||
|
||||
this.userByIdCache = this.cacheManagementService.createMemoryKVCache<MiUser>('userById', 1000 * 60 * 5); // 5m
|
||||
this.localUserByNativeTokenCache = this.cacheManagementService.createMemoryKVCache<MiLocalUser | null>('localUserByNativeToken', 1000 * 60 * 5); // 5m
|
||||
this.localUserByIdCache = this.cacheManagementService.createMemoryKVCache<MiLocalUser>('localUserById', 1000 * 60 * 5); // 5m
|
||||
this.uriPersonCache = this.cacheManagementService.createMemoryKVCache<MiUser | null>('uriPerson', 1000 * 60 * 5); // 5m
|
||||
this.userByIdCache = this.cacheManagementService.createQuantumKVCache('userById', {
|
||||
lifetime: 1000 * 60 * 5, // 5m
|
||||
fetcher: async (userId) => await this.usersRepository.findOneByOrFail({ id: userId }),
|
||||
bulkFetcher: async (userIds) => await this.usersRepository.findBy({ id: In(userIds) }).then(us => us.map(u => [u.id, u])),
|
||||
});
|
||||
|
||||
this.nativeTokenCache = this.cacheManagementService.createQuantumKVCache('localUserByNativeToken', {
|
||||
lifetime: 1000 * 60 * 5, // 5m
|
||||
fetcher: async (token) => {
|
||||
const user = await this.usersRepository
|
||||
.createQueryBuilder('user')
|
||||
.select('user.id')
|
||||
.where({ token })
|
||||
.getOne() as { id: string } | null;
|
||||
return user?.id ?? null;
|
||||
},
|
||||
bulkFetcher: async (tokens) => {
|
||||
const users = await this.usersRepository
|
||||
.createQueryBuilder('user')
|
||||
.select('user.id')
|
||||
.addSelect('user.token')
|
||||
.where({ token: In(tokens) })
|
||||
.getMany() as { id: string, token: string }[];
|
||||
const userMap = new Map(users.map(u => [u.token, u.id]));
|
||||
return tokens.map(token => [token, userMap.get(token) ?? null]);
|
||||
},
|
||||
});
|
||||
|
||||
this.uriPersonCache = this.cacheManagementService.createQuantumKVCache('uriPerson', {
|
||||
lifetime: 1000 * 60 * 5, // 5m
|
||||
fetcher: async (uri) => {
|
||||
const user = await this.usersRepository
|
||||
.createQueryBuilder('user')
|
||||
.select('user.id')
|
||||
.where({ uri })
|
||||
.getOne() as { id: string } | null;
|
||||
return user?.id ?? null;
|
||||
},
|
||||
bulkFetcher: async (uris) => {
|
||||
const users = await this.usersRepository
|
||||
.createQueryBuilder('user')
|
||||
.select('user.id')
|
||||
.addSelect('user.uri')
|
||||
.where({ uri: In(uris) })
|
||||
.getMany() as { id: string, uri: string }[];
|
||||
const userMap = new Map(users.map(u => [u.uri, u.id]));
|
||||
return uris.map(uri => [uri, userMap.get(uri) ?? null]);
|
||||
},
|
||||
});
|
||||
|
||||
this.userByAcctCache = this.cacheManagementService.createQuantumKVCache('userByAcct', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
fetcher: async (acct) => {
|
||||
const parsed = Acct.parse(acct);
|
||||
const { id } = await this.usersRepository
|
||||
.createQueryBuilder('user')
|
||||
.select('user.id')
|
||||
.where({
|
||||
usernameLower: parsed.username.toLowerCase(),
|
||||
host: parsed.host ?? IsNull(),
|
||||
})
|
||||
.getOneOrFail();
|
||||
return id;
|
||||
},
|
||||
// No bulk fetcher for this
|
||||
});
|
||||
|
||||
this.userProfileCache = this.cacheManagementService.createQuantumKVCache('userProfile', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
|
|
@ -339,9 +402,6 @@ export class CacheService implements OnApplicationShutdown {
|
|||
for (const uid of userIds) {
|
||||
const toAdd: MiUser[] = [];
|
||||
|
||||
const localUserById = this.localUserByIdCache.get(uid);
|
||||
if (localUserById) toAdd.push(localUserById);
|
||||
|
||||
const userById = this.userByIdCache.get(uid);
|
||||
if (userById) toAdd.push(userById);
|
||||
|
||||
|
|
@ -414,6 +474,7 @@ export class CacheService implements OnApplicationShutdown {
|
|||
.toArray();
|
||||
|
||||
await Promise.all([
|
||||
this.userByIdCache.deleteMany(ids),
|
||||
this.userProfileCache.deleteMany(ids),
|
||||
this.userMutingsCache.deleteMany(ids),
|
||||
this.userMutedCache.deleteMany(ids),
|
||||
|
|
@ -471,13 +532,17 @@ export class CacheService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
private async onTokenEvent<E extends 'userTokenRegenerated'>(body: InternalEventTypes[E]): Promise<void> {
|
||||
private async onTokenEvent<E extends 'userTokenRegenerated'>(body: InternalEventTypes[E], _: E, isLocal: boolean): Promise<void> {
|
||||
{
|
||||
{
|
||||
{
|
||||
const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as MiLocalUser;
|
||||
this.localUserByNativeTokenCache.delete(body.oldToken);
|
||||
this.localUserByNativeTokenCache.set(body.newToken, user);
|
||||
// Local instance is responsible for expanding these events into the appropriate Quantum events
|
||||
if (!isLocal) return;
|
||||
|
||||
await Promise.all([
|
||||
this.nativeTokenCache.delete(body.oldToken),
|
||||
this.nativeTokenCache.set(body.newToken, body.id),
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -538,8 +603,14 @@ export class CacheService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public findUserById(userId: MiUser['id']) {
|
||||
return this.userByIdCache.fetch(userId, () => this.usersRepository.findOneByOrFail({ id: userId }));
|
||||
public async findUserById(userId: MiUser['id']): Promise<MiUser> {
|
||||
return await this.userByIdCache.fetch(userId);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async findUserByAcct(acct: string): Promise<MiUser> {
|
||||
const id = await this.userByAcctCache.fetch(acct);
|
||||
return await this.findUserById(id);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -617,30 +688,7 @@ export class CacheService implements OnApplicationShutdown {
|
|||
|
||||
@bindThis
|
||||
public async getUsers(userIds: Iterable<string>): Promise<Map<string, MiUser>> {
|
||||
const users = new Map<string, MiUser>;
|
||||
|
||||
const toFetch: string[] = [];
|
||||
for (const userId of userIds) {
|
||||
const fromCache = this.userByIdCache.get(userId);
|
||||
if (fromCache) {
|
||||
users.set(userId, fromCache);
|
||||
} else {
|
||||
toFetch.push(userId);
|
||||
}
|
||||
}
|
||||
|
||||
if (toFetch.length > 0) {
|
||||
const fetched = await this.usersRepository.findBy({
|
||||
id: In(toFetch),
|
||||
});
|
||||
|
||||
for (const user of fetched) {
|
||||
users.set(user.id, user);
|
||||
this.userByIdCache.set(user.id, user);
|
||||
}
|
||||
}
|
||||
|
||||
return users;
|
||||
return new Map(await this.userByIdCache.fetchMany(userIds));
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
|||
|
|
@ -508,7 +508,7 @@ export class ApPersonService implements OnModuleInit {
|
|||
if (user == null) throw new Error(`failed to create user - user is null: ${uri}`);
|
||||
|
||||
// Register to the cache
|
||||
this.cacheService.uriPersonCache.set(user.uri, user);
|
||||
await this.cacheService.uriPersonCache.set(user.uri, user.id);
|
||||
|
||||
// Register public key to the cache.
|
||||
if (publicKey) {
|
||||
|
|
@ -541,7 +541,7 @@ export class ApPersonService implements OnModuleInit {
|
|||
user = { ...user, ...updates };
|
||||
|
||||
// Register to the cache
|
||||
this.cacheService.uriPersonCache.set(user.uri, user);
|
||||
await this.cacheService.uriPersonCache.set(user.uri, user.id);
|
||||
} catch (err) {
|
||||
// Permanent error implies hidden or inaccessible, which is a normal thing.
|
||||
if (isRetryableError(err)) {
|
||||
|
|
@ -772,8 +772,6 @@ export class ApPersonService implements OnModuleInit {
|
|||
|
||||
const updated = { ...exist, ...updates };
|
||||
|
||||
this.cacheService.uriPersonCache.set(uri, updated);
|
||||
|
||||
// 移行処理を行う
|
||||
if (updated.movedAt && (
|
||||
// 初めて移行する場合はmovedAtがnullなので移行処理を許可
|
||||
|
|
|
|||
|
|
@ -19,7 +19,8 @@ import { ApLogService } from '@/core/ApLogService.js';
|
|||
import { ReactionService } from '@/core/ReactionService.js';
|
||||
import { QueueService } from '@/core/QueueService.js';
|
||||
import { CacheService } from '@/core/CacheService.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
|
||||
import * as Acct from '@/misc/acct.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
import type { DbUserDeleteJobData } from '../types.js';
|
||||
|
||||
|
|
@ -152,10 +153,14 @@ export class DeleteAccountProcessorService {
|
|||
await this.cacheService.hibernatedUserCache.delete(user.id);
|
||||
await this.cacheService.renoteMutingsCache.delete(user.id);
|
||||
await this.cacheService.userProfileCache.delete(user.id);
|
||||
this.cacheService.userByIdCache.delete(user.id);
|
||||
this.cacheService.localUserByIdCache.delete(user.id);
|
||||
await this.cacheService.userByIdCache.delete(user.id);
|
||||
await this.cacheService.userByAcctCache.delete(Acct.toString({ username: user.usernameLower, host: user.host }));
|
||||
await this.cacheService.userFollowStatsCache.delete(user.id);
|
||||
if (user.token) {
|
||||
this.cacheService.localUserByNativeTokenCache.delete(user.token);
|
||||
await this.cacheService.nativeTokenCache.delete(user.token);
|
||||
}
|
||||
if (user.uri) {
|
||||
await this.cacheService.uriPersonCache.delete(user.uri);
|
||||
}
|
||||
|
||||
await this.followingsRepository.delete({
|
||||
|
|
|
|||
|
|
@ -53,8 +53,8 @@ export class AuthenticateService {
|
|||
}
|
||||
|
||||
if (isNativeUserToken(token)) {
|
||||
const user = await this.cacheService.localUserByNativeTokenCache.fetch(token,
|
||||
() => this.usersRepository.findOneBy({ token }) as Promise<MiLocalUser | null>);
|
||||
const userId = await this.cacheService.nativeTokenCache.fetch(token);
|
||||
const user = userId ? await this.cacheService.findLocalUserById(userId) : null;
|
||||
|
||||
if (user == null) {
|
||||
throw new AuthenticationError('user not found');
|
||||
|
|
|
|||
|
|
@ -65,10 +65,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
) {
|
||||
super(meta, paramDef, async (ps, me) => {
|
||||
// Lookup user
|
||||
const targetUser = await this.cacheService.findOptionalUserById(ps.userId);
|
||||
if (!targetUser) {
|
||||
throw new ApiError(meta.errors.noSuchUser);
|
||||
}
|
||||
const targetUser = await this.cacheService.findUserById(ps.userId);
|
||||
|
||||
if (targetUser.id === me.id) {
|
||||
throw new ApiError(meta.errors.cannotReportYourself);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue