diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 9bc25a65c4..a6a55f9497 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -5,8 +5,8 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; -import { In, IsNull } from 'typeorm'; -import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing, NoteThreadMutingsRepository } from '@/models/_.js'; +import { In, IsNull, Not } from 'typeorm'; +import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing, NoteThreadMutingsRepository, ChannelFollowingsRepository, MiInstance, InstancesRepository } from '@/models/_.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import type { MiLocalUser, MiRemoteUser, MiUser } from '@/models/User.js'; @@ -15,6 +15,10 @@ import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; import type { InternalEventTypes } from '@/core/GlobalEventService.js'; import { InternalEventService } from '@/core/InternalEventService.js'; +import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { IdService } from '@/core/IdService.js'; +import { diffArraysSimple } from '@/misc/diff-arrays.js'; import type { OnApplicationShutdown } from '@nestjs/common'; export interface FollowStats { @@ -53,6 +57,8 @@ export class CacheService implements OnApplicationShutdown { public hibernatedUserCache: QuantumKVCache; protected userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes protected translationsCache: RedisKVCache; + public userFollowingChannelsCache: QuantumKVCache>; + public federatedInstanceCache: QuantumKVCache; constructor( @Inject(DI.redis) @@ -82,8 +88,16 @@ export class CacheService implements OnApplicationShutdown { @Inject(DI.noteThreadMutingsRepository) private readonly noteThreadMutingsRepository: NoteThreadMutingsRepository, + @Inject(DI.channelFollowingsRepository) + private readonly channelFollowingsRepository: ChannelFollowingsRepository, + + @Inject(DI.instancesRepository) + private readonly instancesRepository: InstancesRepository, + private userEntityService: UserEntityService, private readonly internalEventService: InternalEventService, + private readonly utilityService: UtilityService, + private readonly idService: IdService, ) { //this.onMessage = this.onMessage.bind(this); @@ -271,7 +285,46 @@ export class CacheService implements OnApplicationShutdown { memoryCacheLifetime: 1000 * 60, // 1 minute }); - // NOTE: チャンネルのフォロー状況キャッシュはChannelFollowingServiceで行っている + this.userFollowingChannelsCache = new QuantumKVCache>(this.internalEventService, 'userFollowingChannels', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: (key) => this.channelFollowingsRepository.find({ + where: { followerId: key }, + select: ['followeeId'], + }).then(xs => new Set(xs.map(x => x.followeeId))), + // TODO bulk fetcher + }); + + this.federatedInstanceCache = new QuantumKVCache(this.internalEventService, 'federatedInstance', { + lifetime: 1000 * 60 * 3, // 3 minutes + fetcher: async key => { + const host = this.utilityService.toPuny(key); + let instance = await this.instancesRepository.findOneBy({ host }); + if (instance == null) { + await this.instancesRepository.createQueryBuilder('instance') + .insert() + .values({ + id: this.idService.gen(), + host, + firstRetrievedAt: new Date(), + isBlocked: this.utilityService.isBlockedHost(host), + isSilenced: this.utilityService.isSilencedHost(host), + isMediaSilenced: this.utilityService.isMediaSilencedHost(host), + isAllowListed: this.utilityService.isAllowListedHost(host), + isBubbled: this.utilityService.isBubbledHost(host), + }) + .orIgnore() + .execute(); + + instance = await this.instancesRepository.findOneByOrFail({ host }); + } + return instance; + }, + bulkFetcher: async keys => { + const hosts = keys.map(key => this.utilityService.toPuny(key)); + const instances = await this.instancesRepository.findBy({ host: In(hosts) }); + return instances.map(i => [i.host, i]); + }, + }); this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); this.internalEventService.on('userChangeDeletedState', this.onUserEvent); @@ -280,6 +333,10 @@ export class CacheService implements OnApplicationShutdown { this.internalEventService.on('userTokenRegenerated', this.onTokenEvent); this.internalEventService.on('follow', this.onFollowEvent); this.internalEventService.on('unfollow', this.onFollowEvent); + // For these, only listen to local events because quantum cache handles the sync. + this.internalEventService.on('followChannel', this.onChannelEvent, { ignoreRemote: true }); + this.internalEventService.on('unfollowChannel', this.onChannelEvent, { ignoreRemote: true }); + this.internalEventService.on('metaUpdated', this.onMetaEvent, { ignoreRemote: true }); } @bindThis @@ -343,6 +400,7 @@ export class CacheService implements OnApplicationShutdown { @bindThis private async onFollowEvent(body: InternalEventTypes[E], type: E): Promise { { + // TODO should we filter for local/remote events? switch (type) { case 'follow': { const follower = this.userByIdCache.get(body.followerId); @@ -374,6 +432,29 @@ export class CacheService implements OnApplicationShutdown { } } + @bindThis + private async onChannelEvent(body: InternalEventTypes[E]): Promise { + await this.userFollowingChannelsCache.delete(body.userId); + } + + @bindThis + private async onMetaEvent(body: InternalEventTypes[E]): Promise { + const { before, after } = body; + const changed = ( + diffArraysSimple(before?.blockedHosts, after.blockedHosts) || + diffArraysSimple(before?.silencedHosts, after.silencedHosts) || + diffArraysSimple(before?.mediaSilencedHosts, after.mediaSilencedHosts) || + diffArraysSimple(before?.federationHosts, after.federationHosts) || + diffArraysSimple(before?.bubbleInstances, after.bubbleInstances) + ); + + if (changed) { + // We have to clear the whole thing, otherwise subdomains won't be synced. + // This gets fired in *each* process so don't do anything to trigger cache notifications! + this.federatedInstanceCache.clear(); + } + } + @bindThis public findUserById(userId: MiUser['id']) { return this.userByIdCache.fetch(userId, () => this.usersRepository.findOneByOrFail({ id: userId })); @@ -586,6 +667,10 @@ export class CacheService implements OnApplicationShutdown { this.internalEventService.off('userTokenRegenerated', this.onTokenEvent); this.internalEventService.off('follow', this.onFollowEvent); this.internalEventService.off('unfollow', this.onFollowEvent); + this.internalEventService.off('followChannel', this.onChannelEvent); + this.internalEventService.off('unfollowChannel', this.onChannelEvent); + this.internalEventService.off('metaUpdated', this.onMetaEvent); + this.userByIdCache.dispose(); this.localUserByNativeTokenCache.dispose(); this.localUserByIdCache.dispose(); @@ -602,6 +687,8 @@ export class CacheService implements OnApplicationShutdown { this.hibernatedUserCache.dispose(); this.userFollowStatsCache.dispose(); this.translationsCache.dispose(); + this.userFollowingChannelsCache.dispose(); + this.federatedInstanceCache.dispose(); } @bindThis diff --git a/packages/backend/src/core/ChannelFollowingService.ts b/packages/backend/src/core/ChannelFollowingService.ts index 430711fef1..39e5bdedab 100644 --- a/packages/backend/src/core/ChannelFollowingService.ts +++ b/packages/backend/src/core/ChannelFollowingService.ts @@ -9,16 +9,13 @@ import { DI } from '@/di-symbols.js'; import type { ChannelFollowingsRepository } from '@/models/_.js'; import { MiChannel } from '@/models/_.js'; import { IdService } from '@/core/IdService.js'; -import { GlobalEvents, GlobalEventService, InternalEventTypes } from '@/core/GlobalEventService.js'; +import { GlobalEventService } from '@/core/GlobalEventService.js'; import { bindThis } from '@/decorators.js'; import type { MiLocalUser } from '@/models/User.js'; -import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import { InternalEventService } from './InternalEventService.js'; @Injectable() -export class ChannelFollowingService implements OnModuleInit { - public userFollowingChannelsCache: QuantumKVCache>; - +export class ChannelFollowingService { constructor( @Inject(DI.redis) private redisClient: Redis.Redis, @@ -29,21 +26,7 @@ export class ChannelFollowingService implements OnModuleInit { private idService: IdService, private globalEventService: GlobalEventService, private readonly internalEventService: InternalEventService, - ) { - this.userFollowingChannelsCache = new QuantumKVCache>(this.internalEventService, 'userFollowingChannels', { - lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.channelFollowingsRepository.find({ - where: { followerId: key }, - select: ['followeeId'], - }).then(xs => new Set(xs.map(x => x.followeeId))), - }); - - this.internalEventService.on('followChannel', this.onMessage); - this.internalEventService.on('unfollowChannel', this.onMessage); - } - - onModuleInit() { - } + ) {} @bindThis public async follow( @@ -56,7 +39,7 @@ export class ChannelFollowingService implements OnModuleInit { followeeId: targetChannel.id, }); - this.globalEventService.publishInternalEvent('followChannel', { + await this.internalEventService.emit('followChannel', { userId: requestUser.id, channelId: targetChannel.id, }); @@ -72,37 +55,9 @@ export class ChannelFollowingService implements OnModuleInit { followeeId: targetChannel.id, }); - this.globalEventService.publishInternalEvent('unfollowChannel', { + await this.internalEventService.emit('unfollowChannel', { userId: requestUser.id, channelId: targetChannel.id, }); } - - @bindThis - private async onMessage(body: InternalEventTypes[E], type: E): Promise { - { - switch (type) { - case 'followChannel': { - await this.userFollowingChannelsCache.delete(body.userId); - break; - } - case 'unfollowChannel': { - await this.userFollowingChannelsCache.delete(body.userId); - break; - } - } - } - } - - @bindThis - public dispose(): void { - this.internalEventService.off('followChannel', this.onMessage); - this.internalEventService.off('unfollowChannel', this.onMessage); - this.userFollowingChannelsCache.dispose(); - } - - @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); - } } diff --git a/packages/backend/src/core/FanoutTimelineEndpointService.ts b/packages/backend/src/core/FanoutTimelineEndpointService.ts index ddb0ddb7d2..27011bdeb5 100644 --- a/packages/backend/src/core/FanoutTimelineEndpointService.ts +++ b/packages/backend/src/core/FanoutTimelineEndpointService.ts @@ -277,7 +277,7 @@ export class FanoutTimelineEndpointService { // Fetch everything and populate users const [users, instances] = await Promise.all([ this.cacheService.getUsers(usersToFetch), - this.federatedInstanceService.federatedInstanceCache.fetchMany(instancesToFetch).then(i => new Map(i)), + this.cacheService.federatedInstanceCache.fetchMany(instancesToFetch).then(i => new Map(i)), ]); for (const [id, user] of Array.from(users)) { users.set(id, { diff --git a/packages/backend/src/core/FederatedInstanceService.ts b/packages/backend/src/core/FederatedInstanceService.ts index e549dbc93e..6f0ed5548e 100644 --- a/packages/backend/src/core/FederatedInstanceService.ts +++ b/packages/backend/src/core/FederatedInstanceService.ts @@ -4,23 +4,17 @@ */ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import * as Redis from 'ioredis'; -import { In } from 'typeorm'; import type { InstancesRepository, MiMeta } from '@/models/_.js'; import type { MiInstance } from '@/models/Instance.js'; import { IdService } from '@/core/IdService.js'; import { DI } from '@/di-symbols.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; -import { diffArraysSimple } from '@/misc/diff-arrays.js'; -import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; -import { InternalEventService } from '@/core/InternalEventService.js'; +import type { CacheService } from '@/core/CacheService.js'; import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js'; @Injectable() -export class FederatedInstanceService implements OnApplicationShutdown { - public readonly federatedInstanceCache: QuantumKVCache; - +export class FederatedInstanceService { constructor( @Inject(DI.instancesRepository) private instancesRepository: InstancesRepository, @@ -30,46 +24,12 @@ export class FederatedInstanceService implements OnApplicationShutdown { private utilityService: UtilityService, private idService: IdService, - private readonly internalEventService: InternalEventService, - ) { - this.federatedInstanceCache = new QuantumKVCache(this.internalEventService, 'federatedInstance', { - lifetime: 1000 * 60 * 3, // 3 minutes - fetcher: async key => { - const host = this.utilityService.toPuny(key); - let instance = await this.instancesRepository.findOneBy({ host }); - if (instance == null) { - await this.instancesRepository.createQueryBuilder('instance') - .insert() - .values({ - id: this.idService.gen(), - host, - firstRetrievedAt: new Date(), - isBlocked: this.utilityService.isBlockedHost(host), - isSilenced: this.utilityService.isSilencedHost(host), - isMediaSilenced: this.utilityService.isMediaSilencedHost(host), - isAllowListed: this.utilityService.isAllowListedHost(host), - isBubbled: this.utilityService.isBubbledHost(host), - }) - .orIgnore() - .execute(); - - instance = await this.instancesRepository.findOneByOrFail({ host }); - } - return instance; - }, - bulkFetcher: async keys => { - const hosts = keys.map(key => this.utilityService.toPuny(key)); - const instances = await this.instancesRepository.findBy({ host: In(hosts) }); - return instances.map(i => [i.host, i]); - }, - }); - - this.internalEventService.on('metaUpdated', this.onMetaUpdated); - } + private readonly cacheService: CacheService, + ) {} @bindThis public async fetchOrRegister(host: string): Promise { - return this.federatedInstanceCache.fetch(host); + return this.cacheService.federatedInstanceCache.fetch(host); /* host = this.utilityService.toPuny(host); @@ -103,7 +63,7 @@ export class FederatedInstanceService implements OnApplicationShutdown { @bindThis public async fetch(host: string): Promise { - return this.federatedInstanceCache.fetch(host); + return this.cacheService.federatedInstanceCache.fetch(host); /* host = this.utilityService.toPuny(host); @@ -133,7 +93,7 @@ export class FederatedInstanceService implements OnApplicationShutdown { return response.raw[0] as MiInstance; }); - await this.federatedInstanceCache.set(result.host, result); + await this.cacheService.federatedInstanceCache.set(result.host, result); return result; } @@ -146,7 +106,7 @@ export class FederatedInstanceService implements OnApplicationShutdown { const allowedHosts = new Set(this.meta.federationHosts); this.meta.blockedHosts.forEach(h => allowedHosts.delete(h)); - const instances = await this.federatedInstanceCache.fetchMany(this.meta.federationHosts); + const instances = await this.cacheService.federatedInstanceCache.fetchMany(this.meta.federationHosts); return instances.map(i => i[1]); } @@ -155,38 +115,7 @@ export class FederatedInstanceService implements OnApplicationShutdown { */ @bindThis public async getDenyList(): Promise { - const instances = await this.federatedInstanceCache.fetchMany(this.meta.blockedHosts); + const instances = await this.cacheService.federatedInstanceCache.fetchMany(this.meta.blockedHosts); return instances.map(i => i[1]); } - - // This gets fired *in each process* so don't do anything to trigger cache notifications! - private syncCache(before: MiMeta | undefined, after: MiMeta): void { - const changed = - diffArraysSimple(before?.blockedHosts, after.blockedHosts) || - diffArraysSimple(before?.silencedHosts, after.silencedHosts) || - diffArraysSimple(before?.mediaSilencedHosts, after.mediaSilencedHosts) || - diffArraysSimple(before?.federationHosts, after.federationHosts) || - diffArraysSimple(before?.bubbleInstances, after.bubbleInstances); - - if (changed) { - // We have to clear the whole thing, otherwise subdomains won't be synced. - this.federatedInstanceCache.clear(); - } - } - - @bindThis - private async onMetaUpdated(body: { before?: MiMeta; after: MiMeta; }) { - this.syncCache(body.before, body.after); - } - - @bindThis - public dispose(): void { - this.internalEventService.off('metaUpdated', this.onMetaUpdated); - this.federatedInstanceCache.dispose(); - } - - @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); - } } diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index 5bce5eda41..90657236d6 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -80,7 +80,7 @@ export default class Connection { const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes, threadMutings, noteMutings, myRecentReactions, myRecentFavorites, myRecentRenotes] = await Promise.all([ this.cacheService.userProfileCache.fetch(this.user.id), this.cacheService.userFollowingsCache.fetch(this.user.id), - this.channelFollowingService.userFollowingChannelsCache.fetch(this.user.id), + this.cacheService.userFollowingChannelsCache.fetch(this.user.id), this.cacheService.userMutingsCache.fetch(this.user.id), this.cacheService.userBlockedCache.fetch(this.user.id), this.cacheService.renoteMutingsCache.fetch(this.user.id), diff --git a/packages/backend/test/misc/noOpCaches.ts b/packages/backend/test/misc/noOpCaches.ts index 208d51f23a..39a0d0ea62 100644 --- a/packages/backend/test/misc/noOpCaches.ts +++ b/packages/backend/test/misc/noOpCaches.ts @@ -6,7 +6,7 @@ import * as Redis from 'ioredis'; import { Inject } from '@nestjs/common'; import { FakeInternalEventService } from './FakeInternalEventService.js'; -import type { BlockingsRepository, FollowingsRepository, MiUser, MutingsRepository, NoteThreadMutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { BlockingsRepository, FollowingsRepository, MiUser, MutingsRepository, NoteThreadMutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository, ChannelFollowingsRepository, InstancesRepository } from '@/models/_.js'; import type { MiLocalUser } from '@/models/User.js'; import { MemoryKVCache, MemorySingleCache, RedisKVCache, RedisSingleCache } from '@/misc/cache.js'; import { QuantumKVCache, QuantumKVOpts } from '@/misc/QuantumKVCache.js'; @@ -14,6 +14,8 @@ import { CacheService, FollowStats } from '@/core/CacheService.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { InternalEventService } from '@/core/InternalEventService.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { IdService } from '@/core/IdService.js'; export function noOpRedis() { return { @@ -53,8 +55,20 @@ export class NoOpCacheService extends CacheService { @Inject(DI.noteThreadMutingsRepository) noteThreadMutingsRepository: NoteThreadMutingsRepository, + @Inject(DI.channelFollowingsRepository) + channelFollowingsRepository: ChannelFollowingsRepository, + + @Inject(DI.instancesRepository) + instancesRepository: InstancesRepository, + @Inject(UserEntityService) userEntityService: UserEntityService, + + @Inject(UtilityService) + utilityService: UtilityService, + + @Inject(IdService) + idService: IdService, ) { const fakeRedis = noOpRedis(); const fakeInternalEventService = new FakeInternalEventService(); @@ -69,8 +83,12 @@ export class NoOpCacheService extends CacheService { renoteMutingsRepository, followingsRepository, noteThreadMutingsRepository, + channelFollowingsRepository, + instancesRepository, userEntityService, fakeInternalEventService, + utilityService, + idService, ); this.fakeRedis = fakeRedis; @@ -93,6 +111,7 @@ export class NoOpCacheService extends CacheService { this.hibernatedUserCache = NoOpQuantumKVCache.copy(this.hibernatedUserCache, fakeInternalEventService); this.userFollowStatsCache = new NoOpMemoryKVCache(); this.translationsCache = NoOpRedisKVCache.copy(this.translationsCache, fakeRedis); + this.userFollowingChannelsCache = NoOpQuantumKVCache.copy(this.userFollowingChannelsCache, fakeInternalEventService); } }