use TimeService everywhere in the backend
This commit is contained in:
parent
ed750fd990
commit
6cceca90f9
123 changed files with 550 additions and 285 deletions
|
|
@ -24,6 +24,7 @@ import { LoggerService } from '@/core/LoggerService.js';
|
|||
import type Logger from '@/logger.js';
|
||||
import { SkRateLimiterService } from '@/server/SkRateLimiterService.js';
|
||||
import { QueryService } from '@/core/QueryService.js';
|
||||
import { TimeService, type TimerHandle } from '@/core/TimeService.js';
|
||||
import { AuthenticateService, AuthenticationError } from './AuthenticateService.js';
|
||||
import MainStreamConnection from './stream/Connection.js';
|
||||
import { ChannelsService } from './stream/ChannelsService.js';
|
||||
|
|
@ -38,7 +39,7 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
#wss?: WebSocket.WebSocketServer;
|
||||
#connections = new Map<WebSocket.WebSocket, number>();
|
||||
#connectionsByClient = new Map<string, Set<WebSocket.WebSocket>>(); // key: IP / user ID -> value: connection
|
||||
#cleanConnectionsIntervalId: NodeJS.Timeout | null = null;
|
||||
#cleanConnectionsIntervalId: TimerHandle | null = null;
|
||||
readonly #globalEv = new EventEmitter();
|
||||
#logger: Logger;
|
||||
|
||||
|
|
@ -58,7 +59,9 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
@Inject(DI.noteFavoritesRepository)
|
||||
private readonly noteFavoritesRepository: NoteFavoritesRepository,
|
||||
|
||||
private readonly queryService: QueryService,
|
||||
@Inject(DI.config)
|
||||
private config: Config,
|
||||
|
||||
private cacheService: CacheService,
|
||||
private authenticateService: AuthenticateService,
|
||||
private channelsService: ChannelsService,
|
||||
|
|
@ -67,9 +70,8 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
private channelFollowingService: ChannelFollowingService,
|
||||
private rateLimiterService: SkRateLimiterService,
|
||||
private loggerService: LoggerService,
|
||||
|
||||
@Inject(DI.config)
|
||||
private config: Config,
|
||||
private readonly queryService: QueryService,
|
||||
private readonly timeService: TimeService,
|
||||
) {
|
||||
this.redisForSub.on('message', this.onRedis);
|
||||
this.#logger = loggerService.getLogger('streaming', 'coral');
|
||||
|
|
@ -205,6 +207,7 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
this.notificationService,
|
||||
this.cacheService,
|
||||
this.channelFollowingService,
|
||||
this.timeService,
|
||||
this.loggerService,
|
||||
user, app, requestIp,
|
||||
rateLimiter,
|
||||
|
|
@ -265,17 +268,17 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
|
||||
await stream.listen(ev, connection);
|
||||
|
||||
this.#connections.set(connection, Date.now());
|
||||
this.#connections.set(connection, this.timeService.now);
|
||||
|
||||
// TODO use collapsed queue
|
||||
const userUpdateIntervalId = user ? setInterval(() => {
|
||||
const userUpdateIntervalId = user ? this.timeService.startTimer(() => {
|
||||
this.usersService.updateLastActiveDate(user);
|
||||
}, 1000 * 60 * 5) : null;
|
||||
}, 1000 * 60 * 5, { repeated: true }) : null;
|
||||
if (user) {
|
||||
this.usersService.updateLastActiveDate(user);
|
||||
}
|
||||
const pong = () => {
|
||||
this.#connections.set(connection, Date.now());
|
||||
this.#connections.set(connection, this.timeService.now);
|
||||
};
|
||||
|
||||
connection.once('close', () => {
|
||||
|
|
@ -285,7 +288,7 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
stream.dispose();
|
||||
this.#globalEv.off('message', onRedisMessage);
|
||||
this.#connections.delete(connection);
|
||||
if (userUpdateIntervalId) clearInterval(userUpdateIntervalId);
|
||||
if (userUpdateIntervalId) this.timeService.stopTimer(userUpdateIntervalId);
|
||||
});
|
||||
|
||||
connection.on('error', this.onWsError);
|
||||
|
|
@ -293,8 +296,8 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
});
|
||||
|
||||
// 一定期間通信が無いコネクションは実際には切断されている可能性があるため定期的にterminateする
|
||||
this.#cleanConnectionsIntervalId = setInterval(() => {
|
||||
const now = Date.now();
|
||||
this.#cleanConnectionsIntervalId = this.timeService.startTimer(() => {
|
||||
const now = this.timeService.now;
|
||||
for (const [connection, lastActive] of this.#connections.entries()) {
|
||||
if (now - lastActive > 1000 * 60 * 2) {
|
||||
connection.terminate();
|
||||
|
|
@ -303,13 +306,13 @@ export class StreamingApiServerService implements OnApplicationShutdown {
|
|||
connection.ping();
|
||||
}
|
||||
}
|
||||
}, 1000 * 60);
|
||||
}, 1000 * 60, { repeated: true });
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async detach(): Promise<void> {
|
||||
if (this.#cleanConnectionsIntervalId) {
|
||||
clearInterval(this.#cleanConnectionsIntervalId);
|
||||
this.timeService.stopTimer(this.#cleanConnectionsIntervalId);
|
||||
this.#cleanConnectionsIntervalId = null;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue