From 3a2c0fd9dae04d2b3440ad6d34b150148967656c Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 12:16:17 -0400 Subject: [PATCH] add name and error logging to CollapsedQueue --- .../backend/src/core/CollapsedQueueService.ts | 38 ++++++++++++++++--- packages/backend/src/misc/collapsed-queue.ts | 21 +++++++--- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index f2b920fb33..3f3b304618 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -33,11 +33,11 @@ export class CollapsedQueueService implements OnApplicationShutdown { ) { this.logger = loggerService.getLogger('collapsed-queue'); this.updateInstanceQueue = new CollapsedQueue( - this.envService.env.NODE_ENV !== 'test' - ? 60 * 1000 * 5 - : 0, + 'updateInstance', + this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, (oldJob, newJob) => this.collapseUpdateInstance(oldJob, newJob), (id, job) => this.performUpdateInstance(id, job), + this.onQueueError, ); } @@ -63,11 +63,37 @@ export class CollapsedQueueService implements OnApplicationShutdown { }); } - async dispose() { - await this.updateInstanceQueue.performAllNow().catch(err => this.logger.error(`Shutdown error in updateInstanceQueue: ${renderInlineError(err)}`)); + @bindThis + async performAllNow() { + this.logger.info('Persisting all collapsed queues...'); + + await this.performQueue(this.updateInstanceQueue); + + this.logger.info('Persistence complete.'); + } + + @bindThis + private async performQueue(queue: CollapsedQueue): Promise { + try { + const results = await queue.performAllNow(); + + const [succeeded, failed] = results.reduce((counts, result) => { + counts[result.status === 'fulfilled' ? 0 : 1]++; + return counts; + }, [0, 0]); + + this.logger.debug(`Persistence completed for ${queue.name}: ${succeeded} succeeded and ${failed} failed`); + } catch (err) { + this.logger.error(`Persistence failed for ${queue.name}: ${renderInlineError(err)}`); + } + } + + @bindThis + private onQueueError(queue: CollapsedQueue, error: unknown): void { + this.logger.error(`Error persisting ${queue.name}: ${renderInlineError(error)}`); } async onApplicationShutdown() { - await this.dispose(); + await this.performAllNow(); } } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index ee3f44af4b..af3796f7ed 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -15,10 +15,12 @@ export class CollapsedQueue { private jobs: Map> = new Map(); constructor( + public readonly name: string, protected readonly timeService: TimeService, - private timeout: number, - private collapse: (oldValue: V, newValue: V) => V, - private perform: (key: K, value: V) => Promise, + private readonly timeout: number, + private readonly collapse: (oldValue: V, newValue: V) => V, + private readonly perform: (key: K, value: V) => Promise, + private readonly onError?: (queue: CollapsedQueue, error: unknown) => void, ) {} enqueue(key: K, value: V) { @@ -30,7 +32,7 @@ export class CollapsedQueue { const timer = this.timeService.startTimer(() => { const job = this.jobs.get(key)!; this.jobs.delete(key); - this.perform(key, job.value); + this._perform(key, job.value); }, this.timeout); this.jobs.set(key, { value, timer }); } @@ -42,6 +44,15 @@ export class CollapsedQueue { for (const [_key, job] of entries) { this.timeService.stopTimer(job.timer); } - return await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value))); + return await Promise.allSettled(entries.map(([key, job]) => this._perform(key, job.value))); + } + + private async _perform(key: K, value: V) { + try { + await this.perform(key, value); + } catch (err) { + this.onError?.(this, err); + throw err; + } } }