fix list and instance caching, manage all CacheService caches, and fix list "with replies" setting

This commit is contained in:
Hazelnoot 2025-10-01 12:14:51 -04:00
parent bb0925224d
commit 2daf5c16ec
15 changed files with 466 additions and 386 deletions

View file

@ -27,6 +27,7 @@ import { SystemAccountService } from '@/core/SystemAccountService.js';
import { RoleService } from '@/core/RoleService.js';
import { AntennaService } from '@/core/AntennaService.js';
import { CacheService } from '@/core/CacheService.js';
import { UserListService } from '@/core/UserListService.js';
@Injectable()
export class AccountMoveService {
@ -70,6 +71,7 @@ export class AccountMoveService {
private roleService: RoleService,
private antennaService: AntennaService,
private readonly cacheService: CacheService,
private readonly userListService: UserListService,
) {
}
@ -263,45 +265,35 @@ export class AccountMoveService {
@bindThis
public async updateLists(src: ThinUser, dst: MiUser): Promise<void> {
// Return if there is no list to be updated.
const oldMemberships = await this.userListMembershipsRepository.find({
where: {
userId: src.id,
},
});
if (oldMemberships.length === 0) return;
const [srcMemberships, dstMemberships] = await Promise.all([
this.cacheService.userListMembershipsCache.fetch(src.id),
this.cacheService.userListMembershipsCache.fetch(dst.id),
]);
if (srcMemberships.size === 0) return;
const existingUserListIds = await this.userListMembershipsRepository.find({
where: {
userId: dst.id,
},
}).then(memberships => memberships.map(membership => membership.userListId));
const newMemberships = srcMemberships.values()
.filter(srcMembership => !dstMemberships.has(srcMembership.userListId))
.map(srcMembership => ({
userListId: srcMembership.userListId,
withReplies: srcMembership.withReplies,
}))
.toArray();
const updatedMemberships = srcMemberships.values()
.filter(srcMembership => {
const dstMembership = dstMemberships.get(srcMembership.userListId);
return dstMembership != null && dstMembership.withReplies !== srcMembership.withReplies;
})
.map(srcMembership => ({
userListId: srcMembership.userListId,
withReplies: srcMembership.withReplies,
}))
.toArray();
const newMemberships: Map<string, { userId: string; userListId: string; userListUserId: string; }> = new Map();
// 重複しないようにIDを生成
const genId = (): string => {
let id: string;
do {
id = this.idService.gen();
} while (newMemberships.has(id));
return id;
};
for (const membership of oldMemberships) {
if (existingUserListIds.includes(membership.userListId)) continue; // skip if dst exists in this user's list
newMemberships.set(genId(), {
userId: dst.id,
userListId: membership.userListId,
userListUserId: membership.userListUserId,
});
if (newMemberships.length > 0) {
await this.userListService.bulkAddMember(dst, newMemberships);
}
const arrayToInsert = Array.from(newMemberships.entries()).map(entry => ({ ...entry[1], id: entry[0] }));
await this.userListMembershipsRepository.insert(arrayToInsert);
// Have the proxy account follow the new account in the same way as UserListService.push
if (this.userEntityService.isRemoteUser(dst)) {
const proxy = await this.systemAccountService.fetch('proxy');
this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: dst.id } }]);
if (updatedMemberships.length > 0) {
await this.userListService.bulkUpdateMembership(dst, updatedMemberships);
}
}

View file

