fix webhook caching

This commit is contained in:
Hazelnoot 2025-10-01 12:18:04 -04:00
parent 2daf5c16ec
commit ea80af9243
4 changed files with 72 additions and 98 deletions

View file

@ -247,12 +247,12 @@ export interface InternalEventTypes {
roleUpdated: MiRole;
userRoleAssigned: MiRoleAssignment;
userRoleUnassigned: MiRoleAssignment;
webhookCreated: MiWebhook;
webhookDeleted: MiWebhook;
webhookUpdated: MiWebhook;
systemWebhookCreated: MiSystemWebhook;
systemWebhookDeleted: MiSystemWebhook;
systemWebhookUpdated: MiSystemWebhook;
webhookCreated: { id: MiWebhook['id'] };
webhookDeleted: { id: MiWebhook['id'] };
webhookUpdated: { id: MiWebhook['id'] };
systemWebhookCreated: { id: MiSystemWebhook['id'] };
systemWebhookDeleted: { id: MiSystemWebhook['id'] };
systemWebhookUpdated: { id: MiSystemWebhook['id'] };
antennaCreated: MiAntenna;
antennaDeleted: MiAntenna;
antennaUpdated: MiAntenna;
@ -386,8 +386,8 @@ export class GlobalEventService {
}
@bindThis
public publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): void {
this.publish('broadcast', type, typeof value === 'undefined' ? null : value);
public async publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): Promise<void> {
await this.publish('broadcast', type, typeof value === 'undefined' ? null : value);
}
@bindThis

View file

