From c9f2554b2f0dd6c9656c44ad0ffd862c9faaa027 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 19 Jun 2025 13:40:56 -0400 Subject: [PATCH] track federation-related promises to avoid data loss during restart --- .../backend/src/core/NotePiningService.ts | 5 ++-- .../backend/src/core/UserFollowingService.ts | 28 +++++++++---------- .../src/core/activitypub/ApResolverService.ts | 5 ++-- .../core/activitypub/models/ApNoteService.ts | 4 +-- .../queue/processors/InboxProcessorService.ts | 5 ++-- .../src/server/api/endpoints/i/update.ts | 3 +- .../server/api/endpoints/notes/polls/vote.ts | 3 +- 7 files changed, 29 insertions(+), 24 deletions(-) diff --git a/packages/backend/src/core/NotePiningService.ts b/packages/backend/src/core/NotePiningService.ts index 836db52e48..a510676ef9 100644 --- a/packages/backend/src/core/NotePiningService.ts +++ b/packages/backend/src/core/NotePiningService.ts @@ -18,6 +18,7 @@ import { ApDeliverManagerService } from '@/core/activitypub/ApDeliverManagerServ import { ApRendererService } from '@/core/activitypub/ApRendererService.js'; import { bindThis } from '@/decorators.js'; import { RoleService } from '@/core/RoleService.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; import type { DataSource } from 'typeorm'; @Injectable() @@ -84,7 +85,7 @@ export class NotePiningService { // Deliver to remote followers if (this.userEntityService.isLocalUser(user) && !note.localOnly && ['public', 'home'].includes(note.visibility)) { - await this.deliverPinnedChange(user, note.id, true); + trackPromise(this.deliverPinnedChange(user, note.id, true)); } } @@ -112,7 +113,7 @@ export class NotePiningService { // Deliver to remote followers if (this.userEntityService.isLocalUser(user) && !note.localOnly && ['public', 'home'].includes(note.visibility)) { - await this.deliverPinnedChange(user, noteId, false); + trackPromise(this.deliverPinnedChange(user, noteId, false)); } } diff --git a/packages/backend/src/core/UserFollowingService.ts b/packages/backend/src/core/UserFollowingService.ts index 833ea97193..1d3eada189 100644 --- a/packages/backend/src/core/UserFollowingService.ts +++ b/packages/backend/src/core/UserFollowingService.ts @@ -31,6 +31,7 @@ import type { ThinUser } from '@/queue/types.js'; import { LoggerService } from '@/core/LoggerService.js'; import { InternalEventService } from '@/global/InternalEventService.js'; import type Logger from '../logger.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; type Local = MiLocalUser | { id: MiLocalUser['id']; @@ -102,7 +103,7 @@ export class UserFollowingService implements OnModuleInit { @bindThis public async deliverAccept(follower: MiRemoteUser, followee: MiPartialLocalUser, requestId?: string) { const content = this.apRendererService.addContext(this.apRendererService.renderAccept(this.apRendererService.renderFollow(follower, followee, requestId), followee)); - this.queueService.deliver(followee, content, follower.inbox, false); + await this.queueService.deliver(followee, content, follower.inbox, false); } @bindThis @@ -152,7 +153,7 @@ export class UserFollowingService implements OnModuleInit { // すでにフォロー関係が存在している場合 if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { // リモート → ローカル: acceptを送り返しておしまい - this.deliverAccept(follower, followee, requestId); + trackPromise(this.deliverAccept(follower, followee, requestId)); return; } if (this.userEntityService.isLocalUser(follower)) { @@ -206,7 +207,7 @@ export class UserFollowingService implements OnModuleInit { await this.insertFollowingDoc(followee, follower, silent, withReplies); if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { - this.deliverAccept(follower, followee, requestId); + trackPromise(this.deliverAccept(follower, followee, requestId)); } } @@ -581,7 +582,7 @@ export class UserFollowingService implements OnModuleInit { await this.insertFollowingDoc(followee, follower, false, request.withReplies); if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { - this.deliverAccept(follower, followee as MiPartialLocalUser, request.requestId ?? undefined); + trackPromise(this.deliverAccept(follower, followee as MiPartialLocalUser, request.requestId ?? undefined)); } this.userEntityService.pack(followee.id, followee, { @@ -595,14 +596,13 @@ export class UserFollowingService implements OnModuleInit { id: MiUser['id']; host: MiUser['host']; uri: MiUser['host']; inbox: MiUser['inbox']; sharedInbox: MiUser['sharedInbox']; }, ): Promise { - const requests = await this.followRequestsRepository.findBy({ + const requests = await this.followRequestsRepository.find({ where: { followeeId: user.id, - }); + }, relations: { + follower: true, + } }); - for (const request of requests) { - const follower = await this.usersRepository.findOneByOrFail({ id: request.followerId }); - this.acceptFollowRequest(user, follower); - } + await Promise.all(requests.map(request => this.acceptFollowRequest(user, request.follower as MiUser))); } /** @@ -611,7 +611,7 @@ export class UserFollowingService implements OnModuleInit { @bindThis public async rejectFollowRequest(user: Local, follower: Both): Promise { if (this.userEntityService.isRemoteUser(follower)) { - this.deliverReject(user, follower); + trackPromise(this.deliverReject(user, follower)); } await this.removeFollowRequest(user, follower); @@ -627,7 +627,7 @@ export class UserFollowingService implements OnModuleInit { @bindThis public async rejectFollow(user: Local, follower: Both): Promise { if (this.userEntityService.isRemoteUser(follower)) { - this.deliverReject(user, follower); + trackPromise(this.deliverReject(user, follower)); } await this.removeFollow(user, follower); @@ -696,7 +696,7 @@ export class UserFollowingService implements OnModuleInit { }); const content = this.apRendererService.addContext(this.apRendererService.renderReject(this.apRendererService.renderFollow(follower, followee, request?.requestId ?? undefined), followee)); - this.queueService.deliver(followee, content, follower.inbox, false); + await this.queueService.deliver(followee, content, follower.inbox, false); } /** @@ -720,7 +720,7 @@ export class UserFollowingService implements OnModuleInit { @bindThis public async isFollowing(followerId: MiUser['id'], followeeId: MiUser['id']) { - return this.cacheService.isFollowing(followerId, followeeId); + return await this.cacheService.isFollowing(followerId, followeeId); } @bindThis diff --git a/packages/backend/src/core/activitypub/ApResolverService.ts b/packages/backend/src/core/activitypub/ApResolverService.ts index f6178ace1a..506686cb46 100644 --- a/packages/backend/src/core/activitypub/ApResolverService.ts +++ b/packages/backend/src/core/activitypub/ApResolverService.ts @@ -23,6 +23,7 @@ import { IdentifiableError } from '@/misc/identifiable-error.js'; import { toArray } from '@/misc/prelude/array.js'; import { isPureRenote } from '@/misc/is-renote.js'; import { CacheService } from '@/core/CacheService.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; import { AnyCollection, getApId, getNullableApId, IObjectWithId, isCollection, isCollectionOrOrderedCollection, isCollectionPage, isOrderedCollection, isOrderedCollectionPage } from './type.js'; import { ApDbResolverService } from './ApDbResolverService.js'; import { ApRendererService } from './ApRendererService.js'; @@ -269,8 +270,8 @@ export class Resolver { log.duration = calculateDurationSince(startTime); // Save or finalize asynchronously - this.apLogService.saveFetchLog(log) - .catch(err => this.logger.error('Failed to record AP object fetch:', err)); + trackPromise(this.apLogService.saveFetchLog(log) + .catch(err => this.logger.error('Failed to record AP object fetch:', err))); } } diff --git a/packages/backend/src/core/activitypub/models/ApNoteService.ts b/packages/backend/src/core/activitypub/models/ApNoteService.ts index 0eaf26de54..9b717af32d 100644 --- a/packages/backend/src/core/activitypub/models/ApNoteService.ts +++ b/packages/backend/src/core/activitypub/models/ApNoteService.ts @@ -277,7 +277,7 @@ export class ApNoteService implements OnModuleInit { return x; }) - .catch(async err => { + .catch(err => { this.logger.warn(`error ${renderInlineError(err)} fetching inReplyTo ${note.inReplyTo} for note ${entryUri}`); throw new IdentifiableError('1ebf0a96-2769-4973-a6c2-3dcbad409dff', `failed to create note ${entryUri}: could not fetch inReplyTo ${note.inReplyTo}`, true, err); }) @@ -456,7 +456,7 @@ export class ApNoteService implements OnModuleInit { return x; }) - .catch(async err => { + .catch(err => { this.logger.warn(`error ${renderInlineError(err)} fetching inReplyTo ${note.inReplyTo} for note ${entryUri}`); throw new IdentifiableError('1ebf0a96-2769-4973-a6c2-3dcbad409dff', `failed to update note ${entryUri}: could not fetch inReplyTo ${note.inReplyTo}`, true, err); }) diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 1c7765fddf..1a2389dac4 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -34,6 +34,7 @@ import { TimeService } from '@/global/TimeService.js'; import { isRetryableError } from '@/misc/is-retryable-error.js'; import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueService } from '@/core/QueueService.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -103,8 +104,8 @@ export class InboxProcessorService implements OnApplicationShutdown { log.duration = calculateDurationSince(startTime); // Save or finalize asynchronously - this.apLogService.saveInboxLog(log) - .catch(err => this.logger.error('Failed to record AP activity:', err)); + trackPromise(this.apLogService.saveInboxLog(log) + .catch(err => this.logger.error('Failed to record AP activity:', err))); } } diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index fe3f180aca..90b83443e4 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -39,6 +39,7 @@ import { trackPromise } from '@/misc/promise-tracker.js'; import { QueueService } from '@/core/QueueService.js'; import { ApiLoggerService } from '../../ApiLoggerService.js'; import { ApiError } from '../../error.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; export const meta = { tags: ['account'], @@ -643,7 +644,7 @@ export default class extends Endpoint { // eslint- // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { - await this.userFollowingService.acceptAllFollowRequests(user); + trackPromise(this.userFollowingService.acceptAllFollowRequests(user)); } // フォロワーにUpdateを配信 diff --git a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts index f7a5db8739..5bc104aad3 100644 --- a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts +++ b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts @@ -16,6 +16,7 @@ import { GlobalEventService } from '@/core/GlobalEventService.js'; import { DI } from '@/di-symbols.js'; import { UserBlockingService } from '@/core/UserBlockingService.js'; import { TimeService } from '@/global/TimeService.js'; +import { trackPromise } from '@/misc/promise-tracker.js'; import { ApiError } from '../../../error.js'; export const meta = { @@ -176,7 +177,7 @@ export default class extends Endpoint { // eslint- } // リモートフォロワーにUpdate配信 - await this.pollService.deliverQuestionUpdate(note); + trackPromise(this.pollService.deliverQuestionUpdate(note)); }); } }