diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 3edb9dcab9..6cc6008567 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -17,6 +17,7 @@ import { bindThis } from '@/decorators.js'; import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js'; import { ApRequestCreator } from '@/core/activitypub/ApRequestService.js'; import { type SystemWebhookPayload } from '@/core/SystemWebhookService.js'; +import { MiNote } from '@/models/Note.js'; import { type UserWebhookPayload } from './UserWebhookService.js'; import type { DbJobData, @@ -40,7 +41,6 @@ import type { } from './QueueModule.js'; import type httpSignature from '@peertube/http-signature'; import type * as Bull from 'bullmq'; -import { MiNote } from '@/models/Note.js'; export const QUEUE_TYPES = [ 'system', @@ -231,6 +231,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: activity.id ? { + id: activity.id, + } : undefined, }); } @@ -247,6 +250,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -263,6 +269,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -273,6 +282,9 @@ export class QueueService { }, { removeOnComplete: true, removeOnFail: true, + deduplication: { + id: user.id, + }, }); } @@ -289,6 +301,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -305,6 +320,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -321,6 +339,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -339,6 +360,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -355,6 +379,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -371,6 +398,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -387,6 +417,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -403,6 +436,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -421,6 +457,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: `${user.id}_${fileId}_${withReplies ?? false}`, + }, }); } @@ -433,6 +472,9 @@ export class QueueService { }, { removeOnComplete: true, removeOnFail: true, + deduplication: { + id: `${user.id}_${fileId}_${type ?? null}`, + }, }); } @@ -492,6 +534,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: `${user.id}_${fileId}`, + }, }); } @@ -509,6 +554,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: `${user.id}_${fileId}`, + }, }); } @@ -554,6 +602,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: `${user.id}_${fileId}`, + }, }); } @@ -571,14 +622,18 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: `${user.id}_${fileId}`, + }, }); } @bindThis - public createImportAntennasJob(user: ThinUser, antenna: Antenna) { + public createImportAntennasJob(user: ThinUser, antenna: Antenna, fileId: MiDriveFile['id']) { return this.dbQueue.add('importAntennas', { user: { id: user.id }, antenna, + fileId, }, { removeOnComplete: { age: 3600 * 24 * 7, // keep up to 7 days @@ -588,6 +643,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: `${user.id}_${fileId}`, + }, }); } @@ -605,6 +663,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: user.id, + }, }); } @@ -663,6 +724,9 @@ export class QueueService { count: 100, }, ...opts, + deduplication: { + id: `${data.from.id}_${data.to.id}_${data.requestId ?? ''}_${data.silent ?? false}_${data.withReplies ?? false}`, + }, }, }; } @@ -680,6 +744,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: key, + }, }); } @@ -697,6 +764,9 @@ export class QueueService { age: 3600 * 24 * 7, // keep up to 7 days count: 100, }, + deduplication: { + id: `${olderThanSeconds}_${keepFilesInUse}`, + }, }); } diff --git a/packages/backend/src/core/activitypub/ApRendererService.ts b/packages/backend/src/core/activitypub/ApRendererService.ts index 345b8c6ddb..a13abc6369 100644 --- a/packages/backend/src/core/activitypub/ApRendererService.ts +++ b/packages/backend/src/core/activitypub/ApRendererService.ts @@ -750,9 +750,11 @@ export class ApRendererService { } @bindThis - public renderUpdate(object: string | IObject, user: { id: MiUser['id'] }): IUpdate { + public renderUpdate(object: IObject, user: { id: MiUser['id'] }): IUpdate { + // Deterministic activity IDs to allow de-duplication by remote instances + const updatedAt = object.updated ? new Date(object.updated).getTime() : Date.now(); return { - id: `${this.config.url}/users/${user.id}#updates/${new Date().getTime()}`, + id: `${this.config.url}/users/${user.id}#updates/${updatedAt}`, actor: this.userEntityService.genLocalUserUri(user.id), type: 'Update', to: ['https://www.w3.org/ns/activitystreams#Public'], diff --git a/packages/backend/src/core/activitypub/type.ts b/packages/backend/src/core/activitypub/type.ts index e86495c03e..21434f23e0 100644 --- a/packages/backend/src/core/activitypub/type.ts +++ b/packages/backend/src/core/activitypub/type.ts @@ -28,8 +28,9 @@ export interface IObject { inReplyTo?: any; replies?: ICollection | IOrderedCollection | string; content?: string | null; - startTime?: Date; - endTime?: Date; + startTime?: Date; // TODO these are wrong - should be string + endTime?: Date; // TODO these are wrong - should be string + updated?: string; icon?: any; image?: any; mediaType?: string; @@ -216,7 +217,6 @@ export interface IPost extends IObject { quoteUrl?: string; quoteUri?: string; quote?: string; - updated?: string; } export interface IQuestion extends IObject { diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts index 35e31b9533..0fca613f29 100644 --- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts @@ -11,8 +11,8 @@ import Logger from '@/logger.js'; import type { AntennasRepository, UsersRepository } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; import { NotificationService } from '@/core/NotificationService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; import { DBAntennaImportJobData } from '../types.js'; import type * as Bull from 'bullmq'; @@ -79,7 +79,7 @@ export class ImportAntennasProcessorService { return; } - this.logger.debug(`Importing blocking of ${job.data.user.id} ...`); + this.logger.debug(`Importing antennas of ${job.data.user.id} ...`); const now = new Date(); try { diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 79ab68ab1d..6dc9f88034 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -110,6 +110,7 @@ export type DbNoteImportJobData = { export type DBAntennaImportJobData = { user: ThinUser, antenna: Antenna + fileId: MiDriveFile['id']; }; export type DbUserImportToDbJobData = { diff --git a/packages/backend/src/server/api/endpoints/i/import-antennas.ts b/packages/backend/src/server/api/endpoints/i/import-antennas.ts index 9b45923e92..8db4367ad9 100644 --- a/packages/backend/src/server/api/endpoints/i/import-antennas.ts +++ b/packages/backend/src/server/api/endpoints/i/import-antennas.ts @@ -83,7 +83,7 @@ export default class extends Endpoint { if (currentAntennasCount + antennas.length >= (await this.roleService.getUserPolicies(me.id)).antennaLimit) { throw new ApiError(meta.errors.tooManyAntennas); } - this.queueService.createImportAntennasJob(me, antennas); + await this.queueService.createImportAntennasJob(me, antennas, file.id); }); } }