diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index ddcb0003b2..d7e7d5347f 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -14,7 +14,7 @@ import { bindThis } from '@/decorators.js'; import type { MiInstance } from '@/models/Instance.js'; import { InternalEventService } from '@/core/InternalEventService.js'; import { MiUser } from '@/models/User.js'; -import type { MiNote, UsersRepository } from '@/models/_.js'; +import type { MiNote, UsersRepository, NotesRepository } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; export type UpdateInstanceJob = { @@ -28,12 +28,18 @@ export type UpdateUserJob = { notesCountDelta?: number, }; +export type UpdateNoteJob = { + deltaRepliesCount?: number; + deltaRenoteCount?: number; +}; + @Injectable() export class CollapsedQueueService implements OnApplicationShutdown { // Moved from InboxProcessorService public readonly updateInstanceQueue: CollapsedQueue; // Moved from NoteCreateService, NoteEditService, and NoteDeleteService public readonly updateUserQueue: CollapsedQueue; + public readonly updateNoteQueue: CollapsedQueue; private readonly logger: Logger; @@ -41,6 +47,9 @@ export class CollapsedQueueService implements OnApplicationShutdown { @Inject(DI.usersRepository) public readonly usersRepository: UsersRepository, + @Inject(DI.notesRepository) + public readonly notesRepository: NotesRepository, + private readonly federatedInstanceService: FederatedInstanceService, private readonly envService: EnvService, private readonly internalEventService: InternalEventService, @@ -85,7 +94,24 @@ export class CollapsedQueueService implements OnApplicationShutdown { }), { onError: this.onQueueError, - concurrency: 4, + concurrency: 4, // High concurrency - this queue gets a lot of activity + }, + ); + + this.updateNoteQueue = new CollapsedQueue( + 'updateNote', + oneMinuteInterval, + (oldJob, newJob) => ({ + deltaRepliesCount: (oldJob.deltaRepliesCount ?? 0) + (newJob.deltaRepliesCount ?? 0), + deltaRenoteCount: (oldJob.deltaRenoteCount ?? 0) + (newJob.deltaRenoteCount ?? 0), + }), + (id, job) => this.notesRepository.update({ id }, { + repliesCount: job.deltaRepliesCount ? () => `"repliesCount" + ${job.deltaRepliesCount}` : undefined, + renoteCount: job.deltaRenoteCount ? () => `"renoteCount" + ${job.deltaRenoteCount}` : undefined, + }), + { + onError: this.onQueueError, + concurrency: 4, // High concurrency - this queue gets a lot of activity }, ); @@ -99,6 +125,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { await this.performQueue(this.updateInstanceQueue); await this.performQueue(this.updateUserQueue); + await this.performQueue(this.updateNoteQueue); this.logger.info('Persistence complete.'); } @@ -130,6 +157,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { } async onApplicationShutdown() { + // TODO note/user delete events this.internalEventService.off('localUserUpdated', this.onUserUpdated); this.internalEventService.off('remoteUserUpdated', this.onUserUpdated); diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index e14bb56685..f5486a9b61 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -618,7 +618,7 @@ export class NoteCreateService implements OnApplicationShutdown { }, user); if (data.reply) { - await this.saveReply(data.reply, note); + this.collapsedQueueService.updateNoteQueue.enqueue(data.reply.id, { deltaRepliesCount: 1 }); } if (data.reply == null) { @@ -646,8 +646,10 @@ export class NoteCreateService implements OnApplicationShutdown { }); } + // TODO move these checks into incRenoteCount if (this.isPureRenote(data) && data.renote.userId !== user.id && !user.isBot) { - await this.incRenoteCount(data.renote, user); + this.collapsedQueueService.updateNoteQueue.enqueue(data.renote.id, { deltaRenoteCount: 1 }); + await this.incRenoteCount(data.renote, user) } if (data.poll && data.poll.expiresAt) { @@ -822,10 +824,9 @@ export class NoteCreateService implements OnApplicationShutdown { */ readonly isQuote = isQuote; + // Note: does not increment the count! used only for featured rankings. @bindThis private async incRenoteCount(renote: MiNote, user: MiUser) { - await this.notesRepository.increment({ id: renote.id }, 'renoteCount', 1); - // 30%の確率、3日以内に投稿されたノートの場合ハイライト用ランキング更新 if (user.isExplorable && Math.random() < 0.3 && (this.timeService.now - this.idService.parse(renote.id).date.getTime()) < 1000 * 60 * 60 * 24 * 3) { const policies = await this.roleService.getUserPolicies(user); @@ -878,11 +879,6 @@ export class NoteCreateService implements OnApplicationShutdown { } } - @bindThis - private async saveReply(reply: MiNote, note: MiNote) { - await this.notesRepository.increment({ id: reply.id }, 'repliesCount', 1); - } - @bindThis private async index(note: MiNote) { if (note.text == null && note.cw == null) return; diff --git a/packages/backend/src/core/NoteDeleteService.ts b/packages/backend/src/core/NoteDeleteService.ts index 5198340c4d..f340065d55 100644 --- a/packages/backend/src/core/NoteDeleteService.ts +++ b/packages/backend/src/core/NoteDeleteService.ts @@ -78,19 +78,17 @@ export class NoteDeleteService { const cascadingNotes = await this.findCascadingNotes(note); if (note.replyId) { - promises.push(this.notesRepository.decrement({ id: note.replyId }, 'repliesCount', 1)); + this.collapsedQueueService.updateNoteQueue.enqueue(note.replyId, { deltaRepliesCount: -1 }); } else if (isPureRenote(note)) { - promises.push(this.notesRepository.decrement({ id: note.renoteId }, 'renoteCount', 1)); + this.collapsedQueueService.updateNoteQueue.enqueue(note.renoteId, { deltaRenoteCount: -1 }); } - const cascadeReplies = cascadingNotes.filter(cascade => cascade.replyId != null); - const cascadeRenotes = cascadingNotes.filter(cascade => cascade.renoteId != null); - - if (cascadeReplies.length > 0) { - promises.push(this.notesRepository.decrement({ id: In(cascadeReplies.map(cascade => cascade.replyId)) }, 'repliesCount', 1)); - } - if (cascadeRenotes.length > 0) { - promises.push(this.notesRepository.decrement({ id: In(cascadeRenotes.map(cascade => cascade.renoteId)) }, 'renoteCount', 1)); + for (const cascade of cascadingNotes) { + if (cascade.replyId) { + this.collapsedQueueService.updateNoteQueue.enqueue(cascade.replyId, { deltaRepliesCount: -1 }); + } else if (isPureRenote(cascade)) { + this.collapsedQueueService.updateNoteQueue.enqueue(cascade.renoteId, { deltaRenoteCount: -1 }); + } } if (!quiet) {