diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index aa21f7578f..36c2c6962d 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -60,7 +60,10 @@ export class CollapsedQueueService implements OnApplicationShutdown { isNotResponding: false, suspensionState: job.shouldUnsuspend ? 'none' : undefined, }), - this.onQueueError, + { + onError: this.onQueueError, + concurrency: 2, // Low concurrency, this table is slow for some reason + }, ); this.updateUserQueue = new CollapsedQueue( @@ -70,7 +73,10 @@ export class CollapsedQueueService implements OnApplicationShutdown { updatedAt: new Date(Math.max(oldJob.updatedAt.getTime(), newJob.updatedAt.getTime())), }), (id, job) => this.usersRepository.update({ id }, { updatedAt: job.updatedAt }), - this.onQueueError, + { + onError: this.onQueueError, + concurrency: 4, + }, ); this.internalEventService.on('localUserUpdated', this.onUserUpdated); diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index 8b70bcc505..a2b1166faf 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -3,7 +3,8 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import type { TimeService, TimerHandle } from '@/global/TimeService.js'; +import { TimeService, type TimerHandle } from '@/global/TimeService.js'; +import promiseLimit from 'promise-limit'; type Job = { value: V; @@ -12,6 +13,7 @@ type Job = { // TODO: redis使えるようにする export class CollapsedQueue { + private readonly limiter?: ReturnType>; private jobs: Map> = new Map(); constructor( @@ -20,8 +22,15 @@ export class CollapsedQueue { private readonly timeout: number, private readonly collapse: (oldValue: V, newValue: V) => V, private readonly perform: (key: K, value: V) => Promise, - private readonly onError?: (queue: CollapsedQueue, error: unknown) => void, - ) {} + private readonly opts?: { + onError?: (queue: CollapsedQueue, error: unknown) => void, + concurrency?: number, + }, + ) { + if (opts?.concurrency) { + this.limiter = promiseLimit(opts.concurrency); + } + } enqueue(key: K, value: V) { if (this.jobs.has(key)) { @@ -49,9 +58,15 @@ export class CollapsedQueue { private async _perform(key: K, value: V) { try { - await this.perform(key, value); + if (this.limiter) { + await this.limiter(async () => { + await this.perform(key, value); + }); + } else { + await this.perform(key, value); + } } catch (err) { - this.onError?.(this, err); + this.opts?.onError?.(this, err); throw err; } }