add name and error logging to CollapsedQueue
This commit is contained in:
parent
a3bcc474b7
commit
3a2c0fd9da
2 changed files with 48 additions and 11 deletions
|
|
@ -33,11 +33,11 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
) {
|
||||
this.logger = loggerService.getLogger('collapsed-queue');
|
||||
this.updateInstanceQueue = new CollapsedQueue(
|
||||
this.envService.env.NODE_ENV !== 'test'
|
||||
? 60 * 1000 * 5
|
||||
: 0,
|
||||
'updateInstance',
|
||||
this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0,
|
||||
(oldJob, newJob) => this.collapseUpdateInstance(oldJob, newJob),
|
||||
(id, job) => this.performUpdateInstance(id, job),
|
||||
this.onQueueError,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -63,11 +63,37 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
|
||||
async dispose() {
|
||||
await this.updateInstanceQueue.performAllNow().catch(err => this.logger.error(`Shutdown error in updateInstanceQueue: ${renderInlineError(err)}`));
|
||||
@bindThis
|
||||
async performAllNow() {
|
||||
this.logger.info('Persisting all collapsed queues...');
|
||||
|
||||
await this.performQueue(this.updateInstanceQueue);
|
||||
|
||||
this.logger.info('Persistence complete.');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async performQueue<K, V>(queue: CollapsedQueue<K, V>): Promise<void> {
|
||||
try {
|
||||
const results = await queue.performAllNow();
|
||||
|
||||
const [succeeded, failed] = results.reduce((counts, result) => {
|
||||
counts[result.status === 'fulfilled' ? 0 : 1]++;
|
||||
return counts;
|
||||
}, [0, 0]);
|
||||
|
||||
this.logger.debug(`Persistence completed for ${queue.name}: ${succeeded} succeeded and ${failed} failed`);
|
||||
} catch (err) {
|
||||
this.logger.error(`Persistence failed for ${queue.name}: ${renderInlineError(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private onQueueError<K, V>(queue: CollapsedQueue<K, V>, error: unknown): void {
|
||||
this.logger.error(`Error persisting ${queue.name}: ${renderInlineError(error)}`);
|
||||
}
|
||||
|
||||
async onApplicationShutdown() {
|
||||
await this.dispose();
|
||||
await this.performAllNow();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,10 +15,12 @@ export class CollapsedQueue<K, V> {
|
|||
private jobs: Map<K, Job<V>> = new Map();
|
||||
|
||||
constructor(
|
||||
public readonly name: string,
|
||||
protected readonly timeService: TimeService,
|
||||
private timeout: number,
|
||||
private collapse: (oldValue: V, newValue: V) => V,
|
||||
private perform: (key: K, value: V) => Promise<void>,
|
||||
private readonly timeout: number,
|
||||
private readonly collapse: (oldValue: V, newValue: V) => V,
|
||||
private readonly perform: (key: K, value: V) => Promise<void>,
|
||||
private readonly onError?: (queue: CollapsedQueue<K, V>, error: unknown) => void,
|
||||
) {}
|
||||
|
||||
enqueue(key: K, value: V) {
|
||||
|
|
@ -30,7 +32,7 @@ export class CollapsedQueue<K, V> {
|
|||
const timer = this.timeService.startTimer(() => {
|
||||
const job = this.jobs.get(key)!;
|
||||
this.jobs.delete(key);
|
||||
this.perform(key, job.value);
|
||||
this._perform(key, job.value);
|
||||
}, this.timeout);
|
||||
this.jobs.set(key, { value, timer });
|
||||
}
|
||||
|
|
@ -42,6 +44,15 @@ export class CollapsedQueue<K, V> {
|
|||
for (const [_key, job] of entries) {
|
||||
this.timeService.stopTimer(job.timer);
|
||||
}
|
||||
return await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value)));
|
||||
return await Promise.allSettled(entries.map(([key, job]) => this._perform(key, job.value)));
|
||||
}
|
||||
|
||||
private async _perform(key: K, value: V) {
|
||||
try {
|
||||
await this.perform(key, value);
|
||||
} catch (err) {
|
||||
this.onError?.(this, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue