merge: Better error handling in WebSocket (!1224)

View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1224

Approved-by: dakkar <dakkar@thenautilus.net>
Approved-by: Marie <github@yuugi.dev>
This commit is contained in:
Hazelnoot 2025-09-13 14:32:45 -04:00
commit aee99a4000
2 changed files with 30 additions and 8 deletions

View file

@ -105,6 +105,10 @@ export class StreamingApiServerService implements OnApplicationShutdown {
perMessageDeflate: this.config.websocketCompression,
});
// ws library will kill the process if we don't catch unhandled exceptions.
// https://github.com/websockets/ws/issues/1354#issuecomment-1343117738
this.#wss.on('error', this.onWsError);
server.on('upgrade', async (request, socket, head) => {
if (request.url == null) {
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n');
@ -224,19 +228,23 @@ export class StreamingApiServerService implements OnApplicationShutdown {
stream.dispose();
});
ws.once('error', (e) => {
this.#logger.error(`Unhandled error in Streaming Api: ${renderInlineError(e)}`);
ws.terminate();
});
if (dieInstantly !== null) {
ws.close(...dieInstantly);
return;
}
// Special handler to hard-terminate the connection if it fails during initialization.
// Disconnect immediately after because the connection() handler below defines its own error handler.
const onWsInitError = (error: unknown) => {
this.onWsError(error);
ws.terminate();
};
ws.on('error', onWsInitError);
this.#wss.emit('connection', ws, request, {
stream, user, app,
});
ws.off('error', onWsInitError);
});
});
@ -266,8 +274,13 @@ export class StreamingApiServerService implements OnApplicationShutdown {
if (user) {
this.usersService.updateLastActiveDate(user);
}
const pong = () => {
this.#connections.set(connection, Date.now());
};
connection.once('close', () => {
connection.off('error', this.onWsError);
connection.off('pong', pong);
ev.removeAllListeners();
stream.dispose();
this.#globalEv.off('message', onRedisMessage);
@ -275,9 +288,8 @@ export class StreamingApiServerService implements OnApplicationShutdown {
if (userUpdateIntervalId) clearInterval(userUpdateIntervalId);
});
connection.on('pong', () => {
this.#connections.set(connection, Date.now());
});
connection.on('error', this.onWsError);
connection.on('pong', pong);
});
// 一定期間通信が無いコネクションは実際には切断されている可能性があるため定期的にterminateする
@ -314,5 +326,14 @@ export class StreamingApiServerService implements OnApplicationShutdown {
else resolve();
});
});
// Don't disconnect this until *after* close returns
this.#wss.off('error', this.onWsError);
}
@bindThis
private async onWsError(error: unknown) {
this.#logger.error(`Unhandled error in streaming api: ${renderInlineError(error)}`);
this.#logger.debug('Error details:', { error });
}
}

View file

@ -382,6 +382,7 @@ export default class Connection {
for (const k of this.subscribingNotes.keys()) {
this.subscriber.off(`noteStream:${k}`, this.onNoteStreamMessage);
}
this.wsConnection.off('message', this.onWsConnectionMessage);
this.fetchIntervalId = null;
this.channels.clear();