From 1baa9b2f9d1994706fc602377eb296f5400d32a7 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 22:34:36 -0400 Subject: [PATCH] fix sporadic update failures caused by lack of data --- .../backend/src/core/CollapsedQueueService.ts | 101 +++++++++++------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index d566f40adb..406f92f103 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -104,37 +104,52 @@ export class CollapsedQueueService implements OnApplicationShutdown { followingCountDelta: (oldJob.followingCountDelta ?? 0) + (newJob.followingCountDelta ?? 0), followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0), }), - async (id, job) => await this.federatedInstanceService.update(id, { - // Direct update if defined - latestRequestReceivedAt: job.latestRequestReceivedAt, + async (id, job) => { + // Have to check this because all properties are optional + if ( + job.latestRequestReceivedAt || + job.notRespondingSince !== undefined || + job.shouldSuspendNotResponding || + job.shouldSuspendGone || + job.shouldUnsuspend || + job.notesCountDelta || + job.usersCountDelta || + job.followingCountDelta || + job.followersCountDelta + ) { + await this.federatedInstanceService.update(id, { + // Direct update if defined + latestRequestReceivedAt: job.latestRequestReceivedAt, - // null (responding) > Date (not responding) - notRespondingSince: job.latestRequestReceivedAt - ? null - : job.notRespondingSince, + // null (responding) > Date (not responding) + notRespondingSince: job.latestRequestReceivedAt + ? null + : job.notRespondingSince, - // false (responding) > true (not responding) - isNotResponding: job.latestRequestReceivedAt - ? false - : job.notRespondingSince - ? true - : undefined, + // false (responding) > true (not responding) + isNotResponding: job.latestRequestReceivedAt + ? false + : job.notRespondingSince + ? true + : undefined, - // gone > none > auto - suspensionState: job.shouldSuspendGone - ? 'goneSuspended' - : job.shouldUnsuspend - ? 'none' - : job.shouldSuspendNotResponding - ? 'autoSuspendedForNotResponding' - : undefined, + // gone > none > auto + suspensionState: job.shouldSuspendGone + ? 'goneSuspended' + : job.shouldUnsuspend + ? 'none' + : job.shouldSuspendNotResponding + ? 'autoSuspendedForNotResponding' + : undefined, - // Increment if defined - notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, - usersCount: job.usersCountDelta ? () => `"usersCount" + ${job.usersCountDelta}` : undefined, - followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, - followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined, - }), + // Increment if defined + notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, + usersCount: job.usersCountDelta ? () => `"usersCount" + ${job.usersCountDelta}` : undefined, + followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, + followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined, + }); + } + }, { onError: this.onQueueError, concurrency: 2, // Low concurrency, this table is slow for some reason @@ -161,13 +176,16 @@ export class CollapsedQueueService implements OnApplicationShutdown { followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0), }), async (id, job) => { - await this.usersRepository.update({ id }, { - updatedAt: job.updatedAt, - notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, - followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, - followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined, - }); - await this.internalEventService.emit('userUpdated', { id }); + // Have to check this because all properties are optional + if (job.updatedAt || job.notesCountDelta || job.followingCountDelta || job.followersCountDelta) { + await this.usersRepository.update({ id }, { + updatedAt: job.updatedAt, + notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, + followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, + followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined, + }); + await this.internalEventService.emit('userUpdated', { id }); + } }, { onError: this.onQueueError, @@ -190,11 +208,16 @@ export class CollapsedQueueService implements OnApplicationShutdown { renoteCountDelta: (oldJob.renoteCountDelta ?? 0) + (newJob.renoteCountDelta ?? 0), clippedCountDelta: (oldJob.clippedCountDelta ?? 0) + (newJob.clippedCountDelta ?? 0), }), - async (id, job) => await this.notesRepository.update({ id }, { - repliesCount: job.repliesCountDelta ? () => `"repliesCount" + ${job.repliesCountDelta}` : undefined, - renoteCount: job.renoteCountDelta ? () => `"renoteCount" + ${job.renoteCountDelta}` : undefined, - clippedCount: job.clippedCountDelta ? () => `"clippedCount" + ${job.clippedCountDelta}` : undefined, - }), + async (id, job) => { + // Have to check this because all properties are optional + if (job.repliesCountDelta || job.renoteCountDelta || job.clippedCountDelta) { + await this.notesRepository.update({ id }, { + repliesCount: job.repliesCountDelta ? () => `"repliesCount" + ${job.repliesCountDelta}` : undefined, + renoteCount: job.renoteCountDelta ? () => `"renoteCount" + ${job.renoteCountDelta}` : undefined, + clippedCount: job.clippedCountDelta ? () => `"clippedCount" + ${job.clippedCountDelta}` : undefined, + }); + } + }, { onError: this.onQueueError, concurrency: 4, // High concurrency - this queue gets a lot of activity