track federation-related promises to avoid data loss during restart

This commit is contained in:
Hazelnoot 2025-06-19 13:40:56 -04:00
parent a4440e43a6
commit c9f2554b2f
7 changed files with 29 additions and 24 deletions

View file

@ -18,6 +18,7 @@ import { ApDeliverManagerService } from '@/core/activitypub/ApDeliverManagerServ
import { ApRendererService } from '@/core/activitypub/ApRendererService.js'; import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
import { RoleService } from '@/core/RoleService.js'; import { RoleService } from '@/core/RoleService.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import type { DataSource } from 'typeorm'; import type { DataSource } from 'typeorm';
@Injectable() @Injectable()
@ -84,7 +85,7 @@ export class NotePiningService {
// Deliver to remote followers // Deliver to remote followers
if (this.userEntityService.isLocalUser(user) && !note.localOnly && ['public', 'home'].includes(note.visibility)) { if (this.userEntityService.isLocalUser(user) && !note.localOnly && ['public', 'home'].includes(note.visibility)) {
await this.deliverPinnedChange(user, note.id, true); trackPromise(this.deliverPinnedChange(user, note.id, true));
} }
} }
@ -112,7 +113,7 @@ export class NotePiningService {
// Deliver to remote followers // Deliver to remote followers
if (this.userEntityService.isLocalUser(user) && !note.localOnly && ['public', 'home'].includes(note.visibility)) { if (this.userEntityService.isLocalUser(user) && !note.localOnly && ['public', 'home'].includes(note.visibility)) {
await this.deliverPinnedChange(user, noteId, false); trackPromise(this.deliverPinnedChange(user, noteId, false));
} }
} }

View file

