implement AP fetch logs

This commit is contained in:
Hazelnoot 2025-01-30 22:36:19 -05:00
parent cc2edae7ab
commit 81944b3bdf
11 changed files with 395 additions and 95 deletions

View file

@ -4,7 +4,6 @@
*/
import { URL } from 'node:url';
import { createHash } from 'crypto';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
@ -30,11 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
import { IdService } from '@/core/IdService.js';
import { JsonValue } from '@/misc/json-value.js';
import { SkApInboxLog, SkApContext } from '@/models/_.js';
import type { ApInboxLogsRepository, ApContextsRepository } from '@/models/_.js';
import { SkApInboxLog } from '@/models/_.js';
import type { Config } from '@/config.js';
import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@ -66,13 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
private idService: IdService,
@Inject(DI.apContextsRepository)
private apContextsRepository: ApContextsRepository,
@Inject(DI.apInboxLogsRepository)
private apInboxLogsRepository: ApInboxLogsRepository,
private readonly apLogService: ApLogService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
@ -89,14 +80,9 @@ export class InboxProcessorService implements OnApplicationShutdown {
private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
const startTime = process.hrtime.bigint();
const payload = job.data.activity;
const activity = job.data.activity;
const keyId = job.data.signature.keyId;
const log = this.createLog(payload, keyId);
// Pre-save the activity in case it leads to a hard-crash.
if (this.config.activityLogging.preSave) {
await this.recordLog(log);
}
const log = await this.apLogService.createInboxLog({ activity, keyId });
try {
const result = await this._process(job, log);
@ -111,24 +97,18 @@ export class InboxProcessorService implements OnApplicationShutdown {
throw err;
} finally {
// Calculate the activity processing time with correct rounding and decimals.
// 1. Truncate nanoseconds to microseconds
// 2. Scale to 1/10 millisecond ticks.
// 3. Round to nearest tick.
// 4. Sale to milliseconds
// Example: 123,456,789 ns -> 123,456 us -> 12,345.6 ticks -> 12,346 ticks -> 123.46 ms
const endTime = process.hrtime.bigint();
const duration = Math.round(Number((endTime - startTime) / 1000n) / 10) / 100;
log.duration = duration;
const duration = log.duration = calculateDurationSince(startTime);
// TODO remove this
// Activities should time out after roughly 5 seconds.
// A runtime longer than 10 seconds could indicate a problem or attack.
if (duration > 10000) {
this.logger.warn(`Activity ${JSON.stringify(payload.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`);
this.logger.warn(`Activity ${JSON.stringify(activity.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`);
}
// Save or finalize asynchronously
this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err));
this.apLogService.saveInboxLog(log)
.catch(err => this.logger.error('Failed to record AP activity:', err));
}
}
@ -368,46 +348,4 @@ export class InboxProcessorService implements OnApplicationShutdown {
async onApplicationShutdown(signal?: string) {
await this.dispose();
}
private createLog(payload: IActivity, keyId: string): SkApInboxLog {
const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue;
const host = this.utilityService.extractDbHost(keyId);
const log = new SkApInboxLog({
id: this.idService.gen(),
at: new Date(),
verified: false,
accepted: false,
activity,
keyId,
host,
});
const context = payload['@context'];
if (context) {
const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64');
log.contextHash = md5;
log.context = new SkApContext({
md5,
json: context,
});
}
return log;
}
private async recordLog(log: SkApInboxLog): Promise<void> {
if (log.context) {
// https://stackoverflow.com/a/47064558
await this.apContextsRepository
.createQueryBuilder('activity_context')
.insert()
.into(SkApContext)
.values(log.context)
.orIgnore('md5')
.execute();
}
await this.apInboxLogsRepository.upsert(log, ['id']);
}
}