@ -16,8 +16,8 @@ import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import { ApRequestCreator } from '@/core/activitypub/ApRequestService.js';
import { type SystemWebhookPayload } from '@/core/SystemWebhookService.js';
import { MiNote } from '@/models/Note.js';
import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js';
import type { MiNote } from '@/models/Note.js';
import { type UserWebhookPayload } from './UserWebhookService.js';
import type {
DbJobData,

View file

@ -18,6 +18,8 @@ import Logger from '@/logger.js';
import { Packed } from '@/misc/json-schema.js';
import { AbuseReportResolveType } from '@/models/AbuseUserReport.js';
import { ModeratorInactivityRemainingTime } from '@/queue/processors/CheckModeratorsActivityProcessorService.js';
import { CacheManagementService, type ManagedMemorySingleCache } from '@/core/CacheManagementService.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import type { OnApplicationShutdown } from '@nestjs/common';
export type AbuseReportPayload = {
@ -50,8 +52,7 @@ export type SystemWebhookPayload<T extends SystemWebhookEventType> =
@Injectable()
export class SystemWebhookService implements OnApplicationShutdown {
private activeSystemWebhooksFetched = false;
private activeSystemWebhooks: MiSystemWebhook[] = [];
private readonly activeSystemWebhooks: ManagedMemorySingleCache<MiSystemWebhook[]>;
constructor(
@Inject(DI.redisForSub)
@ -62,20 +63,24 @@ export class SystemWebhookService implements OnApplicationShutdown {
private queueService: QueueService,
private moderationLogService: ModerationLogService,
private globalEventService: GlobalEventService,
private readonly internalEventService: InternalEventService,
cacheManagementService: CacheManagementService,
) {
this.redisForSub.on('message', this.onMessage);
this.activeSystemWebhooks = cacheManagementService.createMemorySingleCache<MiSystemWebhook[]>(1000 * 60 * 60 * 12); // 12h
this.internalEventService.on('systemWebhookCreated', this.onWebhookEvent);
this.internalEventService.on('systemWebhookUpdated', this.onWebhookEvent);
this.internalEventService.on('systemWebhookDeleted', this.onWebhookEvent);
}
@bindThis
public async fetchActiveSystemWebhooks() {
if (!this.activeSystemWebhooksFetched) {
this.activeSystemWebhooks = await this.systemWebhooksRepository.findBy({
return await this.activeSystemWebhooks.fetch(async () => {
return await this.systemWebhooksRepository.findBy({
isActive: true,
});
this.activeSystemWebhooksFetched = true;
}
return this.activeSystemWebhooks;
});
}
/**
@ -124,7 +129,7 @@ export class SystemWebhookService implements OnApplicationShutdown {
});
const webhook = await this.systemWebhooksRepository.findOneByOrFail({ id });
this.globalEventService.publishInternalEvent('systemWebhookCreated', webhook);
await this.internalEventService.emit('systemWebhookCreated', { id: webhook.id });
this.moderationLogService
.log(updater, 'createSystemWebhook', {
systemWebhookId: webhook.id,
@ -160,7 +165,7 @@ export class SystemWebhookService implements OnApplicationShutdown {
});
const afterEntity = await this.systemWebhooksRepository.findOneByOrFail({ id: beforeEntity.id });
this.globalEventService.publishInternalEvent('systemWebhookUpdated', afterEntity);
await this.internalEventService.emit('systemWebhookUpdated', { id: afterEntity.id });
this.moderationLogService
.log(updater, 'updateSystemWebhook', {
systemWebhookId: beforeEntity.id,
@ -179,7 +184,7 @@ export class SystemWebhookService implements OnApplicationShutdown {
const webhook = await this.systemWebhooksRepository.findOneByOrFail({ id });
await this.systemWebhooksRepository.delete(id);
this.globalEventService.publishInternalEvent('systemWebhookDeleted', webhook);
await this.internalEventService.emit('systemWebhookDeleted', { id: webhook.id });
this.moderationLogService
.log(updater, 'deleteSystemWebhook', {
systemWebhookId: webhook.id,
@ -211,45 +216,15 @@ export class SystemWebhookService implements OnApplicationShutdown {
}
@bindThis
private async onMessage(_: string, data: string): Promise<void> {
const obj = JSON.parse(data);
if (obj.channel !== 'internal') {
return;
}
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
switch (type) {
case 'systemWebhookCreated': {
if (body.isActive) {
this.activeSystemWebhooks.push(MiSystemWebhook.deserialize(body));
}
break;
}
case 'systemWebhookUpdated': {
if (body.isActive) {
const i = this.activeSystemWebhooks.findIndex(a => a.id === body.id);
if (i > -1) {
this.activeSystemWebhooks[i] = MiSystemWebhook.deserialize(body);
} else {
this.activeSystemWebhooks.push(MiSystemWebhook.deserialize(body));
}
} else {
this.activeSystemWebhooks = this.activeSystemWebhooks.filter(a => a.id !== body.id);
}
break;
}
case 'systemWebhookDeleted': {
this.activeSystemWebhooks = this.activeSystemWebhooks.filter(a => a.id !== body.id);
break;
}
default:
break;
}
private onWebhookEvent(): void {
this.activeSystemWebhooks.delete();
}
@bindThis
public dispose(): void {
this.redisForSub.off('message', this.onMessage);
this.internalEventService.off('systemWebhookCreated', this.onWebhookEvent);
this.internalEventService.off('systemWebhookUpdated', this.onWebhookEvent);
this.internalEventService.off('systemWebhookDeleted', this.onWebhookEvent);
}
@bindThis

View file

@ -9,9 +9,11 @@ import { MiUser, type WebhooksRepository } from '@/models/_.js';
import { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import { GlobalEvents } from '@/core/GlobalEventService.js';
import type { InternalEventTypes } from '@/core/GlobalEventService.js';
import type { Packed } from '@/misc/json-schema.js';
import { QueueService } from '@/core/QueueService.js';
import { CacheManagementService, type ManagedMemorySingleCache } from '@/core/CacheManagementService.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import type { OnApplicationShutdown } from '@nestjs/common';
export type UserWebhookPayload<T extends WebhookEventTypes> =
@ -27,8 +29,7 @@ export type UserWebhookPayload<T extends WebhookEventTypes> =
@Injectable()
export class UserWebhookService implements OnApplicationShutdown {
private activeWebhooksFetched = false;
private activeWebhooks: MiWebhook[] = [];
private readonly activeWebhooks: ManagedMemorySingleCache<MiWebhook[]>;
constructor(
@Inject(DI.redisForSub)
@ -36,20 +37,24 @@ export class UserWebhookService implements OnApplicationShutdown {
@Inject(DI.webhooksRepository)
private webhooksRepository: WebhooksRepository,
private queueService: QueueService,
private readonly internalEventService: InternalEventService,
cacheManagementService: CacheManagementService,
) {
this.redisForSub.on('message', this.onMessage);
this.activeWebhooks = cacheManagementService.createMemorySingleCache<MiWebhook[]>(1000 * 60 * 60 * 12); // 12h
this.internalEventService.on('webhookCreated', this.onWebhookEvent);
this.internalEventService.on('webhookUpdated', this.onWebhookEvent);
this.internalEventService.on('webhookDeleted', this.onWebhookEvent);
}
@bindThis
public async getActiveWebhooks() {
if (!this.activeWebhooksFetched) {
this.activeWebhooks = await this.webhooksRepository.findBy({
return await this.activeWebhooks.fetch(async () => {
return await this.webhooksRepository.findBy({
active: true,
});
this.activeWebhooksFetched = true;
}
return this.activeWebhooks;
});
}
/**
@ -97,57 +102,51 @@ export class UserWebhookService implements OnApplicationShutdown {
}
@bindThis
private async onMessage(_: string, data: string): Promise<void> {
const obj = JSON.parse(data);
if (obj.channel !== 'internal') {
private async onWebhookEvent<E extends 'webhookCreated' | 'webhookUpdated' | 'webhookDeleted'>(body: InternalEventTypes[E], type: E): Promise<void> {
const cache = this.activeWebhooks.get();
if (!cache) {
return;
}
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
switch (type) {
case 'webhookCreated': {
if (body.active) {
this.activeWebhooks.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい
...body,
latestSentAt: body.latestSentAt ? new Date(body.latestSentAt) : null,
user: null, // joinなカラムは通常取ってこないので
});
// Add
const webhook = await this.webhooksRepository.findOneBy({ id: body.id });
if (webhook) {
cache.push(webhook);
}
break;
}
case 'webhookUpdated': {
if (body.active) {
const i = this.activeWebhooks.findIndex(a => a.id === body.id);
if (i > -1) {
this.activeWebhooks[i] = { // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい
...body,
latestSentAt: body.latestSentAt ? new Date(body.latestSentAt) : null,
user: null, // joinなカラムは通常取ってこないので
};
} else {
this.activeWebhooks.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい
...body,
latestSentAt: body.latestSentAt ? new Date(body.latestSentAt) : null,
user: null, // joinなカラムは通常取ってこないので
});
}
} else {
this.activeWebhooks = this.activeWebhooks.filter(a => a.id !== body.id);
// Delete
const index = cache.findIndex(webhook => webhook.id === body.id);
if (index > -1) {
cache.splice(index, 1);
}
// Add
const webhook = await this.webhooksRepository.findOneBy({ id: body.id });
if (webhook) {
cache.push(webhook);
}
break;
}
case 'webhookDeleted': {
this.activeWebhooks = this.activeWebhooks.filter(a => a.id !== body.id);
// Delete
const index = cache.findIndex(webhook => webhook.id === body.id);
if (index > -1) {
cache.splice(index, 1);
}
break;
}
default:
break;
}
}
@bindThis
public dispose(): void {
this.redisForSub.off('message', this.onMessage);
this.internalEventService.off('webhookCreated', this.onWebhookEvent);
this.internalEventService.off('webhookUpdated', this.onWebhookEvent);
this.internalEventService.off('webhookDeleted', this.onWebhookEvent);
}
@bindThis