implement DeleteApLogsBackgroundTask
This commit is contained in:
parent
ec6ae4c939
commit
4501ecb293
6 changed files with 57 additions and 19 deletions
|
|
@ -14,7 +14,9 @@ import { JsonValue } from '@/misc/json-value.js';
|
|||
import { UtilityService } from '@/core/UtilityService.js';
|
||||
import { TimeService } from '@/global/TimeService.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import { IActivity, IObject } from './activitypub/type.js';
|
||||
import { IActivity, IObject } from '@/core/activitypub/type.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { QueueService } from '@/core/QueueService.js';
|
||||
|
||||
@Injectable()
|
||||
export class ApLogService {
|
||||
|
|
@ -23,7 +25,7 @@ export class ApLogService {
|
|||
private readonly config: Config,
|
||||
|
||||
@Inject(DI.apContextsRepository)
|
||||
private apContextsRepository: ApContextsRepository,
|
||||
private readonly apContextsRepository: ApContextsRepository,
|
||||
|
||||
@Inject(DI.apInboxLogsRepository)
|
||||
private readonly apInboxLogsRepository: ApInboxLogsRepository,
|
||||
|
|
@ -34,6 +36,7 @@ export class ApLogService {
|
|||
private readonly utilityService: UtilityService,
|
||||
private readonly idService: IdService,
|
||||
private readonly timeService: TimeService,
|
||||
private readonly queueService: QueueService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
|
|
@ -123,6 +126,16 @@ export class ApLogService {
|
|||
.execute();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async deleteObjectLogsDeferred(objectUris: string | string[]): Promise<void> {
|
||||
await this.queueService.createDeleteApLogsJob('object', objectUris);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async deleteInboxLogsDeferred(userIds: string | string[]): Promise<void> {
|
||||
await this.queueService.createDeleteApLogsJob('inbox', userIds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all logged copies of an object or objects
|
||||
* @param objectUris URIs / AP IDs of the objects to delete
|
||||
|
|
|
|||
|
|
@ -161,7 +161,7 @@ export class NoteDeleteService {
|
|||
// Update the Latest Note index / following feed
|
||||
this.latestNoteService.handleDeletedNoteDeferred(note);
|
||||
for (const cascadingNote of cascadingNotes) {
|
||||
this.latestNoteService.handleDeletedNote(cascadingNote);
|
||||
this.latestNoteService.handleDeletedNoteDeferred(cascadingNote);
|
||||
}
|
||||
|
||||
if (deleter && (note.userId !== deleter.id)) {
|
||||
|
|
@ -178,8 +178,7 @@ export class NoteDeleteService {
|
|||
.map(n => n.uri)
|
||||
.filter((u): u is string => u != null);
|
||||
if (deletedUris.length > 0) {
|
||||
trackPromise(this.apLogService.deleteObjectLogs(deletedUris)
|
||||
.catch(err => this.logger.error(err, `Failed to delete AP logs for note '${note.uri}'`)));
|
||||
await this.apLogService.deleteObjectLogsDeferred(deletedUris);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -930,6 +930,11 @@ export class QueueService implements OnModuleInit {
|
|||
return await this.createBackgroundTask({ type: 'post-unsuspend', userId }, userId);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async createDeleteApLogsJob(dataType: 'inbox' | 'object', data: string | string[]) {
|
||||
return await this.createBackgroundTask({ type: 'delete-ap-logs', dataType, data });
|
||||
}
|
||||
|
||||
private async createBackgroundTask<T extends BackgroundTaskJobData>(data: T, duplication?: string | { id: string, ttl?: number }) {
|
||||
return await this.backgroundTaskQueue.add(
|
||||
data.type,
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import * as Bull from 'bullmq';
|
||||
import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask, UpdateLatestNoteBackgroundTask, PostSuspendBackgroundTask, PostUnsuspendBackgroundTask } from '@/queue/types.js';
|
||||
import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask, UpdateLatestNoteBackgroundTask, PostSuspendBackgroundTask, PostUnsuspendBackgroundTask, DeleteApLogsBackgroundTask } from '@/queue/types.js';
|
||||
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
|
||||
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
|
||||
import Logger from '@/logger.js';
|
||||
|
|
@ -27,6 +27,7 @@ import { DriveService } from '@/core/DriveService.js';
|
|||
import { LatestNoteService } from '@/core/LatestNoteService.js';
|
||||
import { trackTask } from '@/misc/promise-tracker.js';
|
||||
import { UserSuspendService } from '@/core/UserSuspendService.js';
|
||||
import { ApLogService } from '@/core/ApLogService.js';
|
||||
|
||||
@Injectable()
|
||||
export class BackgroundTaskProcessorService {
|
||||
|
|
@ -59,6 +60,7 @@ export class BackgroundTaskProcessorService {
|
|||
private readonly driveService: DriveService,
|
||||
private readonly latestNoteService: LatestNoteService,
|
||||
private readonly userSuspendService: UserSuspendService,
|
||||
private readonly apLogService: ApLogService,
|
||||
|
||||
queueLoggerService: QueueLoggerService,
|
||||
) {
|
||||
|
|
@ -90,9 +92,11 @@ export class BackgroundTaskProcessorService {
|
|||
return await this.processUpdateLatestNote(job.data);
|
||||
} else if (job.data.type === 'post-suspend') {
|
||||
return await this.processPostSuspend(job.data);
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
} else if (job.data.type === 'post-unsuspend') {
|
||||
return await this.processPostUnsuspend(job.data);
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
} else if (job.data.type === 'delete-ap-logs') {
|
||||
return await this.processDeleteApLogs(job.data);
|
||||
} else {
|
||||
this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data);
|
||||
throw new Error(`Unknown job type ${job.data}, see system logs for details`);
|
||||
|
|
@ -199,14 +203,14 @@ export class BackgroundTaskProcessorService {
|
|||
|
||||
// Update charts
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
await this.instanceChart.requestSent(task.host, success);
|
||||
this.instanceChart.requestSent(task.host, success);
|
||||
}
|
||||
if (success) {
|
||||
await this.apRequestChart.deliverSucc();
|
||||
this.apRequestChart.deliverSucc();
|
||||
} else {
|
||||
await this.apRequestChart.deliverFail();
|
||||
this.apRequestChart.deliverFail();
|
||||
}
|
||||
await this.federationChart.deliverd(task.host, success);
|
||||
this.federationChart.deliverd(task.host, success);
|
||||
|
||||
return 'ok';
|
||||
}
|
||||
|
|
@ -215,7 +219,6 @@ export class BackgroundTaskProcessorService {
|
|||
const instance = await this.federatedInstanceService.fetchOrRegister(task.host);
|
||||
if (instance.isBlocked) return `Skipping post-inbox task: instance ${task.host} is blocked`;
|
||||
|
||||
// TODO move chart stuff out of background?
|
||||
// Update charts
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.requestReceived(task.host);
|
||||
|
|
@ -321,4 +324,18 @@ export class BackgroundTaskProcessorService {
|
|||
|
||||
return 'ok';
|
||||
}
|
||||
|
||||
private async processDeleteApLogs(task: DeleteApLogsBackgroundTask): Promise<string> {
|
||||
if (task.dataType === 'object') {
|
||||
await this.apLogService.deleteObjectLogs(task.data);
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
} else if (task.dataType === 'inbox') {
|
||||
await this.apLogService.deleteInboxLogs(task.data);
|
||||
} else {
|
||||
this.logger.warn(`Can't process unknown data type "${task.dataType}"; this is likely a bug. Full task data:`, task);
|
||||
throw new Error(`Unknown task type ${task.dataType}, see system logs for details`);
|
||||
}
|
||||
|
||||
return 'ok';
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -332,8 +332,7 @@ export class DeleteAccountProcessorService {
|
|||
// Delete note AP logs
|
||||
const noteUris = notes.map(n => n.uri).filter(u => !!u) as string[];
|
||||
if (noteUris.length > 0) {
|
||||
await this.apLogService.deleteObjectLogs(noteUris)
|
||||
.catch(err => this.logger.error(err, `Failed to delete AP logs for notes of user '${user.uri ?? user.id}'`));
|
||||
await this.apLogService.deleteObjectLogsDeferred(noteUris);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -371,12 +370,10 @@ export class DeleteAccountProcessorService {
|
|||
|
||||
{ // Delete actor logs
|
||||
if (user.uri) {
|
||||
await this.apLogService.deleteObjectLogs(user.uri)
|
||||
.catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`));
|
||||
await this.apLogService.deleteObjectLogsDeferred(user.uri);
|
||||
}
|
||||
|
||||
await this.apLogService.deleteInboxLogs(user.id)
|
||||
.catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`));
|
||||
await this.apLogService.deleteInboxLogsDeferred(user.id);
|
||||
|
||||
this.logger.info('All AP logs deleted');
|
||||
}
|
||||
|
|
|
|||
|
|
@ -183,7 +183,8 @@ export type BackgroundTaskJobData =
|
|||
DeleteFileBackgroundTask |
|
||||
UpdateLatestNoteBackgroundTask |
|
||||
PostSuspendBackgroundTask |
|
||||
PostUnsuspendBackgroundTask;
|
||||
PostUnsuspendBackgroundTask |
|
||||
DeleteApLogsBackgroundTask;
|
||||
|
||||
export type UpdateUserBackgroundTask = {
|
||||
type: 'update-user';
|
||||
|
|
@ -254,3 +255,9 @@ export type PostUnsuspendBackgroundTask = {
|
|||
type: 'post-unsuspend';
|
||||
userId: string;
|
||||
};
|
||||
|
||||
export type DeleteApLogsBackgroundTask = {
|
||||
type: 'delete-ap-logs';
|
||||
dataType: 'inbox' | 'object';
|
||||
data: string | string[];
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue