add inbound activity logger for debugging

This commit is contained in:
Hazelnoot 2024-11-14 12:11:37 -05:00
parent 2d7918a9b7
commit b65b4ecadc
16 changed files with 414 additions and 10 deletions

View file

@ -4,6 +4,7 @@
*/
import { URL } from 'node:url';
import { createHash } from 'crypto';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
@ -29,6 +30,11 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
import { IdService } from '@/core/IdService.js';
import { JsonValue } from '@/misc/json-value.js';
import { SkActivityLog, SkActivityContext } from '@/models/_.js';
import type { ActivityLogsRepository, ActivityContextRepository } from '@/models/_.js';
import type { Config } from '@/config.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@ -46,6 +52,9 @@ export class InboxProcessorService implements OnApplicationShutdown {
@Inject(DI.meta)
private meta: MiMeta,
@Inject(DI.config)
private config: Config,
private utilityService: UtilityService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
@ -57,6 +66,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
private idService: IdService,
@Inject(DI.activityContextRepository)
private activityContextRepository: ActivityContextRepository,
@Inject(DI.activityLogsRepository)
private activityLogsRepository: ActivityLogsRepository,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
@ -64,6 +80,42 @@ export class InboxProcessorService implements OnApplicationShutdown {
@bindThis
public async process(job: Bull.Job<InboxJobData>): Promise<string> {
if (this.config.activityLogging.enabled) {
return await this._processLogged(job);
} else {
return await this._process(job);
}
}
private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
const payload = job.data.activity;
const keyId = job.data.signature.keyId;
const log = this.createLog(payload, keyId);
// Pre-save the activity in case it leads to a hard-crash.
if (this.config.activityLogging.preSave) {
await this.recordLog(log);
}
try {
const result = await this._process(job, log);
log.accepted = result.startsWith('ok');
log.result = result;
return result;
} catch (err) {
log.accepted = false;
log.result = String(err);
throw err;
} finally {
// Save or finalize asynchronously
this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err));
}
}
private async _process(job: Bull.Job<InboxJobData>, log?: SkActivityLog): Promise<string> {
const signature = job.data.signature; // HTTP-signature
let activity = job.data.activity;
@ -197,6 +249,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
delete activity.id;
}
// Attach log to verified user
if (log) {
log.verified = true;
log.authUser = authUser.user;
log.authUserId = authUser.user.id;
}
this.apRequestChart.inbox();
this.federationChart.inbox(authUser.user.host);
@ -292,4 +351,47 @@ export class InboxProcessorService implements OnApplicationShutdown {
async onApplicationShutdown(signal?: string) {
await this.dispose();
}
private createLog(payload: IActivity, keyId: string): SkActivityLog {
const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue;
const host = this.utilityService.extractDbHost(keyId);
const log = new SkActivityLog({
id: this.idService.gen(),
at: new Date(),
verified: false,
accepted: false,
result: 'not processed',
activity,
keyId,
host,
});
const context = payload['@context'];
if (context) {
const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64');
log.contextHash = md5;
log.context = new SkActivityContext({
md5,
json: context,
});
}
return log;
}
private async recordLog(log: SkActivityLog): Promise<void> {
if (log.context) {
// https://stackoverflow.com/a/47064558
await this.activityContextRepository
.createQueryBuilder('context_body')
.insert()
.into(SkActivityContext)
.values(log.context)
.orIgnore('md5')
.execute();
}
await this.activityLogsRepository.upsert(log, ['id']);
}
}