use CollapsedQueueService in processPostDeliver

This commit is contained in:
Hazelnoot 2025-06-25 18:51:51 -04:00
parent 15e88d31ad
commit fceb0d0d50
2 changed files with 48 additions and 17 deletions

View file

@ -19,7 +19,10 @@ import { DI } from '@/di-symbols.js';
export type UpdateInstanceJob = {
latestRequestReceivedAt?: Date,
notRespondingSince?: Date | null,
shouldUnsuspend?: boolean,
shouldSuspendGone?: boolean,
shouldSuspendNotResponding?: boolean,
notesCountDelta?: number,
usersCountDelta?: number,
followingCountDelta?: number,
@ -72,6 +75,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
fiveMinuteInterval,
(oldJob, newJob) => ({
latestRequestReceivedAt: maxDate(oldJob.latestRequestReceivedAt, newJob.latestRequestReceivedAt),
notRespondingSince: maxDate(oldJob.notRespondingSince, newJob.notRespondingSince),
shouldUnsuspend: oldJob.shouldUnsuspend || newJob.shouldUnsuspend,
notesCountDelta: (oldJob.notesCountDelta ?? 0) + (newJob.notesCountDelta ?? 0),
usersCountDelta: (oldJob.usersCountDelta ?? 0) + (newJob.usersCountDelta ?? 0),
@ -79,9 +83,31 @@ export class CollapsedQueueService implements OnApplicationShutdown {
followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0),
}),
(id, job) => this.federatedInstanceService.update(id, {
// Direct update if defined
latestRequestReceivedAt: job.latestRequestReceivedAt,
isNotResponding: job.latestRequestReceivedAt ? false : undefined,
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
// null (responding) > Date (not responding)
notRespondingSince: job.latestRequestReceivedAt
? null
: job.notRespondingSince,
// false (responding) > true (not responding)
isNotResponding: job.latestRequestReceivedAt
? false
: job.notRespondingSince
? true
: undefined,
// gone > none > auto
suspensionState: job.shouldSuspendGone
? 'goneSuspended'
: job.shouldUnsuspend
? 'none'
: job.shouldSuspendNotResponding
? 'autoSuspendedForNotResponding'
: undefined,
// Increment if defined
notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined,
usersCount: job.usersCountDelta ? () => `"usersCount" + ${job.usersCountDelta}` : undefined,
followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined,
@ -182,18 +208,27 @@ export class CollapsedQueueService implements OnApplicationShutdown {
}
}
function maxDate(first: Date | undefined, second: Date | undefined): Date | undefined {
if (first && second) {
if (first.getTime() > second.getTime()) {
return first;
function maxDate(first: Date | undefined, second: Date | undefined): Date | undefined;
function maxDate(first: Date | null | undefined, second: Date | null | undefined): Date | null | undefined;
function maxDate(first: Date | null | undefined, second: Date | null | undefined): Date | null | undefined {
if (first !== undefined && second !== undefined) {
if (first != null && second != null) {
if (first.getTime() > second.getTime()) {
return first;
} else {
return second;
}
} else {
return second;
// Null is considered infinitely in the future, and is therefore newer than any date.
return null;
}
} else if (first) {
} else if (first !== undefined) {
return first;
} else if (second) {
} else if (second !== undefined) {
return second;
} else {
// Undefined in considered infinitely in the past, and is therefore older than any date.
return undefined;
}
}

View file

@ -169,7 +169,7 @@ export class BackgroundTaskProcessorService {
}
private async processPostDeliver(task: PostDeliverBackgroundTask): Promise<string> {
let instance = await this.federatedInstanceService.fetchOrRegister(task.host);
const instance = await this.federatedInstanceService.fetchOrRegister(task.host);
if (instance.isBlocked) return `Skipping post-deliver task: instance ${task.host} is blocked`;
const success = task.result === 'success';
@ -188,14 +188,10 @@ export class BackgroundTaskProcessorService {
// This is messy, but we need to minimize updates to space in Postgres blocks.
if (updateNotResponding || updateGoneSuspended || updateAutoSuspended) {
instance = await this.federatedInstanceService.update(instance.id, {
isNotResponding: updateNotResponding ? !success : undefined,
this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, {
notRespondingSince: updateNotResponding ? (success ? null : new Date()) : undefined,
suspensionState: updateGoneSuspended
? 'goneSuspended'
: updateAutoSuspended
? 'autoSuspendedForNotResponding'
: undefined,
shouldSuspendGone: updateGoneSuspended || undefined,
shouldSuspendNotResponding: updateAutoSuspended || undefined,
});
}