fix disposal of ServerStatsService and QueueStatsService

This commit is contained in:
Hazelnoot 2025-06-25 22:16:06 -04:00
parent d4105dee0c
commit c79d66d48b
2 changed files with 117 additions and 38 deletions

View file

@ -13,13 +13,32 @@ import type { Config } from '@/config.js';
import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { OnApplicationShutdown } from '@nestjs/common'; import type { OnApplicationShutdown } from '@nestjs/common';
export interface StatsEntry {
activeSincePrevTick: number,
active: number,
waiting: number,
delayed: number,
}
export interface Stats {
deliver: StatsEntry,
inbox: StatsEntry,
}
const ev = new Xev(); const ev = new Xev();
const interval = 10000; const interval = 10000;
@Injectable() @Injectable()
export class QueueStatsService implements OnApplicationShutdown { export class QueueStatsService implements OnApplicationShutdown {
private intervalId: NodeJS.Timeout; private intervalId?: NodeJS.Timeout;
private activeDeliverJobs = 0;
private activeInboxJobs = 0;
private deliverQueueEvents?: Bull.QueueEvents;
private inboxQueueEvents?: Bull.QueueEvents;
private log?: Stats[];
constructor( constructor(
@Inject(DI.config) @Inject(DI.config)
@ -29,30 +48,39 @@ export class QueueStatsService implements OnApplicationShutdown {
) { ) {
} }
@bindThis
private onDeliverActive() {
this.activeDeliverJobs++;
}
@bindThis
private onInboxActive() {
this.activeInboxJobs++;
}
@bindThis
private onRequestQueueStatsLog(x: { id: string, length?: number }) {
if (this.log) {
ev.emit(`queueStatsLog:${x.id}`, this.log.slice(0, x.length ?? 50));
}
}
/** /**
* Report queue stats regularly * Report queue stats regularly
*/ */
@bindThis @bindThis
public start(): void { public async start() {
const log = [] as any[]; // Just in case start gets called repeatedly
await this.stop();
ev.on('requestQueueStatsLog', x => { this.log = [];
ev.emit(`queueStatsLog:${x.id}`, log.slice(0, x.length ?? 50)); ev.on('requestQueueStatsLog', this.onRequestQueueStatsLog);
});
let activeDeliverJobs = 0; this.deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
let activeInboxJobs = 0; this.inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));
const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); this.deliverQueueEvents.on('active', this.onDeliverActive);
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); this.inboxQueueEvents.on('active', this.onInboxActive);
deliverQueueEvents.on('active', () => {
activeDeliverJobs++;
});
inboxQueueEvents.on('active', () => {
activeInboxJobs++;
});
const tick = async () => { const tick = async () => {
const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts(); const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts();
@ -60,13 +88,13 @@ export class QueueStatsService implements OnApplicationShutdown {
const stats = { const stats = {
deliver: { deliver: {
activeSincePrevTick: activeDeliverJobs, activeSincePrevTick: this.activeDeliverJobs,
active: deliverJobCounts.active, active: deliverJobCounts.active,
waiting: deliverJobCounts.waiting, waiting: deliverJobCounts.waiting,
delayed: deliverJobCounts.delayed, delayed: deliverJobCounts.delayed,
}, },
inbox: { inbox: {
activeSincePrevTick: activeInboxJobs, activeSincePrevTick: this.activeInboxJobs,
active: inboxJobCounts.active, active: inboxJobCounts.active,
waiting: inboxJobCounts.waiting, waiting: inboxJobCounts.waiting,
delayed: inboxJobCounts.delayed, delayed: inboxJobCounts.delayed,
@ -75,11 +103,13 @@ export class QueueStatsService implements OnApplicationShutdown {
ev.emit('queueStats', stats); ev.emit('queueStats', stats);
log.unshift(stats); if (this.log) {
if (log.length > 200) log.pop(); this.log.unshift(stats);
if (this.log.length > 200) this.log.pop();
}
activeDeliverJobs = 0; this.activeDeliverJobs = 0;
activeInboxJobs = 0; this.activeInboxJobs = 0;
}; };
tick(); tick();
@ -88,12 +118,32 @@ export class QueueStatsService implements OnApplicationShutdown {
} }
@bindThis @bindThis
public dispose(): void { public async stop() {
clearInterval(this.intervalId); if (this.intervalId) {
clearInterval(this.intervalId);
}
this.log = undefined;
ev.off('requestQueueStatsLog', this.onRequestQueueStatsLog);
this.deliverQueueEvents?.off('active', this.onDeliverActive);
this.inboxQueueEvents?.off('active', this.onInboxActive);
await this.deliverQueueEvents?.close();
await this.inboxQueueEvents?.close();
this.activeDeliverJobs = 0;
this.activeInboxJobs = 0;
} }
@bindThis @bindThis
public onApplicationShutdown(signal?: string | undefined): void { public async dispose() {
this.dispose(); await this.stop();
ev.dispose();
}
@bindThis
public async onApplicationShutdown(signal?: string | undefined) {
await this.dispose();
} }
} }

View file

@ -12,6 +12,22 @@ import type { OnApplicationShutdown } from '@nestjs/common';
import { MiMeta } from '@/models/_.js'; import { MiMeta } from '@/models/_.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
export interface Stats {
cpu: number,
mem: {
used: number,
active: number,
},
net: {
rx: number,
tx: number,
},
fs: {
r: number,
w: number,
},
}
const ev = new Xev(); const ev = new Xev();
const interval = 2000; const interval = 2000;
@ -23,12 +39,19 @@ const round = (num: number) => Math.round(num * 10) / 10;
export class ServerStatsService implements OnApplicationShutdown { export class ServerStatsService implements OnApplicationShutdown {
private intervalId: NodeJS.Timeout | null = null; private intervalId: NodeJS.Timeout | null = null;
private log: Stats[] = [];
constructor( constructor(
@Inject(DI.meta) @Inject(DI.meta)
private meta: MiMeta, private meta: MiMeta,
) { ) {
} }
@bindThis
private async onRequestStatsLog(x: { id: string, length: number }) {
ev.emit(`serverStatsLog:${x.id}`, this.log.slice(0, x.length));
}
/** /**
* Report server stats regularly * Report server stats regularly
*/ */
@ -36,11 +59,8 @@ export class ServerStatsService implements OnApplicationShutdown {
public async start(): Promise<void> { public async start(): Promise<void> {
if (!this.meta.enableServerMachineStats) return; if (!this.meta.enableServerMachineStats) return;
const log = [] as any[]; this.log = [];
ev.on('requestServerStatsLog', this.onRequestStatsLog);
ev.on('requestServerStatsLog', x => {
ev.emit(`serverStatsLog:${x.id}`, log.slice(0, x.length));
});
const tick = async () => { const tick = async () => {
const cpu = await cpuUsage(); const cpu = await cpuUsage();
@ -64,8 +84,8 @@ export class ServerStatsService implements OnApplicationShutdown {
}, },
}; };
ev.emit('serverStats', stats); ev.emit('serverStats', stats);
log.unshift(stats); this.log.unshift(stats);
if (log.length > 200) log.pop(); if (this.log.length > 200) this.log.pop();
}; };
tick(); tick();
@ -78,6 +98,11 @@ export class ServerStatsService implements OnApplicationShutdown {
if (this.intervalId) { if (this.intervalId) {
clearInterval(this.intervalId); clearInterval(this.intervalId);
} }
this.log = [];
ev.off('requestServerStatsLog', this.onRequestStatsLog);
ev.dispose();
} }
@bindThis @bindThis
@ -89,9 +114,13 @@ export class ServerStatsService implements OnApplicationShutdown {
// CPU STAT // CPU STAT
function cpuUsage(): Promise<number> { function cpuUsage(): Promise<number> {
return new Promise((res, rej) => { return new Promise((res, rej) => {
osUtils.cpuUsage((cpuUsage) => { try {
res(cpuUsage); osUtils.cpuUsage((cpuUsage) => {
}); res(cpuUsage);
});
} catch (err) {
rej(err);
}
}); });
} }