diff --git a/packages/backend/src/core/ClipService.ts b/packages/backend/src/core/ClipService.ts index 575440abd6..6bb6b78eea 100644 --- a/packages/backend/src/core/ClipService.ts +++ b/packages/backend/src/core/ClipService.ts @@ -132,7 +132,7 @@ export class ClipService { lastClippedAt: this.timeService.date, }); - this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: 1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: 1 }); } @bindThis @@ -157,6 +157,6 @@ export class ClipService { clipId: clip.id, }); - this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: -1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: -1 }); } } diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index 1a1def3291..c6628dad7f 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -11,10 +11,8 @@ import { renderInlineError } from '@/misc/render-inline-error.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { EnvService } from '@/core/EnvService.js'; import { bindThis } from '@/decorators.js'; -import type { MiInstance } from '@/models/Instance.js'; import { InternalEventService } from '@/core/InternalEventService.js'; -import { MiUser } from '@/models/User.js'; -import type { MiNote, UsersRepository, NotesRepository, MiAccessToken, AccessTokensRepository, MiAntenna, AntennasRepository } from '@/models/_.js'; +import type { UsersRepository, NotesRepository, AccessTokensRepository, MiAntenna, AntennasRepository } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; import { AntennaService } from '@/core/AntennaService.js'; @@ -52,25 +50,17 @@ export type UpdateAntennaJob = { lastUsedAt?: Date, }; -// TODO sync cross-process: -// 1. Emit internal events when scheduling timer, performing queue, and enqueuing data -// 2. On schedule, mark ID as deferred. -// 3. On perform, clear mark. -// 4. On performAll, skip deferred IDs. -// 5. On enqueue when ID is deferred, send data as event instead. -// 6. On delete when ID is deferred, clear mark - @Injectable() export class CollapsedQueueService implements OnApplicationShutdown { // Moved from InboxProcessorService - public readonly updateInstanceQueue: CollapsedQueue; + public readonly updateInstanceQueue: CollapsedQueue; // Moved from NoteCreateService, NoteEditService, and NoteDeleteService - public readonly updateUserQueue: CollapsedQueue; + public readonly updateUserQueue: CollapsedQueue; - public readonly updateNoteQueue: CollapsedQueue; - public readonly updateAccessTokenQueue: CollapsedQueue; - public readonly updateAntennaQueue: CollapsedQueue; + public readonly updateNoteQueue: CollapsedQueue; + public readonly updateAccessTokenQueue: CollapsedQueue; + public readonly updateAntennaQueue: CollapsedQueue; private readonly logger: Logger; @@ -100,6 +90,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { const oneMinuteInterval = this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 : 0; this.updateInstanceQueue = new CollapsedQueue( + this.internalEventService, 'updateInstance', fiveMinuteInterval, (oldJob, newJob) => ({ @@ -149,6 +140,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { ); this.updateUserQueue = new CollapsedQueue( + this.internalEventService, 'updateUser', oneMinuteInterval, (oldJob, newJob) => ({ @@ -173,6 +165,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { ); this.updateNoteQueue = new CollapsedQueue( + this.internalEventService, 'updateNote', oneMinuteInterval, (oldJob, newJob) => ({ @@ -192,6 +185,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { ); this.updateAccessTokenQueue = new CollapsedQueue( + this.internalEventService, 'updateAccessToken', fiveMinuteInterval, (oldJob, newJob) => ({ @@ -207,6 +201,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { ); this.updateAntennaQueue = new CollapsedQueue( + this.internalEventService, 'updateAntenna', fiveMinuteInterval, (oldJob, newJob) => ({ @@ -229,20 +224,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { } @bindThis - async performAllNow() { - this.logger.info('Persisting all collapsed queues...'); - - await this.performQueue(this.updateInstanceQueue); - await this.performQueue(this.updateUserQueue); - await this.performQueue(this.updateNoteQueue); - await this.performQueue(this.updateAccessTokenQueue); - await this.performQueue(this.updateAntennaQueue); - - this.logger.info('Persistence complete.'); - } - - @bindThis - private async performQueue(queue: CollapsedQueue): Promise { + private async performQueue(queue: CollapsedQueue): Promise { try { const results = await queue.performAllNow(); @@ -258,7 +240,7 @@ export class CollapsedQueueService implements OnApplicationShutdown { } @bindThis - private onQueueError(queue: CollapsedQueue, error: unknown): void { + private onQueueError(queue: CollapsedQueue, error: unknown): void { this.logger.error(`Error persisting ${queue.name}: ${renderInlineError(error)}`); } @@ -274,12 +256,25 @@ export class CollapsedQueueService implements OnApplicationShutdown { this.updateAntennaQueue.delete(data.id); } - async onApplicationShutdown() { + @bindThis + async dispose() { this.internalEventService.off('userChangeDeletedState', this.onUserDeleted); this.internalEventService.off('antennaDeleted', this.onAntennaDeleted); this.internalEventService.off('antennaUpdated', this.onAntennaDeleted); - await this.performAllNow(); + this.logger.info('Persisting all collapsed queues...'); + + await this.performQueue(this.updateInstanceQueue); + await this.performQueue(this.updateUserQueue); + await this.performQueue(this.updateNoteQueue); + await this.performQueue(this.updateAccessTokenQueue); + await this.performQueue(this.updateAntennaQueue); + + this.logger.info('Persistence complete.'); + } + + async onApplicationShutdown() { + await this.dispose(); } } diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index 2be5a580da..eeba5d6bae 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -277,6 +277,8 @@ export interface InternalEventTypes { userListMemberBulkRemoved: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; }; quantumCacheUpdated: { name: string, keys: string[] }; quantumCacheReset: { name: string }; + collapsedQueueDefer: { name: string, key: string, deferred: boolean }; + collapsedQueueEnqueue: { name: string, key: string, value: unknown }; } type EventTypesToEventPayload = EventUnionFromDictionary>>; diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 2c7c8add66..c1c48d1708 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -587,7 +587,7 @@ export class NoteCreateService implements OnApplicationShutdown { if (isRemoteUser(user)) { this.federatedInstanceService.fetchOrRegister(user.host).then(async i => { if (!this.isRenote(note) || this.isQuote(note)) { - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 }); } if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateNote(i.host, note, true); @@ -605,10 +605,10 @@ export class NoteCreateService implements OnApplicationShutdown { if (!this.isRenote(note) || this.isQuote(note)) { // Increment notes count (user) - this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: 1 }); + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: 1 }); } - this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); await this.pushToTl(note, user); @@ -618,7 +618,7 @@ export class NoteCreateService implements OnApplicationShutdown { }, user); if (data.reply) { - this.collapsedQueueService.updateNoteQueue.enqueue(data.reply.id, { repliesCountDelta: 1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(data.reply.id, { repliesCountDelta: 1 }); } if (data.reply == null) { @@ -647,7 +647,7 @@ export class NoteCreateService implements OnApplicationShutdown { } if (this.isPureRenote(data)) { - this.collapsedQueueService.updateNoteQueue.enqueue(data.renote.id, { renoteCountDelta: 1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(data.renote.id, { renoteCountDelta: 1 }); await this.incRenoteCount(data.renote, user); } diff --git a/packages/backend/src/core/NoteDeleteService.ts b/packages/backend/src/core/NoteDeleteService.ts index 8f8308679e..23d6dc5753 100644 --- a/packages/backend/src/core/NoteDeleteService.ts +++ b/packages/backend/src/core/NoteDeleteService.ts @@ -78,16 +78,16 @@ export class NoteDeleteService { const cascadingNotes = await this.findCascadingNotes(note); if (note.replyId) { - this.collapsedQueueService.updateNoteQueue.enqueue(note.replyId, { repliesCountDelta: -1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(note.replyId, { repliesCountDelta: -1 }); } else if (isPureRenote(note)) { - this.collapsedQueueService.updateNoteQueue.enqueue(note.renoteId, { renoteCountDelta: -1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(note.renoteId, { renoteCountDelta: -1 }); } for (const cascade of cascadingNotes) { if (cascade.replyId) { - this.collapsedQueueService.updateNoteQueue.enqueue(cascade.replyId, { repliesCountDelta: -1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(cascade.replyId, { repliesCountDelta: -1 }); } else if (isPureRenote(cascade)) { - this.collapsedQueueService.updateNoteQueue.enqueue(cascade.renoteId, { renoteCountDelta: -1 }); + await this.collapsedQueueService.updateNoteQueue.enqueue(cascade.renoteId, { renoteCountDelta: -1 }); } } @@ -139,14 +139,14 @@ export class NoteDeleteService { if (!isPureRenote(note)) { // Decrement notes count (user) - this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: -1 }); + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: -1 }); } - this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); for (const cascade of cascadingNotes) { if (!isPureRenote(cascade)) { - this.collapsedQueueService.updateUserQueue.enqueue(cascade.user.id, { notesCountDelta: -1 }); + await this.collapsedQueueService.updateUserQueue.enqueue(cascade.user.id, { notesCountDelta: -1 }); } // Don't mark cascaded user as updated (active) } @@ -155,7 +155,7 @@ export class NoteDeleteService { if (isRemoteUser(user)) { if (!isPureRenote(note)) { const i = await this.federatedInstanceService.fetchOrRegister(user.host); - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 }); } if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateNote(user.host, note, false); @@ -166,7 +166,7 @@ export class NoteDeleteService { if (this.userEntityService.isRemoteUser(cascade.user)) { if (!isPureRenote(cascade)) { const i = await this.federatedInstanceService.fetchOrRegister(cascade.user.host); - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 }); } if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateNote(cascade.user.host, cascade, false); diff --git a/packages/backend/src/core/NoteEditService.ts b/packages/backend/src/core/NoteEditService.ts index adff0cbb5b..df98283930 100644 --- a/packages/backend/src/core/NoteEditService.ts +++ b/packages/backend/src/core/NoteEditService.ts @@ -616,7 +616,7 @@ export class NoteEditService implements OnApplicationShutdown { if (isRemoteUser(user)) { this.federatedInstanceService.fetchOrRegister(user.host).then(async i => { if (note.renote && note.text || !note.renote) { - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 }); } if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateNote(i.host, note, true); @@ -625,7 +625,7 @@ export class NoteEditService implements OnApplicationShutdown { } } - this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); // ハッシュタグ更新 await this.pushToTl(note, user); diff --git a/packages/backend/src/core/ReactionService.ts b/packages/backend/src/core/ReactionService.ts index 1e2e30f219..070083784f 100644 --- a/packages/backend/src/core/ReactionService.ts +++ b/packages/backend/src/core/ReactionService.ts @@ -226,7 +226,7 @@ export class ReactionService implements OnModuleInit { .execute(); } - this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); // 30%の確率、セルフではない、3日以内に投稿されたノートの場合ハイライト用ランキング更新 if ( @@ -342,7 +342,7 @@ export class ReactionService implements OnModuleInit { .execute(); } - this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() }); this.globalEventService.publishNoteStream(note.id, 'unreacted', { reaction: this.decodeReaction(exist.reaction).reaction, diff --git a/packages/backend/src/core/UserFollowingService.ts b/packages/backend/src/core/UserFollowingService.ts index 9a58d2d1fe..a17006f57c 100644 --- a/packages/backend/src/core/UserFollowingService.ts +++ b/packages/backend/src/core/UserFollowingService.ts @@ -288,22 +288,22 @@ export class UserFollowingService implements OnModuleInit { // Neither followee nor follower has moved. if (!followeeUser.movedToUri && !followerUser.movedToUri) { //#region Increment counts - this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: 1 }); - this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: 1 }); + await this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: 1 }); + await this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: 1 }); //#endregion //#region Update instance stats if (this.meta.enableStatsForFederatedInstances) { if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { this.federatedInstanceService.fetchOrRegister(follower.host).then(async i => { - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: 1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: 1 }); if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateFollowing(i.host, true); } }); } else if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) { this.federatedInstanceService.fetchOrRegister(followee.host).then(async i => { - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: 1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: 1 }); if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateFollowers(i.host, true); } @@ -398,22 +398,22 @@ export class UserFollowingService implements OnModuleInit { // Neither followee nor follower has moved. if (!follower.movedToUri && !followee.movedToUri) { //#region Decrement following / followers counts - this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: -1 }); - this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: -1 }); + await this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: -1 }); + await this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: -1 }); //#endregion //#region Update instance stats if (this.meta.enableStatsForFederatedInstances) { if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { this.federatedInstanceService.fetchOrRegister(follower.host).then(async i => { - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: -1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: -1 }); if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateFollowing(i.host, false); } }); } else if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) { this.federatedInstanceService.fetchOrRegister(followee.host).then(async i => { - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: -1 }); + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: -1 }); if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateFollowers(i.host, false); } diff --git a/packages/backend/src/core/activitypub/models/ApPersonService.ts b/packages/backend/src/core/activitypub/models/ApPersonService.ts index e3442f76d1..692b04f1c8 100644 --- a/packages/backend/src/core/activitypub/models/ApPersonService.ts +++ b/packages/backend/src/core/activitypub/models/ApPersonService.ts @@ -577,12 +577,12 @@ export class ApPersonService implements OnModuleInit { // Register host if (this.meta.enableStatsForFederatedInstances) { - this.federatedInstanceService.fetchOrRegister(host).then(i => { - this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { usersCountDelta: 1 }); + this.federatedInstanceService.fetchOrRegister(host).then(async i => { + await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { usersCountDelta: 1 }); if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.newUser(i.host); } - this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i); + await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i); }); } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts index 280373db79..38582e61f0 100644 --- a/packages/backend/src/misc/collapsed-queue.ts +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -5,67 +5,105 @@ import { TimeService, type TimerHandle } from '@/global/TimeService.js'; import promiseLimit from 'promise-limit'; +import { InternalEventService } from '@/core/InternalEventService.js'; +import { bindThis } from '@/decorators.js'; type Job = { value: V; timer: TimerHandle; }; -// TODO: redis使えるようにする -export class CollapsedQueue { +// TODO document IPC sync process + +// sync cross-process: +// 1. Emit internal events when scheduling timer, performing queue, and enqueuing data +// 2. On enqueue, mark ID as deferred. +// 3. On perform, clear mark. +// 4. On performAll, skip deferred IDs. +// 5. On enqueue when ID is deferred, send data as event instead. +// 6. On delete, clear mark. +// 7. On delete when ID is deferred, do nothing. + +export class CollapsedQueue { private readonly limiter?: ReturnType>; - private jobs: Map> = new Map(); + private readonly jobs: Map> = new Map(); + private readonly deferredKeys = new Set(); constructor( + private readonly internalEventService: InternalEventervice, + private readonly timeService: TimeService, public readonly name: string, - protected readonly timeService: TimeService, private readonly timeout: number, private readonly collapse: (oldValue: V, newValue: V) => V, - private readonly perform: (key: K, value: V) => Promise, + private readonly perform: (key: string, value: V) => Promise, private readonly opts?: { - onError?: (queue: CollapsedQueue, error: unknown) => void | Promise, + onError?: (queue: CollapsedQueue, error: unknown) => void | Promise, concurrency?: number, }, ) { if (opts?.concurrency) { this.limiter = promiseLimit(opts.concurrency); } + + this.internalEventService.on('collapsedQueueDefer', this.onDefer, { ignoreLocal: true }); + this.internalEventService.on('collapsedQueueEnqueue', this.onEnqueue, { ignoreLocal: true }); } - enqueue(key: K, value: V) { - if (this.jobs.has(key)) { - const old = this.jobs.get(key)!; - const merged = this.collapse(old.value, value); - this.jobs.set(key, { ...old, value: merged }); - } else { - const timer = this.timeService.startTimer(() => { - const job = this.jobs.get(key)!; - this.jobs.delete(key); - this._perform(key, job.value); - }, this.timeout); - this.jobs.set(key, { value, timer }); + @bindThis + async enqueue(key: string, value: V) { + // If deferred, then send it out to the owning process + if (this.deferredKeys.has(key)) { + await this.internalEventService.emit('collapsedQueueEnqueue', { name: this.name, key, value }); + return; } - } - delete(key: K) { + // If already queued, then merge const job = this.jobs.get(key); if (job) { - clearTimeout(job.timer); - this.jobs.delete(key); + job.value = this.collapse(job.value, value); + return; } + + // Otherwise, create a new job + const timer = this.timeService.startTimer(async () => { + const job = this.jobs.get(key); + if (!job) return; + + this.jobs.delete(key); + await this._perform(key, job.value); + }, this.timeout); + this.jobs.set(key, { value, timer }); + + // Mark as deferred so other processes will forward their state to us + await this.internalEventService.emit('collapsedQueueDefer', { name: this.name, key, deferred: true }); } + @bindThis + async delete(key: string) { + const job = this.jobs.get(key); + if (!job) return; + + clearTimeout(job.timer); + this.jobs.delete(key); + await this.internalEventService.emit('collapsedQueueDefer', { name: this.name, key, deferred: false }); + } + + @bindThis async performAllNow() { - const entries = [...this.jobs.entries()]; - this.jobs.clear(); - for (const [_key, job] of entries) { - this.timeService.stopTimer(job.timer); + for (const job of this.jobs.values()) { + clearTimeout(job.timer); } + + const entries = Array.from(this.jobs.entries()); + this.jobs.clear(); + return await Promise.allSettled(entries.map(([key, job]) => this._perform(key, job.value))); } - private async _perform(key: K, value: V) { + private async _perform(key: string, value: V) { try { + await this.internalEventService.emit('collapsedQueueDefer', { name: this.name, key, deferred: false }); + if (this.limiter) { await this.limiter(async () => { await this.perform(key, value); @@ -78,4 +116,48 @@ export class CollapsedQueue { throw err; } } + + //#region Events from other processes + @bindThis + private async onDefer(data: { name: string, key: string, deferred: boolean }) { + if (data.name !== this.name) return; + + // Check for and recover from de-sync conditions where multiple processes try to "own" the same job. + const job = this.jobs.get(data.key); + if (job) { + if (data.deferred) { + // If another process tries to claim our job, then give it to them and queue our latest state. + this.timeService.stopTimer(job.timer); + this.jobs.delete(data.key); + await this.internalEventService.emit('collapsedQueueEnqueue', { name: this.name, key: data.key, value: job.value }); + } else { + // If another process tries to release our job, then just continue. + return; + } + } + + if (data.deferred) { + this.deferredKeys.add(data.key); + } else { + this.deferredKeys.delete(data.key); + } + } + + @bindThis + private async onEnqueue(data: { name: string, key: string, value: unknown }) { + if (data.name !== this.name) return; + + // Only enqueue if not deferred + if (!this.deferredKeys.has(data.key)) { + await this.enqueue(data.key, data.value as V); + } + } + //#endregion + + async dispose() { + this.internalEventService.off('collapsedQueueDefer', this.onDefer); + this.internalEventService.off('collapsedQueueEnqueue', this.onEnqueue); + + return await this.performAllNow(); + } } diff --git a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts index bacfed3473..b1b38ff788 100644 --- a/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts +++ b/packages/backend/src/queue/processors/BackgroundTaskProcessorService.ts @@ -188,7 +188,7 @@ export class BackgroundTaskProcessorService { // This is messy, but we need to minimize updates to space in Postgres blocks. if (updateNotResponding || updateGoneSuspended || updateAutoSuspended) { - this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, { + await this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, { notRespondingSince: updateNotResponding ? (success ? null : new Date()) : undefined, shouldSuspendGone: updateGoneSuspended || undefined, shouldSuspendNotResponding: updateAutoSuspended || undefined, @@ -229,7 +229,7 @@ export class BackgroundTaskProcessorService { await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance); // Unsuspend instance (deferred) - this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, { + await this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, { latestRequestReceivedAt: new Date(), shouldUnsuspend: instance.suspensionState === 'autoSuspendedForNotResponding', }); diff --git a/packages/backend/src/server/api/AuthenticateService.ts b/packages/backend/src/server/api/AuthenticateService.ts index 3f95c4b6c1..23ba7a98b9 100644 --- a/packages/backend/src/server/api/AuthenticateService.ts +++ b/packages/backend/src/server/api/AuthenticateService.ts @@ -81,7 +81,7 @@ export class AuthenticateService { throw new AuthenticationError('invalid signature'); } - this.collapsedQueueService.updateAccessTokenQueue.enqueue(accessToken.id, { + await this.collapsedQueueService.updateAccessTokenQueue.enqueue(accessToken.id, { lastUsedAt: this.timeService.date, }); diff --git a/packages/backend/src/server/api/endpoints/antennas/notes.ts b/packages/backend/src/server/api/endpoints/antennas/notes.ts index 81c0fe5169..5eddfe96fa 100644 --- a/packages/backend/src/server/api/endpoints/antennas/notes.ts +++ b/packages/backend/src/server/api/endpoints/antennas/notes.ts @@ -98,7 +98,7 @@ export default class extends Endpoint { // eslint- // falseだった場合はアンテナの配信先が増えたことを通知したい const needPublishEvent = !antenna.isActive; - this.collapsedQueueService.updateAntennaQueue.enqueue(antenna.id, { + await this.collapsedQueueService.updateAntennaQueue.enqueue(antenna.id, { isActive: true, lastUsedAt: this.timeService.date, });