From 45df03ab148db0484e1fe8dff2f4fecb075dfa48 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 12:34:40 -0400 Subject: [PATCH] replace MarkUserUpdatedBackgroundTask with updateUserQueue --- .../backend/src/core/CollapsedQueueService.ts | 71 ++++++++++++------- .../backend/src/core/NoteCreateService.ts | 6 +- .../backend/src/core/NoteDeleteService.ts | 8 +-- packages/backend/src/core/NoteEditService.ts | 4 +- packages/backend/src/core/QueueService.ts | 5 -- packages/backend/src/core/ReactionService.ts | 8 +-- packages/backend/src/misc/collapsed-queue.ts | 2 +- .../BackgroundTaskProcessorService.ts | 28 +------- packages/backend/src/queue/types.ts | 8 +-- 9 files changed, 67 insertions(+), 73 deletions(-) diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index 3f3b304618..aa21f7578f 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { LoggerService } from '@/core/LoggerService.js'; import type Logger from '@/logger.js'; import { CollapsedQueue } from '@/misc/collapsed-queue.js'; @@ -12,55 +12,69 @@ import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { EnvService } from '@/core/EnvService.js'; import { bindThis } from '@/decorators.js'; import type { MiInstance } from '@/models/Instance.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; +import { MiUser } from '@/models/User.js'; +import type { UsersRepository } from '@/models/_.js'; +import { DI } from '@/di-symbols.js'; export type UpdateInstanceJob = { latestRequestReceivedAt: Date, shouldUnsuspend: boolean, }; +export type UpdateUserJob = { + updatedAt: Date, +}; + @Injectable() export class CollapsedQueueService implements OnApplicationShutdown { // Moved from InboxProcessorService to allow access from ApInboxService public readonly updateInstanceQueue: CollapsedQueue; + public readonly updateUserQueue: CollapsedQueue; private readonly logger: Logger; constructor( + @Inject(DI.usersRepository) + public readonly usersRepository: UsersRepository, + private readonly federatedInstanceService: FederatedInstanceService, private readonly envService: EnvService, + private readonly internalEventService: InternalEventService, loggerService: LoggerService, ) { this.logger = loggerService.getLogger('collapsed-queue'); + + const fiveMinuteInterval = this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0; + this.updateInstanceQueue = new CollapsedQueue( 'updateInstance', - this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, - (oldJob, newJob) => this.collapseUpdateInstance(oldJob, newJob), - (id, job) => this.performUpdateInstance(id, job), + fiveMinuteInterval, + (oldJob, newJob) => ({ + latestRequestReceivedAt: new Date(Math.max(oldJob.latestRequestReceivedAt.getTime(), newJob.latestRequestReceivedAt.getTime())), + shouldUnsuspend: oldJob.shouldUnsuspend || newJob.shouldUnsuspend, + }), + (id, job) => this.federatedInstanceService.update(id, { + latestRequestReceivedAt: job.latestRequestReceivedAt, + isNotResponding: false, + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }), this.onQueueError, ); - } - @bindThis - private collapseUpdateInstance(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { - const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt - ? newJob.latestRequestReceivedAt - : oldJob.latestRequestReceivedAt; - const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; - return { - latestRequestReceivedAt, - shouldUnsuspend, - }; - } + this.updateUserQueue = new CollapsedQueue( + 'updateUser', + fiveMinuteInterval, + (oldJob, newJob) => ({ + updatedAt: new Date(Math.max(oldJob.updatedAt.getTime(), newJob.updatedAt.getTime())), + }), + (id, job) => this.usersRepository.update({ id }, { updatedAt: job.updatedAt }), + this.onQueueError, + ); - @bindThis - private async performUpdateInstance(id: string, job: UpdateInstanceJob) { - await this.federatedInstanceService.update(id, { - latestRequestReceivedAt: new Date(), - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: job.shouldUnsuspend ? 'none' : undefined, - }); + this.internalEventService.on('localUserUpdated', this.onUserUpdated); + this.internalEventService.on('remoteUserUpdated', this.onUserUpdated); } @bindThis @@ -68,6 +82,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { this.logger.info('Persisting all collapsed queues...'); await this.performQueue(this.updateInstanceQueue); + await this.performQueue(this.updateUserQueue); this.logger.info('Persistence complete.'); } @@ -93,7 +108,15 @@ export class CollapsedQueueService implements OnApplicationShutdown { this.logger.error(`Error persisting ${queue.name}: ${renderInlineError(error)}`); } + @bindThis + private onUserUpdated(data: { id: string }) { + this.updateUserQueue.enqueue(data.id, { updatedAt: new Date() }); + } + async onApplicationShutdown() { + this.internalEventService.off('localUserUpdated', this.onUserUpdated); + this.internalEventService.off('remoteUserUpdated', this.onUserUpdated); + await this.performAllNow(); } } diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index c4d5c17803..e4a57880ad 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -59,6 +59,7 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { CacheService } from '@/core/CacheService.js'; import { TimeService } from '@/global/TimeService.js'; import { NoteVisibilityService } from '@/core/NoteVisibilityService.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; type NotificationType = 'reply' | 'renote' | 'quote' | 'mention'; @@ -226,6 +227,7 @@ export class NoteCreateService implements OnApplicationShutdown { private latestNoteService: LatestNoteService, private readonly timeService: TimeService, private readonly noteVisibilityService: NoteVisibilityService, + private readonly collapsedQueueService: CollapsedQueueService, ) { this.updateNotesCountQueue = new CollapsedQueue(this.timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseNotesCount, this.performUpdateNotesCount); } @@ -604,10 +606,10 @@ export class NoteCreateService implements OnApplicationShutdown { if (!this.isRenote(note) || this.isQuote(note)) { // Increment notes count (user) await this.incNotesCountOfUser(user); - } else { - await this.queueService.createMarkUserUpdatedJob(user.id); } + this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); + await this.pushToTl(note, user); await this.antennaService.addNoteToAntennas({ diff --git a/packages/backend/src/core/NoteDeleteService.ts b/packages/backend/src/core/NoteDeleteService.ts index 21dce62744..4384b29a99 100644 --- a/packages/backend/src/core/NoteDeleteService.ts +++ b/packages/backend/src/core/NoteDeleteService.ts @@ -27,7 +27,7 @@ import { LatestNoteService } from '@/core/LatestNoteService.js'; import { ApLogService } from '@/core/ApLogService.js'; import { TimeService } from '@/global/TimeService.js'; import { trackTask } from '@/misc/promise-tracker.js'; -import { QueueService } from '@/core/QueueService.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; import { CacheService } from '@/core/CacheService.js'; @Injectable() @@ -61,7 +61,7 @@ export class NoteDeleteService { private latestNoteService: LatestNoteService, private readonly apLogService: ApLogService, private readonly timeService: TimeService, - private readonly queueService: QueueService, + private readonly collapsedQueueService: CollapsedQueueService, private readonly cacheService: CacheService, ) {} @@ -142,10 +142,10 @@ export class NoteDeleteService { if (!isPureRenote(note)) { // Decrement notes count (user) promises.push(this.decNotesCountOfUser(user)); - } else { - promises.push(this.queueService.createMarkUserUpdatedJob(user.id)); } + this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); + for (const cascade of cascadingNotes) { if (!isPureRenote(cascade)) { promises.push(this.decNotesCountOfUser(cascade.user)); diff --git a/packages/backend/src/core/NoteEditService.ts b/packages/backend/src/core/NoteEditService.ts index dc2990c64f..54d712c042 100644 --- a/packages/backend/src/core/NoteEditService.ts +++ b/packages/backend/src/core/NoteEditService.ts @@ -55,6 +55,7 @@ import { NoteCreateService } from '@/core/NoteCreateService.js'; import { TimeService } from '@/global/TimeService.js'; import { NoteVisibilityService } from '@/core/NoteVisibilityService.js'; import { isPureRenote } from '@/misc/is-renote.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; type NotificationType = 'reply' | 'renote' | 'quote' | 'mention' | 'edited'; @@ -224,6 +225,7 @@ export class NoteEditService implements OnApplicationShutdown { private noteCreateService: NoteCreateService, private readonly timeService: TimeService, private readonly noteVisibilityService: NoteVisibilityService, + private readonly collapsedQueueService: CollapsedQueueService, ) { this.updateNotesCountQueue = new CollapsedQueue(this.timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseNotesCount, this.performUpdateNotesCount); } @@ -627,7 +629,7 @@ export class NoteEditService implements OnApplicationShutdown { } } - await this.queueService.createMarkUserUpdatedJob(user.id); + this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); // ハッシュタグ更新 await this.pushToTl(note, user); diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 9cd9754f21..99911f38a7 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -935,11 +935,6 @@ export class QueueService implements OnModuleInit { return await this.createBackgroundTask({ type: 'delete-ap-logs', dataType, data }); } - @bindThis - public async createMarkUserUpdatedJob(userId: string) { - return await this.createBackgroundTask({ type: 'mark-user-updated', userId }, userId); - } - private async createBackgroundTask(data: T, duplication?: string | { id: string, ttl?: number }) { return await this.backgroundTaskQueue.add( data.type, diff --git a/packages/backend/src/core/ReactionService.ts b/packages/backend/src/core/ReactionService.ts index 8f642e014e..1e2e30f219 100644 --- a/packages/backend/src/core/ReactionService.ts +++ b/packages/backend/src/core/ReactionService.ts @@ -33,7 +33,7 @@ import { PER_NOTE_REACTION_USER_PAIR_CACHE_MAX } from '@/const.js'; import { CacheService } from '@/core/CacheService.js'; import { NoteVisibilityService } from '@/core/NoteVisibilityService.js'; import { TimeService } from '@/global/TimeService.js'; -import { QueueService } from '@/core/QueueService.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; import type { DataSource } from 'typeorm'; const FALLBACK = '\u2764'; @@ -111,7 +111,7 @@ export class ReactionService implements OnModuleInit { private readonly cacheService: CacheService, private readonly noteVisibilityService: NoteVisibilityService, private readonly timeService: TimeService, - private readonly queueService: QueueService, + private readonly collapsedQueueService: CollapsedQueueService, ) { } @@ -226,7 +226,7 @@ export class ReactionService implements OnModuleInit { .execute(); } - await this.queueService.createMarkUserUpdatedJob(user.id); + this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); // 30%の確率、セルフではない、3日以内に投稿されたノートの場合ハイライト用ランキング更新 if ( @@ -342,7 +342,7 @@ export class ReactionService implements OnModuleInit { .execute(); } - await this.queueService.createMarkUserUpdatedJob(user.id); + this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); this.globalEventService.publishNoteStream(note.id, 'unreacted', { reaction: this.decodeReaction(exist.reaction).reaction, diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index af3796f7ed..8b70bcc505 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -19,7 +19,7 @@ export class CollapsedQueue { protected readonly timeService: TimeService, private readonly timeout: number, private readonly collapse: (oldValue: V, newValue: V) => V, - private readonly perform: (key: K, value: V) => Promise, + private readonly perform: (key: K, value: V) => Promise, private readonly onError?: (queue: CollapsedQueue, error: unknown) => void, ) {} diff --git a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts index 19a2e355c8..4863b815d7 100644 --- a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts +++ b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts @@ -5,7 +5,7 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Bull from 'bullmq'; -import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask, UpdateLatestNoteBackgroundTask, PostSuspendBackgroundTask, PostUnsuspendBackgroundTask, DeleteApLogsBackgroundTask, MarkUserUpdatedBackgroundTask } from '@/queue/types.js'; +import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask, UpdateLatestNoteBackgroundTask, PostSuspendBackgroundTask, PostUnsuspendBackgroundTask, DeleteApLogsBackgroundTask } from '@/queue/types.js'; import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js'; import { QueueLoggerService } from '@/queue/QueueLoggerService.js'; import Logger from '@/logger.js'; @@ -18,7 +18,7 @@ import InstanceChart from '@/core/chart/charts/instance.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js'; import FederationChart from '@/core/chart/charts/federation.js'; import { NoteCreateService } from '@/core/NoteCreateService.js'; -import type { DriveFilesRepository, NoteEditsRepository, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js'; +import type { DriveFilesRepository, NoteEditsRepository, NotesRepository, PollsRepository } from '@/models/_.js'; import { MiUser } from '@/models/_.js'; import { NoteEditService } from '@/core/NoteEditService.js'; import { HashtagService } from '@/core/HashtagService.js'; @@ -27,7 +27,6 @@ import { LatestNoteService } from '@/core/LatestNoteService.js'; import { trackTask } from '@/misc/promise-tracker.js'; import { UserSuspendService } from '@/core/UserSuspendService.js'; import { ApLogService } from '@/core/ApLogService.js'; -import { InternalEventService } from '@/core/InternalEventService.js'; import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; @Injectable() @@ -47,9 +46,6 @@ export class BackgroundTaskProcessorService { @Inject(DI.noteEditsRepository) private readonly noteEditsRepository: NoteEditsRepository, - @Inject(DI.usersRepository) - private readonly usersRepository: UsersRepository, - @Inject(DI.pollsRepository) private readonly pollsRepository: PollsRepository, @@ -68,7 +64,6 @@ export class BackgroundTaskProcessorService { private readonly latestNoteService: LatestNoteService, private readonly userSuspendService: UserSuspendService, private readonly apLogService: ApLogService, - private readonly internalEventService: InternalEventService, queueLoggerService: QueueLoggerService, ) { @@ -102,11 +97,9 @@ export class BackgroundTaskProcessorService { return await this.processPostSuspend(job.data); } else if (job.data.type === 'post-unsuspend') { return await this.processPostUnsuspend(job.data); + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (job.data.type === 'delete-ap-logs') { return await this.processDeleteApLogs(job.data); - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - } else if (job.data.type === 'mark-user-updated') { - return await this.processMarkUserUpdated(job.data); } else { this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data); throw new Error(`Unknown job type ${job.data}, see system logs for details`); @@ -352,19 +345,4 @@ export class BackgroundTaskProcessorService { return 'ok'; } - - private async processMarkUserUpdated(task: MarkUserUpdatedBackgroundTask): Promise { - const user = await this.cacheService.findOptionalUserById(task.userId); - if (!user || user.isDeleted) return `Skipping post-unsuspend task: user ${task.userId} has been deleted`; - - await this.usersRepository.update({ id: user.id }, { updatedAt: new Date() }); - - if (user.host == null) { - await this.internalEventService.emit('localUserUpdated', { id: user.id }); - } else { - await this.internalEventService.emit('remoteUserUpdated', { id: user.id }); - } - - return 'ok'; - } } diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 671500a275..cb31e5a3e1 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -184,8 +184,7 @@ export type BackgroundTaskJobData = UpdateLatestNoteBackgroundTask | PostSuspendBackgroundTask | PostUnsuspendBackgroundTask | - DeleteApLogsBackgroundTask | - MarkUserUpdatedBackgroundTask; + DeleteApLogsBackgroundTask; export type UpdateUserBackgroundTask = { type: 'update-user'; @@ -262,8 +261,3 @@ export type DeleteApLogsBackgroundTask = { dataType: 'inbox' | 'object'; data: string | string[]; }; - -export type MarkUserUpdatedBackgroundTask = { - type: 'mark-user-updated'; - userId: string; -};