diff --git a/packages/backend/src/core/AccountMoveService.ts b/packages/backend/src/core/AccountMoveService.ts index e107f02796..3c9f8bb518 100644 --- a/packages/backend/src/core/AccountMoveService.ts +++ b/packages/backend/src/core/AccountMoveService.ts @@ -27,6 +27,7 @@ import { SystemAccountService } from '@/core/SystemAccountService.js'; import { RoleService } from '@/core/RoleService.js'; import { AntennaService } from '@/core/AntennaService.js'; import { CacheService } from '@/core/CacheService.js'; +import { UserListService } from '@/core/UserListService.js'; @Injectable() export class AccountMoveService { @@ -70,6 +71,7 @@ export class AccountMoveService { private roleService: RoleService, private antennaService: AntennaService, private readonly cacheService: CacheService, + private readonly userListService: UserListService, ) { } @@ -263,45 +265,35 @@ export class AccountMoveService { @bindThis public async updateLists(src: ThinUser, dst: MiUser): Promise { // Return if there is no list to be updated. - const oldMemberships = await this.userListMembershipsRepository.find({ - where: { - userId: src.id, - }, - }); - if (oldMemberships.length === 0) return; + const [srcMemberships, dstMemberships] = await Promise.all([ + this.cacheService.userListMembershipsCache.fetch(src.id), + this.cacheService.userListMembershipsCache.fetch(dst.id), + ]); + if (srcMemberships.size === 0) return; - const existingUserListIds = await this.userListMembershipsRepository.find({ - where: { - userId: dst.id, - }, - }).then(memberships => memberships.map(membership => membership.userListId)); + const newMemberships = srcMemberships.values() + .filter(srcMembership => !dstMemberships.has(srcMembership.userListId)) + .map(srcMembership => ({ + userListId: srcMembership.userListId, + withReplies: srcMembership.withReplies, + })) + .toArray(); + const updatedMemberships = srcMemberships.values() + .filter(srcMembership => { + const dstMembership = dstMemberships.get(srcMembership.userListId); + return dstMembership != null && dstMembership.withReplies !== srcMembership.withReplies; + }) + .map(srcMembership => ({ + userListId: srcMembership.userListId, + withReplies: srcMembership.withReplies, + })) + .toArray(); - const newMemberships: Map = new Map(); - - // 重複しないようにIDを生成 - const genId = (): string => { - let id: string; - do { - id = this.idService.gen(); - } while (newMemberships.has(id)); - return id; - }; - for (const membership of oldMemberships) { - if (existingUserListIds.includes(membership.userListId)) continue; // skip if dst exists in this user's list - newMemberships.set(genId(), { - userId: dst.id, - userListId: membership.userListId, - userListUserId: membership.userListUserId, - }); + if (newMemberships.length > 0) { + await this.userListService.bulkAddMember(dst, newMemberships); } - - const arrayToInsert = Array.from(newMemberships.entries()).map(entry => ({ ...entry[1], id: entry[0] })); - await this.userListMembershipsRepository.insert(arrayToInsert); - - // Have the proxy account follow the new account in the same way as UserListService.push - if (this.userEntityService.isRemoteUser(dst)) { - const proxy = await this.systemAccountService.fetch('proxy'); - this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: dst.id } }]); + if (updatedMemberships.length > 0) { + await this.userListService.bulkUpdateMembership(dst, updatedMemberships); } } diff --git a/packages/backend/src/core/AntennaService.ts b/packages/backend/src/core/AntennaService.ts index 667df57943..660c97dd6a 100644 --- a/packages/backend/src/core/AntennaService.ts +++ b/packages/backend/src/core/AntennaService.ts @@ -139,12 +139,8 @@ export class AntennaService implements OnApplicationShutdown { // TODO } else if (antenna.src === 'list') { if (antenna.userListId == null) return false; - const exists = await this.userListMembershipsRepository.exists({ - where: { - userListId: antenna.userListId, - userId: note.userId, - }, - }); + const memberships = await this.cacheService.userListMembershipsCache.fetch(note.userId); + const exists = memberships.has(antenna.userListId); if (!exists) return false; } else if (antenna.src === 'users') { const accts = antenna.users.map(x => { diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index bad4b93c11..152e70972e 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -6,18 +6,20 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import { In, IsNull, Not } from 'typeorm'; -import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing, NoteThreadMutingsRepository, ChannelFollowingsRepository, MiInstance, InstancesRepository } from '@/models/_.js'; -import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; -import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; +import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, NoteThreadMutingsRepository, ChannelFollowingsRepository, UserListMembershipsRepository } from '@/models/_.js'; import type { MiLocalUser, MiRemoteUser, MiUser } from '@/models/User.js'; +import type { MiUserListMembership } from '@/models/UserListMembership.js'; +import { isLocalUser, isRemoteUser } from '@/models/User.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import type { InternalEventTypes } from '@/core/GlobalEventService.js'; import { InternalEventService } from '@/core/InternalEventService.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { UtilityService } from '@/core/UtilityService.js'; -import { IdService } from '@/core/IdService.js'; -import { diffArraysSimple } from '@/misc/diff-arrays.js'; +import { + CacheManagementService, + type ManagedMemoryKVCache, + type ManagedQuantumKVCache, +} from '@/core/CacheManagementService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; export interface FollowStats { @@ -27,37 +29,26 @@ export interface FollowStats { remoteFollowers: number; } -export interface CachedTranslation { - sourceLang: string | undefined; - text: string | undefined; -} - -export interface CachedTranslationEntity { - l?: string; - t?: string; - u?: number; -} - @Injectable() export class CacheService implements OnApplicationShutdown { - public userByIdCache: MemoryKVCache; - public localUserByNativeTokenCache: MemoryKVCache; - public localUserByIdCache: MemoryKVCache; - public uriPersonCache: MemoryKVCache; - public userProfileCache: QuantumKVCache; - public userMutingsCache: QuantumKVCache>; - public userBlockingCache: QuantumKVCache>; - public userBlockedCache: QuantumKVCache>; // NOTE: 「被」Blockキャッシュ - public renoteMutingsCache: QuantumKVCache>; - public threadMutingsCache: QuantumKVCache>; - public noteMutingsCache: QuantumKVCache>; - public userFollowingsCache: QuantumKVCache>>; - public userFollowersCache: QuantumKVCache>>; - public hibernatedUserCache: QuantumKVCache; - protected userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes - protected translationsCache: RedisKVCache; - public userFollowingChannelsCache: QuantumKVCache>; - public federatedInstanceCache: QuantumKVCache; + public readonly userByIdCache: ManagedMemoryKVCache; + public readonly localUserByNativeTokenCache: ManagedMemoryKVCache; + public readonly localUserByIdCache: ManagedMemoryKVCache; + public readonly uriPersonCache: ManagedMemoryKVCache; + public readonly userProfileCache: ManagedQuantumKVCache; + public readonly userMutingsCache: ManagedQuantumKVCache>; + public readonly userBlockingCache: ManagedQuantumKVCache>; + public readonly userBlockedCache: ManagedQuantumKVCache>; // NOTE: 「被」Blockキャッシュ + public readonly userListMembershipsCache: ManagedQuantumKVCache>; + public readonly listUserMembershipsCache: ManagedQuantumKVCache>; + public readonly renoteMutingsCache: ManagedQuantumKVCache>; + public readonly threadMutingsCache: ManagedQuantumKVCache>; + public readonly noteMutingsCache: ManagedQuantumKVCache>; + public readonly userFollowingsCache: ManagedQuantumKVCache>>; + public readonly userFollowersCache: ManagedQuantumKVCache>>; + public readonly hibernatedUserCache: ManagedQuantumKVCache; + public readonly userFollowStatsCache: ManagedMemoryKVCache; + public readonly userFollowingChannelsCache: ManagedQuantumKVCache>; constructor( @Inject(DI.redis) @@ -90,27 +81,26 @@ export class CacheService implements OnApplicationShutdown { @Inject(DI.channelFollowingsRepository) private readonly channelFollowingsRepository: ChannelFollowingsRepository, - @Inject(DI.instancesRepository) - private readonly instancesRepository: InstancesRepository, + @Inject(DI.userListMembershipsRepository) + private readonly userListMembershipsRepository: UserListMembershipsRepository, private readonly internalEventService: InternalEventService, - private readonly utilityService: UtilityService, - private readonly idService: IdService, + private readonly cacheManagementService: CacheManagementService, ) { //this.onMessage = this.onMessage.bind(this); - this.userByIdCache = new MemoryKVCache(1000 * 60 * 5); // 5m - this.localUserByNativeTokenCache = new MemoryKVCache(1000 * 60 * 5); // 5m - this.localUserByIdCache = new MemoryKVCache(1000 * 60 * 5); // 5m - this.uriPersonCache = new MemoryKVCache(1000 * 60 * 5); // 5m + this.userByIdCache = this.cacheManagementService.createMemoryKVCache(1000 * 60 * 5); // 5m + this.localUserByNativeTokenCache = this.cacheManagementService.createMemoryKVCache(1000 * 60 * 5); // 5m + this.localUserByIdCache = this.cacheManagementService.createMemoryKVCache(1000 * 60 * 5); // 5m + this.uriPersonCache = this.cacheManagementService.createMemoryKVCache(1000 * 60 * 5); // 5m - this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', { + this.userProfileCache = this.cacheManagementService.createQuantumKVCache('userProfile', { lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }), + fetcher: (key) => this.userProfilesRepository.findOneBy({ userId: key }), bulkFetcher: userIds => this.userProfilesRepository.findBy({ userId: In(userIds) }).then(ps => ps.map(p => [p.userId, p])), }); - this.userMutingsCache = new QuantumKVCache>(this.internalEventService, 'userMutings', { + this.userMutingsCache = this.cacheManagementService.createQuantumKVCache>('userMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), bulkFetcher: muterIds => this.mutingsRepository @@ -123,7 +113,7 @@ export class CacheService implements OnApplicationShutdown { .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.userBlockingCache = new QuantumKVCache>(this.internalEventService, 'userBlocking', { + this.userBlockingCache = this.cacheManagementService.createQuantumKVCache>('userBlocking', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))), bulkFetcher: blockerIds => this.blockingsRepository @@ -136,7 +126,7 @@ export class CacheService implements OnApplicationShutdown { .then(ms => ms.map(m => [m.blockerId, new Set(m.blockeeIds)])), }); - this.userBlockedCache = new QuantumKVCache>(this.internalEventService, 'userBlocked', { + this.userBlockedCache = this.cacheManagementService.createQuantumKVCache>('userBlocked', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))), bulkFetcher: blockeeIds => this.blockingsRepository @@ -149,7 +139,41 @@ export class CacheService implements OnApplicationShutdown { .then(ms => ms.map(m => [m.blockeeId, new Set(m.blockerIds)])), }); - this.renoteMutingsCache = new QuantumKVCache>(this.internalEventService, 'renoteMutings', { + this.userListMembershipsCache = this.cacheManagementService.createQuantumKVCache>('userListMemberships', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: async userId => await this.userListMembershipsRepository.findBy({ userId }).then(ms => new Map(ms.map(m => [m.id, m]))), + bulkFetcher: async userIds => await this.userListMembershipsRepository + .findBy({ userId: In(userIds) }) + .then(ms => ms + .reduce((groups, m) => { + let listsForUser = groups.get(m.userId); + if (!listsForUser) { + listsForUser = new Map(); + groups.set(m.userId, listsForUser); + } + listsForUser.set(m.userListId, m); + return groups; + }, new Map>)), + }); + + this.listUserMembershipsCache = this.cacheManagementService.createQuantumKVCache>('listUserMemberships', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: async userListId => await this.userListMembershipsRepository.findBy({ userListId }).then(ms => new Map(ms.map(m => [m.id, m]))), + bulkFetcher: async userListIds => await this.userListMembershipsRepository + .findBy({ userListId: In(userListIds) }) + .then(ms => ms + .reduce((groups, m) => { + let usersForList = groups.get(m.userListId); + if (!usersForList) { + usersForList = new Map(); + groups.set(m.userListId, usersForList); + } + usersForList.set(m.userId, m); + return groups; + }, new Map>)), + }); + + this.renoteMutingsCache = this.cacheManagementService.createQuantumKVCache>('renoteMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), bulkFetcher: muterIds => this.renoteMutingsRepository @@ -162,7 +186,7 @@ export class CacheService implements OnApplicationShutdown { .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.threadMutingsCache = new QuantumKVCache>(this.internalEventService, 'threadMutings', { + this.threadMutingsCache = this.cacheManagementService.createQuantumKVCache>('threadMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: muterId => this.noteThreadMutingsRepository .find({ where: { userId: muterId, isPostMute: false }, select: { threadId: true } }) @@ -177,7 +201,7 @@ export class CacheService implements OnApplicationShutdown { .then(ms => ms.map(m => [m.userId, new Set(m.threadIds)])), }); - this.noteMutingsCache = new QuantumKVCache>(this.internalEventService, 'noteMutings', { + this.noteMutingsCache = this.cacheManagementService.createQuantumKVCache>('noteMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: muterId => this.noteThreadMutingsRepository .find({ where: { userId: muterId, isPostMute: true }, select: { threadId: true } }) @@ -192,7 +216,7 @@ export class CacheService implements OnApplicationShutdown { .then(ms => ms.map(m => [m.userId, new Set(m.threadIds)])), }); - this.userFollowingsCache = new QuantumKVCache>>(this.internalEventService, 'userFollowings', { + this.userFollowingsCache = this.cacheManagementService.createQuantumKVCache>>('userFollowings', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.followingsRepository.findBy({ followerId: key }).then(xs => new Map(xs.map(f => [f.followeeId, f]))), bulkFetcher: followerIds => this.followingsRepository @@ -209,7 +233,7 @@ export class CacheService implements OnApplicationShutdown { }, new Map>>)), }); - this.userFollowersCache = new QuantumKVCache>>(this.internalEventService, 'userFollowers', { + this.userFollowersCache = this.cacheManagementService.createQuantumKVCache>>('userFollowers', { lifetime: 1000 * 60 * 30, // 30m fetcher: followeeId => this.followingsRepository.findBy({ followeeId: followeeId }).then(xs => new Map(xs.map(x => [x.followerId, x]))), bulkFetcher: followeeIds => this.followingsRepository @@ -226,14 +250,14 @@ export class CacheService implements OnApplicationShutdown { }, new Map>>)), }); - this.hibernatedUserCache = new QuantumKVCache(this.internalEventService, 'hibernatedUsers', { + this.hibernatedUserCache = this.cacheManagementService.createQuantumKVCache('hibernatedUsers', { lifetime: 1000 * 60 * 30, // 30m fetcher: async userId => { - const { isHibernated } = await this.usersRepository.findOneOrFail({ + const result = await this.usersRepository.findOne({ where: { id: userId }, select: { isHibernated: true }, }); - return isHibernated; + return result?.isHibernated; }, bulkFetcher: async userIds => { const results = await this.usersRepository.find({ @@ -278,12 +302,9 @@ export class CacheService implements OnApplicationShutdown { }, }); - this.translationsCache = new RedisKVCache(this.redisClient, 'translations', { - lifetime: 1000 * 60 * 60 * 24 * 7, // 1 week, - memoryCacheLifetime: 1000 * 60, // 1 minute - }); + this.userFollowStatsCache = this.cacheManagementService.createMemoryKVCache(1000 * 60 * 10); // 10 minutes - this.userFollowingChannelsCache = new QuantumKVCache>(this.internalEventService, 'userFollowingChannels', { + this.userFollowingChannelsCache = this.cacheManagementService.createQuantumKVCache>('userFollowingChannels', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.channelFollowingsRepository.find({ where: { followerId: key }, @@ -292,38 +313,6 @@ export class CacheService implements OnApplicationShutdown { // TODO bulk fetcher }); - this.federatedInstanceCache = new QuantumKVCache(this.internalEventService, 'federatedInstance', { - lifetime: 1000 * 60 * 3, // 3 minutes - fetcher: async key => { - const host = this.utilityService.toPuny(key); - let instance = await this.instancesRepository.findOneBy({ host }); - if (instance == null) { - await this.instancesRepository.createQueryBuilder('instance') - .insert() - .values({ - id: this.idService.gen(), - host, - firstRetrievedAt: new Date(), - isBlocked: this.utilityService.isBlockedHost(host), - isSilenced: this.utilityService.isSilencedHost(host), - isMediaSilenced: this.utilityService.isMediaSilencedHost(host), - isAllowListed: this.utilityService.isAllowListedHost(host), - isBubbled: this.utilityService.isBubbledHost(host), - }) - .orIgnore() - .execute(); - - instance = await this.instancesRepository.findOneByOrFail({ host }); - } - return instance; - }, - bulkFetcher: async keys => { - const hosts = keys.map(key => this.utilityService.toPuny(key)); - const instances = await this.instancesRepository.findBy({ host: In(hosts) }); - return instances.map(i => [i.host, i]); - }, - }); - this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); this.internalEventService.on('userChangeDeletedState', this.onUserEvent); this.internalEventService.on('remoteUserUpdated', this.onUserEvent); @@ -334,7 +323,13 @@ export class CacheService implements OnApplicationShutdown { // For these, only listen to local events because quantum cache handles the sync. this.internalEventService.on('followChannel', this.onChannelEvent, { ignoreRemote: true }); this.internalEventService.on('unfollowChannel', this.onChannelEvent, { ignoreRemote: true }); - this.internalEventService.on('metaUpdated', this.onMetaEvent, { ignoreRemote: true }); + this.internalEventService.on('updateUserProfile', this.onProfileEvent, { ignoreRemote: true }); + this.internalEventService.on('userListMemberAdded', this.onListMemberEvent, { ignoreRemote: true }); + this.internalEventService.on('userListMemberUpdated', this.onListMemberEvent, { ignoreRemote: true }); + this.internalEventService.on('userListMemberRemoved', this.onListMemberEvent, { ignoreRemote: true }); + this.internalEventService.on('userListMemberBulkAdded', this.onListMemberEvent, { ignoreRemote: true }); + this.internalEventService.on('userListMemberBulkUpdated', this.onListMemberEvent, { ignoreRemote: true }); + this.internalEventService.on('userListMemberBulkRemoved', this.onListMemberEvent, { ignoreRemote: true }); } @bindThis @@ -351,6 +346,14 @@ export class CacheService implements OnApplicationShutdown { this.uriPersonCache.delete(k); } } + + // Contains IDs of all lists where this user is a member. + const userListMemberships = this.listUserMembershipsCache + .entries() + .filter(e => e[1].has(body.id)) + .map(e => e[0]) + .toArray(); + if (isLocal) { await Promise.all([ this.userProfileCache.delete(body.id), @@ -363,6 +366,8 @@ export class CacheService implements OnApplicationShutdown { this.hibernatedUserCache.delete(body.id), this.threadMutingsCache.delete(body.id), this.noteMutingsCache.delete(body.id), + this.userListMembershipsCache.delete(body.id), + this.listUserMembershipsCache.deleteMany(userListMemberships), ]); } } else { @@ -436,21 +441,17 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - private async onMetaEvent(body: InternalEventTypes[E]): Promise { - const { before, after } = body; - const changed = ( - diffArraysSimple(before?.blockedHosts, after.blockedHosts) || - diffArraysSimple(before?.silencedHosts, after.silencedHosts) || - diffArraysSimple(before?.mediaSilencedHosts, after.mediaSilencedHosts) || - diffArraysSimple(before?.federationHosts, after.federationHosts) || - diffArraysSimple(before?.bubbleInstances, after.bubbleInstances) - ); + private async onProfileEvent(body: InternalEventTypes[E]): Promise { + await this.userProfileCache.delete(body.userId); + } - if (changed) { - // We have to clear the whole thing, otherwise subdomains won't be synced. - // This gets fired in *each* process so don't do anything to trigger cache notifications! - this.federatedInstanceCache.clear(); - } + @bindThis + private async onListMemberEvent(body: InternalEventTypes[E]): Promise { + const userListIds = 'userListIds' in body ? body.userListIds : [body.userListId]; + await Promise.all([ + this.userListMembershipsCache.delete(body.memberId), + this.listUserMembershipsCache.deleteMany(userListIds), + ]); } @bindThis @@ -527,34 +528,6 @@ export class CacheService implements OnApplicationShutdown { }); } - @bindThis - public async getCachedTranslation(note: MiNote, targetLang: string): Promise { - const cacheKey = `${note.id}@${targetLang}`; - - // Use cached translation, if present and up-to-date - const cached = await this.translationsCache.get(cacheKey); - if (cached && cached.u === note.updatedAt?.valueOf()) { - return { - sourceLang: cached.l, - text: cached.t, - }; - } - - // No cache entry :( - return null; - } - - @bindThis - public async setCachedTranslation(note: MiNote, targetLang: string, translation: CachedTranslation): Promise { - const cacheKey = `${note.id}@${targetLang}`; - - await this.translationsCache.set(cacheKey, { - l: translation.sourceLang, - t: translation.text, - u: note.updatedAt?.valueOf(), - }); - } - @bindThis public async getUsers(userIds: Iterable): Promise> { const users = new Map; @@ -640,20 +613,7 @@ export class CacheService implements OnApplicationShutdown { @bindThis public clear(): void { - this.userByIdCache.clear(); - this.localUserByNativeTokenCache.clear(); - this.localUserByIdCache.clear(); - this.uriPersonCache.clear(); - this.userProfileCache.clear(); - this.userMutingsCache.clear(); - this.userBlockingCache.clear(); - this.userBlockedCache.clear(); - this.renoteMutingsCache.clear(); - this.userFollowingsCache.clear(); - this.userFollowStatsCache.clear(); - this.translationsCache.clear(); - this.userFollowingChannelsCache.clear(); - this.federatedInstanceCache.clear(); + this.cacheManagementService.clear(); } @bindThis @@ -667,30 +627,17 @@ export class CacheService implements OnApplicationShutdown { this.internalEventService.off('unfollow', this.onFollowEvent); this.internalEventService.off('followChannel', this.onChannelEvent); this.internalEventService.off('unfollowChannel', this.onChannelEvent); - this.internalEventService.off('metaUpdated', this.onMetaEvent); - - this.userByIdCache.dispose(); - this.localUserByNativeTokenCache.dispose(); - this.localUserByIdCache.dispose(); - this.uriPersonCache.dispose(); - this.userProfileCache.dispose(); - this.userMutingsCache.dispose(); - this.userBlockingCache.dispose(); - this.userBlockedCache.dispose(); - this.renoteMutingsCache.dispose(); - this.threadMutingsCache.dispose(); - this.noteMutingsCache.dispose(); - this.userFollowingsCache.dispose(); - this.userFollowersCache.dispose(); - this.hibernatedUserCache.dispose(); - this.userFollowStatsCache.dispose(); - this.translationsCache.dispose(); - this.userFollowingChannelsCache.dispose(); - this.federatedInstanceCache.dispose(); + this.internalEventService.off('updateUserProfile', this.onProfileEvent); + this.internalEventService.off('userListMemberAdded', this.onListMemberEvent); + this.internalEventService.off('userListMemberUpdated', this.onListMemberEvent); + this.internalEventService.off('userListMemberRemoved', this.onListMemberEvent); + this.internalEventService.off('userListMemberBulkAdded', this.onListMemberEvent); + this.internalEventService.off('userListMemberBulkUpdated', this.onListMemberEvent); + this.internalEventService.off('userListMemberBulkRemoved', this.onListMemberEvent); } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { + public onApplicationShutdown(): void { this.dispose(); } } diff --git a/packages/backend/src/core/FanoutTimelineEndpointService.ts b/packages/backend/src/core/FanoutTimelineEndpointService.ts index 27011bdeb5..ddb0ddb7d2 100644 --- a/packages/backend/src/core/FanoutTimelineEndpointService.ts +++ b/packages/backend/src/core/FanoutTimelineEndpointService.ts @@ -277,7 +277,7 @@ export class FanoutTimelineEndpointService { // Fetch everything and populate users const [users, instances] = await Promise.all([ this.cacheService.getUsers(usersToFetch), - this.cacheService.federatedInstanceCache.fetchMany(instancesToFetch).then(i => new Map(i)), + this.federatedInstanceService.federatedInstanceCache.fetchMany(instancesToFetch).then(i => new Map(i)), ]); for (const [id, user] of Array.from(users)) { users.set(id, { diff --git a/packages/backend/src/core/FederatedInstanceService.ts b/packages/backend/src/core/FederatedInstanceService.ts index ca714415f4..94958534b7 100644 --- a/packages/backend/src/core/FederatedInstanceService.ts +++ b/packages/backend/src/core/FederatedInstanceService.ts @@ -4,17 +4,24 @@ */ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import type { InstancesRepository, MiMeta } from '@/models/_.js'; +import { In } from 'typeorm'; +import type { InstancesRepository } from '@/models/_.js'; +import type { MiMeta } from '@/models/Meta.js'; import type { MiInstance } from '@/models/Instance.js'; +import type { InternalEventTypes } from '@/core/GlobalEventService.js'; import { IdService } from '@/core/IdService.js'; import { DI } from '@/di-symbols.js'; import { UtilityService } from '@/core/UtilityService.js'; +import { CacheManagementService, type ManagedQuantumKVCache } from '@/core/CacheManagementService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; +import { diffArraysSimple } from '@/misc/diff-arrays.js'; import { bindThis } from '@/decorators.js'; -import { CacheService } from '@/core/CacheService.js'; import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js'; @Injectable() -export class FederatedInstanceService { +export class FederatedInstanceService implements OnApplicationShutdown { + public readonly federatedInstanceCache: ManagedQuantumKVCache; + constructor( @Inject(DI.instancesRepository) private instancesRepository: InstancesRepository, @@ -24,12 +31,49 @@ export class FederatedInstanceService { private utilityService: UtilityService, private idService: IdService, - private readonly cacheService: CacheService, - ) {} + private readonly internalEventService: InternalEventService, + + cacheManagementService: CacheManagementService, + ) { + this.federatedInstanceCache = cacheManagementService.createQuantumKVCache('federatedInstance', { + // TODO can we increase this? + lifetime: 1000 * 60 * 3, // 3 minutes + fetcher: async key => { + const host = this.utilityService.toPuny(key); + let instance = await this.instancesRepository.findOneBy({ host }); + if (instance == null) { + await this.instancesRepository.createQueryBuilder('instance') + .insert() + .values({ + id: this.idService.gen(), + host, + firstRetrievedAt: new Date(), + isBlocked: this.utilityService.isBlockedHost(host), + isSilenced: this.utilityService.isSilencedHost(host), + isMediaSilenced: this.utilityService.isMediaSilencedHost(host), + isAllowListed: this.utilityService.isAllowListedHost(host), + isBubbled: this.utilityService.isBubbledHost(host), + }) + .orIgnore() + .execute(); + + instance = await this.instancesRepository.findOneByOrFail({ host }); + } + return instance; + }, + bulkFetcher: async keys => { + const hosts = keys.map(key => this.utilityService.toPuny(key)); + const instances = await this.instancesRepository.findBy({ host: In(hosts) }); + return instances.map(i => [i.host, i]); + }, + }); + + this.internalEventService.on('metaUpdated', this.onMetaUpdated, { ignoreRemote: true }); + } @bindThis public async fetchOrRegister(host: string): Promise { - return this.cacheService.federatedInstanceCache.fetch(host); + return this.federatedInstanceCache.fetch(host); /* host = this.utilityService.toPuny(host); @@ -63,7 +107,7 @@ export class FederatedInstanceService { @bindThis public async fetch(host: string): Promise { - return this.cacheService.federatedInstanceCache.fetch(host); + return this.federatedInstanceCache.fetch(host); /* host = this.utilityService.toPuny(host); @@ -93,7 +137,7 @@ export class FederatedInstanceService { return response.raw[0] as MiInstance; }); - await this.cacheService.federatedInstanceCache.set(result.host, result); + await this.federatedInstanceCache.set(result.host, result); return result; } @@ -106,7 +150,7 @@ export class FederatedInstanceService { const allowedHosts = new Set(this.meta.federationHosts); this.meta.blockedHosts.forEach(h => allowedHosts.delete(h)); - const instances = await this.cacheService.federatedInstanceCache.fetchMany(this.meta.federationHosts); + const instances = await this.federatedInstanceCache.fetchMany(this.meta.federationHosts); return instances.map(i => i[1]); } @@ -115,7 +159,35 @@ export class FederatedInstanceService { */ @bindThis public async getDenyList(): Promise { - const instances = await this.cacheService.federatedInstanceCache.fetchMany(this.meta.blockedHosts); + const instances = await this.federatedInstanceCache.fetchMany(this.meta.blockedHosts); return instances.map(i => i[1]); } + + @bindThis + private async onMetaUpdated(body: InternalEventTypes['metaUpdated']): Promise { + const { before, after } = body; + const changed = ( + diffArraysSimple(before?.blockedHosts, after.blockedHosts) || + diffArraysSimple(before?.silencedHosts, after.silencedHosts) || + diffArraysSimple(before?.mediaSilencedHosts, after.mediaSilencedHosts) || + diffArraysSimple(before?.federationHosts, after.federationHosts) || + diffArraysSimple(before?.bubbleInstances, after.bubbleInstances) + ); + + if (changed) { + // We have to clear the whole thing, otherwise subdomains won't be synced. + // This gets fired in *each* process so don't do anything to trigger cache notifications! + this.federatedInstanceCache.clear(); + } + } + + @bindThis + public dispose() { + this.internalEventService.off('metaUpdated', this.onMetaUpdated); + } + + @bindThis + public onApplicationShutdown() { + this.dispose(); + } } diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index 5e3f17f409..7a1410fc1f 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -20,12 +20,12 @@ import type { MiPage } from '@/models/Page.js'; import type { MiWebhook } from '@/models/Webhook.js'; import type { MiSystemWebhook } from '@/models/SystemWebhook.js'; import type { MiMeta } from '@/models/Meta.js'; -import { MiAvatarDecoration, MiChatMessage, MiChatRoom, MiReversiGame, MiRole, MiRoleAssignment } from '@/models/_.js'; +import type { MiAvatarDecoration, MiChatMessage, MiChatRoom, MiReversiGame, MiRole, MiRoleAssignment } from '@/models/_.js'; import type { Packed } from '@/misc/json-schema.js'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import { bindThis } from '@/decorators.js'; -import { Serialized } from '@/types.js'; +import type { Serialized } from '@/types.js'; import { InternalEventService } from '@/core/InternalEventService.js'; import { trackPromise } from '@/misc/promise-tracker.js'; import type Emitter from 'strict-event-emitter-types'; @@ -262,11 +262,15 @@ export interface InternalEventTypes { metaUpdated: { before?: MiMeta; after: MiMeta; }; followChannel: { userId: MiUser['id']; channelId: MiChannel['id']; }; unfollowChannel: { userId: MiUser['id']; channelId: MiChannel['id']; }; - updateUserProfile: MiUserProfile; + updateUserProfile: { userId: MiUserProfile['userId'] }; mute: { muterId: MiUser['id']; muteeId: MiUser['id']; }; unmute: { muterId: MiUser['id']; muteeId: MiUser['id']; }; userListMemberAdded: { userListId: MiUserList['id']; memberId: MiUser['id']; }; + userListMemberUpdated: { userListId: MiUserList['id']; memberId: MiUser['id']; }; userListMemberRemoved: { userListId: MiUserList['id']; memberId: MiUser['id']; }; + userListMemberBulkAdded: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; }; + userListMemberBulkUpdated: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; }; + userListMemberBulkRemoved: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; }; quantumCacheUpdated: { name: string, keys: string[] }; } @@ -405,8 +409,8 @@ export class GlobalEventService { } @bindThis - public publishUserListStream(listId: MiUserList['id'], type: K, value?: UserListEventTypes[K]): void { - this.publish(`userListStream:${listId}`, type, typeof value === 'undefined' ? null : value); + public async publishUserListStream(listId: MiUserList['id'], type: K, value?: UserListEventTypes[K]): Promise { + await this.publish(`userListStream:${listId}`, type, typeof value === 'undefined' ? null : value); } @bindThis diff --git a/packages/backend/src/core/NoteVisibilityService.ts b/packages/backend/src/core/NoteVisibilityService.ts index 0285847cf5..c43af9f94a 100644 --- a/packages/backend/src/core/NoteVisibilityService.ts +++ b/packages/backend/src/core/NoteVisibilityService.ts @@ -4,15 +4,18 @@ */ import { Inject, Injectable } from '@nestjs/common'; -import { CacheService } from '@/core/CacheService.js'; import type { MiNote } from '@/models/Note.js'; import type { MiUser } from '@/models/User.js'; -import { bindThis } from '@/decorators.js'; +import type { MiFollowing } from '@/models/Following.js'; +import type { MiInstance } from '@/models/Instance.js'; +import type { MiUserListMembership } from '@/models/UserListMembership.js'; +import type { NotesRepository } from '@/models/_.js'; import type { Packed } from '@/misc/json-schema.js'; -import { IdService } from '@/core/IdService.js'; -import { awaitAll } from '@/misc/prelude/await-all.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; -import type { MiFollowing, MiInstance, NotesRepository } from '@/models/_.js'; +import { CacheService } from '@/core/CacheService.js'; +import { IdService } from '@/core/IdService.js'; +import { bindThis } from '@/decorators.js'; +import { awaitAll } from '@/misc/prelude/await-all.js'; import { DI } from '@/di-symbols.js'; /** @@ -50,6 +53,12 @@ export interface NoteVisibilityFilters { * If false (default), then silence is enforced for all notes. */ includeSilencedAuthor?: boolean; + + /** + * Set to an ID to apply visibility from the context of a specific user list. + * Membership and "with replies" settings will be adopted from this list. + */ + listContext?: string | null; } @Injectable() @@ -151,7 +160,7 @@ export class NoteVisibilityService { } @bindThis - public async populateData(user: PopulatedMe, hint?: Partial): Promise { + public async populateData(user: PopulatedMe, hint?: Partial, filters?: NoteVisibilityFilters): Promise { // noinspection ES6MissingAwait const [ userBlockers, @@ -161,6 +170,7 @@ export class NoteVisibilityService { userMutedUsers, userMutedUserRenotes, userMutedInstances, + userListMemberships, ] = await Promise.all([ user ? (hint?.userBlockers ?? this.cacheService.userBlockedCache.fetch(user.id)) : null, user ? (hint?.userFollowings ?? this.cacheService.userFollowingsCache.fetch(user.id)) : null, @@ -169,6 +179,7 @@ export class NoteVisibilityService { user ? (hint?.userMutedUsers ?? this.cacheService.userMutingsCache.fetch(user.id)) : null, user ? (hint?.userMutedUserRenotes ?? this.cacheService.renoteMutingsCache.fetch(user.id)) : null, user ? (hint?.userMutedInstances ?? this.cacheService.userProfileCache.fetch(user.id).then(p => new Set(p.mutedInstances))) : null, + filters?.listContext ? (hint?.userListMemberships ?? this.cacheService.listUserMembershipsCache.fetch(filters.listContext)) : null, ]); return { @@ -179,6 +190,7 @@ export class NoteVisibilityService { userMutedUsers, userMutedUserRenotes, userMutedInstances, + userListMemberships, }; } @@ -396,6 +408,9 @@ export class NoteVisibilityService { // Don't silence if we follow w/ replies if (user && data.userFollowings?.get(user.id)?.withReplies) return false; + // Don't silence if we're viewing in a list with replies + if (data.userListMemberships?.get(note.userId)?.withReplies) return false; + // Silence otherwise return true; } @@ -409,6 +424,9 @@ export interface NoteVisibilityData extends NotePopulationData { userMutedUsers: Set | null; userMutedUserRenotes: Set | null; userMutedInstances: Set | null; + + // userId => membership (already scoped to listContext) + userListMemberships: Map | null; } export interface NotePopulationData { diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index 2ce7bdb5a9..89b9e9dfed 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -139,7 +139,7 @@ export class NotificationService implements OnApplicationShutdown { return null; } } else if (recieveConfig?.type === 'list') { - const isMember = await this.userListService.membersCache.fetch(recieveConfig.userListId).then(members => members.has(notifierId)); + const isMember = await this.cacheService.listUserMembershipsCache.fetch(recieveConfig.userListId).then(members => members.has(notifierId)); if (!isMember) { return null; } diff --git a/packages/backend/src/core/UserBlockingService.ts b/packages/backend/src/core/UserBlockingService.ts index cf3395ad25..7463767513 100644 --- a/packages/backend/src/core/UserBlockingService.ts +++ b/packages/backend/src/core/UserBlockingService.ts @@ -19,7 +19,8 @@ import { LoggerService } from '@/core/LoggerService.js'; import { UserWebhookService } from '@/core/UserWebhookService.js'; import { bindThis } from '@/decorators.js'; import { CacheService } from '@/core/CacheService.js'; -import { UserFollowingService } from '@/core/UserFollowingService.js'; +import type { UserFollowingService } from '@/core/UserFollowingService.js'; +import { UserListService } from '@/core/UserListService.js'; @Injectable() export class UserBlockingService implements OnModuleInit { @@ -140,16 +141,12 @@ export class UserBlockingService implements OnModuleInit { @bindThis private async removeFromList(listOwner: MiUser, user: MiUser) { - const userLists = await this.userListsRepository.findBy({ - userId: listOwner.id, + const userLists = await this.userListsRepository.find({ + where: { userId: listOwner.id }, + select: { id: true }, }); - for (const userList of userLists) { - await this.userListMembershipsRepository.delete({ - userListId: userList.id, - userId: user.id, - }); - } + await this.userListService.bulkRemoveMember(user, userLists.map(l => l.id)); } @bindThis diff --git a/packages/backend/src/core/UserListService.ts b/packages/backend/src/core/UserListService.ts index b4486b9808..dd09e2f039 100644 --- a/packages/backend/src/core/UserListService.ts +++ b/packages/backend/src/core/UserListService.ts @@ -6,27 +6,26 @@ import { Inject, Injectable, OnApplicationShutdown, OnModuleInit } from '@nestjs/common'; import * as Redis from 'ioredis'; import { ModuleRef } from '@nestjs/core'; -import type { MiMeta, UserListMembershipsRepository } from '@/models/_.js'; +import { In } from 'typeorm'; +import type { MiMeta, UserListMembershipsRepository, UserListsRepository } from '@/models/_.js'; import type { MiUser } from '@/models/User.js'; import type { MiUserList } from '@/models/UserList.js'; import type { MiUserListMembership } from '@/models/UserListMembership.js'; import { IdService } from '@/core/IdService.js'; -import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; import { QueueService } from '@/core/QueueService.js'; -import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; -import { RoleService } from '@/core/RoleService.js'; +import type { RoleService } from '@/core/RoleService.js'; import { SystemAccountService } from '@/core/SystemAccountService.js'; import { InternalEventService } from '@/core/InternalEventService.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() -export class UserListService implements OnApplicationShutdown, OnModuleInit { +export class UserListService implements OnModuleInit { public static TooManyUsersError = class extends Error {}; - public membersCache: QuantumKVCache>; private roleService: RoleService; constructor( @@ -41,6 +40,9 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { @Inject(DI.userListMembershipsRepository) private userListMembershipsRepository: UserListMembershipsRepository, + @Inject(DI.userListsRepository) + private readonly userListsRepository: UserListsRepository, + @Inject(DI.meta) private readonly meta: MiMeta, @@ -50,52 +52,21 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { private queueService: QueueService, private systemAccountService: SystemAccountService, private readonly internalEventService: InternalEventService, - ) { - this.membersCache = new QuantumKVCache>(this.internalEventService, 'userListMembers', { - lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.userListMembershipsRepository.find({ where: { userListId: key }, select: ['userId'] }).then(xs => new Set(xs.map(x => x.userId))), - }); - - this.internalEventService.on('userListMemberAdded', this.onMessage); - this.internalEventService.on('userListMemberRemoved', this.onMessage); - } + private readonly cacheService: CacheService, + ) {} + @bindThis async onModuleInit() { this.roleService = this.moduleRef.get('RoleService'); } - @bindThis - private async onMessage(body: InternalEventTypes[E], type: E): Promise { - { - switch (type) { - case 'userListMemberAdded': { - const { userListId, memberId } = body; - const members = this.membersCache.get(userListId); - if (members) { - members.add(memberId); - } - break; - } - case 'userListMemberRemoved': { - const { userListId, memberId } = body; - const members = this.membersCache.get(userListId); - if (members) { - members.delete(memberId); - } - break; - } - default: - break; - } - } - } - @bindThis public async addMember(target: MiUser, list: MiUserList, me: MiUser) { - const currentCount = await this.userListMembershipsRepository.countBy({ - userListId: list.id, - }); - if (currentCount >= (await this.roleService.getUserPolicies(me.id)).userEachUserListsLimit) { + const [current, policies] = await Promise.all([ + this.cacheService.listUserMembershipsCache.fetch(list.id), + this.roleService.getUserPolicies(me), + ]); + if (current.size >= policies.userEachUserListsLimit) { throw new UserListService.TooManyUsersError(); } @@ -106,13 +77,13 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { userListUserId: list.userId, } as MiUserListMembership); - this.globalEventService.publishInternalEvent('userListMemberAdded', { userListId: list.id, memberId: target.id }); + await this.internalEventService.emit('userListMemberAdded', { userListId: list.id, memberId: target.id }); this.globalEventService.publishUserListStream(list.id, 'userAdded', await this.userEntityService.pack(target)); // このインスタンス内にこのリモートユーザーをフォローしているユーザーがいなくても投稿を受け取るためにダミーのユーザーがフォローしたということにする if (this.userEntityService.isRemoteUser(target) && this.meta.enableProxyAccount) { const proxy = await this.systemAccountService.fetch('proxy'); - this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: target.id } }]); + await this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: target.id } }]); } } @@ -123,16 +94,15 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { userListId: list.id, }); - this.globalEventService.publishInternalEvent('userListMemberRemoved', { userListId: list.id, memberId: target.id }); + await this.internalEventService.emit('userListMemberRemoved', { userListId: list.id, memberId: target.id }); this.globalEventService.publishUserListStream(list.id, 'userRemoved', await this.userEntityService.pack(target)); } @bindThis public async updateMembership(target: MiUser, list: MiUserList, options: { withReplies?: boolean }) { - const membership = await this.userListMembershipsRepository.findOneBy({ - userId: target.id, - userListId: list.id, - }); + const membership = await this.cacheService.userListMembershipsCache + .fetchMaybe(target.id) + .then(ms => ms?.get(list.id)); if (membership == null) { throw new Error('User is not a member of the list'); @@ -143,17 +113,100 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { }, { withReplies: options.withReplies, }); + + await this.internalEventService.emit('userListMemberUpdated', { userListId: list.id, memberId: target.id }); } @bindThis - public dispose(): void { - this.internalEventService.off('userListMemberAdded', this.onMessage); - this.internalEventService.off('userListMemberRemoved', this.onMessage); - this.membersCache.dispose(); + public async bulkAddMember(target: { id: MiUser['id'] }, memberships: { userListId: MiUserList['id'], withReplies?: boolean }[]): Promise { + const userListIds = memberships.map(m => m.userListId); + + // Map userListId => userListUserId + const listUserIds = await this.userListsRepository + .find({ + where: { id: In(userListIds) }, + select: { id: true, userId: true }, + }) + .then(ls => new Map(ls.map(l => [l.id, l.userId]))); + + const toInsert = memberships.map(membership => ({ + id: this.idService.gen(), + userId: target.id, + userListId: membership.userListId, + userListUserId: listUserIds.get(membership.userListId), + withReplies: membership.withReplies, + })); + + await this.userListMembershipsRepository.insert(toInsert); + await this.internalEventService.emit('userListMemberBulkAdded', { + memberId: target.id, + userListIds, + }); + + const targetUser = await this.cacheService.findUserById(target.id); + const packedUser = await this.userEntityService.pack(targetUser); + await Promise.all(memberships.map(async membership => { + await this.globalEventService.publishUserListStream(membership.userListId, 'userAdded', packedUser); + })); + + // このインスタンス内にこのリモートユーザーをフォローしているユーザーがいなくても投稿を受け取るためにダミーのユーザーがフォローしたということにする + if (this.userEntityService.isRemoteUser(targetUser) && this.meta.enableProxyAccount) { + const proxy = await this.systemAccountService.fetch('proxy'); + await this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: target.id } }]); + } } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async bulkRemoveMember(target: { id: MiUser['id'] }, memberships: { userListId: MiUserList['id'] }[] | MiUserList['id'][]): Promise { + const userListIds = memberships.map(mem => typeof(mem) === 'object' ? mem.userListId : mem); + + await this.userListMembershipsRepository.delete({ + userId: target.id, + userListId: In(userListIds), + }); + + await this.internalEventService.emit('userListMemberBulkRemoved', { + userListIds, + memberId: target.id, + }); + + const targetUser = await this.cacheService.findUserById(target.id); + const packedUser = await this.userEntityService.pack(targetUser); + await Promise.all(userListIds.map(async userListId => { + await this.globalEventService.publishUserListStream(userListId, 'userRemoved', packedUser); + })); + } + + @bindThis + public async bulkUpdateMembership(target: { id: MiUser['id'] }, memberships: { userListId: MiUserList['id'], withReplies: boolean }[]): Promise { + const userListMemberships = await this.cacheService.userListMembershipsCache.fetch(target.id); + const membershipChanges = memberships + .map(mem => { + const old = userListMemberships.get(mem.userListId); + return { + new: mem, + id: old?.id ?? '', + old, + }; + }); + + const toAddReplies = membershipChanges + .filter(mem => mem.old != null && mem.new.withReplies && !mem.old.withReplies) + .map(mem => mem.id); + if (toAddReplies.length > 0) { + await this.userListMembershipsRepository.update({ id: In(toAddReplies) }, { withReplies: true }); + } + + const toRemoveReplies = membershipChanges + .filter(mem => mem.old != null && !mem.new.withReplies && mem.old.withReplies) + .map(mem => mem.id); + if (toRemoveReplies.length > 0) { + await this.userListMembershipsRepository.update({ id: In(toRemoveReplies) }, { withReplies: false }); + } + + await this.internalEventService.emit('userListMemberBulkUpdated', { + memberId: target.id, + userListIds: memberships.map(m => m.userListId), + }); } } diff --git a/packages/backend/src/core/entities/UserListEntityService.ts b/packages/backend/src/core/entities/UserListEntityService.ts index b77249c5cb..1693f67861 100644 --- a/packages/backend/src/core/entities/UserListEntityService.ts +++ b/packages/backend/src/core/entities/UserListEntityService.ts @@ -11,6 +11,7 @@ import type { } from '@/models/Blocking.js'; import type { MiUserList } from '@/models/UserList.js'; import { bindThis } from '@/decorators.js'; import { IdService } from '@/core/IdService.js'; +import { CacheService } from '@/core/CacheService.js'; import { UserEntityService } from './UserEntityService.js'; @Injectable() @@ -24,6 +25,7 @@ export class UserListEntityService { private userEntityService: UserEntityService, private idService: IdService, + private readonly cacheService: CacheService, ) { } @@ -31,17 +33,18 @@ export class UserListEntityService { public async pack( src: MiUserList['id'] | MiUserList, ): Promise> { - const userList = typeof src === 'object' ? src : await this.userListsRepository.findOneByOrFail({ id: src }); + const srcId = typeof(src) === 'object' ? src.id : src; - const users = await this.userListMembershipsRepository.findBy({ - userListId: userList.id, - }); + const [userList, users] = await Promise.all([ + typeof src === 'object' ? src : await this.userListsRepository.findOneByOrFail({ id: src }), + this.cacheService.listUserMembershipsCache.fetch(srcId), + ]); return { id: userList.id, createdAt: this.idService.parse(userList.id).date.toISOString(), name: userList.name, - userIds: users.map(x => x.userId), + userIds: users.keys().toArray(), isPublic: userList.isPublic, }; } diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index c54c27d661..1a5852067d 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -26,8 +26,6 @@ import type { DeliverJobData } from '../types.js'; @Injectable() export class DeliverProcessorService { private logger: Logger; - private suspendedHostsCache: MemorySingleCache; - private latest: string | null; constructor( @Inject(DI.meta) @@ -46,7 +44,6 @@ export class DeliverProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('deliver'); - this.suspendedHostsCache = new MemorySingleCache(1000 * 60 * 60); // 1h } @bindThis @@ -57,25 +54,15 @@ export class DeliverProcessorService { return 'skip (blocked)'; } - // isSuspendedなら中断 - let suspendedHosts = this.suspendedHostsCache.get(); - if (suspendedHosts == null) { - suspendedHosts = await this.instancesRepository.find({ - where: { - suspensionState: Not('none'), - }, - }); - this.suspendedHostsCache.set(suspendedHosts); - } - if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) { + const i = await this.federatedInstanceService.federatedInstanceCache.fetch(host); + if (i.suspensionState !== 'none') { return 'skip (suspended)'; } - const i = await (this.meta.enableStatsForFederatedInstances - ? this.federatedInstanceService.fetchOrRegister(host) - : this.federatedInstanceService.fetch(host)); + // Make sure info is up-to-date. + await this.fetchInstanceMetadataService.fetchInstanceMetadata(i); - // suspend server by software + // suspend server by software. if (i != null && this.utilityService.isDeliverSuspendedSoftware(i)) { return 'skip (software suspended)'; } @@ -97,10 +84,6 @@ export class DeliverProcessorService { }); } - if (this.meta.enableStatsForFederatedInstances) { - this.fetchInstanceMetadataService.fetchInstanceMetadata(i); - } - if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestSent(i.host, true); } diff --git a/packages/backend/src/server/api/endpoints/notes/translate.ts b/packages/backend/src/server/api/endpoints/notes/translate.ts index f8c29b60d4..4896459390 100644 --- a/packages/backend/src/server/api/endpoints/notes/translate.ts +++ b/packages/backend/src/server/api/endpoints/notes/translate.ts @@ -12,11 +12,12 @@ import { GetterService } from '@/server/api/GetterService.js'; import { RoleService } from '@/core/RoleService.js'; import type { MiMeta, MiNote } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; -import { CacheService } from '@/core/CacheService.js'; import { hasText } from '@/models/Note.js'; import { ApiLoggerService } from '@/server/api/ApiLoggerService.js'; import { NoteVisibilityService } from '@/core/NoteVisibilityService.js'; -import { ApiError } from '../../error.js'; +import { CacheManagementService, type ManagedRedisKVCache } from '@/core/CacheManagementService.js'; +import { ApiError } from '@/server/api/error.js'; +import { bindThis } from '@/decorators.js'; export const meta = { tags: ['notes'], @@ -75,6 +76,8 @@ export const paramDef = { @Injectable() export default class extends Endpoint { // eslint-disable-line import/no-default-export + private readonly translationsCache: ManagedRedisKVCache; + constructor( @Inject(DI.meta) private serverSettings: MiMeta, @@ -83,9 +86,10 @@ export default class extends Endpoint { // eslint- private getterService: GetterService, private httpRequestService: HttpRequestService, private roleService: RoleService, - private readonly cacheService: CacheService, private readonly loggerService: ApiLoggerService, private readonly noteVisibilityService: NoteVisibilityService, + + cacheManagementService: CacheManagementService, ) { super(meta, paramDef, async (ps, me) => { const note = await this.getterService.getNote(ps.noteId).catch(err => { @@ -110,7 +114,7 @@ export default class extends Endpoint { // eslint- let targetLang = ps.targetLang; if (targetLang.includes('-')) targetLang = targetLang.split('-')[0]; - let response = await this.cacheService.getCachedTranslation(note, targetLang); + let response = await this.getCachedTranslation(note, targetLang); if (!response) { this.loggerService.logger.debug(`Fetching new translation for note=${note.id} lang=${targetLang}`); response = await this.fetchTranslation(note, targetLang); @@ -118,10 +122,15 @@ export default class extends Endpoint { // eslint- throw new ApiError(meta.errors.translationFailed); } - await this.cacheService.setCachedTranslation(note, targetLang, response); + await this.setCachedTranslation(note, targetLang, response); } return response; }); + + this.translationsCache = cacheManagementService.createRedisKVCache('translations', { + lifetime: 1000 * 60 * 60 * 24 * 7, // 1 week, + memoryCacheLifetime: 1000 * 60, // 1 minute + }); } private async fetchTranslation(note: MiNote & { text: string }, targetLang: string) { @@ -219,4 +228,43 @@ export default class extends Endpoint { // eslint- return null; } + + @bindThis + private async getCachedTranslation(note: MiNote, targetLang: string): Promise { + const cacheKey = `${note.id}@${targetLang}`; + + // Use cached translation, if present and up-to-date + const cached = await this.translationsCache.get(cacheKey); + if (cached && cached.u === note.updatedAt?.valueOf()) { + return { + sourceLang: cached.l, + text: cached.t, + }; + } + + // No cache entry :( + return null; + } + + @bindThis + private async setCachedTranslation(note: MiNote, targetLang: string, translation: CachedTranslation): Promise { + const cacheKey = `${note.id}@${targetLang}`; + + await this.translationsCache.set(cacheKey, { + l: translation.sourceLang, + t: translation.text, + u: note.updatedAt?.valueOf(), + }); + } +} + +interface CachedTranslation { + sourceLang: string | undefined; + text: string | undefined; +} + +interface CachedTranslationEntity { + l?: string; + t?: string; + u?: number; } diff --git a/packages/backend/src/server/api/endpoints/users/lists/push.ts b/packages/backend/src/server/api/endpoints/users/lists/push.ts index c717b3959c..4171b0a35b 100644 --- a/packages/backend/src/server/api/endpoints/users/lists/push.ts +++ b/packages/backend/src/server/api/endpoints/users/lists/push.ts @@ -9,6 +9,7 @@ import type { UserListsRepository, UserListMembershipsRepository, BlockingsRepos import { Endpoint } from '@/server/api/endpoint-base.js'; import { GetterService } from '@/server/api/GetterService.js'; import { UserListService } from '@/core/UserListService.js'; +import { CacheService } from '@/core/CacheService.js'; import { DI } from '@/di-symbols.js'; import { ApiError } from '../../../error.js'; @@ -84,44 +85,34 @@ export default class extends Endpoint { // eslint- private getterService: GetterService, private userListService: UserListService, + private readonly cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { - // Fetch the list - const userList = await this.userListsRepository.findOneBy({ - id: ps.listId, - userId: me.id, - }); + const [user, blockings, userList, exist] = await Promise.all([ + this.cacheService.findOptionalUserById(ps.userId), + this.cacheService.userBlockingCache.fetch(ps.userId), + this.userListsRepository.findOneBy({ + id: ps.listId, + userId: me.id, + }), + this.cacheService.listUserMembershipsCache.fetch(ps.listId).then(ms => ms.has(ps.userId)), + ]); if (userList == null) { throw new ApiError(meta.errors.noSuchList); } - // Fetch the user - const user = await this.getterService.getUser(ps.userId).catch(err => { - if (err.id === '15348ddd-432d-49c2-8a5a-8069753becff') throw new ApiError(meta.errors.noSuchUser); - throw err; - }); + if (!user) { + throw new ApiError(meta.errors.noSuchUser); + } - // Check blocking if (user.id !== me.id) { - const blockExist = await this.blockingsRepository.exists({ - where: { - blockerId: user.id, - blockeeId: me.id, - }, - }); + const blockExist = blockings.has(me.id); if (blockExist) { throw new ApiError(meta.errors.youHaveBeenBlocked); } } - const exist = await this.userListMembershipsRepository.exists({ - where: { - userListId: userList.id, - userId: user.id, - }, - }); - if (exist) { throw new ApiError(meta.errors.alreadyAdded); } diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts index fe5ec38195..3bba4fb002 100644 --- a/packages/backend/src/server/api/stream/channels/user-list.ts +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -19,8 +19,6 @@ class UserListChannel extends Channel { public static requireCredential = true as const; public static kind = 'read:account'; private listId: string; - private membershipsMap: Record | undefined> = {}; - private listUsersClock: NodeJS.Timeout; private withFiles: boolean; private withRenotes: boolean; @@ -57,27 +55,6 @@ class UserListChannel extends Channel { this.subscriber?.on(`userListStream:${this.listId}`, this.send); this.subscriber?.on('notesStream', this.onNote); - - this.updateListUsers(); - this.listUsersClock = setInterval(this.updateListUsers, 5000); - } - - @bindThis - private async updateListUsers() { - const memberships = await this.userListMembershipsRepository.find({ - where: { - userListId: this.listId, - }, - select: ['userId'], - }); - - const membershipsMap: Record | undefined> = {}; - for (const membership of memberships) { - membershipsMap[membership.userId] = { - withReplies: membership.withReplies, - }; - } - this.membershipsMap = membershipsMap; } @bindThis @@ -87,9 +64,10 @@ class UserListChannel extends Channel { if (this.withFiles && (note.fileIds == null || note.fileIds.length === 0)) return; - if (!Object.hasOwn(this.membershipsMap, note.userId)) return; + const memberships = await this.cacheService.listUserMembershipsCache.fetch(this.listId); + if (!memberships.has(note.userId)) return; - const { accessible, silence } = await this.checkNoteVisibility(note, { includeReplies: true }); + const { accessible, silence } = await this.checkNoteVisibility(note, { includeReplies: true, listContext: this.listId }); if (!accessible || silence) return; if (!this.withRenotes && isPackedPureRenote(note)) return; @@ -102,8 +80,6 @@ class UserListChannel extends Channel { // Unsubscribe events this.subscriber?.off(`userListStream:${this.listId}`, this.send); this.subscriber?.off('notesStream', this.onNote); - - clearInterval(this.listUsersClock); } }