From 41e50eeb0ee7a3a149db6544b1d1a35e52da1a2e Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 18 Jun 2025 01:53:59 -0400 Subject: [PATCH] add update-user-tags, update-note-tags, post-deliver, post-inbox, post-note, and check-hibernation background tasks --- ...1748990662839-fix-IDX_instance_host_key.js | 2 + ...991828473-create-IDX_note_for_timelines.js | 2 + ...017688-create-IDX_instance_host_filters.js | 2 + .../1748992128683-create-statistics.js | 2 + ...1749097536193-fix-IDX_note_for_timeline.js | 2 + ...016885-remove-IDX_instance_host_filters.js | 2 + packages/backend/src/core/HashtagService.ts | 12 +- .../backend/src/core/NoteCreateService.ts | 19 +- packages/backend/src/core/NoteEditService.ts | 13 +- packages/backend/src/core/QueueService.ts | 87 +++++++++- packages/backend/src/core/ReactionService.ts | 1 + .../src/core/activitypub/ApInboxService.ts | 28 +-- .../activitypub/models/ApPersonService.ts | 14 +- .../BackgroundTaskProcessorService.ts | 162 +++++++++++++++++- .../processors/DeliverProcessorService.ts | 57 +----- .../queue/processors/InboxProcessorService.ts | 26 +-- packages/backend/src/queue/types.ts | 41 ++++- .../src/server/api/endpoints/i/update.ts | 8 +- 18 files changed, 331 insertions(+), 149 deletions(-) diff --git a/packages/backend/migration/1748990662839-fix-IDX_instance_host_key.js b/packages/backend/migration/1748990662839-fix-IDX_instance_host_key.js index fc6d303743..e423ecd1b6 100644 --- a/packages/backend/migration/1748990662839-fix-IDX_instance_host_key.js +++ b/packages/backend/migration/1748990662839-fix-IDX_instance_host_key.js @@ -4,6 +4,8 @@ */ export class FixIDXInstanceHostKey1748990662839 { + name = 'FixIDXInstanceHostKey1748990662839'; + async up(queryRunner) { // must include host for index-only scans: https://www.postgresql.org/docs/current/indexes-index-only-scans.html await queryRunner.query(`DROP INDEX "public"."IDX_instance_host_key"`); diff --git a/packages/backend/migration/1748991828473-create-IDX_note_for_timelines.js b/packages/backend/migration/1748991828473-create-IDX_note_for_timelines.js index 2ea7fe95d2..54debcee27 100644 --- a/packages/backend/migration/1748991828473-create-IDX_note_for_timelines.js +++ b/packages/backend/migration/1748991828473-create-IDX_note_for_timelines.js @@ -4,6 +4,8 @@ */ export class CreateIDXNoteForTimelines1748991828473 { + name = 'CreateIDXNoteForTimelines1748991828473'; + async up(queryRunner) { await queryRunner.query(` create index "IDX_note_for_timelines" diff --git a/packages/backend/migration/1748992017688-create-IDX_instance_host_filters.js b/packages/backend/migration/1748992017688-create-IDX_instance_host_filters.js index 76cf16a6de..24b2b1894f 100644 --- a/packages/backend/migration/1748992017688-create-IDX_instance_host_filters.js +++ b/packages/backend/migration/1748992017688-create-IDX_instance_host_filters.js @@ -4,6 +4,8 @@ */ export class CreateIDXInstanceHostFilters1748992017688 { + name = 'CreateIDXInstanceHostFilters1748992017688'; + async up(queryRunner) { await queryRunner.query(` create index "IDX_instance_host_filters" diff --git a/packages/backend/migration/1748992128683-create-statistics.js b/packages/backend/migration/1748992128683-create-statistics.js index 5d08868536..daa50332ff 100644 --- a/packages/backend/migration/1748992128683-create-statistics.js +++ b/packages/backend/migration/1748992128683-create-statistics.js @@ -4,6 +4,8 @@ */ export class CreateStatistics1748992128683 { + name = 'CreateStatistics1748992128683'; + async up(queryRunner) { await queryRunner.query(`CREATE STATISTICS "STTS_instance_isBlocked_isBubbled" (mcv) ON "isBlocked", "isBubbled" FROM "instance"`); await queryRunner.query(`CREATE STATISTICS "STTS_instance_isBlocked_isSilenced" (mcv) ON "isBlocked", "isSilenced" FROM "instance"`); diff --git a/packages/backend/migration/1749097536193-fix-IDX_note_for_timeline.js b/packages/backend/migration/1749097536193-fix-IDX_note_for_timeline.js index 9a651e5871..57c5579110 100644 --- a/packages/backend/migration/1749097536193-fix-IDX_note_for_timeline.js +++ b/packages/backend/migration/1749097536193-fix-IDX_note_for_timeline.js @@ -4,6 +4,8 @@ */ export class FixIDXNoteForTimeline1749097536193 { + name = 'FixIDXNoteForTimeline1749097536193'; + async up(queryRunner) { await queryRunner.query('drop index "IDX_note_for_timelines"'); await queryRunner.query(` diff --git a/packages/backend/migration/1749267016885-remove-IDX_instance_host_filters.js b/packages/backend/migration/1749267016885-remove-IDX_instance_host_filters.js index d0a4e4f91e..4236399a6e 100644 --- a/packages/backend/migration/1749267016885-remove-IDX_instance_host_filters.js +++ b/packages/backend/migration/1749267016885-remove-IDX_instance_host_filters.js @@ -4,6 +4,8 @@ */ export class RemoveIDXInstanceHostFilters1749267016885 { + name = 'RemoveIDXInstanceHostFilters1749267016885'; + async up(queryRunner) { await queryRunner.query(`DROP INDEX IF EXISTS "IDX_instance_host_filters"`); } diff --git a/packages/backend/src/core/HashtagService.ts b/packages/backend/src/core/HashtagService.ts index 0035c4b0d5..b9945d34b3 100644 --- a/packages/backend/src/core/HashtagService.ts +++ b/packages/backend/src/core/HashtagService.ts @@ -59,7 +59,7 @@ export class HashtagService { tag = normalizeForSearch(tag); // TODO: サンプリング - this.updateHashtagsRanking(tag, user.id); + await this.updateHashtagsRanking(tag, user.id); const index = await this.hashtagsRepository.findOneBy({ name: tag }); @@ -119,11 +119,11 @@ export class HashtagService { if (Object.keys(set).length > 0) { q.set(set); - q.execute(); + await q.execute(); } } else { if (isUserAttached) { - this.hashtagsRepository.insert({ + await this.hashtagsRepository.insert({ id: this.idService.gen(), name: tag, mentionedUserIds: [], @@ -140,7 +140,7 @@ export class HashtagService { attachedRemoteUsersCount: isRemoteUser(user) ? 1 : 0, } as MiHashtag); } else { - this.hashtagsRepository.insert({ + await this.hashtagsRepository.insert({ id: this.idService.gen(), name: tag, mentionedUserIds: [user.id], @@ -174,7 +174,7 @@ export class HashtagService { const exist = await this.redisClient.sismember(`hashtagUsers:${hashtag}`, userId); if (exist === 1) return; - this.featuredService.updateHashtagsRanking(hashtag, 1); + await this.featuredService.updateHashtagsRanking(hashtag, 1); const redisPipeline = this.redisClient.pipeline(); @@ -193,7 +193,7 @@ export class HashtagService { 'NX', // "NX -- Set expiry only when the key has no expiry" = 有効期限がないときだけ設定 ); - redisPipeline.exec(); + await redisPipeline.exec(); } @bindThis diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index fd55c33bfb..484f7e01d0 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -458,10 +458,10 @@ export class NoteCreateService implements OnApplicationShutdown { const note = await this.insertNote(user, data, tags, emojis, mentionedUsers); - setImmediate('post created', { signal: this.#shutdownController.signal }).then( - () => this.postNoteCreated(note, user, data, silent, tags!, mentionedUsers!), - () => { /* aborted, ignore this */ }, - ); + // Update the Latest Note index / following feed + this.latestNoteService.handleCreatedNoteBG(note); + + await this.queueService.createPostNoteJob(note.id, silent, 'create'); return note; } @@ -577,7 +577,7 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - private async postNoteCreated(note: MiNote, user: MiUser & { + public async postNoteCreated(note: MiNote, user: MiUser & { id: MiUser['id']; username: MiUser['username']; host: MiUser['host']; @@ -606,7 +606,7 @@ export class NoteCreateService implements OnApplicationShutdown { // ハッシュタグ更新 if (data.visibility === 'public' || data.visibility === 'home') { if (!user.isBot || this.meta.enableBotTrending) { - this.hashtagService.updateHashtags(user, tags); + await this.queueService.createUpdateNoteTagsJob(note.id); } } @@ -807,9 +807,6 @@ export class NoteCreateService implements OnApplicationShutdown { }); } - // Update the Latest Note index / following feed - this.latestNoteService.handleCreatedNoteBG(note); - // Register to search database if (!user.noindex) this.index(note); } @@ -1100,8 +1097,8 @@ export class NoteCreateService implements OnApplicationShutdown { // Instance cannot quote if (user.host) { - const instance = await this.federatedInstanceService.fetch(user.host); - if (instance?.rejectQuotes) { + const instance = await this.federatedInstanceService.fetchOrRegister(user.host); + if (instance.rejectQuotes) { (data as Option).renote = null; (data.processErrors ??= []).push('quoteUnavailable'); } diff --git a/packages/backend/src/core/NoteEditService.ts b/packages/backend/src/core/NoteEditService.ts index 20afc4e63c..21991e2966 100644 --- a/packages/backend/src/core/NoteEditService.ts +++ b/packages/backend/src/core/NoteEditService.ts @@ -588,10 +588,10 @@ export class NoteEditService implements OnApplicationShutdown { // Re-fetch note to get the default values of null / unset fields. const edited = await this.notesRepository.findOneByOrFail({ id: note.id }); - setImmediate('post edited', { signal: this.#shutdownController.signal }).then( - () => this.postNoteEdited(edited, oldnote, user, data, silent, tags!, mentionedUsers!), - () => { /* aborted, ignore this */ }, - ); + // Update the Latest Note index / following feed + this.latestNoteService.handleUpdatedNoteBG(edited, oldnote); + + await this.queueService.createPostNoteJob(note.id, silent, 'edit'); return edited; } else { @@ -600,7 +600,7 @@ export class NoteEditService implements OnApplicationShutdown { } @bindThis - private async postNoteEdited(note: MiNote, oldNote: MiNote, user: MiUser & { + public async postNoteEdited(note: MiNote, user: MiUser & { id: MiUser['id']; username: MiUser['username']; host: MiUser['host']; @@ -754,9 +754,6 @@ export class NoteEditService implements OnApplicationShutdown { }); } - // Update the Latest Note index / following feed - this.latestNoteService.handleUpdatedNoteBG(oldNote, note); - // Register to search database if (!user.noindex) this.index(note); } diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 15c50e4d83..98b4d6cd8c 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -853,7 +853,6 @@ export class QueueService implements OnModuleInit { }, { id: `update-user:${userId}`, - // ttl: 1000 * 60 * 60 * 24, }, ); } @@ -868,7 +867,6 @@ export class QueueService implements OnModuleInit { }, { id: `update-featured:${userId}`, - // ttl: 1000 * 60 * 60 * 24, }, ); } @@ -883,12 +881,93 @@ export class QueueService implements OnModuleInit { }, { id: `update-instance:${host}`, - // ttl: 1000 * 60 * 60 * 24, }, ); } - private async createBackgroundTask(name: string, data: BackgroundTaskJobData, duplication: { id: string, ttl?: number }) { + @bindThis + public async createPostDeliverJob(host: string, result: 'success' | 'temp-fail' | 'perm-fail') { + return await this.createBackgroundTask( + 'post-deliver', + { + type: 'post-deliver', + host, + result, + }, + ); + } + + @bindThis + public async createPostInboxJob(host: string) { + return await this.createBackgroundTask( + 'post-inbox', + { + type: 'post-inbox', + host, + }, + ); + } + + @bindThis + public async createPostNoteJob(noteId: string, silent: boolean, type: 'create' | 'edit') { + return await this.createBackgroundTask( + 'post-note', + { + type: 'post-note', + noteId, + silent, + edit: type === 'edit', + }, + { + id: `post-note:${noteId}:${type}`, + }, + ); + } + + @bindThis + public async createCheckHibernationJob(userId: string) { + return await this.createBackgroundTask( + 'check-hibernation', + { + type: 'check-hibernation', + userId, + }, + { + id: `check-hibernation:${userId}`, + ttl: 1000 * 60 * 60 * 24, // This is a very heavy task, so only run once per day per user + }, + ); + } + + @bindThis + public async createUpdateUserTagsJob(userId: string) { + return await this.createBackgroundTask( + 'update-user-tags', + { + type: 'update-user-tags', + userId, + }, + { + id: `update-user-tags:${userId}`, + }, + ); + } + + @bindThis + public async createUpdateNoteTagsJob(noteId: string) { + return await this.createBackgroundTask( + 'update-note-tags', + { + type: 'update-note-tags', + noteId, + }, + { + id: `update-note-tags:${noteId}`, + }, + ); + } + + private async createBackgroundTask(name: string, data: BackgroundTaskJobData, duplication?: { id: string, ttl?: number }) { return await this.backgroundTaskQueue.add( name, data, diff --git a/packages/backend/src/core/ReactionService.ts b/packages/backend/src/core/ReactionService.ts index d3ab48e3ff..27a6d7b514 100644 --- a/packages/backend/src/core/ReactionService.ts +++ b/packages/backend/src/core/ReactionService.ts @@ -340,6 +340,7 @@ export class ReactionService implements OnModuleInit { .execute(); } + // TODO update caches this.usersRepository.update({ id: user.id }, { updatedAt: this.timeService.date }); this.globalEventService.publishNoteStream(note.id, 'unreacted', { diff --git a/packages/backend/src/core/activitypub/ApInboxService.ts b/packages/backend/src/core/activitypub/ApInboxService.ts index 309259120d..de37944344 100644 --- a/packages/backend/src/core/activitypub/ApInboxService.ts +++ b/packages/backend/src/core/activitypub/ApInboxService.ts @@ -33,10 +33,6 @@ import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { fromTuple } from '@/misc/from-tuple.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; import { renderInlineError } from '@/misc/render-inline-error.js'; -import InstanceChart from '@/core/chart/charts/instance.js'; -import FederationChart from '@/core/chart/charts/federation.js'; -import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; -import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { CacheService } from '@/core/CacheService.js'; import { NoteVisibilityService } from '@/core/NoteVisibilityService.js'; import { TimeService } from '@/global/TimeService.js'; @@ -97,10 +93,6 @@ export class ApInboxService { private queueService: QueueService, private globalEventService: GlobalEventService, private readonly federatedInstanceService: FederatedInstanceService, - private readonly fetchInstanceMetadataService: FetchInstanceMetadataService, - private readonly instanceChart: InstanceChart, - private readonly federationChart: FederationChart, - private readonly updateInstanceQueue: UpdateInstanceQueue, private readonly cacheService: CacheService, private readonly noteVisibilityService: NoteVisibilityService, private readonly timeService: TimeService, @@ -423,25 +415,7 @@ export class ApInboxService { } // Update stats (adapted from InboxProcessorService) - this.federationChart.inbox(actor.host).then(); - process.nextTick(async () => { - const i = await (this.meta.enableStatsForFederatedInstances - ? this.federatedInstanceService.fetchOrRegister(actor.host) - : this.federatedInstanceService.fetch(actor.host)); - - if (i == null) return; - - this.updateInstanceQueue.enqueue(i.id, { - latestRequestReceivedAt: this.timeService.date, - shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', - }); - - if (this.meta.enableChartsForFederatedInstances) { - this.instanceChart.requestReceived(i.host).then(); - } - - await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i); - }); + await this.queueService.createPostInboxJob(actor.host); // Process it! try { diff --git a/packages/backend/src/core/activitypub/models/ApPersonService.ts b/packages/backend/src/core/activitypub/models/ApPersonService.ts index 096480047f..f153d51c9d 100644 --- a/packages/backend/src/core/activitypub/models/ApPersonService.ts +++ b/packages/backend/src/core/activitypub/models/ApPersonService.ts @@ -585,9 +585,6 @@ export class ApPersonService implements OnModuleInit { this.usersChart.update(user, true); - // ハッシュタグ更新 - this.hashtagService.updateUsertags(user, tags); - //#region アバターとヘッダー画像をフェッチ try { const updates = await this.resolveAvatarAndBanner(user, person.icon, person.image, person.backgroundUrl); @@ -604,6 +601,9 @@ export class ApPersonService implements OnModuleInit { } //#endregion + // ハッシュタグ更新 + await this.queueService.createUpdateUserTagsJob(user.id); + await this.updateFeaturedLazy(user); return user; @@ -811,9 +811,6 @@ export class ApPersonService implements OnModuleInit { this.globalEventService.publishInternalEvent('remoteUserUpdated', { id: exist.id }); - // ハッシュタグ更新 - this.hashtagService.updateUsertags(exist, tags); - // 該当ユーザーが既にフォロワーになっていた場合はFollowingもアップデートする if (exist.inbox !== person.inbox || exist.sharedInbox !== (person.sharedInbox ?? person.endpoints?.sharedInbox)) { await this.followingsRepository.update( @@ -827,6 +824,9 @@ export class ApPersonService implements OnModuleInit { await this.cacheService.refreshFollowRelationsFor(exist.id); } + // ハッシュタグ更新 + await this.queueService.createUpdateUserTagsJob(exist.id); + await this.updateFeaturedLazy(exist); const updated = { ...exist, ...updates }; @@ -967,7 +967,7 @@ export class ApPersonService implements OnModuleInit { let td = 0; for (const note of featuredNotes.filter(x => x != null)) { td -= 1000; - transactionalEntityManager.insert(MiUserNotePining, { + await transactionalEntityManager.insert(MiUserNotePining, { id: this.idService.gen(this.timeService.now + td), userId: user.id, noteId: note.id, diff --git a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts index b237990a4c..341c54883e 100644 --- a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts +++ b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts @@ -5,30 +5,46 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Bull from 'bullmq'; -import { BackgroundTaskJobData, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserBackgroundTask } from '@/queue/types.js'; +import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask } from '@/queue/types.js'; import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js'; import { QueueLoggerService } from '@/queue/QueueLoggerService.js'; import Logger from '@/logger.js'; -import { isRetryableError } from '@/misc/is-retryable-error.js'; import { DI } from '@/di-symbols.js'; -import type { Config } from '@/config.js'; import { CacheService } from '@/core/CacheService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; -import { renderInlineError } from '@/misc/render-inline-error.js'; +import { MiMeta } from '@/models/Meta.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import ApRequestChart from '@/core/chart/charts/ap-request.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; +import { NoteCreateService } from '@/core/NoteCreateService.js'; +import type { NotesRepository } from '@/models/_.js'; +import { NoteEditService } from '@/core/NoteEditService.js'; +import { HashtagService } from '@/core/HashtagService.js'; @Injectable() export class BackgroundTaskProcessorService { private readonly logger: Logger; constructor( - @Inject(DI.config) - private readonly config: Config, + @Inject(DI.meta) + private readonly meta: MiMeta, + + @Inject(DI.notesRepository) + private readonly notesRepository: NotesRepository, private readonly apPersonService: ApPersonService, private readonly cacheService: CacheService, private readonly federatedInstanceService: FederatedInstanceService, private readonly fetchInstanceMetadataService: FetchInstanceMetadataService, + private readonly instanceChart: InstanceChart, + private readonly apRequestChart: ApRequestChart, + private readonly federationChart: FederationChart, + private readonly updateInstanceQueue: UpdateInstanceQueue, + private readonly noteCreateService: NoteCreateService, + private readonly noteEditService: NoteEditService, + private readonly hashtagService: HashtagService, queueLoggerService: QueueLoggerService, ) { @@ -40,9 +56,21 @@ export class BackgroundTaskProcessorService { return await this.processUpdateUser(job.data); } else if (job.data.type === 'update-featured') { return await this.processUpdateFeatured(job.data); - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + } else if (job.data.type === 'update-user-tags') { + return await this.processUpdateUserTags(job.data); + } else if (job.data.type === 'update-note-tags') { + return await this.processUpdateNoteTags(job.data); } else if (job.data.type === 'update-instance') { return await this.processUpdateInstance(job.data); + } else if (job.data.type === 'post-deliver') { + return await this.processPostDeliver(job.data); + } else if (job.data.type === 'post-inbox') { + return await this.processPostInbox(job.data); + } else if (job.data.type === 'post-note') { + return await this.processPostNote(job.data); + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + } else if (job.data.type === 'check-hibernation') { + return await this.processCheckHibernation(job.data); } else { this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data); throw new Error(`Unknown job type ${job.data}, see system logs for details`); @@ -78,10 +106,30 @@ export class BackgroundTaskProcessorService { return 'ok'; } + private async processUpdateUserTags(task: UpdateUserTagsBackgroundTask): Promise { + const user = await this.cacheService.findOptionalUserById(task.userId); + if (!user || user.isDeleted) return `Skipping update-user-tags task: user ${task.userId} has been deleted`; + if (user.isSuspended) return `Skipping update-user-tags task: user ${task.userId} is suspended`; + if (!user.uri) return `Skipping update-user-tags task: user ${task.userId} is local`; + + await this.hashtagService.updateUsertags(user, user.tags); + return 'ok'; + } + + private async processUpdateNoteTags(task: UpdateNoteTagsBackgroundTask): Promise { + const note = await this.notesRepository.findOneBy({ id: task.noteId }); + if (!note) return `Skipping update-note-tags task: note ${task.noteId} has been deleted`; + const user = await this.cacheService.findUserById(note.userId); + if (user.isSuspended) return `Skipping update-note-tags task: note ${task.noteId}'s user ${note.userId} is suspended`; + + await this.hashtagService.updateHashtags(user, note.tags); + return 'ok'; + } + private async processUpdateInstance(task: UpdateInstanceBackgroundTask): Promise { const instance = await this.federatedInstanceService.fetch(task.host); - if (!instance) return `Skipping update-instance task: instance ${task.host} has been deleted`; if (instance.isBlocked) return `Skipping update-instance task: instance ${task.host} is blocked`; + if (instance.suspensionState === 'goneSuspended') return `Skipping update-instance task: instance ${task.host} is gone`; if (instance.infoUpdatedAt && Date.now() - instance.infoUpdatedAt.getTime() < 1000 * 60 * 60 * 24) { return `Skipping update-instance task: instance ${task.host} was recently updated`; @@ -90,4 +138,102 @@ export class BackgroundTaskProcessorService { await this.fetchInstanceMetadataService.fetchInstanceMetadata(instance); return 'ok'; } + + private async processPostDeliver(task: PostDeliverBackgroundTask): Promise { + let instance = await this.federatedInstanceService.fetchOrRegister(task.host); + if (instance.isBlocked) return `Skipping post-deliver task: instance ${task.host} is blocked`; + + const success = task.result === 'success'; + + // isNotResponding should be the inverse of success, because: + // 1. We expect success (success=true) from a responding instance (isNotResponding=false). + // 2. We expect failure (success=false) from a non-responding instance (isNotResponding=true). + // If they are equal, then we need to update the cached state. + const updateNotResponding = success === instance.isNotResponding; + + // If we get a permanent failure, then we need to immediately suspend the instance + const updateGoneSuspended = task.result === 'perm-fail' && instance.suspensionState !== 'goneSuspended'; + + // Check if we need to auto-suspend the instance + const updateAutoSuspended = instance.isNotResponding && instance.notRespondingSince && instance.suspensionState === 'none' && instance.notRespondingSince.getTime() <= Date.now() - 1000 * 60 * 60 * 24 * 7; + + // This is messy, but we need to minimize updates to space in Postgres blocks. + if (updateNotResponding || updateGoneSuspended || updateAutoSuspended) { + instance = await this.federatedInstanceService.update(instance.id, { + isNotResponding: updateNotResponding ? !success : undefined, + notRespondingSince: updateNotResponding ? (success ? null : new Date()) : undefined, + suspensionState: updateGoneSuspended + ? 'goneSuspended' + : updateAutoSuspended + ? 'autoSuspendedForNotResponding' + : undefined, + }); + } + + // Update instance metadata (deferred) + if (success && this.meta.enableStatsForFederatedInstances) { + await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance); + } + + // Update charts + if (this.meta.enableChartsForFederatedInstances) { + await this.instanceChart.requestSent(task.host, success); + } + if (success) { + await this.apRequestChart.deliverSucc(); + } else { + await this.apRequestChart.deliverFail(); + } + await this.federationChart.deliverd(task.host, success); + + return 'ok'; + } + + private async processPostInbox(task: PostInboxBackgroundTask): Promise { + const instance = await this.federatedInstanceService.fetchOrRegister(task.host); + if (instance.isBlocked) return `Skipping post-inbox task: instance ${task.host} is blocked`; + + // Update charts + if (this.meta.enableChartsForFederatedInstances) { + await this.instanceChart.requestReceived(task.host); + } + await this.apRequestChart.inbox(); + await this.federationChart.inbox(task.host); + + // Update instance metadata (deferred) + await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance); + + // Unsuspend instance (deferred) + this.updateInstanceQueue.enqueue(instance.id, { + latestRequestReceivedAt: new Date(), + shouldUnsuspend: instance.suspensionState === 'autoSuspendedForNotResponding', + }); + + return 'ok'; + } + + private async processPostNote(task: PostNoteBackgroundTask): Promise { + const note = await this.notesRepository.findOneBy({ id: task.noteId }); + if (!note) return `Skipping post-note task: note ${task.noteId} has been deleted`; + const user = await this.cacheService.findUserById(note.userId); + if (user.isSuspended) return `Skipping post-note task: note ${task.noteId}'s user ${note.userId} is suspended`; + + const mentionedUsers = await this.cacheService.getUsers(note.mentions); + + if (task.edit) { + await this.noteEditService.postNoteEdited(note, user, note, task.silent, note.tags, Array.from(mentionedUsers.values())); + } else { + await this.noteCreateService.postNoteCreated(note, user, note, task.silent, note.tags, Array.from(mentionedUsers.values())); + } + + return 'ok'; + } + + private async processCheckHibernation(task: CheckHibernationBackgroundTask): Promise { + const followers = await this.cacheService.getNonHibernatedFollowers(task.userId); + if (followers.length < 1) return `Skipping check-hibernation task: user ${task.userId} has no non-hibernated followers`; + + await this.noteCreateService.checkHibernation(followers); + return 'ok'; + } } diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 792ec4b015..7e75a2105c 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -21,6 +21,7 @@ import { StatusError } from '@/misc/status-error.js'; import { UtilityService } from '@/core/UtilityService.js'; import { TimeService } from '@/global/TimeService.js'; import { bindThis } from '@/decorators.js'; +import { QueueService } from '@/core/QueueService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { DeliverJobData } from '../types.js'; @@ -44,13 +45,14 @@ export class DeliverProcessorService { private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, private readonly timeService: TimeService, + private readonly queueService: QueueService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('deliver'); } @bindThis public async process(job: Bull.Job): Promise { - const { host } = new URL(job.data.to); + const host = this.utilityService.extractDbHost(job.data.to); if (!this.utilityService.isFederationAllowedUri(job.data.to)) { return 'skip (blocked)'; @@ -72,66 +74,19 @@ export class DeliverProcessorService { try { await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest); - this.apRequestChart.deliverSucc(); - this.federationChart.deliverd(host, true); - // Update instance stats - process.nextTick(async () => { - if (i == null) return; - - if (i.isNotResponding) { - await this.federatedInstanceService.update(i.id, { - isNotResponding: false, - notRespondingSince: null, - }); - } - - if (this.meta.enableChartsForFederatedInstances) { - await this.instanceChart.requestSent(i.host, true); - } - }); + await this.queueService.createPostDeliverJob(host, 'success'); return 'Success'; } catch (res) { - await this.apRequestChart.deliverFail(); - await this.federationChart.deliverd(host, false); - // Update instance stats - this.federatedInstanceService.fetchOrRegister(host).then(i => { - if (!i.isNotResponding) { - this.federatedInstanceService.update(i.id, { - isNotResponding: true, - notRespondingSince: this.timeService.date, - }); - } else if (i.notRespondingSince) { - // 1週間以上不通ならサスペンド - if (i.suspensionState === 'none' && i.notRespondingSince.getTime() <= this.timeService.now - 1000 * 60 * 60 * 24 * 7) { - this.federatedInstanceService.update(i.id, { - suspensionState: 'autoSuspendedForNotResponding', - }); - } - } else { - // isNotRespondingがtrueでnotRespondingSinceがnullの場合はnotRespondingSinceをセット - // notRespondingSinceは新たな機能なので、それ以前のデータにはnotRespondingSinceがない場合がある - this.federatedInstanceService.update(i.id, { - notRespondingSince: this.timeService.date, - }); - } - - if (this.meta.enableChartsForFederatedInstances) { - this.instanceChart.requestSent(i.host, false); - } - }); + const isPerm = job.data.isSharedInbox && res instanceof StatusError && res.statusCode === 410; + await this.queueService.createPostDeliverJob(host, isPerm ? 'perm-fail' : 'temp-fail'); if (res instanceof StatusError && !res.isRetryable) { // 4xx // 相手が閉鎖していることを明示しているため、配送停止する if (job.data.isSharedInbox && res.statusCode === 410) { - this.federatedInstanceService.fetchOrRegister(host).then(i => { - this.federatedInstanceService.update(i.id, { - suspensionState: 'goneSuspended', - }); - }); throw new Bull.UnrecoverableError(`${host} is gone`); } throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 3b73e98e5c..1c7765fddf 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -30,10 +30,10 @@ import { DI } from '@/di-symbols.js'; import { SkApInboxLog } from '@/models/_.js'; import type { Config } from '@/config.js'; import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; -import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { TimeService } from '@/global/TimeService.js'; import { isRetryableError } from '@/misc/is-retryable-error.js'; import { renderInlineError } from '@/misc/render-inline-error.js'; +import { QueueService } from '@/core/QueueService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -66,8 +66,8 @@ export class InboxProcessorService implements OnApplicationShutdown { private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, private readonly apLogService: ApLogService, - private readonly updateInstanceQueue: UpdateInstanceQueue, private readonly timeService: TimeService, + private readonly queueService: QueueService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); } @@ -258,28 +258,8 @@ export class InboxProcessorService implements OnApplicationShutdown { log.authUserId = authUser.user.id; } - this.apRequestChart.inbox(); - this.federationChart.inbox(authUser.user.host); - // Update instance stats - process.nextTick(async () => { - const i = await (this.meta.enableStatsForFederatedInstances - ? this.federatedInstanceService.fetchOrRegister(authUser.user.host) - : this.federatedInstanceService.fetch(authUser.user.host)); - - if (i == null) return; - - this.updateInstanceQueue.enqueue(i.id, { - latestRequestReceivedAt: this.timeService.date, - shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', - }); - - if (this.meta.enableChartsForFederatedInstances) { - await this.instanceChart.requestReceived(i.host); - } - - await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i); - }); + await this.queueService.createPostInboxJob(authUser.user.host); // アクティビティを処理 try { diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 0c3d790a97..64ae679033 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -172,7 +172,13 @@ export type ScheduleNotePostJobData = { export type BackgroundTaskJobData = UpdateUserBackgroundTask | UpdateFeaturedBackgroundTask | - UpdateInstanceBackgroundTask; + UpdateUserTagsBackgroundTask | + UpdateNoteTagsBackgroundTask | + UpdateInstanceBackgroundTask | + PostDeliverBackgroundTask | + PostInboxBackgroundTask | + PostNoteBackgroundTask | + CheckHibernationBackgroundTask; export type UpdateUserBackgroundTask = { type: 'update-user'; @@ -184,7 +190,40 @@ export type UpdateFeaturedBackgroundTask = { userId: string; }; +export type UpdateUserTagsBackgroundTask = { + type: 'update-user-tags'; + userId: string; +}; + +export type UpdateNoteTagsBackgroundTask = { + type: 'update-note-tags'; + noteId: string; +}; + export type UpdateInstanceBackgroundTask = { type: 'update-instance'; host: string; }; + +export type PostDeliverBackgroundTask = { + type: 'post-deliver'; + host: string; + result: 'success' | 'temp-fail' | 'perm-fail'; +}; + +export type PostInboxBackgroundTask = { + type: 'post-inbox'; + host: string; +}; + +export type PostNoteBackgroundTask = { + type: 'post-note'; + noteId: string; + silent: boolean; + edit: boolean; +}; + +export type CheckHibernationBackgroundTask = { + type: 'check-hibernation'; + userId: string; +}; diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index b2396d93eb..fe3f180aca 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -36,6 +36,7 @@ import { notificationRecieveConfig } from '@/models/json-schema/user.js'; import { userUnsignedFetchOptions } from '@/const.js'; import { renderInlineError } from '@/misc/render-inline-error.js'; import { trackPromise } from '@/misc/promise-tracker.js'; +import { QueueService } from '@/core/QueueService.js'; import { ApiLoggerService } from '../../ApiLoggerService.js'; import { ApiError } from '../../error.js'; @@ -318,6 +319,7 @@ export default class extends Endpoint { // eslint- private httpRequestService: HttpRequestService, private avatarDecorationService: AvatarDecorationService, private utilityService: UtilityService, + private readonly queueService: QueueService, ) { super(meta, paramDef, async (ps, _user, token) => { const user = await this.usersRepository.findOneByOrFail({ id: _user.id }) as MiLocalUser; @@ -606,9 +608,6 @@ export default class extends Endpoint { // eslint- updates.emojis = emojis; updates.tags = tags; - - // ハッシュタグ更新 - this.hashtagService.updateUsertags(user, tags); //#endregion if (Object.keys(updates).length > 0) { @@ -639,6 +638,9 @@ export default class extends Endpoint { // eslint- // Publish meUpdated event this.globalEventService.publishMainStream(user.id, 'meUpdated', iObj); + // ハッシュタグ更新 + await this.queueService.createUpdateUserTagsJob(user.id); + // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { await this.userFollowingService.acceptAllFollowRequests(user);