@ -31,6 +31,7 @@ import type { ThinUser } from '@/queue/types.js';
import { LoggerService } from '@/core/LoggerService.js'; import { LoggerService } from '@/core/LoggerService.js';
import { InternalEventService } from '@/global/InternalEventService.js'; import { InternalEventService } from '@/global/InternalEventService.js';
import type Logger from '../logger.js'; import type Logger from '../logger.js';
import { trackPromise } from '@/misc/promise-tracker.js';
type Local = MiLocalUser | { type Local = MiLocalUser | {
id: MiLocalUser['id']; id: MiLocalUser['id'];
@ -102,7 +103,7 @@ export class UserFollowingService implements OnModuleInit {
@bindThis @bindThis
public async deliverAccept(follower: MiRemoteUser, followee: MiPartialLocalUser, requestId?: string) { public async deliverAccept(follower: MiRemoteUser, followee: MiPartialLocalUser, requestId?: string) {
const content = this.apRendererService.addContext(this.apRendererService.renderAccept(this.apRendererService.renderFollow(follower, followee, requestId), followee)); const content = this.apRendererService.addContext(this.apRendererService.renderAccept(this.apRendererService.renderFollow(follower, followee, requestId), followee));
this.queueService.deliver(followee, content, follower.inbox, false); await this.queueService.deliver(followee, content, follower.inbox, false);
} }
@bindThis @bindThis
@ -152,7 +153,7 @@ export class UserFollowingService implements OnModuleInit {
// すでにフォロー関係が存在している場合 // すでにフォロー関係が存在している場合
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
// リモート → ローカル: acceptを送り返しておしまい // リモート → ローカル: acceptを送り返しておしまい
this.deliverAccept(follower, followee, requestId); trackPromise(this.deliverAccept(follower, followee, requestId));
return; return;
} }
if (this.userEntityService.isLocalUser(follower)) { if (this.userEntityService.isLocalUser(follower)) {
@ -206,7 +207,7 @@ export class UserFollowingService implements OnModuleInit {
await this.insertFollowingDoc(followee, follower, silent, withReplies); await this.insertFollowingDoc(followee, follower, silent, withReplies);
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
this.deliverAccept(follower, followee, requestId); trackPromise(this.deliverAccept(follower, followee, requestId));
} }
} }
@ -581,7 +582,7 @@ export class UserFollowingService implements OnModuleInit {
await this.insertFollowingDoc(followee, follower, false, request.withReplies); await this.insertFollowingDoc(followee, follower, false, request.withReplies);
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
this.deliverAccept(follower, followee as MiPartialLocalUser, request.requestId ?? undefined); trackPromise(this.deliverAccept(follower, followee as MiPartialLocalUser, request.requestId ?? undefined));
} }
this.userEntityService.pack(followee.id, followee, { this.userEntityService.pack(followee.id, followee, {
@ -595,14 +596,13 @@ export class UserFollowingService implements OnModuleInit {
id: MiUser['id']; host: MiUser['host']; uri: MiUser['host']; inbox: MiUser['inbox']; sharedInbox: MiUser['sharedInbox']; id: MiUser['id']; host: MiUser['host']; uri: MiUser['host']; inbox: MiUser['inbox']; sharedInbox: MiUser['sharedInbox'];
}, },
): Promise<void> { ): Promise<void> {
const requests = await this.followRequestsRepository.findBy({ const requests = await this.followRequestsRepository.find({ where: {
followeeId: user.id, followeeId: user.id,
}); }, relations: {
follower: true,
} });
for (const request of requests) { await Promise.all(requests.map(request => this.acceptFollowRequest(user, request.follower as MiUser)));
const follower = await this.usersRepository.findOneByOrFail({ id: request.followerId });
this.acceptFollowRequest(user, follower);
}
} }
/** /**
@ -611,7 +611,7 @@ export class UserFollowingService implements OnModuleInit {
@bindThis @bindThis
public async rejectFollowRequest(user: Local, follower: Both): Promise<void> { public async rejectFollowRequest(user: Local, follower: Both): Promise<void> {
if (this.userEntityService.isRemoteUser(follower)) { if (this.userEntityService.isRemoteUser(follower)) {
this.deliverReject(user, follower); trackPromise(this.deliverReject(user, follower));
} }
await this.removeFollowRequest(user, follower); await this.removeFollowRequest(user, follower);
@ -627,7 +627,7 @@ export class UserFollowingService implements OnModuleInit {
@bindThis @bindThis
public async rejectFollow(user: Local, follower: Both): Promise<void> { public async rejectFollow(user: Local, follower: Both): Promise<void> {
if (this.userEntityService.isRemoteUser(follower)) { if (this.userEntityService.isRemoteUser(follower)) {
this.deliverReject(user, follower); trackPromise(this.deliverReject(user, follower));
} }
await this.removeFollow(user, follower); await this.removeFollow(user, follower);
@ -696,7 +696,7 @@ export class UserFollowingService implements OnModuleInit {
}); });
const content = this.apRendererService.addContext(this.apRendererService.renderReject(this.apRendererService.renderFollow(follower, followee, request?.requestId ?? undefined), followee)); const content = this.apRendererService.addContext(this.apRendererService.renderReject(this.apRendererService.renderFollow(follower, followee, request?.requestId ?? undefined), followee));
this.queueService.deliver(followee, content, follower.inbox, false); await this.queueService.deliver(followee, content, follower.inbox, false);
} }
/** /**
@ -720,7 +720,7 @@ export class UserFollowingService implements OnModuleInit {
@bindThis @bindThis
public async isFollowing(followerId: MiUser['id'], followeeId: MiUser['id']) { public async isFollowing(followerId: MiUser['id'], followeeId: MiUser['id']) {
return this.cacheService.isFollowing(followerId, followeeId); return await this.cacheService.isFollowing(followerId, followeeId);
} }
@bindThis @bindThis

View file

@ -23,6 +23,7 @@ import { IdentifiableError } from '@/misc/identifiable-error.js';
import { toArray } from '@/misc/prelude/array.js'; import { toArray } from '@/misc/prelude/array.js';
import { isPureRenote } from '@/misc/is-renote.js'; import { isPureRenote } from '@/misc/is-renote.js';
import { CacheService } from '@/core/CacheService.js'; import { CacheService } from '@/core/CacheService.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import { AnyCollection, getApId, getNullableApId, IObjectWithId, isCollection, isCollectionOrOrderedCollection, isCollectionPage, isOrderedCollection, isOrderedCollectionPage } from './type.js'; import { AnyCollection, getApId, getNullableApId, IObjectWithId, isCollection, isCollectionOrOrderedCollection, isCollectionPage, isOrderedCollection, isOrderedCollectionPage } from './type.js';
import { ApDbResolverService } from './ApDbResolverService.js'; import { ApDbResolverService } from './ApDbResolverService.js';
import { ApRendererService } from './ApRendererService.js'; import { ApRendererService } from './ApRendererService.js';
@ -269,8 +270,8 @@ export class Resolver {
log.duration = calculateDurationSince(startTime); log.duration = calculateDurationSince(startTime);
// Save or finalize asynchronously // Save or finalize asynchronously
this.apLogService.saveFetchLog(log) trackPromise(this.apLogService.saveFetchLog(log)
.catch(err => this.logger.error('Failed to record AP object fetch:', err)); .catch(err => this.logger.error('Failed to record AP object fetch:', err)));
} }
} }

View file

@ -277,7 +277,7 @@ export class ApNoteService implements OnModuleInit {
return x; return x;
}) })
.catch(async err => { .catch(err => {
this.logger.warn(`error ${renderInlineError(err)} fetching inReplyTo ${note.inReplyTo} for note ${entryUri}`); this.logger.warn(`error ${renderInlineError(err)} fetching inReplyTo ${note.inReplyTo} for note ${entryUri}`);
throw new IdentifiableError('1ebf0a96-2769-4973-a6c2-3dcbad409dff', `failed to create note ${entryUri}: could not fetch inReplyTo ${note.inReplyTo}`, true, err); throw new IdentifiableError('1ebf0a96-2769-4973-a6c2-3dcbad409dff', `failed to create note ${entryUri}: could not fetch inReplyTo ${note.inReplyTo}`, true, err);
}) })
@ -456,7 +456,7 @@ export class ApNoteService implements OnModuleInit {
return x; return x;
}) })
.catch(async err => { .catch(err => {
this.logger.warn(`error ${renderInlineError(err)} fetching inReplyTo ${note.inReplyTo} for note ${entryUri}`); this.logger.warn(`error ${renderInlineError(err)} fetching inReplyTo ${note.inReplyTo} for note ${entryUri}`);
throw new IdentifiableError('1ebf0a96-2769-4973-a6c2-3dcbad409dff', `failed to update note ${entryUri}: could not fetch inReplyTo ${note.inReplyTo}`, true, err); throw new IdentifiableError('1ebf0a96-2769-4973-a6c2-3dcbad409dff', `failed to update note ${entryUri}: could not fetch inReplyTo ${note.inReplyTo}`, true, err);
}) })

View file

@ -34,6 +34,7 @@ import { TimeService } from '@/global/TimeService.js';
import { isRetryableError } from '@/misc/is-retryable-error.js'; import { isRetryableError } from '@/misc/is-retryable-error.js';
import { renderInlineError } from '@/misc/render-inline-error.js'; import { renderInlineError } from '@/misc/render-inline-error.js';
import { QueueService } from '@/core/QueueService.js'; import { QueueService } from '@/core/QueueService.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import { QueueLoggerService } from '../QueueLoggerService.js'; import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js'; import type { InboxJobData } from '../types.js';
@ -103,8 +104,8 @@ export class InboxProcessorService implements OnApplicationShutdown {
log.duration = calculateDurationSince(startTime); log.duration = calculateDurationSince(startTime);
// Save or finalize asynchronously // Save or finalize asynchronously
this.apLogService.saveInboxLog(log) trackPromise(this.apLogService.saveInboxLog(log)
.catch(err => this.logger.error('Failed to record AP activity:', err)); .catch(err => this.logger.error('Failed to record AP activity:', err)));
} }
} }

View file

@ -39,6 +39,7 @@ import { trackPromise } from '@/misc/promise-tracker.js';
import { QueueService } from '@/core/QueueService.js'; import { QueueService } from '@/core/QueueService.js';
import { ApiLoggerService } from '../../ApiLoggerService.js'; import { ApiLoggerService } from '../../ApiLoggerService.js';
import { ApiError } from '../../error.js'; import { ApiError } from '../../error.js';
import { trackPromise } from '@/misc/promise-tracker.js';
export const meta = { export const meta = {
tags: ['account'], tags: ['account'],
@ -643,7 +644,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
// 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認
if (user.isLocked && ps.isLocked === false) { if (user.isLocked && ps.isLocked === false) {
await this.userFollowingService.acceptAllFollowRequests(user); trackPromise(this.userFollowingService.acceptAllFollowRequests(user));
} }
// フォロワーにUpdateを配信 // フォロワーにUpdateを配信

View file

@ -16,6 +16,7 @@ import { GlobalEventService } from '@/core/GlobalEventService.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
import { UserBlockingService } from '@/core/UserBlockingService.js'; import { UserBlockingService } from '@/core/UserBlockingService.js';
import { TimeService } from '@/global/TimeService.js'; import { TimeService } from '@/global/TimeService.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import { ApiError } from '../../../error.js'; import { ApiError } from '../../../error.js';
export const meta = { export const meta = {
@ -176,7 +177,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
} }
// リモートフォロワーにUpdate配信 // リモートフォロワーにUpdate配信
await this.pollService.deliverQuestionUpdate(note); trackPromise(this.pollService.deliverQuestionUpdate(note));
}); });
} }
} }