diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 783f6af34e..42c0ef58f2 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -105,6 +105,10 @@ export class StreamingApiServerService implements OnApplicationShutdown { perMessageDeflate: this.config.websocketCompression, }); + // ws library will kill the process if we don't catch unhandled exceptions. + // https://github.com/websockets/ws/issues/1354#issuecomment-1343117738 + this.#wss.on('error', this.onWsError); + server.on('upgrade', async (request, socket, head) => { if (request.url == null) { socket.write('HTTP/1.1 400 Bad Request\r\n\r\n'); @@ -224,19 +228,23 @@ export class StreamingApiServerService implements OnApplicationShutdown { stream.dispose(); }); - ws.once('error', (e) => { - this.#logger.error(`Unhandled error in Streaming Api: ${renderInlineError(e)}`); - ws.terminate(); - }); - if (dieInstantly !== null) { ws.close(...dieInstantly); return; } + // Special handler to hard-terminate the connection if it fails during initialization. + // Disconnect immediately after because the connection() handler below defines its own error handler. + const onWsInitError = (error: unknown) => { + this.onWsError(error); + ws.terminate(); + }; + + ws.on('error', onWsInitError); this.#wss.emit('connection', ws, request, { stream, user, app, }); + ws.off('error', onWsInitError); }); }); @@ -266,8 +274,13 @@ export class StreamingApiServerService implements OnApplicationShutdown { if (user) { this.usersService.updateLastActiveDate(user); } + const pong = () => { + this.#connections.set(connection, Date.now()); + }; connection.once('close', () => { + connection.off('error', this.onWsError); + connection.off('pong', pong); ev.removeAllListeners(); stream.dispose(); this.#globalEv.off('message', onRedisMessage); @@ -275,9 +288,8 @@ export class StreamingApiServerService implements OnApplicationShutdown { if (userUpdateIntervalId) clearInterval(userUpdateIntervalId); }); - connection.on('pong', () => { - this.#connections.set(connection, Date.now()); - }); + connection.on('error', this.onWsError); + connection.on('pong', pong); }); // 一定期間通信が無いコネクションは実際には切断されている可能性があるため定期的にterminateする @@ -314,5 +326,14 @@ export class StreamingApiServerService implements OnApplicationShutdown { else resolve(); }); }); + + // Don't disconnect this until *after* close returns + this.#wss.off('error', this.onWsError); + } + + @bindThis + private async onWsError(error: unknown) { + this.#logger.error(`Unhandled error in streaming api: ${renderInlineError(error)}`); + this.#logger.debug('Error details:', { error }); } } diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index 8d8f211af3..7c73014012 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -382,6 +382,7 @@ export default class Connection { for (const k of this.subscribingNotes.keys()) { this.subscriber.off(`noteStream:${k}`, this.onNoteStreamMessage); } + this.wsConnection.off('message', this.onWsConnectionMessage); this.fetchIntervalId = null; this.channels.clear();