implement updateNoteQueue

This commit is contained in:
Hazelnoot 2025-06-25 13:46:09 -04:00
parent 2fd44d68c5
commit d89b292eee
3 changed files with 43 additions and 21 deletions

View file

@ -14,7 +14,7 @@ import { bindThis } from '@/decorators.js';
import type { MiInstance } from '@/models/Instance.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { MiUser } from '@/models/User.js';
import type { MiNote, UsersRepository } from '@/models/_.js';
import type { MiNote, UsersRepository, NotesRepository } from '@/models/_.js';
import { DI } from '@/di-symbols.js';
export type UpdateInstanceJob = {
@ -28,12 +28,18 @@ export type UpdateUserJob = {
notesCountDelta?: number,
};
export type UpdateNoteJob = {
deltaRepliesCount?: number;
deltaRenoteCount?: number;
};
@Injectable()
export class CollapsedQueueService implements OnApplicationShutdown {
// Moved from InboxProcessorService
public readonly updateInstanceQueue: CollapsedQueue<MiInstance['id'], UpdateInstanceJob>;
// Moved from NoteCreateService, NoteEditService, and NoteDeleteService
public readonly updateUserQueue: CollapsedQueue<MiUser['id'], UpdateUserJob>;
public readonly updateNoteQueue: CollapsedQueue<MiNote['id'], UpdateNoteJob>;
private readonly logger: Logger;
@ -41,6 +47,9 @@ export class CollapsedQueueService implements OnApplicationShutdown {
@Inject(DI.usersRepository)
public readonly usersRepository: UsersRepository,
@Inject(DI.notesRepository)
public readonly notesRepository: NotesRepository,
private readonly federatedInstanceService: FederatedInstanceService,
private readonly envService: EnvService,
private readonly internalEventService: InternalEventService,
@ -85,7 +94,24 @@ export class CollapsedQueueService implements OnApplicationShutdown {
}),
{
onError: this.onQueueError,
concurrency: 4,
concurrency: 4, // High concurrency - this queue gets a lot of activity
},
);
this.updateNoteQueue = new CollapsedQueue(
'updateNote',
oneMinuteInterval,
(oldJob, newJob) => ({
deltaRepliesCount: (oldJob.deltaRepliesCount ?? 0) + (newJob.deltaRepliesCount ?? 0),
deltaRenoteCount: (oldJob.deltaRenoteCount ?? 0) + (newJob.deltaRenoteCount ?? 0),
}),
(id, job) => this.notesRepository.update({ id }, {
repliesCount: job.deltaRepliesCount ? () => `"repliesCount" + ${job.deltaRepliesCount}` : undefined,
renoteCount: job.deltaRenoteCount ? () => `"renoteCount" + ${job.deltaRenoteCount}` : undefined,
}),
{
onError: this.onQueueError,
concurrency: 4, // High concurrency - this queue gets a lot of activity
},
);
@ -99,6 +125,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
await this.performQueue(this.updateInstanceQueue);
await this.performQueue(this.updateUserQueue);
await this.performQueue(this.updateNoteQueue);
this.logger.info('Persistence complete.');
}
@ -130,6 +157,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
}
async onApplicationShutdown() {
// TODO note/user delete events
this.internalEventService.off('localUserUpdated', this.onUserUpdated);
this.internalEventService.off('remoteUserUpdated', this.onUserUpdated);

View file

@ -618,7 +618,7 @@ export class NoteCreateService implements OnApplicationShutdown {
}, user);
if (data.reply) {
await this.saveReply(data.reply, note);
this.collapsedQueueService.updateNoteQueue.enqueue(data.reply.id, { deltaRepliesCount: 1 });
}
if (data.reply == null) {
@ -646,8 +646,10 @@ export class NoteCreateService implements OnApplicationShutdown {
});
}
// TODO move these checks into incRenoteCount
if (this.isPureRenote(data) && data.renote.userId !== user.id && !user.isBot) {
await this.incRenoteCount(data.renote, user);
this.collapsedQueueService.updateNoteQueue.enqueue(data.renote.id, { deltaRenoteCount: 1 });
await this.incRenoteCount(data.renote, user)
}
if (data.poll && data.poll.expiresAt) {
@ -822,10 +824,9 @@ export class NoteCreateService implements OnApplicationShutdown {
*/
readonly isQuote = isQuote;
// Note: does not increment the count! used only for featured rankings.
@bindThis
private async incRenoteCount(renote: MiNote, user: MiUser) {
await this.notesRepository.increment({ id: renote.id }, 'renoteCount', 1);
// 30%の確率、3日以内に投稿されたートの場合ハイライト用ランキング更新
if (user.isExplorable && Math.random() < 0.3 && (this.timeService.now - this.idService.parse(renote.id).date.getTime()) < 1000 * 60 * 60 * 24 * 3) {
const policies = await this.roleService.getUserPolicies(user);
@ -878,11 +879,6 @@ export class NoteCreateService implements OnApplicationShutdown {
}
}
@bindThis
private async saveReply(reply: MiNote, note: MiNote) {
await this.notesRepository.increment({ id: reply.id }, 'repliesCount', 1);
}
@bindThis
private async index(note: MiNote) {
if (note.text == null && note.cw == null) return;

View file

@ -78,19 +78,17 @@ export class NoteDeleteService {
const cascadingNotes = await this.findCascadingNotes(note);
if (note.replyId) {
promises.push(this.notesRepository.decrement({ id: note.replyId }, 'repliesCount', 1));
this.collapsedQueueService.updateNoteQueue.enqueue(note.replyId, { deltaRepliesCount: -1 });
} else if (isPureRenote(note)) {
promises.push(this.notesRepository.decrement({ id: note.renoteId }, 'renoteCount', 1));
this.collapsedQueueService.updateNoteQueue.enqueue(note.renoteId, { deltaRenoteCount: -1 });
}
const cascadeReplies = cascadingNotes.filter(cascade => cascade.replyId != null);
const cascadeRenotes = cascadingNotes.filter(cascade => cascade.renoteId != null);
if (cascadeReplies.length > 0) {
promises.push(this.notesRepository.decrement({ id: In(cascadeReplies.map(cascade => cascade.replyId)) }, 'repliesCount', 1));
}
if (cascadeRenotes.length > 0) {
promises.push(this.notesRepository.decrement({ id: In(cascadeRenotes.map(cascade => cascade.renoteId)) }, 'renoteCount', 1));
for (const cascade of cascadingNotes) {
if (cascade.replyId) {
this.collapsedQueueService.updateNoteQueue.enqueue(cascade.replyId, { deltaRepliesCount: -1 });
} else if (isPureRenote(cascade)) {
this.collapsedQueueService.updateNoteQueue.enqueue(cascade.renoteId, { deltaRenoteCount: -1 });
}
}
if (!quiet) {