@ -139,12 +139,8 @@ export class AntennaService implements OnApplicationShutdown {
// TODO
} else if (antenna.src === 'list') {
if (antenna.userListId == null) return false;
const exists = await this.userListMembershipsRepository.exists({
where: {
userListId: antenna.userListId,
userId: note.userId,
},
});
const memberships = await this.cacheService.userListMembershipsCache.fetch(note.userId);
const exists = memberships.has(antenna.userListId);
if (!exists) return false;
} else if (antenna.src === 'users') {
const accts = antenna.users.map(x => {

View file

@ -6,18 +6,20 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Redis from 'ioredis';
import { In, IsNull, Not } from 'typeorm';
import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing, NoteThreadMutingsRepository, ChannelFollowingsRepository, MiInstance, InstancesRepository } from '@/models/_.js';
import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js';
import { QuantumKVCache } from '@/misc/QuantumKVCache.js';
import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, NoteThreadMutingsRepository, ChannelFollowingsRepository, UserListMembershipsRepository } from '@/models/_.js';
import type { MiLocalUser, MiRemoteUser, MiUser } from '@/models/User.js';
import type { MiUserListMembership } from '@/models/UserListMembership.js';
import { isLocalUser, isRemoteUser } from '@/models/User.js';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { InternalEventTypes } from '@/core/GlobalEventService.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { IdService } from '@/core/IdService.js';
import { diffArraysSimple } from '@/misc/diff-arrays.js';
import {
CacheManagementService,
type ManagedMemoryKVCache,
type ManagedQuantumKVCache,
} from '@/core/CacheManagementService.js';
import type { OnApplicationShutdown } from '@nestjs/common';
export interface FollowStats {
@ -27,37 +29,26 @@ export interface FollowStats {
remoteFollowers: number;
}
export interface CachedTranslation {
sourceLang: string | undefined;
text: string | undefined;
}
export interface CachedTranslationEntity {
l?: string;
t?: string;
u?: number;
}
@Injectable()
export class CacheService implements OnApplicationShutdown {
public userByIdCache: MemoryKVCache<MiUser>;
public localUserByNativeTokenCache: MemoryKVCache<MiLocalUser | null>;
public localUserByIdCache: MemoryKVCache<MiLocalUser>;
public uriPersonCache: MemoryKVCache<MiUser | null>;
public userProfileCache: QuantumKVCache<MiUserProfile>;
public userMutingsCache: QuantumKVCache<Set<string>>;
public userBlockingCache: QuantumKVCache<Set<string>>;
public userBlockedCache: QuantumKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
public renoteMutingsCache: QuantumKVCache<Set<string>>;
public threadMutingsCache: QuantumKVCache<Set<string>>;
public noteMutingsCache: QuantumKVCache<Set<string>>;
public userFollowingsCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>;
public userFollowersCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>;
public hibernatedUserCache: QuantumKVCache<boolean>;
protected userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes
protected translationsCache: RedisKVCache<CachedTranslationEntity>;
public userFollowingChannelsCache: QuantumKVCache<Set<string>>;
public federatedInstanceCache: QuantumKVCache<MiInstance>;
public readonly userByIdCache: ManagedMemoryKVCache<MiUser>;
public readonly localUserByNativeTokenCache: ManagedMemoryKVCache<MiLocalUser | null>;
public readonly localUserByIdCache: ManagedMemoryKVCache<MiLocalUser>;
public readonly uriPersonCache: ManagedMemoryKVCache<MiUser | null>;
public readonly userProfileCache: ManagedQuantumKVCache<MiUserProfile>;
public readonly userMutingsCache: ManagedQuantumKVCache<Set<string>>;
public readonly userBlockingCache: ManagedQuantumKVCache<Set<string>>;
public readonly userBlockedCache: ManagedQuantumKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
public readonly userListMembershipsCache: ManagedQuantumKVCache<Map<string, MiUserListMembership>>;
public readonly listUserMembershipsCache: ManagedQuantumKVCache<Map<string, MiUserListMembership>>;
public readonly renoteMutingsCache: ManagedQuantumKVCache<Set<string>>;
public readonly threadMutingsCache: ManagedQuantumKVCache<Set<string>>;
public readonly noteMutingsCache: ManagedQuantumKVCache<Set<string>>;
public readonly userFollowingsCache: ManagedQuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>;
public readonly userFollowersCache: ManagedQuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>;
public readonly hibernatedUserCache: ManagedQuantumKVCache<boolean>;
public readonly userFollowStatsCache: ManagedMemoryKVCache<FollowStats>;
public readonly userFollowingChannelsCache: ManagedQuantumKVCache<Set<string>>;
constructor(
@Inject(DI.redis)
@ -90,27 +81,26 @@ export class CacheService implements OnApplicationShutdown {
@Inject(DI.channelFollowingsRepository)
private readonly channelFollowingsRepository: ChannelFollowingsRepository,
@Inject(DI.instancesRepository)
private readonly instancesRepository: InstancesRepository,
@Inject(DI.userListMembershipsRepository)
private readonly userListMembershipsRepository: UserListMembershipsRepository,
private readonly internalEventService: InternalEventService,
private readonly utilityService: UtilityService,
private readonly idService: IdService,
private readonly cacheManagementService: CacheManagementService,
) {
//this.onMessage = this.onMessage.bind(this);
this.userByIdCache = new MemoryKVCache<MiUser>(1000 * 60 * 5); // 5m
this.localUserByNativeTokenCache = new MemoryKVCache<MiLocalUser | null>(1000 * 60 * 5); // 5m
this.localUserByIdCache = new MemoryKVCache<MiLocalUser>(1000 * 60 * 5); // 5m
this.uriPersonCache = new MemoryKVCache<MiUser | null>(1000 * 60 * 5); // 5m
this.userByIdCache = this.cacheManagementService.createMemoryKVCache<MiUser>(1000 * 60 * 5); // 5m
this.localUserByNativeTokenCache = this.cacheManagementService.createMemoryKVCache<MiLocalUser | null>(1000 * 60 * 5); // 5m
this.localUserByIdCache = this.cacheManagementService.createMemoryKVCache<MiLocalUser>(1000 * 60 * 5); // 5m
this.uriPersonCache = this.cacheManagementService.createMemoryKVCache<MiUser | null>(1000 * 60 * 5); // 5m
this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', {
this.userProfileCache = this.cacheManagementService.createQuantumKVCache('userProfile', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }),
fetcher: (key) => this.userProfilesRepository.findOneBy({ userId: key }),
bulkFetcher: userIds => this.userProfilesRepository.findBy({ userId: In(userIds) }).then(ps => ps.map(p => [p.userId, p])),
});
this.userMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userMutings', {
this.userMutingsCache = this.cacheManagementService.createQuantumKVCache<Set<string>>('userMutings', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
bulkFetcher: muterIds => this.mutingsRepository
@ -123,7 +113,7 @@ export class CacheService implements OnApplicationShutdown {
.then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])),
});
this.userBlockingCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocking', {
this.userBlockingCache = this.cacheManagementService.createQuantumKVCache<Set<string>>('userBlocking', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))),
bulkFetcher: blockerIds => this.blockingsRepository
@ -136,7 +126,7 @@ export class CacheService implements OnApplicationShutdown {
.then(ms => ms.map(m => [m.blockerId, new Set(m.blockeeIds)])),
});
this.userBlockedCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocked', {
this.userBlockedCache = this.cacheManagementService.createQuantumKVCache<Set<string>>('userBlocked', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))),
bulkFetcher: blockeeIds => this.blockingsRepository
@ -149,7 +139,41 @@ export class CacheService implements OnApplicationShutdown {
.then(ms => ms.map(m => [m.blockeeId, new Set(m.blockerIds)])),
});
this.renoteMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'renoteMutings', {
this.userListMembershipsCache = this.cacheManagementService.createQuantumKVCache<Map<string, MiUserListMembership>>('userListMemberships', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: async userId => await this.userListMembershipsRepository.findBy({ userId }).then(ms => new Map(ms.map(m => [m.id, m]))),
bulkFetcher: async userIds => await this.userListMembershipsRepository
.findBy({ userId: In(userIds) })
.then(ms => ms
.reduce((groups, m) => {
let listsForUser = groups.get(m.userId);
if (!listsForUser) {
listsForUser = new Map();
groups.set(m.userId, listsForUser);
}
listsForUser.set(m.userListId, m);
return groups;
}, new Map<string, Map<string, MiUserListMembership>>)),
});
this.listUserMembershipsCache = this.cacheManagementService.createQuantumKVCache<Map<string, MiUserListMembership>>('listUserMemberships', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: async userListId => await this.userListMembershipsRepository.findBy({ userListId }).then(ms => new Map(ms.map(m => [m.id, m]))),
bulkFetcher: async userListIds => await this.userListMembershipsRepository
.findBy({ userListId: In(userListIds) })
.then(ms => ms
.reduce((groups, m) => {
let usersForList = groups.get(m.userListId);
if (!usersForList) {
usersForList = new Map();
groups.set(m.userListId, usersForList);
}
usersForList.set(m.userId, m);
return groups;
}, new Map<string, Map<string, MiUserListMembership>>)),
});
this.renoteMutingsCache = this.cacheManagementService.createQuantumKVCache<Set<string>>('renoteMutings', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
bulkFetcher: muterIds => this.renoteMutingsRepository
@ -162,7 +186,7 @@ export class CacheService implements OnApplicationShutdown {
.then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])),
});
this.threadMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'threadMutings', {
this.threadMutingsCache = this.cacheManagementService.createQuantumKVCache<Set<string>>('threadMutings', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: muterId => this.noteThreadMutingsRepository
.find({ where: { userId: muterId, isPostMute: false }, select: { threadId: true } })
@ -177,7 +201,7 @@ export class CacheService implements OnApplicationShutdown {
.then(ms => ms.map(m => [m.userId, new Set(m.threadIds)])),
});
this.noteMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'noteMutings', {
this.noteMutingsCache = this.cacheManagementService.createQuantumKVCache<Set<string>>('noteMutings', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: muterId => this.noteThreadMutingsRepository
.find({ where: { userId: muterId, isPostMute: true }, select: { threadId: true } })
@ -192,7 +216,7 @@ export class CacheService implements OnApplicationShutdown {
.then(ms => ms.map(m => [m.userId, new Set(m.threadIds)])),
});
this.userFollowingsCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowings', {
this.userFollowingsCache = this.cacheManagementService.createQuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>('userFollowings', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.followingsRepository.findBy({ followerId: key }).then(xs => new Map(xs.map(f => [f.followeeId, f]))),
bulkFetcher: followerIds => this.followingsRepository
@ -209,7 +233,7 @@ export class CacheService implements OnApplicationShutdown {
}, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)),
});
this.userFollowersCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowers', {
this.userFollowersCache = this.cacheManagementService.createQuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>('userFollowers', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: followeeId => this.followingsRepository.findBy({ followeeId: followeeId }).then(xs => new Map(xs.map(x => [x.followerId, x]))),
bulkFetcher: followeeIds => this.followingsRepository
@ -226,14 +250,14 @@ export class CacheService implements OnApplicationShutdown {
}, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)),
});
this.hibernatedUserCache = new QuantumKVCache<boolean>(this.internalEventService, 'hibernatedUsers', {
this.hibernatedUserCache = this.cacheManagementService.createQuantumKVCache<boolean>('hibernatedUsers', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: async userId => {
const { isHibernated } = await this.usersRepository.findOneOrFail({
const result = await this.usersRepository.findOne({
where: { id: userId },
select: { isHibernated: true },
});
return isHibernated;
return result?.isHibernated;
},
bulkFetcher: async userIds => {
const results = await this.usersRepository.find({
@ -278,12 +302,9 @@ export class CacheService implements OnApplicationShutdown {
},
});
this.translationsCache = new RedisKVCache<CachedTranslationEntity>(this.redisClient, 'translations', {
lifetime: 1000 * 60 * 60 * 24 * 7, // 1 week,
memoryCacheLifetime: 1000 * 60, // 1 minute
});
this.userFollowStatsCache = this.cacheManagementService.createMemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes
this.userFollowingChannelsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userFollowingChannels', {
this.userFollowingChannelsCache = this.cacheManagementService.createQuantumKVCache<Set<string>>('userFollowingChannels', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.channelFollowingsRepository.find({
where: { followerId: key },
@ -292,38 +313,6 @@ export class CacheService implements OnApplicationShutdown {
// TODO bulk fetcher
});
this.federatedInstanceCache = new QuantumKVCache(this.internalEventService, 'federatedInstance', {
lifetime: 1000 * 60 * 3, // 3 minutes
fetcher: async key => {
const host = this.utilityService.toPuny(key);
let instance = await this.instancesRepository.findOneBy({ host });
if (instance == null) {
await this.instancesRepository.createQueryBuilder('instance')
.insert()
.values({
id: this.idService.gen(),
host,
firstRetrievedAt: new Date(),
isBlocked: this.utilityService.isBlockedHost(host),
isSilenced: this.utilityService.isSilencedHost(host),
isMediaSilenced: this.utilityService.isMediaSilencedHost(host),
isAllowListed: this.utilityService.isAllowListedHost(host),
isBubbled: this.utilityService.isBubbledHost(host),
})
.orIgnore()
.execute();
instance = await this.instancesRepository.findOneByOrFail({ host });
}
return instance;
},
bulkFetcher: async keys => {
const hosts = keys.map(key => this.utilityService.toPuny(key));
const instances = await this.instancesRepository.findBy({ host: In(hosts) });
return instances.map(i => [i.host, i]);
},
});
this.internalEventService.on('userChangeSuspendedState', this.onUserEvent);
this.internalEventService.on('userChangeDeletedState', this.onUserEvent);
this.internalEventService.on('remoteUserUpdated', this.onUserEvent);
@ -334,7 +323,13 @@ export class CacheService implements OnApplicationShutdown {
// For these, only listen to local events because quantum cache handles the sync.
this.internalEventService.on('followChannel', this.onChannelEvent, { ignoreRemote: true });
this.internalEventService.on('unfollowChannel', this.onChannelEvent, { ignoreRemote: true });
this.internalEventService.on('metaUpdated', this.onMetaEvent, { ignoreRemote: true });
this.internalEventService.on('updateUserProfile', this.onProfileEvent, { ignoreRemote: true });
this.internalEventService.on('userListMemberAdded', this.onListMemberEvent, { ignoreRemote: true });
this.internalEventService.on('userListMemberUpdated', this.onListMemberEvent, { ignoreRemote: true });
this.internalEventService.on('userListMemberRemoved', this.onListMemberEvent, { ignoreRemote: true });
this.internalEventService.on('userListMemberBulkAdded', this.onListMemberEvent, { ignoreRemote: true });
this.internalEventService.on('userListMemberBulkUpdated', this.onListMemberEvent, { ignoreRemote: true });
this.internalEventService.on('userListMemberBulkRemoved', this.onListMemberEvent, { ignoreRemote: true });
}
@bindThis
@ -351,6 +346,14 @@ export class CacheService implements OnApplicationShutdown {
this.uriPersonCache.delete(k);
}
}
// Contains IDs of all lists where this user is a member.
const userListMemberships = this.listUserMembershipsCache
.entries()
.filter(e => e[1].has(body.id))
.map(e => e[0])
.toArray();
if (isLocal) {
await Promise.all([
this.userProfileCache.delete(body.id),
@ -363,6 +366,8 @@ export class CacheService implements OnApplicationShutdown {
this.hibernatedUserCache.delete(body.id),
this.threadMutingsCache.delete(body.id),
this.noteMutingsCache.delete(body.id),
this.userListMembershipsCache.delete(body.id),
this.listUserMembershipsCache.deleteMany(userListMemberships),
]);
}
} else {
@ -436,21 +441,17 @@ export class CacheService implements OnApplicationShutdown {
}
@bindThis
private async onMetaEvent<E extends 'metaUpdated'>(body: InternalEventTypes[E]): Promise<void> {
const { before, after } = body;
const changed = (
diffArraysSimple(before?.blockedHosts, after.blockedHosts) ||
diffArraysSimple(before?.silencedHosts, after.silencedHosts) ||
diffArraysSimple(before?.mediaSilencedHosts, after.mediaSilencedHosts) ||
diffArraysSimple(before?.federationHosts, after.federationHosts) ||
diffArraysSimple(before?.bubbleInstances, after.bubbleInstances)
);
private async onProfileEvent<E extends 'updateUserProfile'>(body: InternalEventTypes[E]): Promise<void> {
await this.userProfileCache.delete(body.userId);
}
if (changed) {
// We have to clear the whole thing, otherwise subdomains won't be synced.
// This gets fired in *each* process so don't do anything to trigger cache notifications!
this.federatedInstanceCache.clear();
}
@bindThis
private async onListMemberEvent<E extends 'userListMemberAdded' | 'userListMemberUpdated' | 'userListMemberRemoved' | 'userListMemberBulkAdded' | 'userListMemberBulkUpdated' | 'userListMemberBulkRemoved'>(body: InternalEventTypes[E]): Promise<void> {
const userListIds = 'userListIds' in body ? body.userListIds : [body.userListId];
await Promise.all([
this.userListMembershipsCache.delete(body.memberId),
this.listUserMembershipsCache.deleteMany(userListIds),
]);
}
@bindThis
@ -527,34 +528,6 @@ export class CacheService implements OnApplicationShutdown {
});
}
@bindThis
public async getCachedTranslation(note: MiNote, targetLang: string): Promise<CachedTranslation | null> {
const cacheKey = `${note.id}@${targetLang}`;
// Use cached translation, if present and up-to-date
const cached = await this.translationsCache.get(cacheKey);
if (cached && cached.u === note.updatedAt?.valueOf()) {
return {
sourceLang: cached.l,
text: cached.t,
};
}
// No cache entry :(
return null;
}
@bindThis
public async setCachedTranslation(note: MiNote, targetLang: string, translation: CachedTranslation): Promise<void> {
const cacheKey = `${note.id}@${targetLang}`;
await this.translationsCache.set(cacheKey, {
l: translation.sourceLang,
t: translation.text,
u: note.updatedAt?.valueOf(),
});
}
@bindThis
public async getUsers(userIds: Iterable<string>): Promise<Map<string, MiUser>> {
const users = new Map<string, MiUser>;
@ -640,20 +613,7 @@ export class CacheService implements OnApplicationShutdown {
@bindThis
public clear(): void {
this.userByIdCache.clear();
this.localUserByNativeTokenCache.clear();
this.localUserByIdCache.clear();
this.uriPersonCache.clear();
this.userProfileCache.clear();
this.userMutingsCache.clear();
this.userBlockingCache.clear();
this.userBlockedCache.clear();
this.renoteMutingsCache.clear();
this.userFollowingsCache.clear();
this.userFollowStatsCache.clear();
this.translationsCache.clear();
this.userFollowingChannelsCache.clear();
this.federatedInstanceCache.clear();
this.cacheManagementService.clear();
}
@bindThis
@ -667,30 +627,17 @@ export class CacheService implements OnApplicationShutdown {
this.internalEventService.off('unfollow', this.onFollowEvent);
this.internalEventService.off('followChannel', this.onChannelEvent);
this.internalEventService.off('unfollowChannel', this.onChannelEvent);
this.internalEventService.off('metaUpdated', this.onMetaEvent);
this.userByIdCache.dispose();
this.localUserByNativeTokenCache.dispose();
this.localUserByIdCache.dispose();
this.uriPersonCache.dispose();
this.userProfileCache.dispose();
this.userMutingsCache.dispose();
this.userBlockingCache.dispose();
this.userBlockedCache.dispose();
this.renoteMutingsCache.dispose();
this.threadMutingsCache.dispose();
this.noteMutingsCache.dispose();
this.userFollowingsCache.dispose();
this.userFollowersCache.dispose();
this.hibernatedUserCache.dispose();
this.userFollowStatsCache.dispose();
this.translationsCache.dispose();
this.userFollowingChannelsCache.dispose();
this.federatedInstanceCache.dispose();
this.internalEventService.off('updateUserProfile', this.onProfileEvent);
this.internalEventService.off('userListMemberAdded', this.onListMemberEvent);
this.internalEventService.off('userListMemberUpdated', this.onListMemberEvent);
this.internalEventService.off('userListMemberRemoved', this.onListMemberEvent);
this.internalEventService.off('userListMemberBulkAdded', this.onListMemberEvent);
this.internalEventService.off('userListMemberBulkUpdated', this.onListMemberEvent);
this.internalEventService.off('userListMemberBulkRemoved', this.onListMemberEvent);
}
@bindThis
public onApplicationShutdown(signal?: string | undefined): void {
public onApplicationShutdown(): void {
this.dispose();
}
}

View file

@ -277,7 +277,7 @@ export class FanoutTimelineEndpointService {
// Fetch everything and populate users
const [users, instances] = await Promise.all([
this.cacheService.getUsers(usersToFetch),
this.cacheService.federatedInstanceCache.fetchMany(instancesToFetch).then(i => new Map(i)),
this.federatedInstanceService.federatedInstanceCache.fetchMany(instancesToFetch).then(i => new Map(i)),
]);
for (const [id, user] of Array.from(users)) {
users.set(id, {

View file

@ -4,17 +4,24 @@
*/
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import type { InstancesRepository, MiMeta } from '@/models/_.js';
import { In } from 'typeorm';
import type { InstancesRepository } from '@/models/_.js';
import type { MiMeta } from '@/models/Meta.js';
import type { MiInstance } from '@/models/Instance.js';
import type { InternalEventTypes } from '@/core/GlobalEventService.js';
import { IdService } from '@/core/IdService.js';
import { DI } from '@/di-symbols.js';
import { UtilityService } from '@/core/UtilityService.js';
import { CacheManagementService, type ManagedQuantumKVCache } from '@/core/CacheManagementService.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { diffArraysSimple } from '@/misc/diff-arrays.js';
import { bindThis } from '@/decorators.js';
import { CacheService } from '@/core/CacheService.js';
import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js';
@Injectable()
export class FederatedInstanceService {
export class FederatedInstanceService implements OnApplicationShutdown {
public readonly federatedInstanceCache: ManagedQuantumKVCache<MiInstance>;
constructor(
@Inject(DI.instancesRepository)
private instancesRepository: InstancesRepository,
@ -24,12 +31,49 @@ export class FederatedInstanceService {
private utilityService: UtilityService,
private idService: IdService,
private readonly cacheService: CacheService,
) {}
private readonly internalEventService: InternalEventService,
cacheManagementService: CacheManagementService,
) {
this.federatedInstanceCache = cacheManagementService.createQuantumKVCache('federatedInstance', {
// TODO can we increase this?
lifetime: 1000 * 60 * 3, // 3 minutes
fetcher: async key => {
const host = this.utilityService.toPuny(key);
let instance = await this.instancesRepository.findOneBy({ host });
if (instance == null) {
await this.instancesRepository.createQueryBuilder('instance')
.insert()
.values({
id: this.idService.gen(),
host,
firstRetrievedAt: new Date(),
isBlocked: this.utilityService.isBlockedHost(host),
isSilenced: this.utilityService.isSilencedHost(host),
isMediaSilenced: this.utilityService.isMediaSilencedHost(host),
isAllowListed: this.utilityService.isAllowListedHost(host),
isBubbled: this.utilityService.isBubbledHost(host),
})
.orIgnore()
.execute();
instance = await this.instancesRepository.findOneByOrFail({ host });
}
return instance;
},
bulkFetcher: async keys => {
const hosts = keys.map(key => this.utilityService.toPuny(key));
const instances = await this.instancesRepository.findBy({ host: In(hosts) });
return instances.map(i => [i.host, i]);
},
});
this.internalEventService.on('metaUpdated', this.onMetaUpdated, { ignoreRemote: true });
}
@bindThis
public async fetchOrRegister(host: string): Promise<MiInstance> {
return this.cacheService.federatedInstanceCache.fetch(host);
return this.federatedInstanceCache.fetch(host);
/*
host = this.utilityService.toPuny(host);
@ -63,7 +107,7 @@ export class FederatedInstanceService {
@bindThis
public async fetch(host: string): Promise<MiInstance> {
return this.cacheService.federatedInstanceCache.fetch(host);
return this.federatedInstanceCache.fetch(host);
/*
host = this.utilityService.toPuny(host);
@ -93,7 +137,7 @@ export class FederatedInstanceService {
return response.raw[0] as MiInstance;
});
await this.cacheService.federatedInstanceCache.set(result.host, result);
await this.federatedInstanceCache.set(result.host, result);
return result;
}
@ -106,7 +150,7 @@ export class FederatedInstanceService {
const allowedHosts = new Set(this.meta.federationHosts);
this.meta.blockedHosts.forEach(h => allowedHosts.delete(h));
const instances = await this.cacheService.federatedInstanceCache.fetchMany(this.meta.federationHosts);
const instances = await this.federatedInstanceCache.fetchMany(this.meta.federationHosts);
return instances.map(i => i[1]);
}
@ -115,7 +159,35 @@ export class FederatedInstanceService {
*/
@bindThis
public async getDenyList(): Promise<MiInstance[]> {
const instances = await this.cacheService.federatedInstanceCache.fetchMany(this.meta.blockedHosts);
const instances = await this.federatedInstanceCache.fetchMany(this.meta.blockedHosts);
return instances.map(i => i[1]);
}
@bindThis
private async onMetaUpdated(body: InternalEventTypes['metaUpdated']): Promise<void> {
const { before, after } = body;
const changed = (
diffArraysSimple(before?.blockedHosts, after.blockedHosts) ||
diffArraysSimple(before?.silencedHosts, after.silencedHosts) ||
diffArraysSimple(before?.mediaSilencedHosts, after.mediaSilencedHosts) ||
diffArraysSimple(before?.federationHosts, after.federationHosts) ||
diffArraysSimple(before?.bubbleInstances, after.bubbleInstances)
);
if (changed) {
// We have to clear the whole thing, otherwise subdomains won't be synced.
// This gets fired in *each* process so don't do anything to trigger cache notifications!
this.federatedInstanceCache.clear();
}
}
@bindThis
public dispose() {
this.internalEventService.off('metaUpdated', this.onMetaUpdated);
}
@bindThis
public onApplicationShutdown() {
this.dispose();
}
}

View file

@ -20,12 +20,12 @@ import type { MiPage } from '@/models/Page.js';
import type { MiWebhook } from '@/models/Webhook.js';
import type { MiSystemWebhook } from '@/models/SystemWebhook.js';
import type { MiMeta } from '@/models/Meta.js';
import { MiAvatarDecoration, MiChatMessage, MiChatRoom, MiReversiGame, MiRole, MiRoleAssignment } from '@/models/_.js';
import type { MiAvatarDecoration, MiChatMessage, MiChatRoom, MiReversiGame, MiRole, MiRoleAssignment } from '@/models/_.js';
import type { Packed } from '@/misc/json-schema.js';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { bindThis } from '@/decorators.js';
import { Serialized } from '@/types.js';
import type { Serialized } from '@/types.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import type Emitter from 'strict-event-emitter-types';
@ -262,11 +262,15 @@ export interface InternalEventTypes {
metaUpdated: { before?: MiMeta; after: MiMeta; };
followChannel: { userId: MiUser['id']; channelId: MiChannel['id']; };
unfollowChannel: { userId: MiUser['id']; channelId: MiChannel['id']; };
updateUserProfile: MiUserProfile;
updateUserProfile: { userId: MiUserProfile['userId'] };
mute: { muterId: MiUser['id']; muteeId: MiUser['id']; };
unmute: { muterId: MiUser['id']; muteeId: MiUser['id']; };
userListMemberAdded: { userListId: MiUserList['id']; memberId: MiUser['id']; };
userListMemberUpdated: { userListId: MiUserList['id']; memberId: MiUser['id']; };
userListMemberRemoved: { userListId: MiUserList['id']; memberId: MiUser['id']; };
userListMemberBulkAdded: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; };
userListMemberBulkUpdated: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; };
userListMemberBulkRemoved: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; };
quantumCacheUpdated: { name: string, keys: string[] };
}
@ -405,8 +409,8 @@ export class GlobalEventService {
}
@bindThis
public publishUserListStream<K extends keyof UserListEventTypes>(listId: MiUserList['id'], type: K, value?: UserListEventTypes[K]): void {
this.publish(`userListStream:${listId}`, type, typeof value === 'undefined' ? null : value);
public async publishUserListStream<K extends keyof UserListEventTypes>(listId: MiUserList['id'], type: K, value?: UserListEventTypes[K]): Promise<void> {
await this.publish(`userListStream:${listId}`, type, typeof value === 'undefined' ? null : value);
}
@bindThis

View file

@ -4,15 +4,18 @@
*/
import { Inject, Injectable } from '@nestjs/common';
import { CacheService } from '@/core/CacheService.js';
import type { MiNote } from '@/models/Note.js';
import type { MiUser } from '@/models/User.js';
import { bindThis } from '@/decorators.js';
import type { MiFollowing } from '@/models/Following.js';
import type { MiInstance } from '@/models/Instance.js';
import type { MiUserListMembership } from '@/models/UserListMembership.js';
import type { NotesRepository } from '@/models/_.js';
import type { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
import { awaitAll } from '@/misc/prelude/await-all.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import type { MiFollowing, MiInstance, NotesRepository } from '@/models/_.js';
import { CacheService } from '@/core/CacheService.js';
import { IdService } from '@/core/IdService.js';
import { bindThis } from '@/decorators.js';
import { awaitAll } from '@/misc/prelude/await-all.js';
import { DI } from '@/di-symbols.js';
/**
@ -50,6 +53,12 @@ export interface NoteVisibilityFilters {
* If false (default), then silence is enforced for all notes.
*/
includeSilencedAuthor?: boolean;
/**
* Set to an ID to apply visibility from the context of a specific user list.
* Membership and "with replies" settings will be adopted from this list.
*/
listContext?: string | null;
}
@Injectable()
@ -151,7 +160,7 @@ export class NoteVisibilityService {
}
@bindThis
public async populateData(user: PopulatedMe, hint?: Partial<NoteVisibilityData>): Promise<NoteVisibilityData> {
public async populateData(user: PopulatedMe, hint?: Partial<NoteVisibilityData>, filters?: NoteVisibilityFilters): Promise<NoteVisibilityData> {
// noinspection ES6MissingAwait
const [
userBlockers,
@ -161,6 +170,7 @@ export class NoteVisibilityService {
userMutedUsers,
userMutedUserRenotes,
userMutedInstances,
userListMemberships,
] = await Promise.all([
user ? (hint?.userBlockers ?? this.cacheService.userBlockedCache.fetch(user.id)) : null,
user ? (hint?.userFollowings ?? this.cacheService.userFollowingsCache.fetch(user.id)) : null,
@ -169,6 +179,7 @@ export class NoteVisibilityService {
user ? (hint?.userMutedUsers ?? this.cacheService.userMutingsCache.fetch(user.id)) : null,
user ? (hint?.userMutedUserRenotes ?? this.cacheService.renoteMutingsCache.fetch(user.id)) : null,
user ? (hint?.userMutedInstances ?? this.cacheService.userProfileCache.fetch(user.id).then(p => new Set(p.mutedInstances))) : null,
filters?.listContext ? (hint?.userListMemberships ?? this.cacheService.listUserMembershipsCache.fetch(filters.listContext)) : null,
]);
return {
@ -179,6 +190,7 @@ export class NoteVisibilityService {
userMutedUsers,
userMutedUserRenotes,
userMutedInstances,
userListMemberships,
};
}
@ -396,6 +408,9 @@ export class NoteVisibilityService {
// Don't silence if we follow w/ replies
if (user && data.userFollowings?.get(user.id)?.withReplies) return false;
// Don't silence if we're viewing in a list with replies
if (data.userListMemberships?.get(note.userId)?.withReplies) return false;
// Silence otherwise
return true;
}
@ -409,6 +424,9 @@ export interface NoteVisibilityData extends NotePopulationData {
userMutedUsers: Set<string> | null;
userMutedUserRenotes: Set<string> | null;
userMutedInstances: Set<string> | null;
// userId => membership (already scoped to listContext)
userListMemberships: Map<string, MiUserListMembership> | null;
}
export interface NotePopulationData {

View file

@ -139,7 +139,7 @@ export class NotificationService implements OnApplicationShutdown {
return null;
}
} else if (recieveConfig?.type === 'list') {
const isMember = await this.userListService.membersCache.fetch(recieveConfig.userListId).then(members => members.has(notifierId));
const isMember = await this.cacheService.listUserMembershipsCache.fetch(recieveConfig.userListId).then(members => members.has(notifierId));
if (!isMember) {
return null;
}

View file

@ -19,7 +19,8 @@ import { LoggerService } from '@/core/LoggerService.js';
import { UserWebhookService } from '@/core/UserWebhookService.js';
import { bindThis } from '@/decorators.js';
import { CacheService } from '@/core/CacheService.js';
import { UserFollowingService } from '@/core/UserFollowingService.js';
import type { UserFollowingService } from '@/core/UserFollowingService.js';
import { UserListService } from '@/core/UserListService.js';
@Injectable()
export class UserBlockingService implements OnModuleInit {
@ -140,16 +141,12 @@ export class UserBlockingService implements OnModuleInit {
@bindThis
private async removeFromList(listOwner: MiUser, user: MiUser) {
const userLists = await this.userListsRepository.findBy({
userId: listOwner.id,
const userLists = await this.userListsRepository.find({
where: { userId: listOwner.id },
select: { id: true },
});
for (const userList of userLists) {
await this.userListMembershipsRepository.delete({
userListId: userList.id,
userId: user.id,
});
}
await this.userListService.bulkRemoveMember(user, userLists.map(l => l.id));
}
@bindThis

View file

@ -6,27 +6,26 @@
import { Inject, Injectable, OnApplicationShutdown, OnModuleInit } from '@nestjs/common';
import * as Redis from 'ioredis';
import { ModuleRef } from '@nestjs/core';
import type { MiMeta, UserListMembershipsRepository } from '@/models/_.js';
import { In } from 'typeorm';
import type { MiMeta, UserListMembershipsRepository, UserListsRepository } from '@/models/_.js';
import type { MiUser } from '@/models/User.js';
import type { MiUserList } from '@/models/UserList.js';
import type { MiUserListMembership } from '@/models/UserListMembership.js';
import { IdService } from '@/core/IdService.js';
import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import { DI } from '@/di-symbols.js';
import { UserEntityService } from '@/core/entities/UserEntityService.js';
import { bindThis } from '@/decorators.js';
import { QueueService } from '@/core/QueueService.js';
import { QuantumKVCache } from '@/misc/QuantumKVCache.js';
import { RoleService } from '@/core/RoleService.js';
import type { RoleService } from '@/core/RoleService.js';
import { SystemAccountService } from '@/core/SystemAccountService.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { CacheService } from '@/core/CacheService.js';
@Injectable()
export class UserListService implements OnApplicationShutdown, OnModuleInit {
export class UserListService implements OnModuleInit {
public static TooManyUsersError = class extends Error {};
public membersCache: QuantumKVCache<Set<string>>;
private roleService: RoleService;
constructor(
@ -41,6 +40,9 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
@Inject(DI.userListMembershipsRepository)
private userListMembershipsRepository: UserListMembershipsRepository,
@Inject(DI.userListsRepository)
private readonly userListsRepository: UserListsRepository,
@Inject(DI.meta)
private readonly meta: MiMeta,
@ -50,52 +52,21 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
private queueService: QueueService,
private systemAccountService: SystemAccountService,
private readonly internalEventService: InternalEventService,
) {
this.membersCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userListMembers', {
lifetime: 1000 * 60 * 30, // 30m
fetcher: (key) => this.userListMembershipsRepository.find({ where: { userListId: key }, select: ['userId'] }).then(xs => new Set(xs.map(x => x.userId))),
});
this.internalEventService.on('userListMemberAdded', this.onMessage);
this.internalEventService.on('userListMemberRemoved', this.onMessage);
}
private readonly cacheService: CacheService,
) {}
@bindThis
async onModuleInit() {
this.roleService = this.moduleRef.get('RoleService');
}
@bindThis
private async onMessage<E extends 'userListMemberAdded' | 'userListMemberRemoved'>(body: InternalEventTypes[E], type: E): Promise<void> {
{
switch (type) {
case 'userListMemberAdded': {
const { userListId, memberId } = body;
const members = this.membersCache.get(userListId);
if (members) {
members.add(memberId);
}
break;
}
case 'userListMemberRemoved': {
const { userListId, memberId } = body;
const members = this.membersCache.get(userListId);
if (members) {
members.delete(memberId);
}
break;
}
default:
break;
}
}
}
@bindThis
public async addMember(target: MiUser, list: MiUserList, me: MiUser) {
const currentCount = await this.userListMembershipsRepository.countBy({
userListId: list.id,
});
if (currentCount >= (await this.roleService.getUserPolicies(me.id)).userEachUserListsLimit) {
const [current, policies] = await Promise.all([
this.cacheService.listUserMembershipsCache.fetch(list.id),
this.roleService.getUserPolicies(me),
]);
if (current.size >= policies.userEachUserListsLimit) {
throw new UserListService.TooManyUsersError();
}
@ -106,13 +77,13 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
userListUserId: list.userId,
} as MiUserListMembership);
this.globalEventService.publishInternalEvent('userListMemberAdded', { userListId: list.id, memberId: target.id });
await this.internalEventService.emit('userListMemberAdded', { userListId: list.id, memberId: target.id });
this.globalEventService.publishUserListStream(list.id, 'userAdded', await this.userEntityService.pack(target));
// このインスタンス内にこのリモートユーザーをフォローしているユーザーがいなくても投稿を受け取るためにダミーのユーザーがフォローしたということにする
if (this.userEntityService.isRemoteUser(target) && this.meta.enableProxyAccount) {
const proxy = await this.systemAccountService.fetch('proxy');
this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: target.id } }]);
await this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: target.id } }]);
}
}
@ -123,16 +94,15 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
userListId: list.id,
});
this.globalEventService.publishInternalEvent('userListMemberRemoved', { userListId: list.id, memberId: target.id });
await this.internalEventService.emit('userListMemberRemoved', { userListId: list.id, memberId: target.id });
this.globalEventService.publishUserListStream(list.id, 'userRemoved', await this.userEntityService.pack(target));
}
@bindThis
public async updateMembership(target: MiUser, list: MiUserList, options: { withReplies?: boolean }) {
const membership = await this.userListMembershipsRepository.findOneBy({
userId: target.id,
userListId: list.id,
});
const membership = await this.cacheService.userListMembershipsCache
.fetchMaybe(target.id)
.then(ms => ms?.get(list.id));
if (membership == null) {
throw new Error('User is not a member of the list');
@ -143,17 +113,100 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
}, {
withReplies: options.withReplies,
});
await this.internalEventService.emit('userListMemberUpdated', { userListId: list.id, memberId: target.id });
}
@bindThis
public dispose(): void {
this.internalEventService.off('userListMemberAdded', this.onMessage);
this.internalEventService.off('userListMemberRemoved', this.onMessage);
this.membersCache.dispose();
public async bulkAddMember(target: { id: MiUser['id'] }, memberships: { userListId: MiUserList['id'], withReplies?: boolean }[]): Promise<void> {
const userListIds = memberships.map(m => m.userListId);
// Map userListId => userListUserId
const listUserIds = await this.userListsRepository
.find({
where: { id: In(userListIds) },
select: { id: true, userId: true },
})
.then(ls => new Map(ls.map(l => [l.id, l.userId])));
const toInsert = memberships.map(membership => ({
id: this.idService.gen(),
userId: target.id,
userListId: membership.userListId,
userListUserId: listUserIds.get(membership.userListId),
withReplies: membership.withReplies,
}));
await this.userListMembershipsRepository.insert(toInsert);
await this.internalEventService.emit('userListMemberBulkAdded', {
memberId: target.id,
userListIds,
});
const targetUser = await this.cacheService.findUserById(target.id);
const packedUser = await this.userEntityService.pack(targetUser);
await Promise.all(memberships.map(async membership => {
await this.globalEventService.publishUserListStream(membership.userListId, 'userAdded', packedUser);
}));
// このインスタンス内にこのリモートユーザーをフォローしているユーザーがいなくても投稿を受け取るためにダミーのユーザーがフォローしたということにする
if (this.userEntityService.isRemoteUser(targetUser) && this.meta.enableProxyAccount) {
const proxy = await this.systemAccountService.fetch('proxy');
await this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: target.id } }]);
}
}
@bindThis
public onApplicationShutdown(signal?: string | undefined): void {
this.dispose();
public async bulkRemoveMember(target: { id: MiUser['id'] }, memberships: { userListId: MiUserList['id'] }[] | MiUserList['id'][]): Promise<void> {
const userListIds = memberships.map(mem => typeof(mem) === 'object' ? mem.userListId : mem);
await this.userListMembershipsRepository.delete({
userId: target.id,
userListId: In(userListIds),
});
await this.internalEventService.emit('userListMemberBulkRemoved', {
userListIds,
memberId: target.id,
});
const targetUser = await this.cacheService.findUserById(target.id);
const packedUser = await this.userEntityService.pack(targetUser);
await Promise.all(userListIds.map(async userListId => {
await this.globalEventService.publishUserListStream(userListId, 'userRemoved', packedUser);
}));
}
@bindThis
public async bulkUpdateMembership(target: { id: MiUser['id'] }, memberships: { userListId: MiUserList['id'], withReplies: boolean }[]): Promise<void> {
const userListMemberships = await this.cacheService.userListMembershipsCache.fetch(target.id);
const membershipChanges = memberships
.map(mem => {
const old = userListMemberships.get(mem.userListId);
return {
new: mem,
id: old?.id ?? '',
old,
};
});
const toAddReplies = membershipChanges
.filter(mem => mem.old != null && mem.new.withReplies && !mem.old.withReplies)
.map(mem => mem.id);
if (toAddReplies.length > 0) {
await this.userListMembershipsRepository.update({ id: In(toAddReplies) }, { withReplies: true });
}
const toRemoveReplies = membershipChanges
.filter(mem => mem.old != null && !mem.new.withReplies && mem.old.withReplies)
.map(mem => mem.id);
if (toRemoveReplies.length > 0) {
await this.userListMembershipsRepository.update({ id: In(toRemoveReplies) }, { withReplies: false });
}
await this.internalEventService.emit('userListMemberBulkUpdated', {
memberId: target.id,
userListIds: memberships.map(m => m.userListId),
});
}
}

View file

@ -11,6 +11,7 @@ import type { } from '@/models/Blocking.js';
import type { MiUserList } from '@/models/UserList.js';
import { bindThis } from '@/decorators.js';
import { IdService } from '@/core/IdService.js';
import { CacheService } from '@/core/CacheService.js';
import { UserEntityService } from './UserEntityService.js';
@Injectable()
@ -24,6 +25,7 @@ export class UserListEntityService {
private userEntityService: UserEntityService,
private idService: IdService,
private readonly cacheService: CacheService,
) {
}
@ -31,17 +33,18 @@ export class UserListEntityService {
public async pack(
src: MiUserList['id'] | MiUserList,
): Promise<Packed<'UserList'>> {
const userList = typeof src === 'object' ? src : await this.userListsRepository.findOneByOrFail({ id: src });
const srcId = typeof(src) === 'object' ? src.id : src;
const users = await this.userListMembershipsRepository.findBy({
userListId: userList.id,
});
const [userList, users] = await Promise.all([
typeof src === 'object' ? src : await this.userListsRepository.findOneByOrFail({ id: src }),
this.cacheService.listUserMembershipsCache.fetch(srcId),
]);
return {
id: userList.id,
createdAt: this.idService.parse(userList.id).date.toISOString(),
name: userList.name,
userIds: users.map(x => x.userId),
userIds: users.keys().toArray(),
isPublic: userList.isPublic,
};
}

View file

@ -26,8 +26,6 @@ import type { DeliverJobData } from '../types.js';
@Injectable()
export class DeliverProcessorService {
private logger: Logger;
private suspendedHostsCache: MemorySingleCache<MiInstance[]>;
private latest: string | null;
constructor(
@Inject(DI.meta)
@ -46,7 +44,6 @@ export class DeliverProcessorService {
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('deliver');
this.suspendedHostsCache = new MemorySingleCache<MiInstance[]>(1000 * 60 * 60); // 1h
}
@bindThis
@ -57,25 +54,15 @@ export class DeliverProcessorService {
return 'skip (blocked)';
}
// isSuspendedなら中断
let suspendedHosts = this.suspendedHostsCache.get();
if (suspendedHosts == null) {
suspendedHosts = await this.instancesRepository.find({
where: {
suspensionState: Not('none'),
},
});
this.suspendedHostsCache.set(suspendedHosts);
}
if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) {
const i = await this.federatedInstanceService.federatedInstanceCache.fetch(host);
if (i.suspensionState !== 'none') {
return 'skip (suspended)';
}
const i = await (this.meta.enableStatsForFederatedInstances
? this.federatedInstanceService.fetchOrRegister(host)
: this.federatedInstanceService.fetch(host));
// Make sure info is up-to-date.
await this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
// suspend server by software
// suspend server by software.
if (i != null && this.utilityService.isDeliverSuspendedSoftware(i)) {
return 'skip (software suspended)';
}
@ -97,10 +84,6 @@ export class DeliverProcessorService {
});
}
if (this.meta.enableStatsForFederatedInstances) {
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
}
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, true);
}

View file

@ -12,11 +12,12 @@ import { GetterService } from '@/server/api/GetterService.js';
import { RoleService } from '@/core/RoleService.js';
import type { MiMeta, MiNote } from '@/models/_.js';
import { DI } from '@/di-symbols.js';
import { CacheService } from '@/core/CacheService.js';
import { hasText } from '@/models/Note.js';
import { ApiLoggerService } from '@/server/api/ApiLoggerService.js';
import { NoteVisibilityService } from '@/core/NoteVisibilityService.js';
import { ApiError } from '../../error.js';
import { CacheManagementService, type ManagedRedisKVCache } from '@/core/CacheManagementService.js';
import { ApiError } from '@/server/api/error.js';
import { bindThis } from '@/decorators.js';
export const meta = {
tags: ['notes'],
@ -75,6 +76,8 @@ export const paramDef = {
@Injectable()
export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
private readonly translationsCache: ManagedRedisKVCache<CachedTranslationEntity>;
constructor(
@Inject(DI.meta)
private serverSettings: MiMeta,
@ -83,9 +86,10 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
private getterService: GetterService,
private httpRequestService: HttpRequestService,
private roleService: RoleService,
private readonly cacheService: CacheService,
private readonly loggerService: ApiLoggerService,
private readonly noteVisibilityService: NoteVisibilityService,
cacheManagementService: CacheManagementService,
) {
super(meta, paramDef, async (ps, me) => {
const note = await this.getterService.getNote(ps.noteId).catch(err => {
@ -110,7 +114,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
let targetLang = ps.targetLang;
if (targetLang.includes('-')) targetLang = targetLang.split('-')[0];
let response = await this.cacheService.getCachedTranslation(note, targetLang);
let response = await this.getCachedTranslation(note, targetLang);
if (!response) {
this.loggerService.logger.debug(`Fetching new translation for note=${note.id} lang=${targetLang}`);
response = await this.fetchTranslation(note, targetLang);
@ -118,10 +122,15 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
throw new ApiError(meta.errors.translationFailed);
}
await this.cacheService.setCachedTranslation(note, targetLang, response);
await this.setCachedTranslation(note, targetLang, response);
}
return response;
});
this.translationsCache = cacheManagementService.createRedisKVCache<CachedTranslationEntity>('translations', {
lifetime: 1000 * 60 * 60 * 24 * 7, // 1 week,
memoryCacheLifetime: 1000 * 60, // 1 minute
});
}
private async fetchTranslation(note: MiNote & { text: string }, targetLang: string) {
@ -219,4 +228,43 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
return null;
}
@bindThis
private async getCachedTranslation(note: MiNote, targetLang: string): Promise<CachedTranslation | null> {
const cacheKey = `${note.id}@${targetLang}`;
// Use cached translation, if present and up-to-date
const cached = await this.translationsCache.get(cacheKey);
if (cached && cached.u === note.updatedAt?.valueOf()) {
return {
sourceLang: cached.l,
text: cached.t,
};
}
// No cache entry :(
return null;
}
@bindThis
private async setCachedTranslation(note: MiNote, targetLang: string, translation: CachedTranslation): Promise<void> {
const cacheKey = `${note.id}@${targetLang}`;
await this.translationsCache.set(cacheKey, {
l: translation.sourceLang,
t: translation.text,
u: note.updatedAt?.valueOf(),
});
}
}
interface CachedTranslation {
sourceLang: string | undefined;
text: string | undefined;
}
interface CachedTranslationEntity {
l?: string;
t?: string;
u?: number;
}

View file

@ -9,6 +9,7 @@ import type { UserListsRepository, UserListMembershipsRepository, BlockingsRepos
import { Endpoint } from '@/server/api/endpoint-base.js';
import { GetterService } from '@/server/api/GetterService.js';
import { UserListService } from '@/core/UserListService.js';
import { CacheService } from '@/core/CacheService.js';
import { DI } from '@/di-symbols.js';
import { ApiError } from '../../../error.js';
@ -84,44 +85,34 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
private getterService: GetterService,
private userListService: UserListService,
private readonly cacheService: CacheService,
) {
super(meta, paramDef, async (ps, me) => {
// Fetch the list
const userList = await this.userListsRepository.findOneBy({
id: ps.listId,
userId: me.id,
});
const [user, blockings, userList, exist] = await Promise.all([
this.cacheService.findOptionalUserById(ps.userId),
this.cacheService.userBlockingCache.fetch(ps.userId),
this.userListsRepository.findOneBy({
id: ps.listId,
userId: me.id,
}),
this.cacheService.listUserMembershipsCache.fetch(ps.listId).then(ms => ms.has(ps.userId)),
]);
if (userList == null) {
throw new ApiError(meta.errors.noSuchList);
}
// Fetch the user
const user = await this.getterService.getUser(ps.userId).catch(err => {
if (err.id === '15348ddd-432d-49c2-8a5a-8069753becff') throw new ApiError(meta.errors.noSuchUser);
throw err;
});
if (!user) {
throw new ApiError(meta.errors.noSuchUser);
}
// Check blocking
if (user.id !== me.id) {
const blockExist = await this.blockingsRepository.exists({
where: {
blockerId: user.id,
blockeeId: me.id,
},
});
const blockExist = blockings.has(me.id);
if (blockExist) {
throw new ApiError(meta.errors.youHaveBeenBlocked);
}
}
const exist = await this.userListMembershipsRepository.exists({
where: {
userListId: userList.id,
userId: user.id,
},
});
if (exist) {
throw new ApiError(meta.errors.alreadyAdded);
}

View file

@ -19,8 +19,6 @@ class UserListChannel extends Channel {
public static requireCredential = true as const;
public static kind = 'read:account';
private listId: string;
private membershipsMap: Record<string, Pick<MiUserListMembership, 'withReplies'> | undefined> = {};
private listUsersClock: NodeJS.Timeout;
private withFiles: boolean;
private withRenotes: boolean;
@ -57,27 +55,6 @@ class UserListChannel extends Channel {
this.subscriber?.on(`userListStream:${this.listId}`, this.send);
this.subscriber?.on('notesStream', this.onNote);
this.updateListUsers();
this.listUsersClock = setInterval(this.updateListUsers, 5000);
}
@bindThis
private async updateListUsers() {
const memberships = await this.userListMembershipsRepository.find({
where: {
userListId: this.listId,
},
select: ['userId'],
});
const membershipsMap: Record<string, Pick<MiUserListMembership, 'withReplies'> | undefined> = {};
for (const membership of memberships) {
membershipsMap[membership.userId] = {
withReplies: membership.withReplies,
};
}
this.membershipsMap = membershipsMap;
}
@bindThis
@ -87,9 +64,10 @@ class UserListChannel extends Channel {
if (this.withFiles && (note.fileIds == null || note.fileIds.length === 0)) return;
if (!Object.hasOwn(this.membershipsMap, note.userId)) return;
const memberships = await this.cacheService.listUserMembershipsCache.fetch(this.listId);
if (!memberships.has(note.userId)) return;
const { accessible, silence } = await this.checkNoteVisibility(note, { includeReplies: true });
const { accessible, silence } = await this.checkNoteVisibility(note, { includeReplies: true, listContext: this.listId });
if (!accessible || silence) return;
if (!this.withRenotes && isPackedPureRenote(note)) return;
@ -102,8 +80,6 @@ class UserListChannel extends Channel {
// Unsubscribe events
this.subscriber?.off(`userListStream:${this.listId}`, this.send);
this.subscriber?.off('notesStream', this.onNote);
clearInterval(this.listUsersClock);
}
}