From d4105dee0c110f37c732336d6822c6900b60f763 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 16:28:07 -0400 Subject: [PATCH 1/6] enable NestJS shutdown hooks --- packages/backend/src/boot/common.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/backend/src/boot/common.ts b/packages/backend/src/boot/common.ts index 2f97980e9a..a10597c7a7 100644 --- a/packages/backend/src/boot/common.ts +++ b/packages/backend/src/boot/common.ts @@ -19,6 +19,7 @@ export async function server() { const app = await NestFactory.createApplicationContext(MainModule, { logger: new NestLogger(), }); + app.enableShutdownHooks(); const serverService = app.get(ServerService); await serverService.launch(); @@ -39,6 +40,7 @@ export async function jobQueue() { const jobQueue = await NestFactory.createApplicationContext(QueueProcessorModule, { logger: new NestLogger(), }); + jobQueue.enableShutdownHooks(); jobQueue.get(QueueProcessorService).start(); jobQueue.get(ChartManagementService).start(); From c79d66d48b62b5af42d5f6a7d7b986f7e1c572fe Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 22:16:06 -0400 Subject: [PATCH 2/6] fix disposal of ServerStatsService and QueueStatsService --- .../backend/src/daemons/QueueStatsService.ts | 106 +++++++++++++----- .../backend/src/daemons/ServerStatsService.ts | 49 ++++++-- 2 files changed, 117 insertions(+), 38 deletions(-) diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index ede104b9fe..9888ae3942 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -13,13 +13,32 @@ import type { Config } from '@/config.js'; import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import type { OnApplicationShutdown } from '@nestjs/common'; +export interface StatsEntry { + activeSincePrevTick: number, + active: number, + waiting: number, + delayed: number, +} + +export interface Stats { + deliver: StatsEntry, + inbox: StatsEntry, +} + const ev = new Xev(); const interval = 10000; @Injectable() export class QueueStatsService implements OnApplicationShutdown { - private intervalId: NodeJS.Timeout; + private intervalId?: NodeJS.Timeout; + private activeDeliverJobs = 0; + private activeInboxJobs = 0; + + private deliverQueueEvents?: Bull.QueueEvents; + private inboxQueueEvents?: Bull.QueueEvents; + + private log?: Stats[]; constructor( @Inject(DI.config) @@ -29,30 +48,39 @@ export class QueueStatsService implements OnApplicationShutdown { ) { } + @bindThis + private onDeliverActive() { + this.activeDeliverJobs++; + } + + @bindThis + private onInboxActive() { + this.activeInboxJobs++; + } + + @bindThis + private onRequestQueueStatsLog(x: { id: string, length?: number }) { + if (this.log) { + ev.emit(`queueStatsLog:${x.id}`, this.log.slice(0, x.length ?? 50)); + } + } + /** * Report queue stats regularly */ @bindThis - public start(): void { - const log = [] as any[]; + public async start() { + // Just in case start gets called repeatedly + await this.stop(); - ev.on('requestQueueStatsLog', x => { - ev.emit(`queueStatsLog:${x.id}`, log.slice(0, x.length ?? 50)); - }); + this.log = []; + ev.on('requestQueueStatsLog', this.onRequestQueueStatsLog); - let activeDeliverJobs = 0; - let activeInboxJobs = 0; + this.deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); + this.inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); - const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); - const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); - - deliverQueueEvents.on('active', () => { - activeDeliverJobs++; - }); - - inboxQueueEvents.on('active', () => { - activeInboxJobs++; - }); + this.deliverQueueEvents.on('active', this.onDeliverActive); + this.inboxQueueEvents.on('active', this.onInboxActive); const tick = async () => { const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts(); @@ -60,13 +88,13 @@ export class QueueStatsService implements OnApplicationShutdown { const stats = { deliver: { - activeSincePrevTick: activeDeliverJobs, + activeSincePrevTick: this.activeDeliverJobs, active: deliverJobCounts.active, waiting: deliverJobCounts.waiting, delayed: deliverJobCounts.delayed, }, inbox: { - activeSincePrevTick: activeInboxJobs, + activeSincePrevTick: this.activeInboxJobs, active: inboxJobCounts.active, waiting: inboxJobCounts.waiting, delayed: inboxJobCounts.delayed, @@ -75,11 +103,13 @@ export class QueueStatsService implements OnApplicationShutdown { ev.emit('queueStats', stats); - log.unshift(stats); - if (log.length > 200) log.pop(); + if (this.log) { + this.log.unshift(stats); + if (this.log.length > 200) this.log.pop(); + } - activeDeliverJobs = 0; - activeInboxJobs = 0; + this.activeDeliverJobs = 0; + this.activeInboxJobs = 0; }; tick(); @@ -88,12 +118,32 @@ export class QueueStatsService implements OnApplicationShutdown { } @bindThis - public dispose(): void { - clearInterval(this.intervalId); + public async stop() { + if (this.intervalId) { + clearInterval(this.intervalId); + } + + this.log = undefined; + ev.off('requestQueueStatsLog', this.onRequestQueueStatsLog); + + this.deliverQueueEvents?.off('active', this.onDeliverActive); + this.inboxQueueEvents?.off('active', this.onInboxActive); + + await this.deliverQueueEvents?.close(); + await this.inboxQueueEvents?.close(); + + this.activeDeliverJobs = 0; + this.activeInboxJobs = 0; } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async dispose() { + await this.stop(); + ev.dispose(); + } + + @bindThis + public async onApplicationShutdown(signal?: string | undefined) { + await this.dispose(); } } diff --git a/packages/backend/src/daemons/ServerStatsService.ts b/packages/backend/src/daemons/ServerStatsService.ts index 6e9d29dcbd..9fafa54b04 100644 --- a/packages/backend/src/daemons/ServerStatsService.ts +++ b/packages/backend/src/daemons/ServerStatsService.ts @@ -12,6 +12,22 @@ import type { OnApplicationShutdown } from '@nestjs/common'; import { MiMeta } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; +export interface Stats { + cpu: number, + mem: { + used: number, + active: number, + }, + net: { + rx: number, + tx: number, + }, + fs: { + r: number, + w: number, + }, +} + const ev = new Xev(); const interval = 2000; @@ -23,12 +39,19 @@ const round = (num: number) => Math.round(num * 10) / 10; export class ServerStatsService implements OnApplicationShutdown { private intervalId: NodeJS.Timeout | null = null; + private log: Stats[] = []; + constructor( @Inject(DI.meta) private meta: MiMeta, ) { } + @bindThis + private async onRequestStatsLog(x: { id: string, length: number }) { + ev.emit(`serverStatsLog:${x.id}`, this.log.slice(0, x.length)); + } + /** * Report server stats regularly */ @@ -36,11 +59,8 @@ export class ServerStatsService implements OnApplicationShutdown { public async start(): Promise { if (!this.meta.enableServerMachineStats) return; - const log = [] as any[]; - - ev.on('requestServerStatsLog', x => { - ev.emit(`serverStatsLog:${x.id}`, log.slice(0, x.length)); - }); + this.log = []; + ev.on('requestServerStatsLog', this.onRequestStatsLog); const tick = async () => { const cpu = await cpuUsage(); @@ -64,8 +84,8 @@ export class ServerStatsService implements OnApplicationShutdown { }, }; ev.emit('serverStats', stats); - log.unshift(stats); - if (log.length > 200) log.pop(); + this.log.unshift(stats); + if (this.log.length > 200) this.log.pop(); }; tick(); @@ -78,6 +98,11 @@ export class ServerStatsService implements OnApplicationShutdown { if (this.intervalId) { clearInterval(this.intervalId); } + + this.log = []; + ev.off('requestServerStatsLog', this.onRequestStatsLog); + + ev.dispose(); } @bindThis @@ -89,9 +114,13 @@ export class ServerStatsService implements OnApplicationShutdown { // CPU STAT function cpuUsage(): Promise { return new Promise((res, rej) => { - osUtils.cpuUsage((cpuUsage) => { - res(cpuUsage); - }); + try { + osUtils.cpuUsage((cpuUsage) => { + res(cpuUsage); + }); + } catch (err) { + rej(err); + } }); } From 4e609478f88207da8d0378c1ce587e43e6702e6e Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 21:39:09 -0400 Subject: [PATCH 3/6] add additional shutdown logging --- packages/backend/src/GlobalModule.ts | 6 ++++++ packages/backend/src/core/QueueModule.ts | 6 ++++++ packages/backend/src/core/chart/ChartManagementService.ts | 1 + packages/backend/src/queue/QueueProcessorService.ts | 2 ++ packages/backend/src/server/ServerService.ts | 5 +++++ 5 files changed, 20 insertions(+) diff --git a/packages/backend/src/GlobalModule.ts b/packages/backend/src/GlobalModule.ts index 90dbdaf2a6..988da44512 100644 --- a/packages/backend/src/GlobalModule.ts +++ b/packages/backend/src/GlobalModule.ts @@ -14,6 +14,7 @@ import { createPostgresDataSource } from './postgres.js'; import { RepositoryModule } from './models/RepositoryModule.js'; import { allSettled } from './misc/promise-tracker.js'; import { GlobalEvents } from './core/GlobalEventService.js'; +import Logger from './logger.js'; import type { Provider, OnApplicationShutdown } from '@nestjs/common'; const $config: Provider = { @@ -164,6 +165,8 @@ const $meta: Provider = { exports: [$config, $db, $meta, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, $redisForRateLimit, RepositoryModule], }) export class GlobalModule implements OnApplicationShutdown { + private readonly logger = new Logger('global'); + constructor( @Inject(DI.db) private db: DataSource, @Inject(DI.redis) private redisClient: Redis.Redis, @@ -176,8 +179,10 @@ export class GlobalModule implements OnApplicationShutdown { public async dispose(): Promise { // Wait for all potential DB queries + this.logger.info('Finalizing active promises...'); await allSettled(); // And then disconnect from DB + this.logger.info('Disconnected from data sources...'); await this.db.destroy(); this.redisClient.disconnect(); this.redisForPub.disconnect(); @@ -185,6 +190,7 @@ export class GlobalModule implements OnApplicationShutdown { this.redisForTimelines.disconnect(); this.redisForReactions.disconnect(); this.redisForRateLimit.disconnect(); + this.logger.info('Global module disposed.'); } async onApplicationShutdown(signal: string): Promise { diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 6dd48927c1..a48ffaab43 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -9,6 +9,7 @@ import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import { baseQueueOptions, QUEUE } from '@/queue/const.js'; import { allSettled } from '@/misc/promise-tracker.js'; +import Logger from '@/logger.js'; import { DeliverJobData, EndedPollNotificationJobData, @@ -120,6 +121,8 @@ const $scheduleNotePost: Provider = { ], }) export class QueueModule implements OnApplicationShutdown { + private readonly logger = new Logger('queue'); + constructor( @Inject('queue:system') public systemQueue: SystemQueue, @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, @@ -135,8 +138,10 @@ export class QueueModule implements OnApplicationShutdown { public async dispose(): Promise { // Wait for all potential queue jobs + this.logger.info('Finalizing active promises...'); await allSettled(); // And then close all queues + this.logger.info('Closing BullMQ queues...'); await Promise.all([ this.systemQueue.close(), this.endedPollNotificationQueue.close(), @@ -149,6 +154,7 @@ export class QueueModule implements OnApplicationShutdown { this.systemWebhookDeliverQueue.close(), this.scheduleNotePostQueue.close(), ]); + this.logger.info('Queue module disposed.'); } async onApplicationShutdown(signal: string): Promise { diff --git a/packages/backend/src/core/chart/ChartManagementService.ts b/packages/backend/src/core/chart/ChartManagementService.ts index 81495c8a6c..4f151ff73d 100644 --- a/packages/backend/src/core/chart/ChartManagementService.ts +++ b/packages/backend/src/core/chart/ChartManagementService.ts @@ -75,6 +75,7 @@ export class ChartManagementService implements OnApplicationShutdown { public async dispose(): Promise { clearInterval(this.saveIntervalId); if (process.env.NODE_ENV !== 'test') { + this.logger.info('Saving charts for shutdown...'); for (const chart of this.charts) { await chart.save(); } diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 4c1a6a1d9e..76a617f027 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -612,6 +612,8 @@ export class QueueProcessorService implements OnApplicationShutdown { @bindThis public async onApplicationShutdown(signal?: string | undefined): Promise { + this.logger.info('Stopping BullMQ workers...'); await this.stop(); + this.logger.info('Workers disposed.'); } } diff --git a/packages/backend/src/server/ServerService.ts b/packages/backend/src/server/ServerService.ts index 77b4519570..caa82d1ce8 100644 --- a/packages/backend/src/server/ServerService.ts +++ b/packages/backend/src/server/ServerService.ts @@ -309,8 +309,13 @@ export class ServerService implements OnApplicationShutdown { @bindThis public async dispose(): Promise { + this.logger.info('Disconnecting WebSocket clients...'); await this.streamingApiServerService.detach(); + + this.logger.info('Disconnecting HTTP clients....;'); await this.#fastify.close(); + + this.logger.info('Server disposed.'); } /** From 168a364162207c309ab94f30f0bb56646b13d8df Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 22:51:16 -0400 Subject: [PATCH 4/6] fix redis listener leak in StreamingApiServerService.ts --- .../backend/src/server/api/StreamingApiServerService.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index eaeaecb1c2..34d62e8f09 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -201,10 +201,12 @@ export class StreamingApiServerService { const globalEv = new EventEmitter(); - this.redisForSub.on('message', (_: string, data: string) => { + const onRedis = (_: string, data: string) => { const parsed = JSON.parse(data); globalEv.emit('message', parsed); - }); + }; + + this.redisForSub.on('message', onRedis); this.#wss.on('connection', async (connection: WebSocket.WebSocket, request: http.IncomingMessage, ctx: { stream: MainStreamConnection, @@ -235,6 +237,7 @@ export class StreamingApiServerService { connection.once('close', () => { ev.removeAllListeners(); stream.dispose(); + this.redisForSub.off('message', onRedis); globalEv.off('message', onRedisMessage); this.#connections.delete(connection); if (userUpdateIntervalId) clearInterval(userUpdateIntervalId); From 088fe15be5ff52b1bb595b7ca49cc550fceaf2a4 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 25 Jun 2025 22:51:32 -0400 Subject: [PATCH 5/6] improve shutdown logic somewhat --- .../server/api/StreamingApiServerService.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 34d62e8f09..35855ab9bf 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -227,6 +227,7 @@ export class StreamingApiServerService { this.#connections.set(connection, Date.now()); + // TODO use collapsed queue const userUpdateIntervalId = user ? setInterval(() => { this.usersService.updateLastActiveDate(user); }, 1000 * 60 * 5) : null; @@ -263,13 +264,24 @@ export class StreamingApiServerService { } @bindThis - public detach(): Promise { + public async detach(): Promise { if (this.#cleanConnectionsIntervalId) { clearInterval(this.#cleanConnectionsIntervalId); this.#cleanConnectionsIntervalId = null; } - return new Promise((resolve) => { - this.#wss.close(() => resolve()); + + for (const connection of this.#connections.keys()) { + connection.close(); + } + + this.#connections.clear(); + this.#connectionsByClient.clear(); + + await new Promise((resolve, reject) => { + this.#wss.close(err => { + if (err) reject(err); + else resolve(); + }); }); } } From d7b94e756d8a69501015b8725243fb1bf18c5356 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 26 Jun 2025 09:32:14 -0400 Subject: [PATCH 6/6] fix websockets not working --- .../server/api/StreamingApiServerService.ts | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 35855ab9bf..a4ddf0d4b2 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -4,7 +4,7 @@ */ import { EventEmitter } from 'events'; -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import * as Redis from 'ioredis'; import * as WebSocket from 'ws'; import proxyAddr from 'proxy-addr'; @@ -32,11 +32,12 @@ import type * as http from 'node:http'; const MAX_CONNECTIONS_PER_CLIENT = 32; @Injectable() -export class StreamingApiServerService { +export class StreamingApiServerService implements OnApplicationShutdown { #wss: WebSocket.WebSocketServer; #connections = new Map(); #connectionsByClient = new Map>(); // key: IP / user ID -> value: connection #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; + readonly #globalEv = new EventEmitter(); constructor( @Inject(DI.redisForSub) @@ -57,6 +58,14 @@ export class StreamingApiServerService { @Inject(DI.config) private config: Config, ) { + this.redisForSub.on('message', this.onRedis); + } + + @bindThis + onApplicationShutdown() { + this.redisForSub.off('message', this.onRedis); + this.#globalEv.removeAllListeners(); + // Other shutdown logic is handled by detach(), which gets called by ServerServer's own shutdown handler. } @bindThis @@ -69,6 +78,12 @@ export class StreamingApiServerService { return rateLimit.blocked; } + @bindThis + private onRedis(_: string, data: string) { + const parsed = JSON.parse(data); + this.#globalEv.emit('message', parsed); + } + @bindThis public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ @@ -199,15 +214,6 @@ export class StreamingApiServerService { }); }); - const globalEv = new EventEmitter(); - - const onRedis = (_: string, data: string) => { - const parsed = JSON.parse(data); - globalEv.emit('message', parsed); - }; - - this.redisForSub.on('message', onRedis); - this.#wss.on('connection', async (connection: WebSocket.WebSocket, request: http.IncomingMessage, ctx: { stream: MainStreamConnection, user: MiLocalUser | null; @@ -221,7 +227,7 @@ export class StreamingApiServerService { ev.emit(data.channel, data.message); } - globalEv.on('message', onRedisMessage); + this.#globalEv.on('message', onRedisMessage); await stream.listen(ev, connection); @@ -238,8 +244,7 @@ export class StreamingApiServerService { connection.once('close', () => { ev.removeAllListeners(); stream.dispose(); - this.redisForSub.off('message', onRedis); - globalEv.off('message', onRedisMessage); + this.#globalEv.off('message', onRedisMessage); this.#connections.delete(connection); if (userUpdateIntervalId) clearInterval(userUpdateIntervalId); });