replace MarkUserUpdatedBackgroundTask with updateUserQueue

This commit is contained in:
Hazelnoot 2025-06-25 12:34:40 -04:00
parent 3a2c0fd9da
commit 45df03ab14
9 changed files with 67 additions and 73 deletions

View file

@ -3,7 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { LoggerService } from '@/core/LoggerService.js';
import type Logger from '@/logger.js';
import { CollapsedQueue } from '@/misc/collapsed-queue.js';
@ -12,55 +12,69 @@ import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { EnvService } from '@/core/EnvService.js';
import { bindThis } from '@/decorators.js';
import type { MiInstance } from '@/models/Instance.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { MiUser } from '@/models/User.js';
import type { UsersRepository } from '@/models/_.js';
import { DI } from '@/di-symbols.js';
export type UpdateInstanceJob = {
latestRequestReceivedAt: Date,
shouldUnsuspend: boolean,
};
export type UpdateUserJob = {
updatedAt: Date,
};
@Injectable()
export class CollapsedQueueService implements OnApplicationShutdown {
// Moved from InboxProcessorService to allow access from ApInboxService
public readonly updateInstanceQueue: CollapsedQueue<MiInstance['id'], UpdateInstanceJob>;
public readonly updateUserQueue: CollapsedQueue<MiUser['id'], UpdateUserJob>;
private readonly logger: Logger;
constructor(
@Inject(DI.usersRepository)
public readonly usersRepository: UsersRepository,
private readonly federatedInstanceService: FederatedInstanceService,
private readonly envService: EnvService,
private readonly internalEventService: InternalEventService,
loggerService: LoggerService,
) {
this.logger = loggerService.getLogger('collapsed-queue');
const fiveMinuteInterval = this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0;
this.updateInstanceQueue = new CollapsedQueue(
'updateInstance',
this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0,
(oldJob, newJob) => this.collapseUpdateInstance(oldJob, newJob),
(id, job) => this.performUpdateInstance(id, job),
fiveMinuteInterval,
(oldJob, newJob) => ({
latestRequestReceivedAt: new Date(Math.max(oldJob.latestRequestReceivedAt.getTime(), newJob.latestRequestReceivedAt.getTime())),
shouldUnsuspend: oldJob.shouldUnsuspend || newJob.shouldUnsuspend,
}),
(id, job) => this.federatedInstanceService.update(id, {
latestRequestReceivedAt: job.latestRequestReceivedAt,
isNotResponding: false,
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
}),
this.onQueueError,
);
}
@bindThis
private collapseUpdateInstance(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
? newJob.latestRequestReceivedAt
: oldJob.latestRequestReceivedAt;
const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend;
return {
latestRequestReceivedAt,
shouldUnsuspend,
};
}
this.updateUserQueue = new CollapsedQueue(
'updateUser',
fiveMinuteInterval,
(oldJob, newJob) => ({
updatedAt: new Date(Math.max(oldJob.updatedAt.getTime(), newJob.updatedAt.getTime())),
}),
(id, job) => this.usersRepository.update({ id }, { updatedAt: job.updatedAt }),
this.onQueueError,
);
@bindThis
private async performUpdateInstance(id: string, job: UpdateInstanceJob) {
await this.federatedInstanceService.update(id, {
latestRequestReceivedAt: new Date(),
isNotResponding: false,
// もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
});
this.internalEventService.on('localUserUpdated', this.onUserUpdated);
this.internalEventService.on('remoteUserUpdated', this.onUserUpdated);
}
@bindThis
@ -68,6 +82,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
this.logger.info('Persisting all collapsed queues...');
await this.performQueue(this.updateInstanceQueue);
await this.performQueue(this.updateUserQueue);
this.logger.info('Persistence complete.');
}
@ -93,7 +108,15 @@ export class CollapsedQueueService implements OnApplicationShutdown {
this.logger.error(`Error persisting ${queue.name}: ${renderInlineError(error)}`);
}
@bindThis
private onUserUpdated(data: { id: string }) {
this.updateUserQueue.enqueue(data.id, { updatedAt: new Date() });
}
async onApplicationShutdown() {
this.internalEventService.off('localUserUpdated', this.onUserUpdated);
this.internalEventService.off('remoteUserUpdated', this.onUserUpdated);
await this.performAllNow();
}
}

View file

