add concurrency limit to CollapsedQueue

This commit is contained in:
Hazelnoot 2025-06-25 12:43:13 -04:00
parent 45df03ab14
commit e884e3f6a3
2 changed files with 28 additions and 7 deletions

View file

@ -60,7 +60,10 @@ export class CollapsedQueueService implements OnApplicationShutdown {
isNotResponding: false,
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
}),
this.onQueueError,
{
onError: this.onQueueError,
concurrency: 2, // Low concurrency, this table is slow for some reason
},
);
this.updateUserQueue = new CollapsedQueue(
@ -70,7 +73,10 @@ export class CollapsedQueueService implements OnApplicationShutdown {
updatedAt: new Date(Math.max(oldJob.updatedAt.getTime(), newJob.updatedAt.getTime())),
}),
(id, job) => this.usersRepository.update({ id }, { updatedAt: job.updatedAt }),
this.onQueueError,
{
onError: this.onQueueError,
concurrency: 4,
},
);
this.internalEventService.on('localUserUpdated', this.onUserUpdated);

View file

@ -3,7 +3,8 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
import type { TimeService, TimerHandle } from '@/global/TimeService.js';
import { TimeService, type TimerHandle } from '@/global/TimeService.js';
import promiseLimit from 'promise-limit';
type Job<V> = {
value: V;
@ -12,6 +13,7 @@ type Job<V> = {
// TODO: redis使えるようにする
export class CollapsedQueue<K, V> {
private readonly limiter?: ReturnType<typeof promiseLimit<void>>;
private jobs: Map<K, Job<V>> = new Map();
constructor(
@ -20,8 +22,15 @@ export class CollapsedQueue<K, V> {
private readonly timeout: number,
private readonly collapse: (oldValue: V, newValue: V) => V,
private readonly perform: (key: K, value: V) => Promise<void | unknown>,
private readonly onError?: (queue: CollapsedQueue<K, V>, error: unknown) => void,
) {}
private readonly opts?: {
onError?: (queue: CollapsedQueue<K, V>, error: unknown) => void,
concurrency?: number,
},
) {
if (opts?.concurrency) {
this.limiter = promiseLimit<void>(opts.concurrency);
}
}
enqueue(key: K, value: V) {
if (this.jobs.has(key)) {
@ -49,9 +58,15 @@ export class CollapsedQueue<K, V> {
private async _perform(key: K, value: V) {
try {
await this.perform(key, value);
if (this.limiter) {
await this.limiter(async () => {
await this.perform(key, value);
});
} else {
await this.perform(key, value);
}
} catch (err) {
this.onError?.(this, err);
this.opts?.onError?.(this, err);
throw err;
}
}