From 0491ea55cce5c41050b50a40792f8f4d2f3a3f60 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 11:59:00 -0400 Subject: [PATCH] move UpdateInstanceQueue into CollapsedQueueService --- .../backend/src/core/CollapsedQueueService.ts | 73 +++++++++++++++++++ packages/backend/src/core/CoreModule.ts | 12 +-- .../backend/src/core/UpdateInstanceQueue.ts | 54 -------------- .../BackgroundTaskProcessorService.ts | 6 +- .../queue/processors/InboxProcessorService.ts | 7 ++ 5 files changed, 89 insertions(+), 63 deletions(-) create mode 100644 packages/backend/src/core/CollapsedQueueService.ts delete mode 100644 packages/backend/src/core/UpdateInstanceQueue.ts diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts new file mode 100644 index 0000000000..f2b920fb33 --- /dev/null +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -0,0 +1,73 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; +import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; +import { EnvService } from '@/core/EnvService.js'; +import { bindThis } from '@/decorators.js'; +import type { MiInstance } from '@/models/Instance.js'; + +export type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; + +@Injectable() +export class CollapsedQueueService implements OnApplicationShutdown { + // Moved from InboxProcessorService to allow access from ApInboxService + public readonly updateInstanceQueue: CollapsedQueue; + + private readonly logger: Logger; + + constructor( + private readonly federatedInstanceService: FederatedInstanceService, + private readonly envService: EnvService, + + loggerService: LoggerService, + ) { + this.logger = loggerService.getLogger('collapsed-queue'); + this.updateInstanceQueue = new CollapsedQueue( + this.envService.env.NODE_ENV !== 'test' + ? 60 * 1000 * 5 + : 0, + (oldJob, newJob) => this.collapseUpdateInstance(oldJob, newJob), + (id, job) => this.performUpdateInstance(id, job), + ); + } + + @bindThis + private collapseUpdateInstance(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + private async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } + + async dispose() { + await this.updateInstanceQueue.performAllNow().catch(err => this.logger.error(`Shutdown error in updateInstanceQueue: ${renderInlineError(err)}`)); + } + + async onApplicationShutdown() { + await this.dispose(); + } +} diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index 11119c52a3..a75c3b3abf 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -17,7 +17,7 @@ import { WebhookTestService } from '@/core/WebhookTestService.js'; import { FlashService } from '@/core/FlashService.js'; import { ApUtilityService } from '@/core/activitypub/ApUtilityService.js'; import { ApLogService } from '@/core/ApLogService.js'; -import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; import { InstanceStatsService } from '@/core/InstanceStatsService.js'; import { NoteVisibilityService } from '@/core/NoteVisibilityService.js'; import { AccountMoveService } from './AccountMoveService.js'; @@ -218,7 +218,7 @@ const $UserRenoteMutingService: Provider = { provide: 'UserRenoteMutingService', const $UserSearchService: Provider = { provide: 'UserSearchService', useExisting: UserSearchService }; const $UserSuspendService: Provider = { provide: 'UserSuspendService', useExisting: UserSuspendService }; const $UserAuthService: Provider = { provide: 'UserAuthService', useExisting: UserAuthService }; -const $UpdateInstanceQueue: Provider = { provide: 'UpdateInstanceQueue', useExisting: UpdateInstanceQueue }; +const $CollapsedQueueService: Provider = { provide: 'CollapsedQueueService', useExisting: CollapsedQueueService }; const $VideoProcessingService: Provider = { provide: 'VideoProcessingService', useExisting: VideoProcessingService }; const $UserWebhookService: Provider = { provide: 'UserWebhookService', useExisting: UserWebhookService }; const $SystemWebhookService: Provider = { provide: 'SystemWebhookService', useExisting: SystemWebhookService }; @@ -377,7 +377,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp UserSearchService, UserSuspendService, UserAuthService, - UpdateInstanceQueue, + CollapsedQueueService, VideoProcessingService, UserWebhookService, SystemWebhookService, @@ -531,7 +531,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $UserSearchService, $UserSuspendService, $UserAuthService, - $UpdateInstanceQueue, + $CollapsedQueueService, $VideoProcessingService, $UserWebhookService, $SystemWebhookService, @@ -686,7 +686,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp UserSearchService, UserSuspendService, UserAuthService, - UpdateInstanceQueue, + CollapsedQueueService, VideoProcessingService, UserWebhookService, SystemWebhookService, @@ -839,7 +839,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $UserSearchService, $UserSuspendService, $UserAuthService, - $UpdateInstanceQueue, + $CollapsedQueueService, $VideoProcessingService, $UserWebhookService, $SystemWebhookService, diff --git a/packages/backend/src/core/UpdateInstanceQueue.ts b/packages/backend/src/core/UpdateInstanceQueue.ts deleted file mode 100644 index c136241344..0000000000 --- a/packages/backend/src/core/UpdateInstanceQueue.ts +++ /dev/null @@ -1,54 +0,0 @@ -/* - * SPDX-FileCopyrightText: syuilo and misskey-project - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import { Injectable, OnApplicationShutdown } from '@nestjs/common'; -import { CollapsedQueue } from '@/misc/collapsed-queue.js'; -import { bindThis } from '@/decorators.js'; -import { MiNote } from '@/models/Note.js'; -import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; -import { TimeService } from '@/global/TimeService.js'; - -type UpdateInstanceJob = { - latestRequestReceivedAt: Date, - shouldUnsuspend: boolean, -}; - -// Moved from InboxProcessorService to allow access from ApInboxService -@Injectable() -export class UpdateInstanceQueue extends CollapsedQueue implements OnApplicationShutdown { - constructor( - private readonly federatedInstanceService: FederatedInstanceService, - timeService: TimeService, - ) { - super(timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, (id, job) => this.collapseUpdateInstanceJobs(id, job), (id, job) => this.performUpdateInstance(id, job)); - } - - @bindThis - private collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { - const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt - ? newJob.latestRequestReceivedAt - : oldJob.latestRequestReceivedAt; - const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; - return { - latestRequestReceivedAt, - shouldUnsuspend, - }; - } - - @bindThis - private async performUpdateInstance(id: string, job: UpdateInstanceJob) { - await this.federatedInstanceService.update(id, { - latestRequestReceivedAt: this.timeService.date, - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: job.shouldUnsuspend ? 'none' : undefined, - }); - } - - @bindThis - async onApplicationShutdown() { - await this.performAllNow(); - } -} diff --git a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts index 3d82dfa89b..19a2e355c8 100644 --- a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts +++ b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts @@ -17,7 +17,6 @@ import { MiMeta } from '@/models/Meta.js'; import InstanceChart from '@/core/chart/charts/instance.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js'; import FederationChart from '@/core/chart/charts/federation.js'; -import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { NoteCreateService } from '@/core/NoteCreateService.js'; import type { DriveFilesRepository, NoteEditsRepository, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js'; import { MiUser } from '@/models/_.js'; @@ -29,6 +28,7 @@ import { trackTask } from '@/misc/promise-tracker.js'; import { UserSuspendService } from '@/core/UserSuspendService.js'; import { ApLogService } from '@/core/ApLogService.js'; import { InternalEventService } from '@/core/InternalEventService.js'; +import { CollapsedQueueService } from '@/core/CollapsedQueueService.js'; @Injectable() export class BackgroundTaskProcessorService { @@ -60,7 +60,7 @@ export class BackgroundTaskProcessorService { private readonly instanceChart: InstanceChart, private readonly apRequestChart: ApRequestChart, private readonly federationChart: FederationChart, - private readonly updateInstanceQueue: UpdateInstanceQueue, + private readonly collapsedQueueService: CollapsedQueueService, private readonly noteCreateService: NoteCreateService, private readonly noteEditService: NoteEditService, private readonly hashtagService: HashtagService, @@ -240,7 +240,7 @@ export class BackgroundTaskProcessorService { await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance); // Unsuspend instance (deferred) - this.updateInstanceQueue.enqueue(instance.id, { + this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, { latestRequestReceivedAt: new Date(), shouldUnsuspend: instance.suspensionState === 'autoSuspendedForNotResponding', }); diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 1a2389dac4..0495501567 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -38,14 +38,18 @@ import { trackPromise } from '@/misc/promise-tracker.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; +// Moved to CollapsedQueueService +/* type UpdateInstanceJob = { latestRequestReceivedAt: Date, shouldUnsuspend: boolean, }; + */ @Injectable() export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; + // Moved to CollapsedQueueService //private updateInstanceQueue: CollapsedQueue; constructor( @@ -295,6 +299,8 @@ export class InboxProcessorService implements OnApplicationShutdown { return 'ok'; } + // Moved to CollapsedQueueService + /* @bindThis public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt @@ -316,6 +322,7 @@ export class InboxProcessorService implements OnApplicationShutdown { suspensionState: job.shouldUnsuspend ? 'none' : undefined, }); } + */ @bindThis public async dispose(): Promise {}