merge NoteCreateService.updateNotesCountQueue and NoteEditService.updateNotesCountQueue into CollapsedQueueService.updateInstanceQueue
This commit is contained in:
parent
e884e3f6a3
commit
28dff9aff9
3 changed files with 38 additions and 26 deletions
|
|
@ -18,8 +18,9 @@ import type { UsersRepository } from '@/models/_.js';
|
|||
import { DI } from '@/di-symbols.js';
|
||||
|
||||
export type UpdateInstanceJob = {
|
||||
latestRequestReceivedAt: Date,
|
||||
shouldUnsuspend: boolean,
|
||||
latestRequestReceivedAt?: Date,
|
||||
shouldUnsuspend?: boolean,
|
||||
additionalNotes?: number,
|
||||
};
|
||||
|
||||
export type UpdateUserJob = {
|
||||
|
|
@ -52,13 +53,15 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
'updateInstance',
|
||||
fiveMinuteInterval,
|
||||
(oldJob, newJob) => ({
|
||||
latestRequestReceivedAt: new Date(Math.max(oldJob.latestRequestReceivedAt.getTime(), newJob.latestRequestReceivedAt.getTime())),
|
||||
latestRequestReceivedAt: maxDate(oldJob.latestRequestReceivedAt, newJob.latestRequestReceivedAt),
|
||||
shouldUnsuspend: oldJob.shouldUnsuspend || newJob.shouldUnsuspend,
|
||||
additionalNotes: (oldJob.additionalNotes ?? 0) + (newJob.additionalNotes ?? 0),
|
||||
}),
|
||||
(id, job) => this.federatedInstanceService.update(id, {
|
||||
latestRequestReceivedAt: job.latestRequestReceivedAt,
|
||||
isNotResponding: false,
|
||||
isNotResponding: job.latestRequestReceivedAt ? false : undefined,
|
||||
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
|
||||
notesCount: job.additionalNotes ? () => `"notesCount" + ${job.additionalNotes}` : undefined,
|
||||
}),
|
||||
{
|
||||
onError: this.onQueueError,
|
||||
|
|
@ -70,7 +73,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
'updateUser',
|
||||
fiveMinuteInterval,
|
||||
(oldJob, newJob) => ({
|
||||
updatedAt: new Date(Math.max(oldJob.updatedAt.getTime(), newJob.updatedAt.getTime())),
|
||||
updatedAt: maxDate(oldJob.updatedAt, newJob.updatedAt),
|
||||
}),
|
||||
(id, job) => this.usersRepository.update({ id }, { updatedAt: job.updatedAt }),
|
||||
{
|
||||
|
|
@ -126,3 +129,24 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
await this.performAllNow();
|
||||
}
|
||||
}
|
||||
|
||||
function maxDate(first: Date, second: Date | undefined): Date;
|
||||
function maxDate(first: Date | undefined, second: Date): Date;
|
||||
function maxDate(first: Date | undefined, second: Date | undefined): Date | undefined;
|
||||
// eslint requires a space here -_-
|
||||
|
||||
function maxDate(first: Date | undefined, second: Date | undefined): Date | undefined {
|
||||
if (first && second) {
|
||||
if (first.getTime() > second.getTime()) {
|
||||
return first;
|
||||
} else {
|
||||
return second;
|
||||
}
|
||||
} else if (first) {
|
||||
return first;
|
||||
} else if (second) {
|
||||
return second;
|
||||
} else {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ export type PureRenoteOption = Option & { renote: MiNote } & ({ text?: null } |
|
|||
@Injectable()
|
||||
export class NoteCreateService implements OnApplicationShutdown {
|
||||
#shutdownController = new AbortController();
|
||||
private updateNotesCountQueue: CollapsedQueue<MiNote['id'], number>;
|
||||
// private updateNotesCountQueue: CollapsedQueue<MiNote['id'], number>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.config)
|
||||
|
|
@ -229,7 +229,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
private readonly noteVisibilityService: NoteVisibilityService,
|
||||
private readonly collapsedQueueService: CollapsedQueueService,
|
||||
) {
|
||||
this.updateNotesCountQueue = new CollapsedQueue(this.timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseNotesCount, this.performUpdateNotesCount);
|
||||
//this.updateNotesCountQueue = new CollapsedQueue(this.timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseNotesCount, this.performUpdateNotesCount);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -587,7 +587,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
if (isRemoteUser(user)) {
|
||||
this.federatedInstanceService.fetchOrRegister(user.host).then(async i => {
|
||||
if (!this.isRenote(note) || this.isQuote(note)) {
|
||||
this.updateNotesCountQueue.enqueue(i.id, 1);
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { additionalNotes: 1 });
|
||||
}
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateNote(i.host, note, true);
|
||||
|
|
@ -1050,6 +1050,8 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Moved to CollapsedQueueService
|
||||
/*
|
||||
@bindThis
|
||||
private collapseNotesCount(oldValue: number, newValue: number) {
|
||||
return oldValue + newValue;
|
||||
|
|
@ -1059,11 +1061,12 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
private async performUpdateNotesCount(id: MiNote['id'], incrBy: number) {
|
||||
await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy);
|
||||
}
|
||||
*/
|
||||
|
||||
@bindThis
|
||||
public async dispose(): Promise<void> {
|
||||
this.#shutdownController.abort();
|
||||
await this.updateNotesCountQueue.performAllNow();
|
||||
// await this.updateNotesCountQueue.performAllNow();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
|||
|
|
@ -50,7 +50,6 @@ import { trackTask } from '@/misc/promise-tracker.js';
|
|||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||
import { LatestNoteService } from '@/core/LatestNoteService.js';
|
||||
import { CollapsedQueue } from '@/misc/collapsed-queue.js';
|
||||
import { NoteCreateService } from '@/core/NoteCreateService.js';
|
||||
import { TimeService } from '@/global/TimeService.js';
|
||||
import { NoteVisibilityService } from '@/core/NoteVisibilityService.js';
|
||||
|
|
@ -151,7 +150,6 @@ export type Option = {
|
|||
@Injectable()
|
||||
export class NoteEditService implements OnApplicationShutdown {
|
||||
#shutdownController = new AbortController();
|
||||
private updateNotesCountQueue: CollapsedQueue<MiNote['id'], number>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.config)
|
||||
|
|
@ -226,9 +224,7 @@ export class NoteEditService implements OnApplicationShutdown {
|
|||
private readonly timeService: TimeService,
|
||||
private readonly noteVisibilityService: NoteVisibilityService,
|
||||
private readonly collapsedQueueService: CollapsedQueueService,
|
||||
) {
|
||||
this.updateNotesCountQueue = new CollapsedQueue(this.timeService, process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseNotesCount, this.performUpdateNotesCount);
|
||||
}
|
||||
) {}
|
||||
|
||||
@bindThis
|
||||
public async edit(user: MiUser, editid: MiNote['id'], data: Option, silent = false): Promise<MiNote> {
|
||||
|
|
@ -620,7 +616,7 @@ export class NoteEditService implements OnApplicationShutdown {
|
|||
if (isRemoteUser(user)) {
|
||||
this.federatedInstanceService.fetchOrRegister(user.host).then(async i => {
|
||||
if (note.renote && note.text || !note.renote) {
|
||||
this.updateNotesCountQueue.enqueue(i.id, 1);
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { additionalNotes: 1 });
|
||||
}
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateNote(i.host, note, true);
|
||||
|
|
@ -925,20 +921,9 @@ export class NoteEditService implements OnApplicationShutdown {
|
|||
|
||||
// checkHibernation moved to HibernateUsersProcessorService
|
||||
|
||||
@bindThis
|
||||
private collapseNotesCount(oldValue: number, newValue: number) {
|
||||
return oldValue + newValue;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async performUpdateNotesCount(id: MiNote['id'], incrBy: number) {
|
||||
await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async dispose(): Promise<void> {
|
||||
this.#shutdownController.abort();
|
||||
await this.updateNotesCountQueue.performAllNow();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue