diff --git a/packages/backend/src/GlobalModule.ts b/packages/backend/src/GlobalModule.ts index a7ddc9bb35..d4ff7b7f84 100644 --- a/packages/backend/src/GlobalModule.ts +++ b/packages/backend/src/GlobalModule.ts @@ -14,6 +14,7 @@ import { createPostgresDataSource } from './postgres.js'; import { RepositoryModule } from './models/RepositoryModule.js'; import { allSettled } from './misc/promise-tracker.js'; import { GlobalEvents } from './core/GlobalEventService.js'; +import Logger from './logger.js'; import type { Provider, OnApplicationShutdown } from '@nestjs/common'; const $config: Provider = { @@ -169,6 +170,8 @@ const $meta: Provider = { exports: [$config, $db, $meta, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, $redisForRateLimit, RepositoryModule], }) export class GlobalModule implements OnApplicationShutdown { + private readonly logger = new Logger('global'); + constructor( @Inject(DI.db) private db: DataSource, @Inject(DI.redis) private redisClient: Redis.Redis, @@ -181,8 +184,10 @@ export class GlobalModule implements OnApplicationShutdown { public async dispose(): Promise { // Wait for all potential DB queries + this.logger.info('Finalizing active promises...'); await allSettled(); // And then disconnect from DB + this.logger.info('Disconnected from data sources...'); await this.db.destroy(); this.redisClient.disconnect(); this.redisForPub.disconnect(); @@ -190,6 +195,7 @@ export class GlobalModule implements OnApplicationShutdown { this.redisForTimelines.disconnect(); this.redisForReactions.disconnect(); this.redisForRateLimit.disconnect(); + this.logger.info('Global module disposed.'); } async onApplicationShutdown(signal: string): Promise { diff --git a/packages/backend/src/boot/common.ts b/packages/backend/src/boot/common.ts index 2f97980e9a..a10597c7a7 100644 --- a/packages/backend/src/boot/common.ts +++ b/packages/backend/src/boot/common.ts @@ -19,6 +19,7 @@ export async function server() { const app = await NestFactory.createApplicationContext(MainModule, { logger: new NestLogger(), }); + app.enableShutdownHooks(); const serverService = app.get(ServerService); await serverService.launch(); @@ -39,6 +40,7 @@ export async function jobQueue() { const jobQueue = await NestFactory.createApplicationContext(QueueProcessorModule, { logger: new NestLogger(), }); + jobQueue.enableShutdownHooks(); jobQueue.get(QueueProcessorService).start(); jobQueue.get(ChartManagementService).start(); diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 6dd48927c1..a48ffaab43 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -9,6 +9,7 @@ import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import { baseQueueOptions, QUEUE } from '@/queue/const.js'; import { allSettled } from '@/misc/promise-tracker.js'; +import Logger from '@/logger.js'; import { DeliverJobData, EndedPollNotificationJobData, @@ -120,6 +121,8 @@ const $scheduleNotePost: Provider = { ], }) export class QueueModule implements OnApplicationShutdown { + private readonly logger = new Logger('queue'); + constructor( @Inject('queue:system') public systemQueue: SystemQueue, @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, @@ -135,8 +138,10 @@ export class QueueModule implements OnApplicationShutdown { public async dispose(): Promise { // Wait for all potential queue jobs + this.logger.info('Finalizing active promises...'); await allSettled(); // And then close all queues + this.logger.info('Closing BullMQ queues...'); await Promise.all([ this.systemQueue.close(), this.endedPollNotificationQueue.close(), @@ -149,6 +154,7 @@ export class QueueModule implements OnApplicationShutdown { this.systemWebhookDeliverQueue.close(), this.scheduleNotePostQueue.close(), ]); + this.logger.info('Queue module disposed.'); } async onApplicationShutdown(signal: string): Promise { diff --git a/packages/backend/src/core/chart/ChartManagementService.ts b/packages/backend/src/core/chart/ChartManagementService.ts index 81495c8a6c..4f151ff73d 100644 --- a/packages/backend/src/core/chart/ChartManagementService.ts +++ b/packages/backend/src/core/chart/ChartManagementService.ts @@ -75,6 +75,7 @@ export class ChartManagementService implements OnApplicationShutdown { public async dispose(): Promise { clearInterval(this.saveIntervalId); if (process.env.NODE_ENV !== 'test') { + this.logger.info('Saving charts for shutdown...'); for (const chart of this.charts) { await chart.save(); } diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index ede104b9fe..9888ae3942 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -13,13 +13,32 @@ import type { Config } from '@/config.js'; import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import type { OnApplicationShutdown } from '@nestjs/common'; +export interface StatsEntry { + activeSincePrevTick: number, + active: number, + waiting: number, + delayed: number, +} + +export interface Stats { + deliver: StatsEntry, + inbox: StatsEntry, +} + const ev = new Xev(); const interval = 10000; @Injectable() export class QueueStatsService implements OnApplicationShutdown { - private intervalId: NodeJS.Timeout; + private intervalId?: NodeJS.Timeout; + private activeDeliverJobs = 0; + private activeInboxJobs = 0; + + private deliverQueueEvents?: Bull.QueueEvents; + private inboxQueueEvents?: Bull.QueueEvents; + + private log?: Stats[]; constructor( @Inject(DI.config) @@ -29,30 +48,39 @@ export class QueueStatsService implements OnApplicationShutdown { ) { } + @bindThis + private onDeliverActive() { + this.activeDeliverJobs++; + } + + @bindThis + private onInboxActive() { + this.activeInboxJobs++; + } + + @bindThis + private onRequestQueueStatsLog(x: { id: string, length?: number }) { + if (this.log) { + ev.emit(`queueStatsLog:${x.id}`, this.log.slice(0, x.length ?? 50)); + } + } + /** * Report queue stats regularly */ @bindThis - public start(): void { - const log = [] as any[]; + public async start() { + // Just in case start gets called repeatedly + await this.stop(); - ev.on('requestQueueStatsLog', x => { - ev.emit(`queueStatsLog:${x.id}`, log.slice(0, x.length ?? 50)); - }); + this.log = []; + ev.on('requestQueueStatsLog', this.onRequestQueueStatsLog); - let activeDeliverJobs = 0; - let activeInboxJobs = 0; + this.deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); + this.inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); - const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); - const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); - - deliverQueueEvents.on('active', () => { - activeDeliverJobs++; - }); - - inboxQueueEvents.on('active', () => { - activeInboxJobs++; - }); + this.deliverQueueEvents.on('active', this.onDeliverActive); + this.inboxQueueEvents.on('active', this.onInboxActive); const tick = async () => { const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts(); @@ -60,13 +88,13 @@ export class QueueStatsService implements OnApplicationShutdown { const stats = { deliver: { - activeSincePrevTick: activeDeliverJobs, + activeSincePrevTick: this.activeDeliverJobs, active: deliverJobCounts.active, waiting: deliverJobCounts.waiting, delayed: deliverJobCounts.delayed, }, inbox: { - activeSincePrevTick: activeInboxJobs, + activeSincePrevTick: this.activeInboxJobs, active: inboxJobCounts.active, waiting: inboxJobCounts.waiting, delayed: inboxJobCounts.delayed, @@ -75,11 +103,13 @@ export class QueueStatsService implements OnApplicationShutdown { ev.emit('queueStats', stats); - log.unshift(stats); - if (log.length > 200) log.pop(); + if (this.log) { + this.log.unshift(stats); + if (this.log.length > 200) this.log.pop(); + } - activeDeliverJobs = 0; - activeInboxJobs = 0; + this.activeDeliverJobs = 0; + this.activeInboxJobs = 0; }; tick(); @@ -88,12 +118,32 @@ export class QueueStatsService implements OnApplicationShutdown { } @bindThis - public dispose(): void { - clearInterval(this.intervalId); + public async stop() { + if (this.intervalId) { + clearInterval(this.intervalId); + } + + this.log = undefined; + ev.off('requestQueueStatsLog', this.onRequestQueueStatsLog); + + this.deliverQueueEvents?.off('active', this.onDeliverActive); + this.inboxQueueEvents?.off('active', this.onInboxActive); + + await this.deliverQueueEvents?.close(); + await this.inboxQueueEvents?.close(); + + this.activeDeliverJobs = 0; + this.activeInboxJobs = 0; } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async dispose() { + await this.stop(); + ev.dispose(); + } + + @bindThis + public async onApplicationShutdown(signal?: string | undefined) { + await this.dispose(); } } diff --git a/packages/backend/src/daemons/ServerStatsService.ts b/packages/backend/src/daemons/ServerStatsService.ts index 6e9d29dcbd..9fafa54b04 100644 --- a/packages/backend/src/daemons/ServerStatsService.ts +++ b/packages/backend/src/daemons/ServerStatsService.ts @@ -12,6 +12,22 @@ import type { OnApplicationShutdown } from '@nestjs/common'; import { MiMeta } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; +export interface Stats { + cpu: number, + mem: { + used: number, + active: number, + }, + net: { + rx: number, + tx: number, + }, + fs: { + r: number, + w: number, + }, +} + const ev = new Xev(); const interval = 2000; @@ -23,12 +39,19 @@ const round = (num: number) => Math.round(num * 10) / 10; export class ServerStatsService implements OnApplicationShutdown { private intervalId: NodeJS.Timeout | null = null; + private log: Stats[] = []; + constructor( @Inject(DI.meta) private meta: MiMeta, ) { } + @bindThis + private async onRequestStatsLog(x: { id: string, length: number }) { + ev.emit(`serverStatsLog:${x.id}`, this.log.slice(0, x.length)); + } + /** * Report server stats regularly */ @@ -36,11 +59,8 @@ export class ServerStatsService implements OnApplicationShutdown { public async start(): Promise { if (!this.meta.enableServerMachineStats) return; - const log = [] as any[]; - - ev.on('requestServerStatsLog', x => { - ev.emit(`serverStatsLog:${x.id}`, log.slice(0, x.length)); - }); + this.log = []; + ev.on('requestServerStatsLog', this.onRequestStatsLog); const tick = async () => { const cpu = await cpuUsage(); @@ -64,8 +84,8 @@ export class ServerStatsService implements OnApplicationShutdown { }, }; ev.emit('serverStats', stats); - log.unshift(stats); - if (log.length > 200) log.pop(); + this.log.unshift(stats); + if (this.log.length > 200) this.log.pop(); }; tick(); @@ -78,6 +98,11 @@ export class ServerStatsService implements OnApplicationShutdown { if (this.intervalId) { clearInterval(this.intervalId); } + + this.log = []; + ev.off('requestServerStatsLog', this.onRequestStatsLog); + + ev.dispose(); } @bindThis @@ -89,9 +114,13 @@ export class ServerStatsService implements OnApplicationShutdown { // CPU STAT function cpuUsage(): Promise { return new Promise((res, rej) => { - osUtils.cpuUsage((cpuUsage) => { - res(cpuUsage); - }); + try { + osUtils.cpuUsage((cpuUsage) => { + res(cpuUsage); + }); + } catch (err) { + rej(err); + } }); } diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 4c1a6a1d9e..76a617f027 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -612,6 +612,8 @@ export class QueueProcessorService implements OnApplicationShutdown { @bindThis public async onApplicationShutdown(signal?: string | undefined): Promise { + this.logger.info('Stopping BullMQ workers...'); await this.stop(); + this.logger.info('Workers disposed.'); } } diff --git a/packages/backend/src/server/ServerService.ts b/packages/backend/src/server/ServerService.ts index 77b4519570..caa82d1ce8 100644 --- a/packages/backend/src/server/ServerService.ts +++ b/packages/backend/src/server/ServerService.ts @@ -309,8 +309,13 @@ export class ServerService implements OnApplicationShutdown { @bindThis public async dispose(): Promise { + this.logger.info('Disconnecting WebSocket clients...'); await this.streamingApiServerService.detach(); + + this.logger.info('Disconnecting HTTP clients....;'); await this.#fastify.close(); + + this.logger.info('Server disposed.'); } /** diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index de24a32eed..a3ac8f5447 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -4,7 +4,7 @@ */ import { EventEmitter } from 'events'; -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import * as Redis from 'ioredis'; import * as WebSocket from 'ws'; import proxyAddr from 'proxy-addr'; @@ -32,11 +32,12 @@ import type * as http from 'node:http'; const MAX_CONNECTIONS_PER_CLIENT = 32; @Injectable() -export class StreamingApiServerService { +export class StreamingApiServerService implements OnApplicationShutdown { #wss: WebSocket.WebSocketServer; #connections = new Map(); #connectionsByClient = new Map>(); // key: IP / user ID -> value: connection #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; + readonly #globalEv = new EventEmitter(); constructor( @Inject(DI.redisForSub) @@ -67,6 +68,14 @@ export class StreamingApiServerService { @Inject(DI.config) private config: Config, ) { + this.redisForSub.on('message', this.onRedis); + } + + @bindThis + onApplicationShutdown() { + this.redisForSub.off('message', this.onRedis); + this.#globalEv.removeAllListeners(); + // Other shutdown logic is handled by detach(), which gets called by ServerServer's own shutdown handler. } @bindThis @@ -79,6 +88,12 @@ export class StreamingApiServerService { return rateLimit.blocked; } + @bindThis + private onRedis(_: string, data: string) { + const parsed = JSON.parse(data); + this.#globalEv.emit('message', parsed); + } + @bindThis public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ @@ -213,13 +228,6 @@ export class StreamingApiServerService { }); }); - const globalEv = new EventEmitter(); - - this.redisForSub.on('message', (_: string, data: string) => { - const parsed = JSON.parse(data); - globalEv.emit('message', parsed); - }); - this.#wss.on('connection', async (connection: WebSocket.WebSocket, request: http.IncomingMessage, ctx: { stream: MainStreamConnection, user: MiLocalUser | null; @@ -233,12 +241,13 @@ export class StreamingApiServerService { ev.emit(data.channel, data.message); } - globalEv.on('message', onRedisMessage); + this.#globalEv.on('message', onRedisMessage); await stream.listen(ev, connection); this.#connections.set(connection, Date.now()); + // TODO use collapsed queue const userUpdateIntervalId = user ? setInterval(() => { this.usersService.updateLastActiveDate(user); }, 1000 * 60 * 5) : null; @@ -249,7 +258,7 @@ export class StreamingApiServerService { connection.once('close', () => { ev.removeAllListeners(); stream.dispose(); - globalEv.off('message', onRedisMessage); + this.#globalEv.off('message', onRedisMessage); this.#connections.delete(connection); if (userUpdateIntervalId) clearInterval(userUpdateIntervalId); }); @@ -274,13 +283,24 @@ export class StreamingApiServerService { } @bindThis - public detach(): Promise { + public async detach(): Promise { if (this.#cleanConnectionsIntervalId) { clearInterval(this.#cleanConnectionsIntervalId); this.#cleanConnectionsIntervalId = null; } - return new Promise((resolve) => { - this.#wss.close(() => resolve()); + + for (const connection of this.#connections.keys()) { + connection.close(); + } + + this.#connections.clear(); + this.#connectionsByClient.clear(); + + await new Promise((resolve, reject) => { + this.#wss.close(err => { + if (err) reject(err); + else resolve(); + }); }); } }