merge: Enable NestJS shutdown hooks for clean exit (!1145)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1145 Approved-by: dakkar <dakkar@thenautilus.net> Approved-by: Marie <github@yuugi.dev>
This commit is contained in:
commit
be1063238f
9 changed files with 173 additions and 52 deletions
|
|
@ -14,6 +14,7 @@ import { createPostgresDataSource } from './postgres.js';
|
|||
import { RepositoryModule } from './models/RepositoryModule.js';
|
||||
import { allSettled } from './misc/promise-tracker.js';
|
||||
import { GlobalEvents } from './core/GlobalEventService.js';
|
||||
import Logger from './logger.js';
|
||||
import type { Provider, OnApplicationShutdown } from '@nestjs/common';
|
||||
|
||||
const $config: Provider = {
|
||||
|
|
@ -169,6 +170,8 @@ const $meta: Provider = {
|
|||
exports: [$config, $db, $meta, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, $redisForRateLimit, RepositoryModule],
|
||||
})
|
||||
export class GlobalModule implements OnApplicationShutdown {
|
||||
private readonly logger = new Logger('global');
|
||||
|
||||
constructor(
|
||||
@Inject(DI.db) private db: DataSource,
|
||||
@Inject(DI.redis) private redisClient: Redis.Redis,
|
||||
|
|
@ -181,8 +184,10 @@ export class GlobalModule implements OnApplicationShutdown {
|
|||
|
||||
public async dispose(): Promise<void> {
|
||||
// Wait for all potential DB queries
|
||||
this.logger.info('Finalizing active promises...');
|
||||
await allSettled();
|
||||
// And then disconnect from DB
|
||||
this.logger.info('Disconnected from data sources...');
|
||||
await this.db.destroy();
|
||||
this.redisClient.disconnect();
|
||||
this.redisForPub.disconnect();
|
||||
|
|
@ -190,6 +195,7 @@ export class GlobalModule implements OnApplicationShutdown {
|
|||
this.redisForTimelines.disconnect();
|
||||
this.redisForReactions.disconnect();
|
||||
this.redisForRateLimit.disconnect();
|
||||
this.logger.info('Global module disposed.');
|
||||
}
|
||||
|
||||
async onApplicationShutdown(signal: string): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ export async function server() {
|
|||
const app = await NestFactory.createApplicationContext(MainModule, {
|
||||
logger: new NestLogger(),
|
||||
});
|
||||
app.enableShutdownHooks();
|
||||
|
||||
const serverService = app.get(ServerService);
|
||||
await serverService.launch();
|
||||
|
|
@ -39,6 +40,7 @@ export async function jobQueue() {
|
|||
const jobQueue = await NestFactory.createApplicationContext(QueueProcessorModule, {
|
||||
logger: new NestLogger(),
|
||||
});
|
||||
jobQueue.enableShutdownHooks();
|
||||
|
||||
jobQueue.get(QueueProcessorService).start();
|
||||
jobQueue.get(ChartManagementService).start();
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import { DI } from '@/di-symbols.js';
|
|||
import type { Config } from '@/config.js';
|
||||
import { baseQueueOptions, QUEUE } from '@/queue/const.js';
|
||||
import { allSettled } from '@/misc/promise-tracker.js';
|
||||
import Logger from '@/logger.js';
|
||||
import {
|
||||
DeliverJobData,
|
||||
EndedPollNotificationJobData,
|
||||
|
|
@ -120,6 +121,8 @@ const $scheduleNotePost: Provider = {
|
|||
],
|
||||
})
|
||||
export class QueueModule implements OnApplicationShutdown {
|
||||
private readonly logger = new Logger('queue');
|
||||
|
||||
constructor(
|
||||
@Inject('queue:system') public systemQueue: SystemQueue,
|
||||
@Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue,
|
||||
|
|
@ -135,8 +138,10 @@ export class QueueModule implements OnApplicationShutdown {
|
|||
|
||||
public async dispose(): Promise<void> {
|
||||
// Wait for all potential queue jobs
|
||||
this.logger.info('Finalizing active promises...');
|
||||
await allSettled();
|
||||
// And then close all queues
|
||||
this.logger.info('Closing BullMQ queues...');
|
||||
await Promise.all([
|
||||
this.systemQueue.close(),
|
||||
this.endedPollNotificationQueue.close(),
|
||||
|
|
@ -149,6 +154,7 @@ export class QueueModule implements OnApplicationShutdown {
|
|||
this.systemWebhookDeliverQueue.close(),
|
||||
this.scheduleNotePostQueue.close(),
|
||||
]);
|
||||
this.logger.info('Queue module disposed.');
|
||||
}
|
||||
|
||||
async onApplicationShutdown(signal: string): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ export class ChartManagementService implements OnApplicationShutdown {
|
|||
public async dispose(): Promise<void> {
|
||||
clearInterval(this.saveIntervalId);
|
||||
if (process.env.NODE_ENV !== 'test') {
|
||||
this.logger.info('Saving charts for shutdown...');
|
||||
for (const chart of this.charts) {
|
||||
await chart.save();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,13 +13,32 @@ import type { Config } from '@/config.js';
|
|||
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
|
||||
import type { OnApplicationShutdown } from '@nestjs/common';
|
||||
|
||||
export interface StatsEntry {
|
||||
activeSincePrevTick: number,
|
||||
active: number,
|
||||
waiting: number,
|
||||
delayed: number,
|
||||
}
|
||||
|
||||
export interface Stats {
|
||||
deliver: StatsEntry,
|
||||
inbox: StatsEntry,
|
||||
}
|
||||
|
||||
const ev = new Xev();
|
||||
|
||||
const interval = 10000;
|
||||
|
||||
@Injectable()
|
||||
export class QueueStatsService implements OnApplicationShutdown {
|
||||
private intervalId: NodeJS.Timeout;
|
||||
private intervalId?: NodeJS.Timeout;
|
||||
private activeDeliverJobs = 0;
|
||||
private activeInboxJobs = 0;
|
||||
|
||||
private deliverQueueEvents?: Bull.QueueEvents;
|
||||
private inboxQueueEvents?: Bull.QueueEvents;
|
||||
|
||||
private log?: Stats[];
|
||||
|
||||
constructor(
|
||||
@Inject(DI.config)
|
||||
|
|
@ -29,30 +48,39 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
) {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private onDeliverActive() {
|
||||
this.activeDeliverJobs++;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private onInboxActive() {
|
||||
this.activeInboxJobs++;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private onRequestQueueStatsLog(x: { id: string, length?: number }) {
|
||||
if (this.log) {
|
||||
ev.emit(`queueStatsLog:${x.id}`, this.log.slice(0, x.length ?? 50));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Report queue stats regularly
|
||||
*/
|
||||
@bindThis
|
||||
public start(): void {
|
||||
const log = [] as any[];
|
||||
public async start() {
|
||||
// Just in case start gets called repeatedly
|
||||
await this.stop();
|
||||
|
||||
ev.on('requestQueueStatsLog', x => {
|
||||
ev.emit(`queueStatsLog:${x.id}`, log.slice(0, x.length ?? 50));
|
||||
});
|
||||
this.log = [];
|
||||
ev.on('requestQueueStatsLog', this.onRequestQueueStatsLog);
|
||||
|
||||
let activeDeliverJobs = 0;
|
||||
let activeInboxJobs = 0;
|
||||
this.deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
|
||||
this.inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));
|
||||
|
||||
const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
|
||||
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));
|
||||
|
||||
deliverQueueEvents.on('active', () => {
|
||||
activeDeliverJobs++;
|
||||
});
|
||||
|
||||
inboxQueueEvents.on('active', () => {
|
||||
activeInboxJobs++;
|
||||
});
|
||||
this.deliverQueueEvents.on('active', this.onDeliverActive);
|
||||
this.inboxQueueEvents.on('active', this.onInboxActive);
|
||||
|
||||
const tick = async () => {
|
||||
const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts();
|
||||
|
|
@ -60,13 +88,13 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
|
||||
const stats = {
|
||||
deliver: {
|
||||
activeSincePrevTick: activeDeliverJobs,
|
||||
activeSincePrevTick: this.activeDeliverJobs,
|
||||
active: deliverJobCounts.active,
|
||||
waiting: deliverJobCounts.waiting,
|
||||
delayed: deliverJobCounts.delayed,
|
||||
},
|
||||
inbox: {
|
||||
activeSincePrevTick: activeInboxJobs,
|
||||
activeSincePrevTick: this.activeInboxJobs,
|
||||
active: inboxJobCounts.active,
|
||||
waiting: inboxJobCounts.waiting,
|
||||
delayed: inboxJobCounts.delayed,
|
||||
|
|
@ -75,11 +103,13 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
|
||||
ev.emit('queueStats', stats);
|
||||
|
||||
log.unshift(stats);
|
||||
if (log.length > 200) log.pop();
|
||||
if (this.log) {
|
||||
this.log.unshift(stats);
|
||||
if (this.log.length > 200) this.log.pop();
|
||||
}
|
||||
|
||||
activeDeliverJobs = 0;
|
||||
activeInboxJobs = 0;
|
||||
this.activeDeliverJobs = 0;
|
||||
this.activeInboxJobs = 0;
|
||||
};
|
||||
|
||||
tick();
|
||||
|
|
@ -88,12 +118,32 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public dispose(): void {
|
||||
clearInterval(this.intervalId);
|
||||
public async stop() {
|
||||
if (this.intervalId) {
|
||||
clearInterval(this.intervalId);
|
||||
}
|
||||
|
||||
this.log = undefined;
|
||||
ev.off('requestQueueStatsLog', this.onRequestQueueStatsLog);
|
||||
|
||||
this.deliverQueueEvents?.off('active', this.onDeliverActive);
|
||||
this.inboxQueueEvents?.off('active', this.onInboxActive);
|
||||
|
||||
await this.deliverQueueEvents?.close();
|
||||
await this.inboxQueueEvents?.close();
|
||||
|
||||
this.activeDeliverJobs = 0;
|
||||
this.activeInboxJobs = 0;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public onApplicationShutdown(signal?: string | undefined): void {
|
||||
this.dispose();
|
||||
public async dispose() {
|
||||
await this.stop();
|
||||
ev.dispose();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async onApplicationShutdown(signal?: string | undefined) {
|
||||
await this.dispose();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,22 @@ import type { OnApplicationShutdown } from '@nestjs/common';
|
|||
import { MiMeta } from '@/models/_.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
|
||||
export interface Stats {
|
||||
cpu: number,
|
||||
mem: {
|
||||
used: number,
|
||||
active: number,
|
||||
},
|
||||
net: {
|
||||
rx: number,
|
||||
tx: number,
|
||||
},
|
||||
fs: {
|
||||
r: number,
|
||||
w: number,
|
||||
},
|
||||
}
|
||||
|
||||
const ev = new Xev();
|
||||
|
||||
const interval = 2000;
|
||||
|
|
@ -23,12 +39,19 @@ const round = (num: number) => Math.round(num * 10) / 10;
|
|||
export class ServerStatsService implements OnApplicationShutdown {
|
||||
private intervalId: NodeJS.Timeout | null = null;
|
||||
|
||||
private log: Stats[] = [];
|
||||
|
||||
constructor(
|
||||
@Inject(DI.meta)
|
||||
private meta: MiMeta,
|
||||
) {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async onRequestStatsLog(x: { id: string, length: number }) {
|
||||
ev.emit(`serverStatsLog:${x.id}`, this.log.slice(0, x.length));
|
||||
}
|
||||
|
||||
/**
|
||||
* Report server stats regularly
|
||||
*/
|
||||
|
|
@ -36,11 +59,8 @@ export class ServerStatsService implements OnApplicationShutdown {
|
|||
public async start(): Promise<void> {
|
||||
if (!this.meta.enableServerMachineStats) return;
|
||||
|
||||
const log = [] as any[];
|
||||
|
||||
ev.on('requestServerStatsLog', x => {
|
||||
ev.emit(`serverStatsLog:${x.id}`, log.slice(0, x.length));
|
||||
});
|
||||
this.log = [];
|
||||
ev.on('requestServerStatsLog', this.onRequestStatsLog);
|
||||
|
||||
const tick = async () => {
|
||||
const cpu = await cpuUsage();
|
||||
|
|
@ -64,8 +84,8 @@ export class ServerStatsService implements OnApplicationShutdown {
|
|||
},
|
||||
};
|
||||
ev.emit('serverStats', stats);
|
||||
log.unshift(stats);
|
||||
if (log.length > 200) log.pop();
|
||||
this.log.unshift(stats);
|
||||
if (this.log.length > 200) this.log.pop();
|
||||
};
|
||||
|
||||
tick();
|
||||
|
|
@ -78,6 +98,11 @@ export class ServerStatsService implements OnApplicationShutdown {
|
|||
if (this.intervalId) {
|
||||
clearInterval(this.intervalId);
|
||||
}
|
||||
|
||||
this.log = [];
|
||||
ev.off('requestServerStatsLog', this.onRequestStatsLog);
|
||||
|
||||
ev.dispose();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -89,9 +114,13 @@ export class ServerStatsService implements OnApplicationShutdown {
|
|||
// CPU STAT
|
||||
function cpuUsage(): Promise<number> {
|
||||
return new Promise((res, rej) => {
|
||||
osUtils.cpuUsage((cpuUsage) => {
|
||||
res(cpuUsage);
|
||||
});
|
||||
try {
|
||||
osUtils.cpuUsage((cpuUsage) => {
|
||||
res(cpuUsage);
|
||||
});
|
||||
} catch (err) {
|
||||
rej(err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -612,6 +612,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
|
||||
@bindThis
|
||||
public async onApplicationShutdown(signal?: string | undefined): Promise<void> {
|
||||
this.logger.info('Stopping BullMQ workers...');
|
||||
await this.stop();
|
||||
this.logger.info('Workers disposed.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -309,8 +309,13 @@ export class ServerService implements OnApplicationShutdown {
|
|||
|
||||
@bindThis
|
||||
public async dispose(): Promise<void> {
|
||||
this.logger.info('Disconnecting WebSocket clients...');
|
||||
await this.streamingApiServerService.detach();
|
||||
|
||||
this.logger.info('Disconnecting HTTP clients....;');
|
||||
await this.#fastify.close();
|
||||
|
||||
this.logger.info('Server disposed.');
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||
import * as Redis from 'ioredis';
|
||||
import * as WebSocket from 'ws';
|
||||
import proxyAddr from 'proxy-addr';
|
||||
|
|
@ -32,11 +32,12 @@ import type * as http from 'node:http';
|
|||
const MAX_CONNECTIONS_PER_CLIENT = 32;
|
||||
|
||||
@Injectable()
|
||||
export class StreamingApiServerService {
|
||||
export class StreamingApiServerService implements OnApplicationShutdown {
|
||||
#wss: WebSocket.WebSocketServer;
|
||||
#connections = new Map<WebSocket.WebSocket, number>();
|
||||
#connectionsByClient = new Map<string, Set<WebSocket.WebSocket>>(); // key: IP / user ID -> value: connection
|
||||
#cleanConnectionsIntervalId: NodeJS.Timeout | null = null;
|
||||
readonly #globalEv = new EventEmitter();
|
||||
|
||||
constructor(
|
||||
@Inject(DI.redisForSub)
|
||||
|
|
@ -67,6 +68,14 @@ export class StreamingApiServerService {
|
|||
@Inject(DI.config)
|
||||
private config: Config,
|
||||
) {
|
||||
this.redisForSub.on('message', this.onRedis);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
onApplicationShutdown() {
|
||||
this.redisForSub.off('message', this.onRedis);
|
||||
this.#globalEv.removeAllListeners();
|
||||
// Other shutdown logic is handled by detach(), which gets called by ServerServer's own shutdown handler.
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -79,6 +88,12 @@ export class StreamingApiServerService {
|
|||
return rateLimit.blocked;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private onRedis(_: string, data: string) {
|
||||
const parsed = JSON.parse(data);
|
||||
this.#globalEv.emit('message', parsed);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public attach(server: http.Server): void {
|
||||
this.#wss = new WebSocket.WebSocketServer({
|
||||
|
|
@ -213,13 +228,6 @@ export class StreamingApiServerService {
|
|||
});
|
||||
});
|
||||
|
||||
const globalEv = new EventEmitter();
|
||||
|
||||
this.redisForSub.on('message', (_: string, data: string) => {
|
||||
const parsed = JSON.parse(data);
|
||||
globalEv.emit('message', parsed);
|
||||
});
|
||||
|
||||
this.#wss.on('connection', async (connection: WebSocket.WebSocket, request: http.IncomingMessage, ctx: {
|
||||
stream: MainStreamConnection,
|
||||
user: MiLocalUser | null;
|
||||
|
|
@ -233,12 +241,13 @@ export class StreamingApiServerService {
|
|||
ev.emit(data.channel, data.message);
|
||||
}
|
||||
|
||||
globalEv.on('message', onRedisMessage);
|
||||
this.#globalEv.on('message', onRedisMessage);
|
||||
|
||||
await stream.listen(ev, connection);
|
||||
|
||||
this.#connections.set(connection, Date.now());
|
||||
|
||||
// TODO use collapsed queue
|
||||
const userUpdateIntervalId = user ? setInterval(() => {
|
||||
this.usersService.updateLastActiveDate(user);
|
||||
}, 1000 * 60 * 5) : null;
|
||||
|
|
@ -249,7 +258,7 @@ export class StreamingApiServerService {
|
|||
connection.once('close', () => {
|
||||
ev.removeAllListeners();
|
||||
stream.dispose();
|
||||
globalEv.off('message', onRedisMessage);
|
||||
this.#globalEv.off('message', onRedisMessage);
|
||||
this.#connections.delete(connection);
|
||||
if (userUpdateIntervalId) clearInterval(userUpdateIntervalId);
|
||||
});
|
||||
|
|
@ -274,13 +283,24 @@ export class StreamingApiServerService {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public detach(): Promise<void> {
|
||||
public async detach(): Promise<void> {
|
||||
if (this.#cleanConnectionsIntervalId) {
|
||||
clearInterval(this.#cleanConnectionsIntervalId);
|
||||
this.#cleanConnectionsIntervalId = null;
|
||||
}
|
||||
return new Promise((resolve) => {
|
||||
this.#wss.close(() => resolve());
|
||||
|
||||
for (const connection of this.#connections.keys()) {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
this.#connections.clear();
|
||||
this.#connectionsByClient.clear();
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
this.#wss.close(err => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue