synchronize collapsed queues across the cluster to avoid data races
This commit is contained in:
parent
1cb21b0911
commit
896a364de7
13 changed files with 174 additions and 95 deletions
|
|
@ -132,7 +132,7 @@ export class ClipService {
|
|||
lastClippedAt: this.timeService.date,
|
||||
});
|
||||
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: 1 });
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -157,6 +157,6 @@ export class ClipService {
|
|||
clipId: clip.id,
|
||||
});
|
||||
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(noteId, { clippedCountDelta: -1 });
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,10 +11,8 @@ import { renderInlineError } from '@/misc/render-inline-error.js';
|
|||
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
|
||||
import { EnvService } from '@/core/EnvService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import type { MiInstance } from '@/models/Instance.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
import { MiUser } from '@/models/User.js';
|
||||
import type { MiNote, UsersRepository, NotesRepository, MiAccessToken, AccessTokensRepository, MiAntenna, AntennasRepository } from '@/models/_.js';
|
||||
import type { UsersRepository, NotesRepository, AccessTokensRepository, MiAntenna, AntennasRepository } from '@/models/_.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { AntennaService } from '@/core/AntennaService.js';
|
||||
|
||||
|
|
@ -52,25 +50,17 @@ export type UpdateAntennaJob = {
|
|||
lastUsedAt?: Date,
|
||||
};
|
||||
|
||||
// TODO sync cross-process:
|
||||
// 1. Emit internal events when scheduling timer, performing queue, and enqueuing data
|
||||
// 2. On schedule, mark ID as deferred.
|
||||
// 3. On perform, clear mark.
|
||||
// 4. On performAll, skip deferred IDs.
|
||||
// 5. On enqueue when ID is deferred, send data as event instead.
|
||||
// 6. On delete when ID is deferred, clear mark
|
||||
|
||||
@Injectable()
|
||||
export class CollapsedQueueService implements OnApplicationShutdown {
|
||||
// Moved from InboxProcessorService
|
||||
public readonly updateInstanceQueue: CollapsedQueue<MiInstance['id'], UpdateInstanceJob>;
|
||||
public readonly updateInstanceQueue: CollapsedQueue<UpdateInstanceJob>;
|
||||
|
||||
// Moved from NoteCreateService, NoteEditService, and NoteDeleteService
|
||||
public readonly updateUserQueue: CollapsedQueue<MiUser['id'], UpdateUserJob>;
|
||||
public readonly updateUserQueue: CollapsedQueue<UpdateUserJob>;
|
||||
|
||||
public readonly updateNoteQueue: CollapsedQueue<MiNote['id'], UpdateNoteJob>;
|
||||
public readonly updateAccessTokenQueue: CollapsedQueue<MiAccessToken['id'], UpdateAccessTokenJob>;
|
||||
public readonly updateAntennaQueue: CollapsedQueue<MiAntenna['id'], UpdateAntennaJob>;
|
||||
public readonly updateNoteQueue: CollapsedQueue<UpdateNoteJob>;
|
||||
public readonly updateAccessTokenQueue: CollapsedQueue<UpdateAccessTokenJob>;
|
||||
public readonly updateAntennaQueue: CollapsedQueue<UpdateAntennaJob>;
|
||||
|
||||
private readonly logger: Logger;
|
||||
|
||||
|
|
@ -100,6 +90,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
const oneMinuteInterval = this.envService.env.NODE_ENV !== 'test' ? 60 * 1000 : 0;
|
||||
|
||||
this.updateInstanceQueue = new CollapsedQueue(
|
||||
this.internalEventService,
|
||||
'updateInstance',
|
||||
fiveMinuteInterval,
|
||||
(oldJob, newJob) => ({
|
||||
|
|
@ -149,6 +140,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
);
|
||||
|
||||
this.updateUserQueue = new CollapsedQueue(
|
||||
this.internalEventService,
|
||||
'updateUser',
|
||||
oneMinuteInterval,
|
||||
(oldJob, newJob) => ({
|
||||
|
|
@ -173,6 +165,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
);
|
||||
|
||||
this.updateNoteQueue = new CollapsedQueue(
|
||||
this.internalEventService,
|
||||
'updateNote',
|
||||
oneMinuteInterval,
|
||||
(oldJob, newJob) => ({
|
||||
|
|
@ -192,6 +185,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
);
|
||||
|
||||
this.updateAccessTokenQueue = new CollapsedQueue(
|
||||
this.internalEventService,
|
||||
'updateAccessToken',
|
||||
fiveMinuteInterval,
|
||||
(oldJob, newJob) => ({
|
||||
|
|
@ -207,6 +201,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
);
|
||||
|
||||
this.updateAntennaQueue = new CollapsedQueue(
|
||||
this.internalEventService,
|
||||
'updateAntenna',
|
||||
fiveMinuteInterval,
|
||||
(oldJob, newJob) => ({
|
||||
|
|
@ -229,20 +224,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
async performAllNow() {
|
||||
this.logger.info('Persisting all collapsed queues...');
|
||||
|
||||
await this.performQueue(this.updateInstanceQueue);
|
||||
await this.performQueue(this.updateUserQueue);
|
||||
await this.performQueue(this.updateNoteQueue);
|
||||
await this.performQueue(this.updateAccessTokenQueue);
|
||||
await this.performQueue(this.updateAntennaQueue);
|
||||
|
||||
this.logger.info('Persistence complete.');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async performQueue<K, V>(queue: CollapsedQueue<K, V>): Promise<void> {
|
||||
private async performQueue<V>(queue: CollapsedQueue<V>): Promise<void> {
|
||||
try {
|
||||
const results = await queue.performAllNow();
|
||||
|
||||
|
|
@ -258,7 +240,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
private onQueueError<K, V>(queue: CollapsedQueue<K, V>, error: unknown): void {
|
||||
private onQueueError<V>(queue: CollapsedQueue<V>, error: unknown): void {
|
||||
this.logger.error(`Error persisting ${queue.name}: ${renderInlineError(error)}`);
|
||||
}
|
||||
|
||||
|
|
@ -274,12 +256,25 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
this.updateAntennaQueue.delete(data.id);
|
||||
}
|
||||
|
||||
async onApplicationShutdown() {
|
||||
@bindThis
|
||||
async dispose() {
|
||||
this.internalEventService.off('userChangeDeletedState', this.onUserDeleted);
|
||||
this.internalEventService.off('antennaDeleted', this.onAntennaDeleted);
|
||||
this.internalEventService.off('antennaUpdated', this.onAntennaDeleted);
|
||||
|
||||
await this.performAllNow();
|
||||
this.logger.info('Persisting all collapsed queues...');
|
||||
|
||||
await this.performQueue(this.updateInstanceQueue);
|
||||
await this.performQueue(this.updateUserQueue);
|
||||
await this.performQueue(this.updateNoteQueue);
|
||||
await this.performQueue(this.updateAccessTokenQueue);
|
||||
await this.performQueue(this.updateAntennaQueue);
|
||||
|
||||
this.logger.info('Persistence complete.');
|
||||
}
|
||||
|
||||
async onApplicationShutdown() {
|
||||
await this.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -277,6 +277,8 @@ export interface InternalEventTypes {
|
|||
userListMemberBulkRemoved: { userListIds: MiUserList['id'][]; memberId: MiUser['id']; };
|
||||
quantumCacheUpdated: { name: string, keys: string[] };
|
||||
quantumCacheReset: { name: string };
|
||||
collapsedQueueDefer: { name: string, key: string, deferred: boolean };
|
||||
collapsedQueueEnqueue: { name: string, key: string, value: unknown };
|
||||
}
|
||||
|
||||
type EventTypesToEventPayload<T> = EventUnionFromDictionary<UndefinedAsNullAll<SerializedAll<T>>>;
|
||||
|
|
|
|||
|
|
@ -587,7 +587,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
if (isRemoteUser(user)) {
|
||||
this.federatedInstanceService.fetchOrRegister(user.host).then(async i => {
|
||||
if (!this.isRenote(note) || this.isQuote(note)) {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 });
|
||||
}
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateNote(i.host, note, true);
|
||||
|
|
@ -605,10 +605,10 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
|
||||
if (!this.isRenote(note) || this.isQuote(note)) {
|
||||
// Increment notes count (user)
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: 1 });
|
||||
}
|
||||
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
|
||||
await this.pushToTl(note, user);
|
||||
|
||||
|
|
@ -618,7 +618,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
}, user);
|
||||
|
||||
if (data.reply) {
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(data.reply.id, { repliesCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(data.reply.id, { repliesCountDelta: 1 });
|
||||
}
|
||||
|
||||
if (data.reply == null) {
|
||||
|
|
@ -647,7 +647,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
if (this.isPureRenote(data)) {
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(data.renote.id, { renoteCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(data.renote.id, { renoteCountDelta: 1 });
|
||||
await this.incRenoteCount(data.renote, user);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -78,16 +78,16 @@ export class NoteDeleteService {
|
|||
const cascadingNotes = await this.findCascadingNotes(note);
|
||||
|
||||
if (note.replyId) {
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(note.replyId, { repliesCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(note.replyId, { repliesCountDelta: -1 });
|
||||
} else if (isPureRenote(note)) {
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(note.renoteId, { renoteCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(note.renoteId, { renoteCountDelta: -1 });
|
||||
}
|
||||
|
||||
for (const cascade of cascadingNotes) {
|
||||
if (cascade.replyId) {
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(cascade.replyId, { repliesCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(cascade.replyId, { repliesCountDelta: -1 });
|
||||
} else if (isPureRenote(cascade)) {
|
||||
this.collapsedQueueService.updateNoteQueue.enqueue(cascade.renoteId, { renoteCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateNoteQueue.enqueue(cascade.renoteId, { renoteCountDelta: -1 });
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -139,14 +139,14 @@ export class NoteDeleteService {
|
|||
|
||||
if (!isPureRenote(note)) {
|
||||
// Decrement notes count (user)
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { notesCountDelta: -1 });
|
||||
}
|
||||
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
|
||||
for (const cascade of cascadingNotes) {
|
||||
if (!isPureRenote(cascade)) {
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(cascade.user.id, { notesCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(cascade.user.id, { notesCountDelta: -1 });
|
||||
}
|
||||
// Don't mark cascaded user as updated (active)
|
||||
}
|
||||
|
|
@ -155,7 +155,7 @@ export class NoteDeleteService {
|
|||
if (isRemoteUser(user)) {
|
||||
if (!isPureRenote(note)) {
|
||||
const i = await this.federatedInstanceService.fetchOrRegister(user.host);
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 });
|
||||
}
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateNote(user.host, note, false);
|
||||
|
|
@ -166,7 +166,7 @@ export class NoteDeleteService {
|
|||
if (this.userEntityService.isRemoteUser(cascade.user)) {
|
||||
if (!isPureRenote(cascade)) {
|
||||
const i = await this.federatedInstanceService.fetchOrRegister(cascade.user.host);
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: -1 });
|
||||
}
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateNote(cascade.user.host, cascade, false);
|
||||
|
|
|
|||
|
|
@ -616,7 +616,7 @@ export class NoteEditService implements OnApplicationShutdown {
|
|||
if (isRemoteUser(user)) {
|
||||
this.federatedInstanceService.fetchOrRegister(user.host).then(async i => {
|
||||
if (note.renote && note.text || !note.renote) {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { notesCountDelta: 1 });
|
||||
}
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateNote(i.host, note, true);
|
||||
|
|
@ -625,7 +625,7 @@ export class NoteEditService implements OnApplicationShutdown {
|
|||
}
|
||||
}
|
||||
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
|
||||
// ハッシュタグ更新
|
||||
await this.pushToTl(note, user);
|
||||
|
|
|
|||
|
|
@ -226,7 +226,7 @@ export class ReactionService implements OnModuleInit {
|
|||
.execute();
|
||||
}
|
||||
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
|
||||
// 30%の確率、セルフではない、3日以内に投稿されたノートの場合ハイライト用ランキング更新
|
||||
if (
|
||||
|
|
@ -342,7 +342,7 @@ export class ReactionService implements OnModuleInit {
|
|||
.execute();
|
||||
}
|
||||
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { updatedAt: new Date() });
|
||||
|
||||
this.globalEventService.publishNoteStream(note.id, 'unreacted', {
|
||||
reaction: this.decodeReaction(exist.reaction).reaction,
|
||||
|
|
|
|||
|
|
@ -288,22 +288,22 @@ export class UserFollowingService implements OnModuleInit {
|
|||
// Neither followee nor follower has moved.
|
||||
if (!followeeUser.movedToUri && !followerUser.movedToUri) {
|
||||
//#region Increment counts
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: 1 });
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: 1 });
|
||||
//#endregion
|
||||
|
||||
//#region Update instance stats
|
||||
if (this.meta.enableStatsForFederatedInstances) {
|
||||
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
|
||||
this.federatedInstanceService.fetchOrRegister(follower.host).then(async i => {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: 1 });
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateFollowing(i.host, true);
|
||||
}
|
||||
});
|
||||
} else if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) {
|
||||
this.federatedInstanceService.fetchOrRegister(followee.host).then(async i => {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: 1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: 1 });
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateFollowers(i.host, true);
|
||||
}
|
||||
|
|
@ -398,22 +398,22 @@ export class UserFollowingService implements OnModuleInit {
|
|||
// Neither followee nor follower has moved.
|
||||
if (!follower.movedToUri && !followee.movedToUri) {
|
||||
//#region Decrement following / followers counts
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: -1 });
|
||||
this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(follower.id, { followingCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateUserQueue.enqueue(followee.id, { followersCountDelta: -1 });
|
||||
//#endregion
|
||||
|
||||
//#region Update instance stats
|
||||
if (this.meta.enableStatsForFederatedInstances) {
|
||||
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
|
||||
this.federatedInstanceService.fetchOrRegister(follower.host).then(async i => {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followingCountDelta: -1 });
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateFollowing(i.host, false);
|
||||
}
|
||||
});
|
||||
} else if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) {
|
||||
this.federatedInstanceService.fetchOrRegister(followee.host).then(async i => {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: -1 });
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { followersCountDelta: -1 });
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.updateFollowers(i.host, false);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -577,12 +577,12 @@ export class ApPersonService implements OnModuleInit {
|
|||
|
||||
// Register host
|
||||
if (this.meta.enableStatsForFederatedInstances) {
|
||||
this.federatedInstanceService.fetchOrRegister(host).then(i => {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { usersCountDelta: 1 });
|
||||
this.federatedInstanceService.fetchOrRegister(host).then(async i => {
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(i.id, { usersCountDelta: 1 });
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.newUser(i.host);
|
||||
}
|
||||
this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i);
|
||||
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,67 +5,105 @@
|
|||
|
||||
import { TimeService, type TimerHandle } from '@/global/TimeService.js';
|
||||
import promiseLimit from 'promise-limit';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
|
||||
type Job<V> = {
|
||||
value: V;
|
||||
timer: TimerHandle;
|
||||
};
|
||||
|
||||
// TODO: redis使えるようにする
|
||||
export class CollapsedQueue<K, V> {
|
||||
// TODO document IPC sync process
|
||||
|
||||
// sync cross-process:
|
||||
// 1. Emit internal events when scheduling timer, performing queue, and enqueuing data
|
||||
// 2. On enqueue, mark ID as deferred.
|
||||
// 3. On perform, clear mark.
|
||||
// 4. On performAll, skip deferred IDs.
|
||||
// 5. On enqueue when ID is deferred, send data as event instead.
|
||||
// 6. On delete, clear mark.
|
||||
// 7. On delete when ID is deferred, do nothing.
|
||||
|
||||
export class CollapsedQueue<V> {
|
||||
private readonly limiter?: ReturnType<typeof promiseLimit<void>>;
|
||||
private jobs: Map<K, Job<V>> = new Map();
|
||||
private readonly jobs: Map<string, Job<V>> = new Map();
|
||||
private readonly deferredKeys = new Set<string>();
|
||||
|
||||
constructor(
|
||||
private readonly internalEventService: InternalEventervice,
|
||||
private readonly timeService: TimeService,
|
||||
public readonly name: string,
|
||||
protected readonly timeService: TimeService,
|
||||
private readonly timeout: number,
|
||||
private readonly collapse: (oldValue: V, newValue: V) => V,
|
||||
private readonly perform: (key: K, value: V) => Promise<void | unknown>,
|
||||
private readonly perform: (key: string, value: V) => Promise<void | unknown>,
|
||||
private readonly opts?: {
|
||||
onError?: (queue: CollapsedQueue<K, V>, error: unknown) => void | Promise<void>,
|
||||
onError?: (queue: CollapsedQueue<V>, error: unknown) => void | Promise<void>,
|
||||
concurrency?: number,
|
||||
},
|
||||
) {
|
||||
if (opts?.concurrency) {
|
||||
this.limiter = promiseLimit<void>(opts.concurrency);
|
||||
}
|
||||
|
||||
this.internalEventService.on('collapsedQueueDefer', this.onDefer, { ignoreLocal: true });
|
||||
this.internalEventService.on('collapsedQueueEnqueue', this.onEnqueue, { ignoreLocal: true });
|
||||
}
|
||||
|
||||
enqueue(key: K, value: V) {
|
||||
if (this.jobs.has(key)) {
|
||||
const old = this.jobs.get(key)!;
|
||||
const merged = this.collapse(old.value, value);
|
||||
this.jobs.set(key, { ...old, value: merged });
|
||||
} else {
|
||||
const timer = this.timeService.startTimer(() => {
|
||||
const job = this.jobs.get(key)!;
|
||||
this.jobs.delete(key);
|
||||
this._perform(key, job.value);
|
||||
}, this.timeout);
|
||||
this.jobs.set(key, { value, timer });
|
||||
@bindThis
|
||||
async enqueue(key: string, value: V) {
|
||||
// If deferred, then send it out to the owning process
|
||||
if (this.deferredKeys.has(key)) {
|
||||
await this.internalEventService.emit('collapsedQueueEnqueue', { name: this.name, key, value });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
delete(key: K) {
|
||||
// If already queued, then merge
|
||||
const job = this.jobs.get(key);
|
||||
if (job) {
|
||||
clearTimeout(job.timer);
|
||||
this.jobs.delete(key);
|
||||
job.value = this.collapse(job.value, value);
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise, create a new job
|
||||
const timer = this.timeService.startTimer(async () => {
|
||||
const job = this.jobs.get(key);
|
||||
if (!job) return;
|
||||
|
||||
this.jobs.delete(key);
|
||||
await this._perform(key, job.value);
|
||||
}, this.timeout);
|
||||
this.jobs.set(key, { value, timer });
|
||||
|
||||
// Mark as deferred so other processes will forward their state to us
|
||||
await this.internalEventService.emit('collapsedQueueDefer', { name: this.name, key, deferred: true });
|
||||
}
|
||||
|
||||
@bindThis
|
||||
async delete(key: string) {
|
||||
const job = this.jobs.get(key);
|
||||
if (!job) return;
|
||||
|
||||
clearTimeout(job.timer);
|
||||
this.jobs.delete(key);
|
||||
await this.internalEventService.emit('collapsedQueueDefer', { name: this.name, key, deferred: false });
|
||||
}
|
||||
|
||||
@bindThis
|
||||
async performAllNow() {
|
||||
const entries = [...this.jobs.entries()];
|
||||
this.jobs.clear();
|
||||
for (const [_key, job] of entries) {
|
||||
this.timeService.stopTimer(job.timer);
|
||||
for (const job of this.jobs.values()) {
|
||||
clearTimeout(job.timer);
|
||||
}
|
||||
|
||||
const entries = Array.from(this.jobs.entries());
|
||||
this.jobs.clear();
|
||||
|
||||
return await Promise.allSettled(entries.map(([key, job]) => this._perform(key, job.value)));
|
||||
}
|
||||
|
||||
private async _perform(key: K, value: V) {
|
||||
private async _perform(key: string, value: V) {
|
||||
try {
|
||||
await this.internalEventService.emit('collapsedQueueDefer', { name: this.name, key, deferred: false });
|
||||
|
||||
if (this.limiter) {
|
||||
await this.limiter(async () => {
|
||||
await this.perform(key, value);
|
||||
|
|
@ -78,4 +116,48 @@ export class CollapsedQueue<K, V> {
|
|||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
//#region Events from other processes
|
||||
@bindThis
|
||||
private async onDefer(data: { name: string, key: string, deferred: boolean }) {
|
||||
if (data.name !== this.name) return;
|
||||
|
||||
// Check for and recover from de-sync conditions where multiple processes try to "own" the same job.
|
||||
const job = this.jobs.get(data.key);
|
||||
if (job) {
|
||||
if (data.deferred) {
|
||||
// If another process tries to claim our job, then give it to them and queue our latest state.
|
||||
this.timeService.stopTimer(job.timer);
|
||||
this.jobs.delete(data.key);
|
||||
await this.internalEventService.emit('collapsedQueueEnqueue', { name: this.name, key: data.key, value: job.value });
|
||||
} else {
|
||||
// If another process tries to release our job, then just continue.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (data.deferred) {
|
||||
this.deferredKeys.add(data.key);
|
||||
} else {
|
||||
this.deferredKeys.delete(data.key);
|
||||
}
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async onEnqueue(data: { name: string, key: string, value: unknown }) {
|
||||
if (data.name !== this.name) return;
|
||||
|
||||
// Only enqueue if not deferred
|
||||
if (!this.deferredKeys.has(data.key)) {
|
||||
await this.enqueue(data.key, data.value as V);
|
||||
}
|
||||
}
|
||||
//#endregion
|
||||
|
||||
async dispose() {
|
||||
this.internalEventService.off('collapsedQueueDefer', this.onDefer);
|
||||
this.internalEventService.off('collapsedQueueEnqueue', this.onEnqueue);
|
||||
|
||||
return await this.performAllNow();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ export class BackgroundTaskProcessorService {
|
|||
|
||||
// This is messy, but we need to minimize updates to space in Postgres blocks.
|
||||
if (updateNotResponding || updateGoneSuspended || updateAutoSuspended) {
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, {
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, {
|
||||
notRespondingSince: updateNotResponding ? (success ? null : new Date()) : undefined,
|
||||
shouldSuspendGone: updateGoneSuspended || undefined,
|
||||
shouldSuspendNotResponding: updateAutoSuspended || undefined,
|
||||
|
|
@ -229,7 +229,7 @@ export class BackgroundTaskProcessorService {
|
|||
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance);
|
||||
|
||||
// Unsuspend instance (deferred)
|
||||
this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, {
|
||||
await this.collapsedQueueService.updateInstanceQueue.enqueue(instance.id, {
|
||||
latestRequestReceivedAt: new Date(),
|
||||
shouldUnsuspend: instance.suspensionState === 'autoSuspendedForNotResponding',
|
||||
});
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ export class AuthenticateService {
|
|||
throw new AuthenticationError('invalid signature');
|
||||
}
|
||||
|
||||
this.collapsedQueueService.updateAccessTokenQueue.enqueue(accessToken.id, {
|
||||
await this.collapsedQueueService.updateAccessTokenQueue.enqueue(accessToken.id, {
|
||||
lastUsedAt: this.timeService.date,
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
// falseだった場合はアンテナの配信先が増えたことを通知したい
|
||||
const needPublishEvent = !antenna.isActive;
|
||||
|
||||
this.collapsedQueueService.updateAntennaQueue.enqueue(antenna.id, {
|
||||
await this.collapsedQueueService.updateAntennaQueue.enqueue(antenna.id, {
|
||||
isActive: true,
|
||||
lastUsedAt: this.timeService.date,
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue