From c79d66d48b62b5af42d5f6a7d7b986f7e1c572fe Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 22:16:06 -0400 Subject: [PATCH] fix disposal of ServerStatsService and QueueStatsService --- .../backend/src/daemons/QueueStatsService.ts | 106 +++++++++++++----- .../backend/src/daemons/ServerStatsService.ts | 49 ++++++-- 2 files changed, 117 insertions(+), 38 deletions(-) diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index ede104b9fe..9888ae3942 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -13,13 +13,32 @@ import type { Config } from '@/config.js'; import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import type { OnApplicationShutdown } from '@nestjs/common'; +export interface StatsEntry { + activeSincePrevTick: number, + active: number, + waiting: number, + delayed: number, +} + +export interface Stats { + deliver: StatsEntry, + inbox: StatsEntry, +} + const ev = new Xev(); const interval = 10000; @Injectable() export class QueueStatsService implements OnApplicationShutdown { - private intervalId: NodeJS.Timeout; + private intervalId?: NodeJS.Timeout; + private activeDeliverJobs = 0; + private activeInboxJobs = 0; + + private deliverQueueEvents?: Bull.QueueEvents; + private inboxQueueEvents?: Bull.QueueEvents; + + private log?: Stats[]; constructor( @Inject(DI.config) @@ -29,30 +48,39 @@ export class QueueStatsService implements OnApplicationShutdown { ) { } + @bindThis + private onDeliverActive() { + this.activeDeliverJobs++; + } + + @bindThis + private onInboxActive() { + this.activeInboxJobs++; + } + + @bindThis + private onRequestQueueStatsLog(x: { id: string, length?: number }) { + if (this.log) { + ev.emit(`queueStatsLog:${x.id}`, this.log.slice(0, x.length ?? 50)); + } + } + /** * Report queue stats regularly */ @bindThis - public start(): void { - const log = [] as any[]; + public async start() { + // Just in case start gets called repeatedly + await this.stop(); - ev.on('requestQueueStatsLog', x => { - ev.emit(`queueStatsLog:${x.id}`, log.slice(0, x.length ?? 50)); - }); + this.log = []; + ev.on('requestQueueStatsLog', this.onRequestQueueStatsLog); - let activeDeliverJobs = 0; - let activeInboxJobs = 0; + this.deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); + this.inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); - const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); - const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); - - deliverQueueEvents.on('active', () => { - activeDeliverJobs++; - }); - - inboxQueueEvents.on('active', () => { - activeInboxJobs++; - }); + this.deliverQueueEvents.on('active', this.onDeliverActive); + this.inboxQueueEvents.on('active', this.onInboxActive); const tick = async () => { const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts(); @@ -60,13 +88,13 @@ export class QueueStatsService implements OnApplicationShutdown { const stats = { deliver: { - activeSincePrevTick: activeDeliverJobs, + activeSincePrevTick: this.activeDeliverJobs, active: deliverJobCounts.active, waiting: deliverJobCounts.waiting, delayed: deliverJobCounts.delayed, }, inbox: { - activeSincePrevTick: activeInboxJobs, + activeSincePrevTick: this.activeInboxJobs, active: inboxJobCounts.active, waiting: inboxJobCounts.waiting, delayed: inboxJobCounts.delayed, @@ -75,11 +103,13 @@ export class QueueStatsService implements OnApplicationShutdown { ev.emit('queueStats', stats); - log.unshift(stats); - if (log.length > 200) log.pop(); + if (this.log) { + this.log.unshift(stats); + if (this.log.length > 200) this.log.pop(); + } - activeDeliverJobs = 0; - activeInboxJobs = 0; + this.activeDeliverJobs = 0; + this.activeInboxJobs = 0; }; tick(); @@ -88,12 +118,32 @@ export class QueueStatsService implements OnApplicationShutdown { } @bindThis - public dispose(): void { - clearInterval(this.intervalId); + public async stop() { + if (this.intervalId) { + clearInterval(this.intervalId); + } + + this.log = undefined; + ev.off('requestQueueStatsLog', this.onRequestQueueStatsLog); + + this.deliverQueueEvents?.off('active', this.onDeliverActive); + this.inboxQueueEvents?.off('active', this.onInboxActive); + + await this.deliverQueueEvents?.close(); + await this.inboxQueueEvents?.close(); + + this.activeDeliverJobs = 0; + this.activeInboxJobs = 0; } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async dispose() { + await this.stop(); + ev.dispose(); + } + + @bindThis + public async onApplicationShutdown(signal?: string | undefined) { + await this.dispose(); } } diff --git a/packages/backend/src/daemons/ServerStatsService.ts b/packages/backend/src/daemons/ServerStatsService.ts index 6e9d29dcbd..9fafa54b04 100644 --- a/packages/backend/src/daemons/ServerStatsService.ts +++ b/packages/backend/src/daemons/ServerStatsService.ts @@ -12,6 +12,22 @@ import type { OnApplicationShutdown } from '@nestjs/common'; import { MiMeta } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; +export interface Stats { + cpu: number, + mem: { + used: number, + active: number, + }, + net: { + rx: number, + tx: number, + }, + fs: { + r: number, + w: number, + }, +} + const ev = new Xev(); const interval = 2000; @@ -23,12 +39,19 @@ const round = (num: number) => Math.round(num * 10) / 10; export class ServerStatsService implements OnApplicationShutdown { private intervalId: NodeJS.Timeout | null = null; + private log: Stats[] = []; + constructor( @Inject(DI.meta) private meta: MiMeta, ) { } + @bindThis + private async onRequestStatsLog(x: { id: string, length: number }) { + ev.emit(`serverStatsLog:${x.id}`, this.log.slice(0, x.length)); + } + /** * Report server stats regularly */ @@ -36,11 +59,8 @@ export class ServerStatsService implements OnApplicationShutdown { public async start(): Promise { if (!this.meta.enableServerMachineStats) return; - const log = [] as any[]; - - ev.on('requestServerStatsLog', x => { - ev.emit(`serverStatsLog:${x.id}`, log.slice(0, x.length)); - }); + this.log = []; + ev.on('requestServerStatsLog', this.onRequestStatsLog); const tick = async () => { const cpu = await cpuUsage(); @@ -64,8 +84,8 @@ export class ServerStatsService implements OnApplicationShutdown { }, }; ev.emit('serverStats', stats); - log.unshift(stats); - if (log.length > 200) log.pop(); + this.log.unshift(stats); + if (this.log.length > 200) this.log.pop(); }; tick(); @@ -78,6 +98,11 @@ export class ServerStatsService implements OnApplicationShutdown { if (this.intervalId) { clearInterval(this.intervalId); } + + this.log = []; + ev.off('requestServerStatsLog', this.onRequestStatsLog); + + ev.dispose(); } @bindThis @@ -89,9 +114,13 @@ export class ServerStatsService implements OnApplicationShutdown { // CPU STAT function cpuUsage(): Promise { return new Promise((res, rej) => { - osUtils.cpuUsage((cpuUsage) => { - res(cpuUsage); - }); + try { + osUtils.cpuUsage((cpuUsage) => { + res(cpuUsage); + }); + } catch (err) { + rej(err); + } }); }