move uriPersonCache to ApPersonService
This commit is contained in:
parent
0047396cf2
commit
0556d403e6
4 changed files with 67 additions and 51 deletions
|
|
@ -35,7 +35,6 @@ export interface FollowStats {
|
||||||
export class CacheService implements OnApplicationShutdown {
|
export class CacheService implements OnApplicationShutdown {
|
||||||
public readonly userByIdCache: ManagedQuantumKVCache<MiUser>;
|
public readonly userByIdCache: ManagedQuantumKVCache<MiUser>;
|
||||||
public readonly nativeTokenCache: ManagedQuantumKVCache<string>; // Token -> UserId
|
public readonly nativeTokenCache: ManagedQuantumKVCache<string>; // Token -> UserId
|
||||||
public readonly uriPersonCache: ManagedQuantumKVCache<string>; // URI -> UserId
|
|
||||||
public readonly userByAcctCache: ManagedQuantumKVCache<string>; // Acct -> UserId
|
public readonly userByAcctCache: ManagedQuantumKVCache<string>; // Acct -> UserId
|
||||||
public readonly userProfileCache: ManagedQuantumKVCache<MiUserProfile>;
|
public readonly userProfileCache: ManagedQuantumKVCache<MiUserProfile>;
|
||||||
public readonly userMutingsCache: ManagedQuantumKVCache<Set<string>>;
|
public readonly userMutingsCache: ManagedQuantumKVCache<Set<string>>;
|
||||||
|
|
@ -119,12 +118,12 @@ export class CacheService implements OnApplicationShutdown {
|
||||||
this.nativeTokenCache = this.cacheManagementService.createQuantumKVCache('localUserByNativeToken', {
|
this.nativeTokenCache = this.cacheManagementService.createQuantumKVCache('localUserByNativeToken', {
|
||||||
lifetime: 1000 * 60 * 5, // 5m
|
lifetime: 1000 * 60 * 5, // 5m
|
||||||
fetcher: async (token) => {
|
fetcher: async (token) => {
|
||||||
const user = await this.usersRepository
|
const { id } = await this.usersRepository
|
||||||
.createQueryBuilder('user')
|
.createQueryBuilder('user')
|
||||||
.select('user.id')
|
.select('user.id')
|
||||||
.where({ token })
|
.where({ token })
|
||||||
.getOne() as { id: string } | null;
|
.getOneOrFail() as { id: string };
|
||||||
return user?.id ?? null;
|
return id;
|
||||||
},
|
},
|
||||||
bulkFetcher: async (tokens) => {
|
bulkFetcher: async (tokens) => {
|
||||||
const users = await this.usersRepository
|
const users = await this.usersRepository
|
||||||
|
|
@ -133,29 +132,7 @@ export class CacheService implements OnApplicationShutdown {
|
||||||
.addSelect('user.token')
|
.addSelect('user.token')
|
||||||
.where({ token: In(tokens) })
|
.where({ token: In(tokens) })
|
||||||
.getMany() as { id: string, token: string }[];
|
.getMany() as { id: string, token: string }[];
|
||||||
const userMap = new Map(users.map(u => [u.token, u.id]));
|
return users.map(u => [u.token, u.id]);
|
||||||
return tokens.map(token => [token, userMap.get(token) ?? null]);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
this.uriPersonCache = this.cacheManagementService.createQuantumKVCache('uriPerson', {
|
|
||||||
lifetime: 1000 * 60 * 30, // 30m
|
|
||||||
fetcher: async (uri) => {
|
|
||||||
const user = await this.usersRepository
|
|
||||||
.createQueryBuilder('user')
|
|
||||||
.select('user.id')
|
|
||||||
.where({ uri })
|
|
||||||
.getOneOrFail() as { id: string };
|
|
||||||
return user.id;
|
|
||||||
},
|
|
||||||
bulkFetcher: async (uris) => {
|
|
||||||
const users = await this.usersRepository
|
|
||||||
.createQueryBuilder('user')
|
|
||||||
.select('user.id')
|
|
||||||
.addSelect('user.uri')
|
|
||||||
.where({ uri: In(uris) })
|
|
||||||
.getMany() as { id: string, uri: string }[];
|
|
||||||
return users.map(u => [u.uri, u.id]);
|
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -636,7 +613,7 @@ export class CacheService implements OnApplicationShutdown {
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public findOptionalUserById(userId: MiUser['id']) {
|
public findOptionalUserById(userId: MiUser['id']) {
|
||||||
return this.userByIdCache.fetchMaybe(userId, async () => await this.usersRepository.findOneBy({ id: userId }) ?? undefined);
|
return this.userByIdCache.fetchMaybe(userId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
|
|
|
||||||
|
|
@ -107,6 +107,7 @@ export class RemoteUserResolveService {
|
||||||
}, {
|
}, {
|
||||||
uri: self.href,
|
uri: self.href,
|
||||||
});
|
});
|
||||||
|
await this.apPersonService.uriPersonCache.delete(user.uri); // Unmap the old URI
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(`Corrected URI for ${acctLower} from ${user.uri} to ${self.href}`);
|
this.logger.info(`Corrected URI for ${acctLower} from ${user.uri} to ${self.href}`);
|
||||||
|
|
|
||||||
|
|
@ -63,9 +63,12 @@ type Field = Record<'name' | 'value', string>;
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class ApPersonService implements OnModuleInit {
|
export class ApPersonService implements OnModuleInit {
|
||||||
|
// Moved from CacheService
|
||||||
|
public readonly uriPersonCache: ManagedQuantumKVCache<string>;
|
||||||
|
|
||||||
// Moved from ApDbResolverService
|
// Moved from ApDbResolverService
|
||||||
private publicKeyByKeyIdCache: ManagedQuantumKVCache<MiUserPublickey>;
|
private readonly publicKeyByKeyIdCache: ManagedQuantumKVCache<MiUserPublickey>;
|
||||||
private publicKeyByUserIdCache: ManagedQuantumKVCache<MiUserPublickey>;
|
private readonly publicKeyByUserIdCache: ManagedQuantumKVCache<MiUserPublickey>;
|
||||||
|
|
||||||
private driveFileEntityService: DriveFileEntityService;
|
private driveFileEntityService: DriveFileEntityService;
|
||||||
private globalEventService: GlobalEventService;
|
private globalEventService: GlobalEventService;
|
||||||
|
|
@ -122,11 +125,34 @@ export class ApPersonService implements OnModuleInit {
|
||||||
apLoggerService: ApLoggerService,
|
apLoggerService: ApLoggerService,
|
||||||
) {
|
) {
|
||||||
this.logger = apLoggerService.logger;
|
this.logger = apLoggerService.logger;
|
||||||
|
|
||||||
|
this.uriPersonCache = this.cacheManagementService.createQuantumKVCache('uriPerson', {
|
||||||
|
lifetime: 1000 * 60 * 30, // 30m
|
||||||
|
fetcher: async (uri) => {
|
||||||
|
const { id } = await this.usersRepository
|
||||||
|
.createQueryBuilder('user')
|
||||||
|
.select('user.id')
|
||||||
|
.where({ uri })
|
||||||
|
.getOneOrFail() as { id: string };
|
||||||
|
return id;
|
||||||
|
},
|
||||||
|
bulkFetcher: async (uris) => {
|
||||||
|
const users = await this.usersRepository
|
||||||
|
.createQueryBuilder('user')
|
||||||
|
.select('user.id')
|
||||||
|
.addSelect('user.uri')
|
||||||
|
.where({ uri: In(uris) })
|
||||||
|
.getMany() as { id: string, uri: string }[];
|
||||||
|
return users.map(u => [u.uri, u.id]);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
this.publicKeyByKeyIdCache = this.cacheManagementService.createQuantumKVCache<MiUserPublickey>('publicKeyByKeyId', {
|
this.publicKeyByKeyIdCache = this.cacheManagementService.createQuantumKVCache<MiUserPublickey>('publicKeyByKeyId', {
|
||||||
lifetime: 1000 * 60 * 60 * 12, // 12h
|
lifetime: 1000 * 60 * 60 * 12, // 12h
|
||||||
fetcher: async (keyId) => await this.userPublickeysRepository.findOneBy({ keyId }),
|
fetcher: async (keyId) => await this.userPublickeysRepository.findOneBy({ keyId }),
|
||||||
bulkFetcher: async (keyIds) => await this.userPublickeysRepository.findBy({ keyId: In(keyIds) }).then(ks => ks.map(k => [k.keyId, k])),
|
bulkFetcher: async (keyIds) => await this.userPublickeysRepository.findBy({ keyId: In(keyIds) }).then(ks => ks.map(k => [k.keyId, k])),
|
||||||
});
|
});
|
||||||
|
|
||||||
this.publicKeyByUserIdCache = this.cacheManagementService.createQuantumKVCache<MiUserPublickey>('publicKeyByUserId', {
|
this.publicKeyByUserIdCache = this.cacheManagementService.createQuantumKVCache<MiUserPublickey>('publicKeyByUserId', {
|
||||||
lifetime: 1000 * 60 * 60 * 12, // 12h
|
lifetime: 1000 * 60 * 60 * 12, // 12h
|
||||||
fetcher: async (userId) => await this.userPublickeysRepository.findOneBy({ userId }),
|
fetcher: async (userId) => await this.userPublickeysRepository.findOneBy({ userId }),
|
||||||
|
|
@ -251,28 +277,38 @@ export class ApPersonService implements OnModuleInit {
|
||||||
* Misskeyに対象のPersonが登録されていればそれを返し、登録がなければnullを返します。
|
* Misskeyに対象のPersonが登録されていればそれを返し、登録がなければnullを返します。
|
||||||
*/
|
*/
|
||||||
@bindThis
|
@bindThis
|
||||||
public async fetchPerson(uri: string): Promise<MiLocalUser | MiRemoteUser | null> {
|
public async fetchPerson(uri: string, opts?: { withDeleted?: boolean, withSuspended?: boolean }): Promise<MiLocalUser | MiRemoteUser | null> {
|
||||||
const cached = this.cacheService.uriPersonCache.get(uri) as MiLocalUser | MiRemoteUser | null | undefined;
|
const _opts = {
|
||||||
if (cached) return cached;
|
withDeleted: opts?.withDeleted ?? false,
|
||||||
|
withSuspended: opts?.withSuspended ?? true,
|
||||||
|
};
|
||||||
|
|
||||||
// URIがこのサーバーを指しているならデータベースからフェッチ
|
let userId;
|
||||||
if (uri.startsWith(`${this.config.url}/`)) {
|
|
||||||
const id = uri.split('/').pop();
|
// Resolve URI -> User ID
|
||||||
const u = await this.usersRepository.findOneBy({ id }) as MiLocalUser | null;
|
const parsed = this.utilityService.parseUri(uri);
|
||||||
if (u) this.cacheService.uriPersonCache.set(uri, u);
|
if (parsed.local) {
|
||||||
return u;
|
userId = parsed.type === 'users' ? parsed.id : null;
|
||||||
|
} else {
|
||||||
|
userId = await this.uriPersonCache.fetch(uri).catch(() => null);
|
||||||
}
|
}
|
||||||
|
|
||||||
//#region このサーバーに既に登録されていたらそれを返す
|
// No match
|
||||||
const exist = await this.usersRepository.findOneBy({ uri }) as MiLocalUser | MiRemoteUser | null;
|
if (!userId) {
|
||||||
|
return null;
|
||||||
if (exist) {
|
|
||||||
this.cacheService.uriPersonCache.set(uri, exist);
|
|
||||||
return exist;
|
|
||||||
}
|
}
|
||||||
//#endregion
|
|
||||||
|
|
||||||
return null;
|
const user = await this.cacheService.findUserById(userId)
|
||||||
|
.catch(() => null) as MiLocalUser | MiRemoteUser | null;
|
||||||
|
|
||||||
|
if (user?.isDeleted && !_opts.withDeleted) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (user?.isSuspended && !_opts.withSuspended) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return user;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async resolveAvatarAndBanner(user: MiRemoteUser, icon: any, image: any, bgimg: any): Promise<Partial<Pick<MiRemoteUser, 'avatarId' | 'bannerId' | 'backgroundId' | 'avatarUrl' | 'bannerUrl' | 'backgroundUrl' | 'avatarBlurhash' | 'bannerBlurhash' | 'backgroundBlurhash'>>> {
|
private async resolveAvatarAndBanner(user: MiRemoteUser, icon: any, image: any, bgimg: any): Promise<Partial<Pick<MiRemoteUser, 'avatarId' | 'bannerId' | 'backgroundId' | 'avatarUrl' | 'bannerUrl' | 'backgroundUrl' | 'avatarBlurhash' | 'bannerBlurhash' | 'backgroundBlurhash'>>> {
|
||||||
|
|
@ -508,7 +544,7 @@ export class ApPersonService implements OnModuleInit {
|
||||||
if (user == null) throw new Error(`failed to create user - user is null: ${uri}`);
|
if (user == null) throw new Error(`failed to create user - user is null: ${uri}`);
|
||||||
|
|
||||||
// Register to the cache
|
// Register to the cache
|
||||||
await this.cacheService.uriPersonCache.set(user.uri, user.id);
|
await this.uriPersonCache.set(user.uri, user.id);
|
||||||
|
|
||||||
// Register public key to the cache.
|
// Register public key to the cache.
|
||||||
if (publicKey) {
|
if (publicKey) {
|
||||||
|
|
@ -541,7 +577,7 @@ export class ApPersonService implements OnModuleInit {
|
||||||
user = { ...user, ...updates };
|
user = { ...user, ...updates };
|
||||||
|
|
||||||
// Register to the cache
|
// Register to the cache
|
||||||
await this.cacheService.uriPersonCache.set(user.uri, user.id);
|
await this.uriPersonCache.set(user.uri, user.id);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// Permanent error implies hidden or inaccessible, which is a normal thing.
|
// Permanent error implies hidden or inaccessible, which is a normal thing.
|
||||||
if (isRetryableError(err)) {
|
if (isRetryableError(err)) {
|
||||||
|
|
@ -809,7 +845,7 @@ export class ApPersonService implements OnModuleInit {
|
||||||
}
|
}
|
||||||
|
|
||||||
//#region このサーバーに既に登録されていたらそれを返す
|
//#region このサーバーに既に登録されていたらそれを返す
|
||||||
const exist = await this.fetchPerson(uri);
|
const exist = await this.fetchPerson(uri, { withDeleted: true });
|
||||||
if (exist) return exist;
|
if (exist) return exist;
|
||||||
//#endregion
|
//#endregion
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import { ReactionService } from '@/core/ReactionService.js';
|
||||||
import { QueueService } from '@/core/QueueService.js';
|
import { QueueService } from '@/core/QueueService.js';
|
||||||
import { CacheService } from '@/core/CacheService.js';
|
import { CacheService } from '@/core/CacheService.js';
|
||||||
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
|
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
|
||||||
|
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
|
||||||
import * as Acct from '@/misc/acct.js';
|
import * as Acct from '@/misc/acct.js';
|
||||||
import type * as Bull from 'bullmq';
|
import type * as Bull from 'bullmq';
|
||||||
import type { DbUserDeleteJobData } from '../types.js';
|
import type { DbUserDeleteJobData } from '../types.js';
|
||||||
|
|
@ -97,6 +98,7 @@ export class DeleteAccountProcessorService {
|
||||||
private reactionService: ReactionService,
|
private reactionService: ReactionService,
|
||||||
private readonly apLogService: ApLogService,
|
private readonly apLogService: ApLogService,
|
||||||
private readonly cacheService: CacheService,
|
private readonly cacheService: CacheService,
|
||||||
|
private readonly apPersonService: ApPersonService,
|
||||||
) {
|
) {
|
||||||
this.logger = this.queueLoggerService.logger.createSubLogger('delete-account');
|
this.logger = this.queueLoggerService.logger.createSubLogger('delete-account');
|
||||||
}
|
}
|
||||||
|
|
@ -160,7 +162,7 @@ export class DeleteAccountProcessorService {
|
||||||
await this.cacheService.nativeTokenCache.delete(user.token);
|
await this.cacheService.nativeTokenCache.delete(user.token);
|
||||||
}
|
}
|
||||||
if (user.uri) {
|
if (user.uri) {
|
||||||
await this.cacheService.uriPersonCache.delete(user.uri);
|
await this.apPersonService.uriPersonCache.delete(user.uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.followingsRepository.delete({
|
await this.followingsRepository.delete({
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue