From ea80af9243c27959e5bb0e01ec64c59397a21258 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 1 Oct 2025 12:18:04 -0400 Subject: [PATCH] fix webhook caching --- .../backend/src/core/GlobalEventService.ts | 16 ++-- packages/backend/src/core/QueueService.ts | 4 +- .../backend/src/core/SystemWebhookService.ts | 69 +++++----------- .../backend/src/core/UserWebhookService.ts | 81 +++++++++---------- 4 files changed, 72 insertions(+), 98 deletions(-) diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index 7a1410fc1f..79634acb8d 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -247,12 +247,12 @@ export interface InternalEventTypes { roleUpdated: MiRole; userRoleAssigned: MiRoleAssignment; userRoleUnassigned: MiRoleAssignment; - webhookCreated: MiWebhook; - webhookDeleted: MiWebhook; - webhookUpdated: MiWebhook; - systemWebhookCreated: MiSystemWebhook; - systemWebhookDeleted: MiSystemWebhook; - systemWebhookUpdated: MiSystemWebhook; + webhookCreated: { id: MiWebhook['id'] }; + webhookDeleted: { id: MiWebhook['id'] }; + webhookUpdated: { id: MiWebhook['id'] }; + systemWebhookCreated: { id: MiSystemWebhook['id'] }; + systemWebhookDeleted: { id: MiSystemWebhook['id'] }; + systemWebhookUpdated: { id: MiSystemWebhook['id'] }; antennaCreated: MiAntenna; antennaDeleted: MiAntenna; antennaUpdated: MiAntenna; @@ -386,8 +386,8 @@ export class GlobalEventService { } @bindThis - public publishBroadcastStream(type: K, value?: BroadcastTypes[K]): void { - this.publish('broadcast', type, typeof value === 'undefined' ? null : value); + public async publishBroadcastStream(type: K, value?: BroadcastTypes[K]): Promise { + await this.publish('broadcast', type, typeof value === 'undefined' ? null : value); } @bindThis diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index d1bfe83222..f80c968cb8 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -16,8 +16,8 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js'; import { ApRequestCreator } from '@/core/activitypub/ApRequestService.js'; -import { type SystemWebhookPayload } from '@/core/SystemWebhookService.js'; -import { MiNote } from '@/models/Note.js'; +import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js'; +import type { MiNote } from '@/models/Note.js'; import { type UserWebhookPayload } from './UserWebhookService.js'; import type { DbJobData, diff --git a/packages/backend/src/core/SystemWebhookService.ts b/packages/backend/src/core/SystemWebhookService.ts index 8239490adc..bce1a6b41f 100644 --- a/packages/backend/src/core/SystemWebhookService.ts +++ b/packages/backend/src/core/SystemWebhookService.ts @@ -18,6 +18,8 @@ import Logger from '@/logger.js'; import { Packed } from '@/misc/json-schema.js'; import { AbuseReportResolveType } from '@/models/AbuseUserReport.js'; import { ModeratorInactivityRemainingTime } from '@/queue/processors/CheckModeratorsActivityProcessorService.js'; +import { CacheManagementService, type ManagedMemorySingleCache } from '@/core/CacheManagementService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; export type AbuseReportPayload = { @@ -50,8 +52,7 @@ export type SystemWebhookPayload = @Injectable() export class SystemWebhookService implements OnApplicationShutdown { - private activeSystemWebhooksFetched = false; - private activeSystemWebhooks: MiSystemWebhook[] = []; + private readonly activeSystemWebhooks: ManagedMemorySingleCache; constructor( @Inject(DI.redisForSub) @@ -62,20 +63,24 @@ export class SystemWebhookService implements OnApplicationShutdown { private queueService: QueueService, private moderationLogService: ModerationLogService, private globalEventService: GlobalEventService, + private readonly internalEventService: InternalEventService, + + cacheManagementService: CacheManagementService, ) { - this.redisForSub.on('message', this.onMessage); + this.activeSystemWebhooks = cacheManagementService.createMemorySingleCache(1000 * 60 * 60 * 12); // 12h + + this.internalEventService.on('systemWebhookCreated', this.onWebhookEvent); + this.internalEventService.on('systemWebhookUpdated', this.onWebhookEvent); + this.internalEventService.on('systemWebhookDeleted', this.onWebhookEvent); } @bindThis public async fetchActiveSystemWebhooks() { - if (!this.activeSystemWebhooksFetched) { - this.activeSystemWebhooks = await this.systemWebhooksRepository.findBy({ + return await this.activeSystemWebhooks.fetch(async () => { + return await this.systemWebhooksRepository.findBy({ isActive: true, }); - this.activeSystemWebhooksFetched = true; - } - - return this.activeSystemWebhooks; + }); } /** @@ -124,7 +129,7 @@ export class SystemWebhookService implements OnApplicationShutdown { }); const webhook = await this.systemWebhooksRepository.findOneByOrFail({ id }); - this.globalEventService.publishInternalEvent('systemWebhookCreated', webhook); + await this.internalEventService.emit('systemWebhookCreated', { id: webhook.id }); this.moderationLogService .log(updater, 'createSystemWebhook', { systemWebhookId: webhook.id, @@ -160,7 +165,7 @@ export class SystemWebhookService implements OnApplicationShutdown { }); const afterEntity = await this.systemWebhooksRepository.findOneByOrFail({ id: beforeEntity.id }); - this.globalEventService.publishInternalEvent('systemWebhookUpdated', afterEntity); + await this.internalEventService.emit('systemWebhookUpdated', { id: afterEntity.id }); this.moderationLogService .log(updater, 'updateSystemWebhook', { systemWebhookId: beforeEntity.id, @@ -179,7 +184,7 @@ export class SystemWebhookService implements OnApplicationShutdown { const webhook = await this.systemWebhooksRepository.findOneByOrFail({ id }); await this.systemWebhooksRepository.delete(id); - this.globalEventService.publishInternalEvent('systemWebhookDeleted', webhook); + await this.internalEventService.emit('systemWebhookDeleted', { id: webhook.id }); this.moderationLogService .log(updater, 'deleteSystemWebhook', { systemWebhookId: webhook.id, @@ -211,45 +216,15 @@ export class SystemWebhookService implements OnApplicationShutdown { } @bindThis - private async onMessage(_: string, data: string): Promise { - const obj = JSON.parse(data); - if (obj.channel !== 'internal') { - return; - } - - const { type, body } = obj.message as GlobalEvents['internal']['payload']; - switch (type) { - case 'systemWebhookCreated': { - if (body.isActive) { - this.activeSystemWebhooks.push(MiSystemWebhook.deserialize(body)); - } - break; - } - case 'systemWebhookUpdated': { - if (body.isActive) { - const i = this.activeSystemWebhooks.findIndex(a => a.id === body.id); - if (i > -1) { - this.activeSystemWebhooks[i] = MiSystemWebhook.deserialize(body); - } else { - this.activeSystemWebhooks.push(MiSystemWebhook.deserialize(body)); - } - } else { - this.activeSystemWebhooks = this.activeSystemWebhooks.filter(a => a.id !== body.id); - } - break; - } - case 'systemWebhookDeleted': { - this.activeSystemWebhooks = this.activeSystemWebhooks.filter(a => a.id !== body.id); - break; - } - default: - break; - } + private onWebhookEvent(): void { + this.activeSystemWebhooks.delete(); } @bindThis public dispose(): void { - this.redisForSub.off('message', this.onMessage); + this.internalEventService.off('systemWebhookCreated', this.onWebhookEvent); + this.internalEventService.off('systemWebhookUpdated', this.onWebhookEvent); + this.internalEventService.off('systemWebhookDeleted', this.onWebhookEvent); } @bindThis diff --git a/packages/backend/src/core/UserWebhookService.ts b/packages/backend/src/core/UserWebhookService.ts index 2f79eb429a..edf6ed6bda 100644 --- a/packages/backend/src/core/UserWebhookService.ts +++ b/packages/backend/src/core/UserWebhookService.ts @@ -9,9 +9,11 @@ import { MiUser, type WebhooksRepository } from '@/models/_.js'; import { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; -import { GlobalEvents } from '@/core/GlobalEventService.js'; +import type { InternalEventTypes } from '@/core/GlobalEventService.js'; import type { Packed } from '@/misc/json-schema.js'; import { QueueService } from '@/core/QueueService.js'; +import { CacheManagementService, type ManagedMemorySingleCache } from '@/core/CacheManagementService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; export type UserWebhookPayload = @@ -27,8 +29,7 @@ export type UserWebhookPayload = @Injectable() export class UserWebhookService implements OnApplicationShutdown { - private activeWebhooksFetched = false; - private activeWebhooks: MiWebhook[] = []; + private readonly activeWebhooks: ManagedMemorySingleCache; constructor( @Inject(DI.redisForSub) @@ -36,20 +37,24 @@ export class UserWebhookService implements OnApplicationShutdown { @Inject(DI.webhooksRepository) private webhooksRepository: WebhooksRepository, private queueService: QueueService, + private readonly internalEventService: InternalEventService, + + cacheManagementService: CacheManagementService, ) { - this.redisForSub.on('message', this.onMessage); + this.activeWebhooks = cacheManagementService.createMemorySingleCache(1000 * 60 * 60 * 12); // 12h + + this.internalEventService.on('webhookCreated', this.onWebhookEvent); + this.internalEventService.on('webhookUpdated', this.onWebhookEvent); + this.internalEventService.on('webhookDeleted', this.onWebhookEvent); } @bindThis public async getActiveWebhooks() { - if (!this.activeWebhooksFetched) { - this.activeWebhooks = await this.webhooksRepository.findBy({ + return await this.activeWebhooks.fetch(async () => { + return await this.webhooksRepository.findBy({ active: true, }); - this.activeWebhooksFetched = true; - } - - return this.activeWebhooks; + }); } /** @@ -97,57 +102,51 @@ export class UserWebhookService implements OnApplicationShutdown { } @bindThis - private async onMessage(_: string, data: string): Promise { - const obj = JSON.parse(data); - if (obj.channel !== 'internal') { + private async onWebhookEvent(body: InternalEventTypes[E], type: E): Promise { + const cache = this.activeWebhooks.get(); + if (!cache) { return; } - const { type, body } = obj.message as GlobalEvents['internal']['payload']; switch (type) { case 'webhookCreated': { - if (body.active) { - this.activeWebhooks.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい - ...body, - latestSentAt: body.latestSentAt ? new Date(body.latestSentAt) : null, - user: null, // joinなカラムは通常取ってこないので - }); + // Add + const webhook = await this.webhooksRepository.findOneBy({ id: body.id }); + if (webhook) { + cache.push(webhook); } break; } case 'webhookUpdated': { - if (body.active) { - const i = this.activeWebhooks.findIndex(a => a.id === body.id); - if (i > -1) { - this.activeWebhooks[i] = { // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい - ...body, - latestSentAt: body.latestSentAt ? new Date(body.latestSentAt) : null, - user: null, // joinなカラムは通常取ってこないので - }; - } else { - this.activeWebhooks.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい - ...body, - latestSentAt: body.latestSentAt ? new Date(body.latestSentAt) : null, - user: null, // joinなカラムは通常取ってこないので - }); - } - } else { - this.activeWebhooks = this.activeWebhooks.filter(a => a.id !== body.id); + // Delete + const index = cache.findIndex(webhook => webhook.id === body.id); + if (index > -1) { + cache.splice(index, 1); + } + + // Add + const webhook = await this.webhooksRepository.findOneBy({ id: body.id }); + if (webhook) { + cache.push(webhook); } break; } case 'webhookDeleted': { - this.activeWebhooks = this.activeWebhooks.filter(a => a.id !== body.id); + // Delete + const index = cache.findIndex(webhook => webhook.id === body.id); + if (index > -1) { + cache.splice(index, 1); + } break; } - default: - break; } } @bindThis public dispose(): void { - this.redisForSub.off('message', this.onMessage); + this.internalEventService.off('webhookCreated', this.onWebhookEvent); + this.internalEventService.off('webhookUpdated', this.onWebhookEvent); + this.internalEventService.off('webhookDeleted', this.onWebhookEvent); } @bindThis