From 4b08e978ce7a17f644f2b391d94605619448e400 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Tue, 17 Jun 2025 14:06:15 -0400 Subject: [PATCH] implement background task queue --- .config/ci.yml | 41 ++++++-- .config/cypress-devcontainer.yml | 37 ++++++-- .config/docker_example.yml | 25 ++++- .config/example.yml | 25 ++++- packages/backend/src/config.ts | 9 ++ .../src/core/FetchInstanceMetadataService.ts | 15 +++ packages/backend/src/core/QueueModule.ts | 12 +++ packages/backend/src/core/QueueService.ts | 76 +++++++++++++++ .../src/core/activitypub/ApInboxService.ts | 29 ++---- .../activitypub/models/ApPersonService.ts | 50 +++++++--- .../backend/src/queue/QueueProcessorModule.ts | 3 +- .../src/queue/QueueProcessorService.ts | 47 +++++++++- packages/backend/src/queue/const.ts | 1 + .../BackgroundTaskProcessorService.ts | 93 +++++++++++++++++++ .../processors/DeliverProcessorService.ts | 8 +- .../queue/processors/InboxProcessorService.ts | 4 +- packages/backend/src/queue/types.ts | 20 ++++ .../test/misc/immediateBackgroundTasks.ts | 50 ++++++++++ 18 files changed, 482 insertions(+), 63 deletions(-) create mode 100644 packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts create mode 100644 packages/backend/test/misc/immediateBackgroundTasks.ts diff --git a/.config/ci.yml b/.config/ci.yml index 8543205b17..d5f288fc05 100644 --- a/.config/ci.yml +++ b/.config/ci.yml @@ -253,21 +253,42 @@ id: 'aidx' # Number of worker processes #clusterLimit: 1 +# +-------------------------+ +# | Job concurrency options | +# +-------------------------+ +# +### Available options: +# [type]JobConcurrency - limits the number jobs that can run at the same time. +# Sharkey will allow this many jobs of this type *per worker process*. +# [type]JobPerSec - limits the total number of jobs that may complete within a single second. +# If this limit is exceeded, then Sharkey will pause this type of job until the next second. +# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped. +# If this limit is exceeded, then the job is considered "failed" and recorded for debugging. +# +### Job types: +# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances. +# All inbound activities are queued and processed in chronological order by this job. +# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances. +# All outbound activities are queued and processed in chronological order by this job. +# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs. +# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates. + # Job concurrency per worker -# deliverJobConcurrency: 128 -# inboxJobConcurrency: 16 -# relashionshipJobConcurrency: 16 -# What's relashionshipJob?: -# Follow, unfollow, block and unblock(ings) while following-imports, etc. or account migrations. +#deliverJobConcurrency: 128 +#inboxJobConcurrency: 16 +#relationshipJobConcurrency: 16 +#backgroundJobConcurrency: 32 # Job rate limiter -# deliverJobPerSec: 128 -# inboxJobPerSec: 32 -# relashionshipJobPerSec: 64 +#deliverJobPerSec: 128 +#inboxJobPerSec: 32 +#relationshipJobPerSec: 64 +#backgroundJobPerSec: 256 # Job attempts -# deliverJobMaxAttempts: 12 -# inboxJobMaxAttempts: 8 +#deliverJobMaxAttempts: 12 +#inboxJobMaxAttempts: 8 +#backgroundJobMaxAttempts: 8 # Local address used for outgoing requests #outgoingAddress: 127.0.0.1 diff --git a/.config/cypress-devcontainer.yml b/.config/cypress-devcontainer.yml index f705d06d45..ba6d51959c 100644 --- a/.config/cypress-devcontainer.yml +++ b/.config/cypress-devcontainer.yml @@ -223,17 +223,42 @@ id: 'aidx' # Number of worker processes #clusterLimit: 1 +# +-------------------------+ +# | Job concurrency options | +# +-------------------------+ +# +### Available options: +# [type]JobConcurrency - limits the number jobs that can run at the same time. +# Sharkey will allow this many jobs of this type *per worker process*. +# [type]JobPerSec - limits the total number of jobs that may complete within a single second. +# If this limit is exceeded, then Sharkey will pause this type of job until the next second. +# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped. +# If this limit is exceeded, then the job is considered "failed" and recorded for debugging. +# +### Job types: +# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances. +# All inbound activities are queued and processed in chronological order by this job. +# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances. +# All outbound activities are queued and processed in chronological order by this job. +# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs. +# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates. + # Job concurrency per worker -# deliverJobConcurrency: 128 -# inboxJobConcurrency: 16 +#deliverJobConcurrency: 128 +#inboxJobConcurrency: 16 +#relationshipJobConcurrency: 16 +#backgroundJobConcurrency: 32 # Job rate limiter -# deliverJobPerSec: 128 -# inboxJobPerSec: 32 +#deliverJobPerSec: 128 +#inboxJobPerSec: 32 +#relationshipJobPerSec: 64 +#backgroundJobPerSec: 256 # Job attempts -# deliverJobMaxAttempts: 12 -# inboxJobMaxAttempts: 8 +#deliverJobMaxAttempts: 12 +#inboxJobMaxAttempts: 8 +#backgroundJobMaxAttempts: 8 # IP address family used for outgoing request (ipv4, ipv6 or dual) #outgoingAddressFamily: ipv4 diff --git a/.config/docker_example.yml b/.config/docker_example.yml index 5905e3deed..8ca8d7ff50 100644 --- a/.config/docker_example.yml +++ b/.config/docker_example.yml @@ -307,21 +307,42 @@ id: 'aidx' # Number of worker processes #clusterLimit: 1 +# +-------------------------+ +# | Job concurrency options | +# +-------------------------+ +# +### Available options: +# [type]JobConcurrency - limits the number jobs that can run at the same time. +# Sharkey will allow this many jobs of this type *per worker process*. +# [type]JobPerSec - limits the total number of jobs that may complete within a single second. +# If this limit is exceeded, then Sharkey will pause this type of job until the next second. +# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped. +# If this limit is exceeded, then the job is considered "failed" and recorded for debugging. +# +### Job types: +# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances. +# All inbound activities are queued and processed in chronological order by this job. +# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances. +# All outbound activities are queued and processed in chronological order by this job. +# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs. +# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates. + # Job concurrency per worker #deliverJobConcurrency: 128 #inboxJobConcurrency: 16 #relationshipJobConcurrency: 16 -# What's relationshipJob?: -# Follow, unfollow, block and unblock(ings) while following-imports, etc. or account migrations. +#backgroundJobConcurrency: 32 # Job rate limiter #deliverJobPerSec: 128 #inboxJobPerSec: 32 #relationshipJobPerSec: 64 +#backgroundJobPerSec: 256 # Job attempts #deliverJobMaxAttempts: 12 #inboxJobMaxAttempts: 8 +#backgroundJobMaxAttempts: 8 # Local address used for outgoing requests #outgoingAddress: 127.0.0.1 diff --git a/.config/example.yml b/.config/example.yml index cffc333d14..6fa3f02026 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -310,21 +310,42 @@ id: 'aidx' # Number of worker processes #clusterLimit: 1 +# +-------------------------+ +# | Job concurrency options | +# +-------------------------+ +# +### Available options: +# [type]JobConcurrency - limits the number jobs that can run at the same time. +# Sharkey will allow this many jobs of this type *per worker process*. +# [type]JobPerSec - limits the total number of jobs that may complete within a single second. +# If this limit is exceeded, then Sharkey will pause this type of job until the next second. +# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped. +# If this limit is exceeded, then the job is considered "failed" and recorded for debugging. +# +### Job types: +# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances. +# All inbound activities are queued and processed in chronological order by this job. +# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances. +# All outbound activities are queued and processed in chronological order by this job. +# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs. +# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates. + # Job concurrency per worker #deliverJobConcurrency: 128 #inboxJobConcurrency: 16 #relationshipJobConcurrency: 16 -# What's relationshipJob?: -# Follow, unfollow, block and unblock(ings) while following-imports, etc. or account migrations. +#backgroundJobConcurrency: 32 # Job rate limiter #deliverJobPerSec: 128 #inboxJobPerSec: 32 #relationshipJobPerSec: 64 +#backgroundJobPerSec: 256 # Job attempts #deliverJobMaxAttempts: 12 #inboxJobMaxAttempts: 8 +#backgroundJobMaxAttempts: 8 # Local address used for outgoing requests #outgoingAddress: 127.0.0.1 diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index 5607f50eb7..be3892e942 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -111,11 +111,14 @@ type Source = { deliverJobConcurrency?: number; inboxJobConcurrency?: number; relationshipJobConcurrency?: number; + backgroundJobConcurrency?: number; deliverJobPerSec?: number; inboxJobPerSec?: number; relationshipJobPerSec?: number; + backgroundJobPerSec?: number; deliverJobMaxAttempts?: number; inboxJobMaxAttempts?: number; + backgroundJobMaxAttempts?: number; mediaDirectory?: string; mediaProxy?: string; @@ -272,11 +275,14 @@ export type Config = { deliverJobConcurrency: number | undefined; inboxJobConcurrency: number | undefined; relationshipJobConcurrency: number | undefined; + backgroundJobConcurrency: number | undefined; deliverJobPerSec: number | undefined; inboxJobPerSec: number | undefined; relationshipJobPerSec: number | undefined; + backgroundJobPerSec: number | undefined; deliverJobMaxAttempts: number | undefined; inboxJobMaxAttempts: number | undefined; + backgroundJobMaxAttempts: number | undefined; proxyRemoteFiles: boolean | undefined; customMOTD: string[] | undefined; signToActivityPubGet: boolean; @@ -475,11 +481,14 @@ export function loadConfig(loggerService: LoggerService): Config { deliverJobConcurrency: config.deliverJobConcurrency, inboxJobConcurrency: config.inboxJobConcurrency, relationshipJobConcurrency: config.relationshipJobConcurrency, + backgroundJobConcurrency: config.backgroundJobConcurrency, deliverJobPerSec: config.deliverJobPerSec, inboxJobPerSec: config.inboxJobPerSec, relationshipJobPerSec: config.relationshipJobPerSec, + backgroundJobPerSec: config.backgroundJobPerSec, deliverJobMaxAttempts: config.deliverJobMaxAttempts, inboxJobMaxAttempts: config.inboxJobMaxAttempts, + backgroundJobMaxAttempts: config.backgroundJobMaxAttempts, proxyRemoteFiles: config.proxyRemoteFiles, customMOTD: config.customMOTD, signToActivityPubGet: config.signToActivityPubGet ?? true, diff --git a/packages/backend/src/core/FetchInstanceMetadataService.ts b/packages/backend/src/core/FetchInstanceMetadataService.ts index d288c5d231..0ba61ce4f9 100644 --- a/packages/backend/src/core/FetchInstanceMetadataService.ts +++ b/packages/backend/src/core/FetchInstanceMetadataService.ts @@ -17,6 +17,7 @@ import { bindThis } from '@/decorators.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { TimeService } from '@/global/TimeService.js'; import { renderInlineError } from '@/misc/render-inline-error.js'; +import { QueueService } from '@/core/QueueService.js'; import type { CheerioAPI } from 'cheerio/slim'; type NodeInfo = { @@ -50,6 +51,7 @@ export class FetchInstanceMetadataService { private redisClient: Redis.Redis, private readonly timeService: TimeService, + private readonly queueService: QueueService, ) { this.logger = this.loggerService.getLogger('metadata', 'cyan'); } @@ -73,8 +75,21 @@ export class FetchInstanceMetadataService { return this.redisClient.del(`fetchInstanceMetadata:mutex:v2:${host}`); } + /** + * Schedules a deferred update on the background task worker. + * Duplicate updates are automatically skipped. + */ + @bindThis + public async fetchInstanceMetadataLazy(instance: MiInstance): Promise { + if (!instance.isBlocked) { + await this.queueService.createUpdateInstanceJob(instance.host); + } + } + @bindThis public async fetchInstanceMetadata(instance: MiInstance, force = false): Promise { + if (instance.isBlocked) return; + const host = instance.host; // finallyでunlockされてしまうのでtry内でロックチェックをしない diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 2f594394a6..078c6002e8 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -20,6 +20,7 @@ import { UserWebhookDeliverJobData, SystemWebhookDeliverJobData, ScheduleNotePostJobData, + BackgroundTaskJobData, } from '../queue/types.js'; import type { Provider } from '@nestjs/common'; @@ -33,6 +34,7 @@ export type ObjectStorageQueue = Bull.Queue; export type UserWebhookDeliverQueue = Bull.Queue; export type SystemWebhookDeliverQueue = Bull.Queue; export type ScheduleNotePostQueue = Bull.Queue; +export type BackgroundTaskQueue = Bull.Queue; const $system: Provider = { provide: 'queue:system', @@ -94,6 +96,12 @@ const $scheduleNotePost: Provider = { inject: [DI.config], }; +const $backgroundTask: Provider = { + provide: 'queue:backgroundTask', + useFactory: (config: Config) => new Bull.Queue(QUEUE.BACKGROUND_TASK, baseQueueOptions(config, QUEUE.BACKGROUND_TASK)), + inject: [DI.config], +}; + @Module({ imports: [ ], @@ -108,6 +116,7 @@ const $scheduleNotePost: Provider = { $userWebhookDeliver, $systemWebhookDeliver, $scheduleNotePost, + $backgroundTask, ], exports: [ $system, @@ -120,6 +129,7 @@ const $scheduleNotePost: Provider = { $userWebhookDeliver, $systemWebhookDeliver, $scheduleNotePost, + $backgroundTask, ], }) export class QueueModule implements OnApplicationShutdown { @@ -136,6 +146,7 @@ export class QueueModule implements OnApplicationShutdown { @Inject('queue:userWebhookDeliver') public userWebhookDeliverQueue: UserWebhookDeliverQueue, @Inject('queue:systemWebhookDeliver') public systemWebhookDeliverQueue: SystemWebhookDeliverQueue, @Inject('queue:scheduleNotePost') public scheduleNotePostQueue: ScheduleNotePostQueue, + @Inject('queue:backgroundTask') public readonly backgroundTaskQueue: BackgroundTaskQueue, ) {} public async dispose(): Promise { @@ -155,6 +166,7 @@ export class QueueModule implements OnApplicationShutdown { this.userWebhookDeliverQueue.close(), this.systemWebhookDeliverQueue.close(), this.scheduleNotePostQueue.close(), + this.backgroundTaskQueue.close(), ]).then(res => { for (const result of res) { if (result.status === 'rejected') { diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 9fd646e655..71716f0f6b 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -21,6 +21,7 @@ import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js'; import type { MiNote } from '@/models/Note.js'; import { type UserWebhookPayload } from './UserWebhookService.js'; import type { + BackgroundTaskJobData, DbJobData, DeliverJobData, RelationshipJobData, @@ -39,6 +40,7 @@ import type { SystemWebhookDeliverQueue, UserWebhookDeliverQueue, ScheduleNotePostQueue, + BackgroundTaskQueue, } from './QueueModule.js'; import type httpSignature from '@peertube/http-signature'; import type * as Bull from 'bullmq'; @@ -54,6 +56,7 @@ export const QUEUE_TYPES = [ 'userWebhookDeliver', 'systemWebhookDeliver', 'scheduleNotePost', + 'backgroundTask', ] as const; @Injectable() @@ -72,6 +75,7 @@ export class QueueService implements OnModuleInit { @Inject('queue:userWebhookDeliver') public userWebhookDeliverQueue: UserWebhookDeliverQueue, @Inject('queue:systemWebhookDeliver') public systemWebhookDeliverQueue: SystemWebhookDeliverQueue, @Inject('queue:scheduleNotePost') public ScheduleNotePostQueue: ScheduleNotePostQueue, + @Inject('queue:backgroundTask') public readonly backgroundTaskQueue: BackgroundTaskQueue, private readonly timeService: TimeService, ) {} @@ -839,6 +843,78 @@ export class QueueService implements OnModuleInit { }); } + @bindThis + public async createUpdateUserJob(userId: string) { + return await this.createBackgroundTask( + 'update-user', + { + type: 'update-user', + userId, + }, + { + id: `update-user:${userId}`, + // ttl: 1000 * 60 * 60 * 24, + }, + ); + } + + @bindThis + public async createUpdateFeaturedJob(userId: string) { + return await this.createBackgroundTask( + 'update-featured', + { + type: 'update-featured', + userId, + }, + { + id: `update-featured:${userId}`, + // ttl: 1000 * 60 * 60 * 24, + }, + ); + } + + @bindThis + public async createUpdateInstanceJob(host: string) { + return await this.createBackgroundTask( + 'update-instance', + { + type: 'update-instance', + host, + }, + { + id: `update-instance:${host}`, + // ttl: 1000 * 60 * 60 * 24, + }, + ); + } + + private async createBackgroundTask(name: string, data: BackgroundTaskJobData, duplication: { id: string, ttl?: number }) { + return await this.backgroundTaskQueue.add( + name, + data, + { + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, + + // https://docs.bullmq.io/guide/retrying-failing-jobs#custom-back-off-strategies + attempts: this.config.backgroundJobMaxAttempts ?? 8, + backoff: { + // Resolves to QueueProcessorService::HttpRelatedBackoff() + type: 'custom', + }, + + // https://docs.bullmq.io/guide/jobs/deduplication + deduplication: duplication, + }, + ); + }; + /** * @see UserWebhookDeliverJobData * @see UserWebhookDeliverProcessorService diff --git a/packages/backend/src/core/activitypub/ApInboxService.ts b/packages/backend/src/core/activitypub/ApInboxService.ts index 91eba793d6..309259120d 100644 --- a/packages/backend/src/core/activitypub/ApInboxService.ts +++ b/packages/backend/src/core/activitypub/ApInboxService.ts @@ -153,11 +153,10 @@ export class ApInboxService { // ついでにリモートユーザーの情報が古かったら更新しておく if (actor.uri) { if (actor.lastFetchedAt == null || this.timeService.now - actor.lastFetchedAt.getTime() > 1000 * 60 * 60 * 24) { - setImmediate(() => { + { // 同一ユーザーの情報を再度処理するので、使用済みのresolverを再利用してはいけない - this.apPersonService.updatePerson(actor.uri) - .catch(err => this.logger.error(`Failed to update person: ${renderInlineError(err)}`)); - }); + await this.apPersonService.updatePersonLazy(actor); + } } } return result; @@ -441,25 +440,15 @@ export class ApInboxService { this.instanceChart.requestReceived(i.host).then(); } - this.fetchInstanceMetadataService.fetchInstanceMetadata(i).then(); + await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i); }); // Process it! - return await this.performOneActivity(actor, activity, resolver) - .finally(() => { - // Update user (adapted from performActivity) - if (actor.lastFetchedAt == null || this.timeService.now - actor.lastFetchedAt.getTime() > 1000 * 60 * 60 * 24) { - setImmediate(() => { - // Don't re-use the resolver, or it may throw recursion errors. - // Instead, create a new resolver with an appropriately-reduced recursion limit. - const subResolver = this.apResolverService.createResolver({ - recursionLimit: resolver.getRecursionLimit() - resolver.getHistory().length, - }); - this.apPersonService.updatePerson(actor.uri, subResolver) - .catch(err => this.logger.error(`Failed to update person: ${renderInlineError(err)}`)); - }); - } - }); + try { + return await this.performOneActivity(actor, activity, resolver); + } finally { + await this.apPersonService.updatePersonLazy(actor); + } } @bindThis diff --git a/packages/backend/src/core/activitypub/models/ApPersonService.ts b/packages/backend/src/core/activitypub/models/ApPersonService.ts index 4ce0e8db3f..096480047f 100644 --- a/packages/backend/src/core/activitypub/models/ApPersonService.ts +++ b/packages/backend/src/core/activitypub/models/ApPersonService.ts @@ -579,7 +579,7 @@ export class ApPersonService implements OnModuleInit { if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.newUser(i.host); } - this.fetchInstanceMetadataService.fetchInstanceMetadata(i); + this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i); }); } @@ -604,16 +604,26 @@ export class ApPersonService implements OnModuleInit { } //#endregion - await this.updateFeatured(user.id, resolver).catch(err => { - // Permanent error implies hidden or inaccessible, which is a normal thing. - if (isRetryableError(err)) { - this.logger.error(`Error updating featured notes: ${renderInlineError(err)}`); - } - }); + await this.updateFeaturedLazy(user); return user; } + /** + * Schedules a deferred update on the background task worker. + * Duplicate updates are automatically skipped. + */ + @bindThis + public async updatePersonLazy(uriOrUser: string | MiUser): Promise { + const user = typeof(uriOrUser) === 'string' + ? await this.fetchPerson(uriOrUser) + : uriOrUser; + + if (user && user.host != null) { + await this.queueService.createUpdateUserJob(user.id); + } + } + /** * Personの情報を更新します。 * Misskeyに対象のPersonが登録されていなければ無視します。 @@ -817,12 +827,7 @@ export class ApPersonService implements OnModuleInit { await this.cacheService.refreshFollowRelationsFor(exist.id); } - await this.updateFeatured(exist.id, resolver).catch(err => { - // Permanent error implies hidden or inaccessible, which is a normal thing. - if (isRetryableError(err)) { - this.logger.error(`Error updating featured notes: ${renderInlineError(err)}`); - } - }); + await this.updateFeaturedLazy(exist); const updated = { ...exist, ...updates }; @@ -902,9 +907,24 @@ export class ApPersonService implements OnModuleInit { return fields; } + /** + * Schedules a deferred update on the background task worker. + * Duplicate updates are automatically skipped. + */ @bindThis - public async updateFeatured(userId: MiUser['id'], resolver?: Resolver): Promise { - const user = await this.usersRepository.findOneByOrFail({ id: userId, isDeleted: false }); + public async updateFeaturedLazy(userOrId: MiUser | MiUser['id']): Promise { + const userId = typeof(userOrId) === 'object' ? userOrId.id : userOrId; + const user = typeof(userOrId) === 'object' ? userOrId : await this.usersRepository.findOneByOrFail({ id: userId, isDeleted: false }); + + if (isRemoteUser(user) && user.featured) { + await this.queueService.createUpdateFeaturedJob(userId); + } + } + + @bindThis + public async updateFeatured(userOrId: MiUser | MiUser['id'], resolver?: Resolver): Promise { + const userId = typeof(userOrId) === 'object' ? userOrId.id : userOrId; + const user = typeof(userOrId) === 'object' ? userOrId : await this.usersRepository.findOneByOrFail({ id: userId, isDeleted: false }); if (!isRemoteUser(user)) return; if (!user.featured) return; diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index b6469229d2..d76b29340d 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -46,7 +46,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js'; import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js'; - +import { BackgroundTaskProcessorService } from './processors/BackgroundTaskProcessorService.js'; @Module({ imports: [ CoreModule, @@ -93,6 +93,7 @@ import { HibernateUsersProcessorService } from './processors/HibernateUsersProce ScheduleNotePostProcessorService, CleanupApLogsProcessorService, HibernateUsersProcessorService, + BackgroundTaskProcessorService, ], exports: [ QueueProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index cd85db3122..b04d19618f 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -14,6 +14,7 @@ import { CheckModeratorsActivityProcessorService } from '@/queue/processors/Chec import { TimeService } from '@/global/TimeService.js'; import { renderFullError } from '@/misc/render-full-error.js'; import { renderInlineError } from '@/misc/render-inline-error.js'; +import { isRetryableError } from '@/misc/is-retryable-error.js'; import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; @@ -53,9 +54,16 @@ import { QUEUE, baseWorkerOptions } from './const.js'; import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js'; import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js'; +import { BackgroundTaskProcessorService } from './processors/BackgroundTaskProcessorService.js'; // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 -function httpRelatedBackoff(attemptsMade: number) { +function httpRelatedBackoff(attemptsMade: number, type?: string, error?: Error) { + // Don't retry permanent errors + // https://docs.bullmq.io/guide/retrying-failing-jobs#custom-back-off-strategies + if (error && !isRetryableError(error)) { + return -1; + } + const baseDelay = 60 * 1000; // 1min const maxBackoff = 8 * 60 * 60 * 1000; // 8hours let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; @@ -95,6 +103,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; private schedulerNotePostQueueWorker: Bull.Worker; + private readonly backgroundTaskWorker: Bull.Worker; constructor( @Inject(DI.config) @@ -140,6 +149,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private readonly timeService: TimeService, private readonly cleanupApLogsProcessorService: CleanupApLogsProcessorService, private readonly hibernateUsersProcessorService: HibernateUsersProcessorService, + private readonly backgroundTaskProcessorService: BackgroundTaskProcessorService, ) { this.logger = this.queueLoggerService.logger; @@ -565,6 +575,39 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion + + //#region background tasks + { + const logger = this.logger.createSubLogger('backgroundTask'); + + this.backgroundTaskWorker = new Bull.Worker(QUEUE.BACKGROUND_TASK, (job) => this.backgroundTaskProcessorService.process(job), { + ...baseWorkerOptions(this.config, QUEUE.BACKGROUND_TASK), + autorun: false, + concurrency: this.config.backgroundJobConcurrency ?? 32, + limiter: { + max: this.config.backgroundJobPerSec ?? 256, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); + this.backgroundTaskWorker + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => { + this.logError(logger, err, job); + if (config.sentryForBackend) { + Sentry.captureMessage(`Queue: ${QUEUE.BACKGROUND_TASK}: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { + level: 'error', + extra: { job, err }, + }); + } + }) + .on('error', (err: Error) => this.logError(logger, err)) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); + } + //#endregion } private logError(logger: Logger, err: unknown, job?: Bull.Job | null): void { @@ -606,6 +649,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), this.schedulerNotePostQueueWorker.run(), + this.backgroundTaskWorker.run(), ]); } @@ -622,6 +666,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), this.schedulerNotePostQueueWorker.close(), + this.backgroundTaskWorker.close(), ]).then(res => { for (const result of res) { if (result.status === 'rejected') { diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 17c6b81736..44192c280e 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -18,6 +18,7 @@ export const QUEUE = { USER_WEBHOOK_DELIVER: 'userWebhookDeliver', SYSTEM_WEBHOOK_DELIVER: 'systemWebhookDeliver', SCHEDULE_NOTE_POST: 'scheduleNotePost', + BACKGROUND_TASK: 'backgroundTask', }; export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { diff --git a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts new file mode 100644 index 0000000000..b237990a4c --- /dev/null +++ b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts @@ -0,0 +1,93 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import * as Bull from 'bullmq'; +import { BackgroundTaskJobData, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserBackgroundTask } from '@/queue/types.js'; +import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js'; +import { QueueLoggerService } from '@/queue/QueueLoggerService.js'; +import Logger from '@/logger.js'; +import { isRetryableError } from '@/misc/is-retryable-error.js'; +import { DI } from '@/di-symbols.js'; +import type { Config } from '@/config.js'; +import { CacheService } from '@/core/CacheService.js'; +import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; +import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; + +@Injectable() +export class BackgroundTaskProcessorService { + private readonly logger: Logger; + + constructor( + @Inject(DI.config) + private readonly config: Config, + + private readonly apPersonService: ApPersonService, + private readonly cacheService: CacheService, + private readonly federatedInstanceService: FederatedInstanceService, + private readonly fetchInstanceMetadataService: FetchInstanceMetadataService, + + queueLoggerService: QueueLoggerService, + ) { + this.logger = queueLoggerService.logger.createSubLogger('background-task'); + } + + public async process(job: Bull.Job): Promise { + if (job.data.type === 'update-user') { + return await this.processUpdateUser(job.data); + } else if (job.data.type === 'update-featured') { + return await this.processUpdateFeatured(job.data); + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + } else if (job.data.type === 'update-instance') { + return await this.processUpdateInstance(job.data); + } else { + this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data); + throw new Error(`Unknown job type ${job.data}, see system logs for details`); + } + } + + private async processUpdateUser(task: UpdateUserBackgroundTask): Promise { + const user = await this.cacheService.findOptionalUserById(task.userId); + if (!user || user.isDeleted) return `Skipping update-user task: user ${task.userId} has been deleted`; + if (user.isSuspended) return `Skipping update-user task: user ${task.userId} is suspended`; + if (!user.uri) return `Skipping update-user task: user ${task.userId} is local`; + + if (user.lastFetchedAt && Date.now() - user.lastFetchedAt.getTime() < 1000 * 60 * 60 * 24) { + return `Skipping update-user task: user ${task.userId} was recently updated`; + } + + await this.apPersonService.updatePerson(user.uri); + return 'ok'; + } + + private async processUpdateFeatured(task: UpdateFeaturedBackgroundTask): Promise { + const user = await this.cacheService.findOptionalUserById(task.userId); + if (!user || user.isDeleted) return `Skipping update-featured task: user ${task.userId} has been deleted`; + if (user.isSuspended) return `Skipping update-featured task: user ${task.userId} is suspended`; + if (!user.uri) return `Skipping update-featured task: user ${task.userId} is local`; + if (!user.featured) return `Skipping update-featured task: user ${task.userId} has no featured collection`; + + if (user.lastFetchedAt && Date.now() - user.lastFetchedAt.getTime() < 1000 * 60 * 60 * 24) { + return `Skipping update-featured task: user ${task.userId} was recently updated`; + } + + await this.apPersonService.updateFeatured(user); + return 'ok'; + } + + private async processUpdateInstance(task: UpdateInstanceBackgroundTask): Promise { + const instance = await this.federatedInstanceService.fetch(task.host); + if (!instance) return `Skipping update-instance task: instance ${task.host} has been deleted`; + if (instance.isBlocked) return `Skipping update-instance task: instance ${task.host} is blocked`; + + if (instance.infoUpdatedAt && Date.now() - instance.infoUpdatedAt.getTime() < 1000 * 60 * 60 * 24) { + return `Skipping update-instance task: instance ${task.host} was recently updated`; + } + + await this.fetchInstanceMetadataService.fetchInstanceMetadata(instance); + return 'ok'; + } +} diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 0b1ef03a7a..792ec4b015 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -80,21 +80,21 @@ export class DeliverProcessorService { if (i == null) return; if (i.isNotResponding) { - this.federatedInstanceService.update(i.id, { + await this.federatedInstanceService.update(i.id, { isNotResponding: false, notRespondingSince: null, }); } if (this.meta.enableChartsForFederatedInstances) { - this.instanceChart.requestSent(i.host, true); + await this.instanceChart.requestSent(i.host, true); } }); return 'Success'; } catch (res) { - this.apRequestChart.deliverFail(); - this.federationChart.deliverd(host, false); + await this.apRequestChart.deliverFail(); + await this.federationChart.deliverd(host, false); // Update instance stats this.federatedInstanceService.fetchOrRegister(host).then(i => { diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 13b2885263..3b73e98e5c 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -275,10 +275,10 @@ export class InboxProcessorService implements OnApplicationShutdown { }); if (this.meta.enableChartsForFederatedInstances) { - this.instanceChart.requestReceived(i.host); + await this.instanceChart.requestReceived(i.host); } - this.fetchInstanceMetadataService.fetchInstanceMetadata(i); + await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i); }); // アクティビティを処理 diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 6dc9f88034..0c3d790a97 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -168,3 +168,23 @@ export type ThinUser = { export type ScheduleNotePostJobData = { scheduleNoteId: MiNote['id']; }; + +export type BackgroundTaskJobData = + UpdateUserBackgroundTask | + UpdateFeaturedBackgroundTask | + UpdateInstanceBackgroundTask; + +export type UpdateUserBackgroundTask = { + type: 'update-user'; + userId: string; +}; + +export type UpdateFeaturedBackgroundTask = { + type: 'update-featured'; + userId: string; +}; + +export type UpdateInstanceBackgroundTask = { + type: 'update-instance'; + host: string; +}; diff --git a/packages/backend/test/misc/immediateBackgroundTasks.ts b/packages/backend/test/misc/immediateBackgroundTasks.ts new file mode 100644 index 0000000000..fd059e3f10 --- /dev/null +++ b/packages/backend/test/misc/immediateBackgroundTasks.ts @@ -0,0 +1,50 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import type { IObject } from '@/core/activitypub/type.js'; +import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js'; +import { MiRemoteUser, MiUser } from '@/models/User.js'; +import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; +import { MiInstance } from '@/models/Instance.js'; +import { Resolver } from '@/core/activitypub/ApResolverService.js'; +import { bindThis } from '@/decorators.js'; + +export class ImmediateApPersonService extends ApPersonService { + @bindThis + async createPerson(uri: string, resolver?: Resolver): Promise { + const user = await super.createPerson(uri, resolver); + await this.updateFeatured(user, resolver); + return user; + } + + @bindThis + async updatePerson(uri: string, resolver?: Resolver | null, hint?: IObject, movePreventUris: string[] = []): Promise { + const result = await super.updatePerson(uri, resolver, hint, movePreventUris); + + const user = await this.fetchPerson(uri); + if (user == null) throw new Error('updated user is null, did you forget to mock out caches?'); + await this.updateFeatured(user, resolver ?? undefined); + + return result; + } + + @bindThis + async updatePersonLazy(uriOrUser: string | MiUser): Promise { + const userId = typeof(uriOrUser) === 'object' ? uriOrUser.id : uriOrUser; + await this.updatePerson(userId); + } + + @bindThis + async updateFeaturedLazy(userOrId: string | MiUser): Promise { + await this.updateFeatured(userOrId); + } +} + +export class ImmediateFetchInstanceMetadataService extends FetchInstanceMetadataService { + @bindThis + async fetchInstanceMetadataLazy(instance: MiInstance): Promise { + return await this.fetchInstanceMetadata(instance); + } +}