@ -59,6 +59,7 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { CacheService } from '@/core/CacheService.js';
import { TimeService } from '@/global/TimeService.js';
import { NoteVisibilityService } from '@/core/NoteVisibilityService.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
@ -226,6 +227,7 @@ export class NoteCreateService implements OnApplicationShutdown {
private latestNoteService: LatestNoteService,
private readonly timeService: TimeService,
private readonly noteVisibilityService: NoteVisibilityService,
private readonly collapsedQueueService: CollapsedQueueService,
) {
this.updateNotesCountQueue = new CollapsedQueue(this.timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseNotesCount, this.performUpdateNotesCount);
}
@ -604,10 +606,10 @@ export class NoteCreateService implements OnApplicationShutdown {
if (!this.isRenote(note) || this.isQuote(note)) {
// Increment notes count (user)
await this.incNotesCountOfUser(user);
} else {
await this.queueService.createMarkUserUpdatedJob(user.id);
}
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
await this.pushToTl(note, user);
await this.antennaService.addNoteToAntennas({

View file

@ -27,7 +27,7 @@ import { LatestNoteService } from '@/core/LatestNoteService.js';
import { ApLogService } from '@/core/ApLogService.js';
import { TimeService } from '@/global/TimeService.js';
import { trackTask } from '@/misc/promise-tracker.js';
import { QueueService } from '@/core/QueueService.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
import { CacheService } from '@/core/CacheService.js';
@Injectable()
@ -61,7 +61,7 @@ export class NoteDeleteService {
private latestNoteService: LatestNoteService,
private readonly apLogService: ApLogService,
private readonly timeService: TimeService,
private readonly queueService: QueueService,
private readonly collapsedQueueService: CollapsedQueueService,
private readonly cacheService: CacheService,
) {}
@ -142,10 +142,10 @@ export class NoteDeleteService {
if (!isPureRenote(note)) {
// Decrement notes count (user)
promises.push(this.decNotesCountOfUser(user));
} else {
promises.push(this.queueService.createMarkUserUpdatedJob(user.id));
}
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
for (const cascade of cascadingNotes) {
if (!isPureRenote(cascade)) {
promises.push(this.decNotesCountOfUser(cascade.user));

View file

@ -55,6 +55,7 @@ import { NoteCreateService } from '@/core/NoteCreateService.js';
import { TimeService } from '@/global/TimeService.js';
import { NoteVisibilityService } from '@/core/NoteVisibilityService.js';
import { isPureRenote } from '@/misc/is-renote.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention' | 'edited';
@ -224,6 +225,7 @@ export class NoteEditService implements OnApplicationShutdown {
private noteCreateService: NoteCreateService,
private readonly timeService: TimeService,
private readonly noteVisibilityService: NoteVisibilityService,
private readonly collapsedQueueService: CollapsedQueueService,
) {
this.updateNotesCountQueue = new CollapsedQueue(this.timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseNotesCount, this.performUpdateNotesCount);
}
@ -627,7 +629,7 @@ export class NoteEditService implements OnApplicationShutdown {
}
}
await this.queueService.createMarkUserUpdatedJob(user.id);
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
// ハッシュタグ更新
await this.pushToTl(note, user);

View file

@ -935,11 +935,6 @@ export class QueueService implements OnModuleInit {
return await this.createBackgroundTask({ type: 'delete-ap-logs', dataType, data });
}
@bindThis
public async createMarkUserUpdatedJob(userId: string) {
return await this.createBackgroundTask({ type: 'mark-user-updated', userId }, userId);
}
private async createBackgroundTask<T extends BackgroundTaskJobData>(data: T, duplication?: string | { id: string, ttl?: number }) {
return await this.backgroundTaskQueue.add(
data.type,

View file

@ -33,7 +33,7 @@ import { PER_NOTE_REACTION_USER_PAIR_CACHE_MAX } from '@/const.js';
import { CacheService } from '@/core/CacheService.js';
import { NoteVisibilityService } from '@/core/NoteVisibilityService.js';
import { TimeService } from '@/global/TimeService.js';
import { QueueService } from '@/core/QueueService.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
import type { DataSource } from 'typeorm';
const FALLBACK = '\u2764';
@ -111,7 +111,7 @@ export class ReactionService implements OnModuleInit {
private readonly cacheService: CacheService,
private readonly noteVisibilityService: NoteVisibilityService,
private readonly timeService: TimeService,
private readonly queueService: QueueService,
private readonly collapsedQueueService: CollapsedQueueService,
) {
}
@ -226,7 +226,7 @@ export class ReactionService implements OnModuleInit {
.execute();
}
await this.queueService.createMarkUserUpdatedJob(user.id);
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
// 30%の確率、セルフではない、3日以内に投稿されたートの場合ハイライト用ランキング更新
if (
@ -342,7 +342,7 @@ export class ReactionService implements OnModuleInit {
.execute();
}
await this.queueService.createMarkUserUpdatedJob(user.id);
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
this.globalEventService.publishNoteStream(note.id, 'unreacted', {
reaction: this.decodeReaction(exist.reaction).reaction,

View file

@ -19,7 +19,7 @@ export class CollapsedQueue<K, V> {
protected readonly timeService: TimeService,
private readonly timeout: number,
private readonly collapse: (oldValue: V, newValue: V) => V,
private readonly perform: (key: K, value: V) => Promise<void>,
private readonly perform: (key: K, value: V) => Promise<void | unknown>,
private readonly onError?: (queue: CollapsedQueue<K, V>, error: unknown) => void,
) {}

View file

@ -5,7 +5,7 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask, UpdateLatestNoteBackgroundTask, PostSuspendBackgroundTask, PostUnsuspendBackgroundTask, DeleteApLogsBackgroundTask, MarkUserUpdatedBackgroundTask } from '@/queue/types.js';
import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask, UpdateLatestNoteBackgroundTask, PostSuspendBackgroundTask, PostUnsuspendBackgroundTask, DeleteApLogsBackgroundTask } from '@/queue/types.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
import Logger from '@/logger.js';
@ -18,7 +18,7 @@ import InstanceChart from '@/core/chart/charts/instance.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { NoteCreateService } from '@/core/NoteCreateService.js';
import type { DriveFilesRepository, NoteEditsRepository, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js';
import type { DriveFilesRepository, NoteEditsRepository, NotesRepository, PollsRepository } from '@/models/_.js';
import { MiUser } from '@/models/_.js';
import { NoteEditService } from '@/core/NoteEditService.js';
import { HashtagService } from '@/core/HashtagService.js';
@ -27,7 +27,6 @@ import { LatestNoteService } from '@/core/LatestNoteService.js';
import { trackTask } from '@/misc/promise-tracker.js';
import { UserSuspendService } from '@/core/UserSuspendService.js';
import { ApLogService } from '@/core/ApLogService.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
@Injectable()
@ -47,9 +46,6 @@ export class BackgroundTaskProcessorService {
@Inject(DI.noteEditsRepository)
private readonly noteEditsRepository: NoteEditsRepository,
@Inject(DI.usersRepository)
private readonly usersRepository: UsersRepository,
@Inject(DI.pollsRepository)
private readonly pollsRepository: PollsRepository,
@ -68,7 +64,6 @@ export class BackgroundTaskProcessorService {
private readonly latestNoteService: LatestNoteService,
private readonly userSuspendService: UserSuspendService,
private readonly apLogService: ApLogService,
private readonly internalEventService: InternalEventService,
queueLoggerService: QueueLoggerService,
) {
@ -102,11 +97,9 @@ export class BackgroundTaskProcessorService {
return await this.processPostSuspend(job.data);
} else if (job.data.type === 'post-unsuspend') {
return await this.processPostUnsuspend(job.data);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (job.data.type === 'delete-ap-logs') {
return await this.processDeleteApLogs(job.data);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (job.data.type === 'mark-user-updated') {
return await this.processMarkUserUpdated(job.data);
} else {
this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data);
throw new Error(`Unknown job type ${job.data}, see system logs for details`);
@ -352,19 +345,4 @@ export class BackgroundTaskProcessorService {
return 'ok';
}
private async processMarkUserUpdated(task: MarkUserUpdatedBackgroundTask): Promise<string> {
const user = await this.cacheService.findOptionalUserById(task.userId);
if (!user || user.isDeleted) return `Skipping post-unsuspend task: user ${task.userId} has been deleted`;
await this.usersRepository.update({ id: user.id }, { updatedAt: new Date() });
if (user.host == null) {
await this.internalEventService.emit('localUserUpdated', { id: user.id });
} else {
await this.internalEventService.emit('remoteUserUpdated', { id: user.id });
}
return 'ok';
}
}

View file

@ -184,8 +184,7 @@ export type BackgroundTaskJobData =
UpdateLatestNoteBackgroundTask |
PostSuspendBackgroundTask |
PostUnsuspendBackgroundTask |
DeleteApLogsBackgroundTask |
MarkUserUpdatedBackgroundTask;
DeleteApLogsBackgroundTask;
export type UpdateUserBackgroundTask = {
type: 'update-user';
@ -262,8 +261,3 @@ export type DeleteApLogsBackgroundTask = {
dataType: 'inbox' | 'object';
data: string | string[];
};
export type MarkUserUpdatedBackgroundTask = {
type: 'mark-user-updated';
userId: string;
};