diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index c146811331..5e3f17f409 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -26,6 +26,8 @@ import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import { bindThis } from '@/decorators.js'; import { Serialized } from '@/types.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; import type Emitter from 'strict-event-emitter-types'; import type { EventEmitter } from 'events'; @@ -350,6 +352,8 @@ export class GlobalEventService { @Inject(DI.redisForPub) private redisForPub: Redis.Redis, + + private readonly internalEventService: InternalEventService, ) { } @@ -365,14 +369,16 @@ export class GlobalEventService { })); } + /** @deprecated use InternalEventService instead */ @bindThis - public publishInternalEvent(type: K, value?: InternalEventTypes[K]): void { - this.publish('internal', type, typeof value === 'undefined' ? null : value); + public publishInternalEvent(type: K, value: InternalEventTypes[K]): void { + trackPromise(this.internalEventService.emit(type, value)); } + /** @deprecated use InternalEventService instead */ @bindThis - public async publishInternalEventAsync(type: K, value?: InternalEventTypes[K]): Promise { - await this.publish('internal', type, typeof value === 'undefined' ? null : value); + public async publishInternalEventAsync(type: K, value: InternalEventTypes[K]): Promise { + await this.internalEventService.emit(type, value); } @bindThis diff --git a/packages/backend/src/core/InternalEventService.ts b/packages/backend/src/core/InternalEventService.ts index 5b164b605e..be685e6b48 100644 --- a/packages/backend/src/core/InternalEventService.ts +++ b/packages/backend/src/core/InternalEventService.ts @@ -6,8 +6,8 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import Redis from 'ioredis'; import { DI } from '@/di-symbols.js'; -import { GlobalEventService } from '@/core/GlobalEventService.js'; import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js'; +import type { Config } from '@/config.js'; import { bindThis } from '@/decorators.js'; export type Listener = (value: InternalEventTypes[K], key: K, isLocal: boolean) => void | Promise; @@ -25,7 +25,11 @@ export class InternalEventService implements OnApplicationShutdown { @Inject(DI.redisForSub) private readonly redisForSub: Redis.Redis, - private readonly globalEventService: GlobalEventService, + @Inject(DI.redis) + private readonly redisForPub: Redis.Redis, + + @Inject(DI.config) + private readonly config: Pick, ) { this.redisForSub.on('message', this.onMessage); } @@ -50,7 +54,10 @@ export class InternalEventService implements OnApplicationShutdown { @bindThis public async emit(type: K, value: InternalEventTypes[K]): Promise { await this.emitInternal(type, value, true); - await this.globalEventService.publishInternalEventAsync(type, { ...value, _pid: process.pid }); + await this.redisForPub.publish(this.config.host, JSON.stringify({ + channel: 'internal', + message: { type: type, body: value }, + })); } @bindThis