move UpdateInstanceQueue into CollapsedQueueService

This commit is contained in:
Hazelnoot 2025-06-25 11:59:00 -04:00
parent 15bb9b38e1
commit 0491ea55cc
5 changed files with 89 additions and 63 deletions

View file

@ -0,0 +1,73 @@
/*
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { LoggerService } from '@/core/LoggerService.js';
import type Logger from '@/logger.js';
import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { EnvService } from '@/core/EnvService.js';
import { bindThis } from '@/decorators.js';
import type { MiInstance } from '@/models/Instance.js';
export type UpdateInstanceJob = {
latestRequestReceivedAt: Date,
shouldUnsuspend: boolean,
};
@Injectable()
export class CollapsedQueueService implements OnApplicationShutdown {
// Moved from InboxProcessorService to allow access from ApInboxService
public readonly updateInstanceQueue: CollapsedQueue<MiInstance['id'], UpdateInstanceJob>;
private readonly logger: Logger;
constructor(
private readonly federatedInstanceService: FederatedInstanceService,
private readonly envService: EnvService,
loggerService: LoggerService,
) {
this.logger = loggerService.getLogger('collapsed-queue');
this.updateInstanceQueue = new CollapsedQueue(
this.envService.env.NODE_ENV !== 'test'
? 60 * 1000 * 5
: 0,
(oldJob, newJob) => this.collapseUpdateInstance(oldJob, newJob),
(id, job) => this.performUpdateInstance(id, job),
);
}
@bindThis
private collapseUpdateInstance(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
? newJob.latestRequestReceivedAt
: oldJob.latestRequestReceivedAt;
const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend;
return {
latestRequestReceivedAt,
shouldUnsuspend,
};
}
@bindThis
private async performUpdateInstance(id: string, job: UpdateInstanceJob) {
await this.federatedInstanceService.update(id, {
latestRequestReceivedAt: new Date(),
isNotResponding: false,
// もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
});
}
async dispose() {
await this.updateInstanceQueue.performAllNow().catch(err => this.logger.error(`Shutdown error in updateInstanceQueue: ${renderInlineError(err)}`));
}
async onApplicationShutdown() {
await this.dispose();
}
}

View file

@ -17,7 +17,7 @@ import { WebhookTestService } from '@/core/WebhookTestService.js';
import { FlashService } from '@/core/FlashService.js'; import { FlashService } from '@/core/FlashService.js';
import { ApUtilityService } from '@/core/activitypub/ApUtilityService.js'; import { ApUtilityService } from '@/core/activitypub/ApUtilityService.js';
import { ApLogService } from '@/core/ApLogService.js'; import { ApLogService } from '@/core/ApLogService.js';
import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
import { InstanceStatsService } from '@/core/InstanceStatsService.js'; import { InstanceStatsService } from '@/core/InstanceStatsService.js';
import { NoteVisibilityService } from '@/core/NoteVisibilityService.js'; import { NoteVisibilityService } from '@/core/NoteVisibilityService.js';
import { AccountMoveService } from './AccountMoveService.js'; import { AccountMoveService } from './AccountMoveService.js';
@ -218,7 +218,7 @@ const $UserRenoteMutingService: Provider = { provide: 'UserRenoteMutingService',
const $UserSearchService: Provider = { provide: 'UserSearchService', useExisting: UserSearchService }; const $UserSearchService: Provider = { provide: 'UserSearchService', useExisting: UserSearchService };
const $UserSuspendService: Provider = { provide: 'UserSuspendService', useExisting: UserSuspendService }; const $UserSuspendService: Provider = { provide: 'UserSuspendService', useExisting: UserSuspendService };
const $UserAuthService: Provider = { provide: 'UserAuthService', useExisting: UserAuthService }; const $UserAuthService: Provider = { provide: 'UserAuthService', useExisting: UserAuthService };
const $UpdateInstanceQueue: Provider = { provide: 'UpdateInstanceQueue', useExisting: UpdateInstanceQueue }; const $CollapsedQueueService: Provider = { provide: 'CollapsedQueueService', useExisting: CollapsedQueueService };
const $VideoProcessingService: Provider = { provide: 'VideoProcessingService', useExisting: VideoProcessingService }; const $VideoProcessingService: Provider = { provide: 'VideoProcessingService', useExisting: VideoProcessingService };
const $UserWebhookService: Provider = { provide: 'UserWebhookService', useExisting: UserWebhookService }; const $UserWebhookService: Provider = { provide: 'UserWebhookService', useExisting: UserWebhookService };
const $SystemWebhookService: Provider = { provide: 'SystemWebhookService', useExisting: SystemWebhookService }; const $SystemWebhookService: Provider = { provide: 'SystemWebhookService', useExisting: SystemWebhookService };
@ -377,7 +377,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
UserSearchService, UserSearchService,
UserSuspendService, UserSuspendService,
UserAuthService, UserAuthService,
UpdateInstanceQueue, CollapsedQueueService,
VideoProcessingService, VideoProcessingService,
UserWebhookService, UserWebhookService,
SystemWebhookService, SystemWebhookService,
@ -531,7 +531,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
$UserSearchService, $UserSearchService,
$UserSuspendService, $UserSuspendService,
$UserAuthService, $UserAuthService,
$UpdateInstanceQueue, $CollapsedQueueService,
$VideoProcessingService, $VideoProcessingService,
$UserWebhookService, $UserWebhookService,
$SystemWebhookService, $SystemWebhookService,
@ -686,7 +686,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
UserSearchService, UserSearchService,
UserSuspendService, UserSuspendService,
UserAuthService, UserAuthService,
UpdateInstanceQueue, CollapsedQueueService,
VideoProcessingService, VideoProcessingService,
UserWebhookService, UserWebhookService,
SystemWebhookService, SystemWebhookService,
@ -839,7 +839,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
$UserSearchService, $UserSearchService,
$UserSuspendService, $UserSuspendService,
$UserAuthService, $UserAuthService,
$UpdateInstanceQueue, $CollapsedQueueService,
$VideoProcessingService, $VideoProcessingService,
$UserWebhookService, $UserWebhookService,
$SystemWebhookService, $SystemWebhookService,

View file

@ -1,54 +0,0 @@
/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { bindThis } from '@/decorators.js';
import { MiNote } from '@/models/Note.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { TimeService } from '@/global/TimeService.js';
type UpdateInstanceJob = {
latestRequestReceivedAt: Date,
shouldUnsuspend: boolean,
};
// Moved from InboxProcessorService to allow access from ApInboxService
@Injectable()
export class UpdateInstanceQueue extends CollapsedQueue<MiNote['id'], UpdateInstanceJob> implements OnApplicationShutdown {
constructor(
private readonly federatedInstanceService: FederatedInstanceService,
timeService: TimeService,
) {
super(timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, (id, job) => this.collapseUpdateInstanceJobs(id, job), (id, job) => this.performUpdateInstance(id, job));
}
@bindThis
private collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
? newJob.latestRequestReceivedAt
: oldJob.latestRequestReceivedAt;
const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend;
return {
latestRequestReceivedAt,
shouldUnsuspend,
};
}
@bindThis
private async performUpdateInstance(id: string, job: UpdateInstanceJob) {
await this.federatedInstanceService.update(id, {
latestRequestReceivedAt: this.timeService.date,
isNotResponding: false,
// もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
});
}
@bindThis
async onApplicationShutdown() {
await this.performAllNow();
}
}

View file

@ -17,7 +17,6 @@ import { MiMeta } from '@/models/Meta.js';
import InstanceChart from '@/core/chart/charts/instance.js'; import InstanceChart from '@/core/chart/charts/instance.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js'; import FederationChart from '@/core/chart/charts/federation.js';
import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js';
import { NoteCreateService } from '@/core/NoteCreateService.js'; import { NoteCreateService } from '@/core/NoteCreateService.js';
import type { DriveFilesRepository, NoteEditsRepository, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js'; import type { DriveFilesRepository, NoteEditsRepository, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js';
import { MiUser } from '@/models/_.js'; import { MiUser } from '@/models/_.js';
@ -29,6 +28,7 @@ import { trackTask } from '@/misc/promise-tracker.js';
import { UserSuspendService } from '@/core/UserSuspendService.js'; import { UserSuspendService } from '@/core/UserSuspendService.js';
import { ApLogService } from '@/core/ApLogService.js'; import { ApLogService } from '@/core/ApLogService.js';
import { InternalEventService } from '@/core/InternalEventService.js'; import { InternalEventService } from '@/core/InternalEventService.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
@Injectable() @Injectable()
export class BackgroundTaskProcessorService { export class BackgroundTaskProcessorService {
@ -60,7 +60,7 @@ export class BackgroundTaskProcessorService {
private readonly instanceChart: InstanceChart, private readonly instanceChart: InstanceChart,
private readonly apRequestChart: ApRequestChart, private readonly apRequestChart: ApRequestChart,
private readonly federationChart: FederationChart, private readonly federationChart: FederationChart,
private readonly updateInstanceQueue: UpdateInstanceQueue, private readonly collapsedQueueService: CollapsedQueueService,
private readonly noteCreateService: NoteCreateService, private readonly noteCreateService: NoteCreateService,
private readonly noteEditService: NoteEditService, private readonly noteEditService: NoteEditService,
private readonly hashtagService: HashtagService, private readonly hashtagService: HashtagService,
@ -240,7 +240,7 @@ export class BackgroundTaskProcessorService {
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance); await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance);
// Unsuspend instance (deferred) // Unsuspend instance (deferred)
this.updateInstanceQueue.enqueue(instance.id, { this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, {
latestRequestReceivedAt: new Date(), latestRequestReceivedAt: new Date(),
shouldUnsuspend: instance.suspensionState === 'autoSuspendedForNotResponding', shouldUnsuspend: instance.suspensionState === 'autoSuspendedForNotResponding',
}); });

View file

@ -38,14 +38,18 @@ import { trackPromise } from '@/misc/promise-tracker.js';
import { QueueLoggerService } from '../QueueLoggerService.js'; import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js'; import type { InboxJobData } from '../types.js';
// Moved to CollapsedQueueService
/*
type UpdateInstanceJob = { type UpdateInstanceJob = {
latestRequestReceivedAt: Date, latestRequestReceivedAt: Date,
shouldUnsuspend: boolean, shouldUnsuspend: boolean,
}; };
*/
@Injectable() @Injectable()
export class InboxProcessorService implements OnApplicationShutdown { export class InboxProcessorService implements OnApplicationShutdown {
private logger: Logger; private logger: Logger;
// Moved to CollapsedQueueService
//private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; //private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>;
constructor( constructor(
@ -295,6 +299,8 @@ export class InboxProcessorService implements OnApplicationShutdown {
return 'ok'; return 'ok';
} }
// Moved to CollapsedQueueService
/*
@bindThis @bindThis
public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
@ -316,6 +322,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
suspensionState: job.shouldUnsuspend ? 'none' : undefined, suspensionState: job.shouldUnsuspend ? 'none' : undefined,
}); });
} }
*/
@bindThis @bindThis
public async dispose(): Promise<void> {} public async dispose(): Promise<void> {}