diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index 4b619320e3..a5781d815a 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -19,7 +19,10 @@ import { DI } from '@/di-symbols.js'; export type UpdateInstanceJob = { latestRequestReceivedAt?: Date, + notRespondingSince?: Date | null, shouldUnsuspend?: boolean, + shouldSuspendGone?: boolean, + shouldSuspendNotResponding?: boolean, notesCountDelta?: number, usersCountDelta?: number, followingCountDelta?: number, @@ -72,6 +75,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { fiveMinuteInterval, (oldJob, newJob) => ({ latestRequestReceivedAt: maxDate(oldJob.latestRequestReceivedAt, newJob.latestRequestReceivedAt), + notRespondingSince: maxDate(oldJob.notRespondingSince, newJob.notRespondingSince), shouldUnsuspend: oldJob.shouldUnsuspend || newJob.shouldUnsuspend, notesCountDelta: (oldJob.notesCountDelta ?? 0) + (newJob.notesCountDelta ?? 0), usersCountDelta: (oldJob.usersCountDelta ?? 0) + (newJob.usersCountDelta ?? 0), @@ -79,9 +83,31 @@ export class CollapsedQueueService implements OnApplicationShutdown { followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0), }), (id, job) => this.federatedInstanceService.update(id, { + // Direct update if defined latestRequestReceivedAt: job.latestRequestReceivedAt, - isNotResponding: job.latestRequestReceivedAt ? false : undefined, - suspensionState: job.shouldUnsuspend ? 'none' : undefined, + + // null (responding) > Date (not responding) + notRespondingSince: job.latestRequestReceivedAt + ? null + : job.notRespondingSince, + + // false (responding) > true (not responding) + isNotResponding: job.latestRequestReceivedAt + ? false + : job.notRespondingSince + ? true + : undefined, + + // gone > none > auto + suspensionState: job.shouldSuspendGone + ? 'goneSuspended' + : job.shouldUnsuspend + ? 'none' + : job.shouldSuspendNotResponding + ? 'autoSuspendedForNotResponding' + : undefined, + + // Increment if defined notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, usersCount: job.usersCountDelta ? () => `"usersCount" + ${job.usersCountDelta}` : undefined, followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, @@ -182,18 +208,27 @@ export class CollapsedQueueService implements OnApplicationShutdown { } } -function maxDate(first: Date | undefined, second: Date | undefined): Date | undefined { - if (first && second) { - if (first.getTime() > second.getTime()) { - return first; +function maxDate(first: Date | undefined, second: Date | undefined): Date | undefined; +function maxDate(first: Date | null | undefined, second: Date | null | undefined): Date | null | undefined; + +function maxDate(first: Date | null | undefined, second: Date | null | undefined): Date | null | undefined { + if (first !== undefined && second !== undefined) { + if (first != null && second != null) { + if (first.getTime() > second.getTime()) { + return first; + } else { + return second; + } } else { - return second; + // Null is considered infinitely in the future, and is therefore newer than any date. + return null; } - } else if (first) { + } else if (first !== undefined) { return first; - } else if (second) { + } else if (second !== undefined) { return second; } else { + // Undefined in considered infinitely in the past, and is therefore older than any date. return undefined; } } diff --git a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts index 4863b815d7..bacfed3473 100644 --- a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts +++ b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts @@ -169,7 +169,7 @@ export class BackgroundTaskProcessorService { } private async processPostDeliver(task: PostDeliverBackgroundTask): Promise { - let instance = await this.federatedInstanceService.fetchOrRegister(task.host); + const instance = await this.federatedInstanceService.fetchOrRegister(task.host); if (instance.isBlocked) return `Skipping post-deliver task: instance ${task.host} is blocked`; const success = task.result === 'success'; @@ -188,14 +188,10 @@ export class BackgroundTaskProcessorService { // This is messy, but we need to minimize updates to space in Postgres blocks. if (updateNotResponding || updateGoneSuspended || updateAutoSuspended) { - instance = await this.federatedInstanceService.update(instance.id, { - isNotResponding: updateNotResponding ? !success : undefined, + this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, { notRespondingSince: updateNotResponding ? (success ? null : new Date()) : undefined, - suspensionState: updateGoneSuspended - ? 'goneSuspended' - : updateAutoSuspended - ? 'autoSuspendedForNotResponding' - : undefined, + shouldSuspendGone: updateGoneSuspended || undefined, + shouldSuspendNotResponding: updateAutoSuspended || undefined, }); }