diff --git a/packages/backend/src/core/AntennaService.ts b/packages/backend/src/core/AntennaService.ts index cc4b7dd719..8404da2a08 100644 --- a/packages/backend/src/core/AntennaService.ts +++ b/packages/backend/src/core/AntennaService.ts @@ -18,13 +18,15 @@ import type { AntennasRepository, UserListMembershipsRepository } from '@/models import type { MiAntenna } from '@/models/Antenna.js'; import type { MiNote } from '@/models/Note.js'; import type { MiUser } from '@/models/User.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; import { CacheService } from './CacheService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; @Injectable() export class AntennaService implements OnApplicationShutdown { + // TODO implement QuantumSingleCache then replace this private antennasFetched: boolean; - private antennas: MiAntenna[]; + private antennas: Map; constructor( @Inject(DI.redisForTimelines) @@ -43,9 +45,10 @@ export class AntennaService implements OnApplicationShutdown { private utilityService: UtilityService, private globalEventService: GlobalEventService, private fanoutTimelineService: FanoutTimelineService, + private readonly internalEventService: InternalEventService, ) { this.antennasFetched = false; - this.antennas = []; + this.antennas = new Map(); this.redisForSub.on('message', this.onRedisMessage); } @@ -58,35 +61,16 @@ export class AntennaService implements OnApplicationShutdown { const { type, body } = obj.message as GlobalEvents['internal']['payload']; switch (type) { case 'antennaCreated': - this.antennas.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい + case 'antennaUpdated': + this.antennas.set(body.id, { // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい ...body, lastUsedAt: new Date(body.lastUsedAt), user: null, // joinなカラムは通常取ってこないので userList: null, // joinなカラムは通常取ってこないので }); break; - case 'antennaUpdated': { - const idx = this.antennas.findIndex(a => a.id === body.id); - if (idx >= 0) { - this.antennas[idx] = { // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい - ...body, - lastUsedAt: new Date(body.lastUsedAt), - user: null, // joinなカラムは通常取ってこないので - userList: null, // joinなカラムは通常取ってこないので - }; - } else { - // サーバ起動時にactiveじゃなかった場合、リストに持っていないので追加する必要あり - this.antennas.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい - ...body, - lastUsedAt: new Date(body.lastUsedAt), - user: null, // joinなカラムは通常取ってこないので - userList: null, // joinなカラムは通常取ってこないので - }); - } - } - break; case 'antennaDeleted': - this.antennas = this.antennas.filter(a => a.id !== body.id); + this.antennas.delete(body.id); break; default: break; @@ -94,6 +78,20 @@ export class AntennaService implements OnApplicationShutdown { } } + @bindThis + public async updateAntenna(id: string, data: Partial) { + await this.antennasRepository.update({ id }, data); + + const antenna = this.antennas.get(id) ?? await this.antennasRepository.findOneBy({ id }); + if (antenna) { + // This will be handled above to save result + await this.internalEventService.emit('antennaUpdated', { + ...antenna, + ...data, + }); + } + } + @bindThis public async addNoteToAntennas(note: MiNote, noteUser: { id: MiUser['id']; username: string; host: string | null; isBot: boolean; }): Promise { const antennas = await this.getAntennas(); @@ -212,13 +210,14 @@ export class AntennaService implements OnApplicationShutdown { @bindThis public async getAntennas() { if (!this.antennasFetched) { - this.antennas = await this.antennasRepository.findBy({ + const allAntennas = await this.antennasRepository.findBy({ isActive: true, }); + this.antennas = new Map(allAntennas.map(a => [a.id, a])); this.antennasFetched = true; } - return this.antennas; + return Array.from(this.antennas.values()); } @bindThis diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index 0082c55d0d..1a1def3291 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -14,8 +14,9 @@ import { bindThis } from '@/decorators.js'; import type { MiInstance } from '@/models/Instance.js'; import { InternalEventService } from '@/core/InternalEventService.js'; import { MiUser } from '@/models/User.js'; -import type { MiNote, UsersRepository, NotesRepository, MiAccessToken, AccessTokensRepository } from '@/models/_.js'; +import type { MiNote, UsersRepository, NotesRepository, MiAccessToken, AccessTokensRepository, MiAntenna, AntennasRepository } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; +import { AntennaService } from '@/core/AntennaService.js'; export type UpdateInstanceJob = { latestRequestReceivedAt?: Date, @@ -46,14 +47,30 @@ export type UpdateAccessTokenJob = { lastUsedAt: Date; }; +export type UpdateAntennaJob = { + isActive: boolean, + lastUsedAt?: Date, +}; + +// TODO sync cross-process: +// 1. Emit internal events when scheduling timer, performing queue, and enqueuing data +// 2. On schedule, mark ID as deferred. +// 3. On perform, clear mark. +// 4. On performAll, skip deferred IDs. +// 5. On enqueue when ID is deferred, send data as event instead. +// 6. On delete when ID is deferred, clear mark + @Injectable() export class CollapsedQueueService implements OnApplicationShutdown { // Moved from InboxProcessorService public readonly updateInstanceQueue: CollapsedQueue; + // Moved from NoteCreateService, NoteEditService, and NoteDeleteService public readonly updateUserQueue: CollapsedQueue; + public readonly updateNoteQueue: CollapsedQueue; public readonly updateAccessTokenQueue: CollapsedQueue; + public readonly updateAntennaQueue: CollapsedQueue; private readonly logger: Logger; @@ -67,9 +84,13 @@ export class CollapsedQueueService implements OnApplicationShutdown { @Inject(DI.accessTokensRepository) public readonly accessTokensRepository: AccessTokensRepository, + @Inject(DI.antennasRepository) + public readonly antennasRepository: AntennasRepository, + private readonly federatedInstanceService: FederatedInstanceService, private readonly envService: EnvService, private readonly internalEventService: InternalEventService, + private readonly antennaService: AntennaService, loggerService: LoggerService, ) { @@ -136,15 +157,17 @@ export class CollapsedQueueService implements OnApplicationShutdown { followingCountDelta: (oldJob.followingCountDelta ?? 0) + (newJob.followingCountDelta ?? 0), followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0), }), - (id, job) => this.usersRepository.update({ id }, { - updatedAt: job.updatedAt, - notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, - followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, - followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined, - }), + async (id, job) => { + await this.usersRepository.update({ id }, { + updatedAt: job.updatedAt, + notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, + followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, + followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined, + }); + await this.internalEventService.emit('userUpdated', { id }); + }, { onError: this.onQueueError, - onPerform: (_, id) => this.internalEventService.emit('userUpdated', { id }), concurrency: 4, // High concurrency - this queue gets a lot of activity }, ); @@ -183,7 +206,26 @@ export class CollapsedQueueService implements OnApplicationShutdown { }, ); + this.updateAntennaQueue = new CollapsedQueue( + 'updateAntenna', + fiveMinuteInterval, + (oldJob, newJob) => ({ + isActive: oldJob.isActive || newJob.isActive, + lastUsedAt: maxDate(oldJob.lastUsedAt, newJob.lastUsedAt), + }), + (id, job) => this.antennaService.updateAntenna(id, { + isActive: job.isActive, + lastUsedAt: job.lastUsedAt, + }), + { + onError: this.onQueueError, + concurrency: 4, + }, + ); + this.internalEventService.on('userChangeDeletedState', this.onUserDeleted); + this.internalEventService.on('antennaDeleted', this.onAntennaDeleted); + this.internalEventService.on('antennaUpdated', this.onAntennaDeleted); } @bindThis @@ -194,6 +236,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { await this.performQueue(this.updateUserQueue); await this.performQueue(this.updateNoteQueue); await this.performQueue(this.updateAccessTokenQueue); + await this.performQueue(this.updateAntennaQueue); this.logger.info('Persistence complete.'); } @@ -226,8 +269,15 @@ export class CollapsedQueueService implements OnApplicationShutdown { } } + @bindThis + private onAntennaDeleted(data: MiAntenna) { + this.updateAntennaQueue.delete(data.id); + } + async onApplicationShutdown() { this.internalEventService.off('userChangeDeletedState', this.onUserDeleted); + this.internalEventService.off('antennaDeleted', this.onAntennaDeleted); + this.internalEventService.off('antennaUpdated', this.onAntennaDeleted); await this.performAllNow(); } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index af1657cc63..280373db79 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -24,7 +24,6 @@ export class CollapsedQueue { private readonly perform: (key: K, value: V) => Promise, private readonly opts?: { onError?: (queue: CollapsedQueue, error: unknown) => void | Promise, - onPerform?: (queue: CollapsedQueue, key: K, value: V) => void | Promise, concurrency?: number, }, ) { @@ -74,7 +73,6 @@ export class CollapsedQueue { } else { await this.perform(key, value); } - await this.opts?.onPerform?.(this, key, value); } catch (err) { await this.opts?.onError?.(this, err); throw err; diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts index f678801e08..eec468e3e4 100644 --- a/packages/backend/src/queue/processors/CleanProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanProcessorService.ts @@ -13,6 +13,7 @@ import { IdService } from '@/core/IdService.js'; import type { Config } from '@/config.js'; import { ReversiService } from '@/core/ReversiService.js'; import { TimeService } from '@/global/TimeService.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -37,6 +38,7 @@ export class CleanProcessorService { private reversiService: ReversiService, private idService: IdService, private readonly timeService: TimeService, + private readonly collapsedQueueService: CollapsedQueueService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('clean'); } @@ -51,6 +53,7 @@ export class CleanProcessorService { // 使われてないアンテナを停止 if (this.config.deactivateAntennaThreshold > 0) { + await this.collapsedQueueService.updateAntennaQueue.performAllNow(); await this.antennasRepository.update({ lastUsedAt: LessThan(new Date(this.timeService.now - this.config.deactivateAntennaThreshold)), }, { diff --git a/packages/backend/src/server/api/endpoints/antennas/notes.ts b/packages/backend/src/server/api/endpoints/antennas/notes.ts index 683d5b9e91..81c0fe5169 100644 --- a/packages/backend/src/server/api/endpoints/antennas/notes.ts +++ b/packages/backend/src/server/api/endpoints/antennas/notes.ts @@ -16,6 +16,7 @@ import { FanoutTimelineService } from '@/core/FanoutTimelineService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import { trackPromise } from '@/misc/promise-tracker.js'; import ActiveUsersChart from '@/core/chart/charts/active-users.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -79,6 +80,7 @@ export default class extends Endpoint { // eslint- private globalEventService: GlobalEventService, private readonly activeUsersChart: ActiveUsersChart, private readonly timeService: TimeService, + private readonly collapsedQueueService: CollapsedQueueService, ) { super(meta, paramDef, async (ps, me) => { const untilId = ps.untilId ?? (ps.untilDate ? this.idService.gen(ps.untilDate!) : null); @@ -96,9 +98,10 @@ export default class extends Endpoint { // eslint- // falseだった場合はアンテナの配信先が増えたことを通知したい const needPublishEvent = !antenna.isActive; - antenna.isActive = true; - antenna.lastUsedAt = this.timeService.date; - trackPromise(this.antennasRepository.update(antenna.id, antenna)); + this.collapsedQueueService.updateAntennaQueue.enqueue(antenna.id, { + isActive: true, + lastUsedAt: this.timeService.date, + }); if (needPublishEvent) { this.globalEventService.publishInternalEvent('antennaUpdated', antenna);