diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 71716f0f6b..15c50e4d83 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -1003,6 +1003,7 @@ export class QueueService implements OnModuleInit { case 'userWebhookDeliver': return this.userWebhookDeliverQueue; case 'systemWebhookDeliver': return this.systemWebhookDeliverQueue; case 'scheduleNotePost': return this.ScheduleNotePostQueue; + case 'backgroundTask': return this.backgroundTaskQueue; default: throw new Error(`Unrecognized queue type: ${type}`); } } diff --git a/packages/backend/src/core/SponsorsService.ts b/packages/backend/src/core/SponsorsService.ts index 551768bdc9..23994b5761 100644 --- a/packages/backend/src/core/SponsorsService.ts +++ b/packages/backend/src/core/SponsorsService.ts @@ -61,6 +61,7 @@ export class SponsorsService { } try { + // TODO use HTTP service const backers = await fetch(`${this.meta.donationUrl}/members/users.json`).then((response) => response.json() as Promise); // Merge both together into one array and make sure it only has Active subscriptions @@ -76,6 +77,7 @@ export class SponsorsService { @bindThis private async fetchSharkeySponsors(): Promise { try { + // TODO use HTTP service const backers = await fetch('https://opencollective.com/sharkey/tiers/backer/all.json').then((response) => response.json() as Promise); const sponsorsOC = await fetch('https://opencollective.com/sharkey/tiers/sponsor/all.json').then((response) => response.json() as Promise); diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index 3779172517..9d2266e7e7 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -24,6 +24,7 @@ export interface StatsEntry { export interface Stats { deliver: StatsEntry, inbox: StatsEntry, + background: StatsEntry, } const ev = new Xev(); @@ -35,9 +36,11 @@ export class QueueStatsService implements OnApplicationShutdown { private intervalId?: TimerHandle; private activeDeliverJobs = 0; private activeInboxJobs = 0; + private activeBackgroundJobs = 0; private deliverQueueEvents?: Bull.QueueEvents; private inboxQueueEvents?: Bull.QueueEvents; + private backgroundQueueEvents?: Bull.QueueEvents; private log?: Stats[]; @@ -60,6 +63,11 @@ export class QueueStatsService implements OnApplicationShutdown { this.activeInboxJobs++; } + @bindThis + private onBackgroundActive() { + this.activeBackgroundJobs++; + } + @bindThis private onRequestQueueStatsLog(x: { id: string, length?: number }) { if (this.log) { @@ -80,13 +88,16 @@ export class QueueStatsService implements OnApplicationShutdown { this.deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER)); this.inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX)); + this.backgroundQueueEvents = new Bull.QueueEvents(QUEUE.BACKGROUND_TASK, baseQueueOptions(this.config, QUEUE.BACKGROUND_TASK)); this.deliverQueueEvents.on('active', this.onDeliverActive); this.inboxQueueEvents.on('active', this.onInboxActive); + this.backgroundQueueEvents.on('active', this.onBackgroundActive); const tick = async () => { const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts(); const inboxJobCounts = await this.queueService.inboxQueue.getJobCounts(); + const backgroundJobCounts = await this.queueService.backgroundTaskQueue.getJobCounts(); const stats = { deliver: { @@ -101,6 +112,12 @@ export class QueueStatsService implements OnApplicationShutdown { waiting: inboxJobCounts.waiting, delayed: inboxJobCounts.delayed, }, + background: { + activeSincePrevTick: this.activeBackgroundJobs, + active: backgroundJobCounts.active, + waiting: backgroundJobCounts.waiting, + delayed: backgroundJobCounts.delayed, + }, }; ev.emit('queueStats', stats); @@ -112,6 +129,7 @@ export class QueueStatsService implements OnApplicationShutdown { this.activeDeliverJobs = 0; this.activeInboxJobs = 0; + this.activeBackgroundJobs = 0; }; tick(); @@ -120,7 +138,7 @@ export class QueueStatsService implements OnApplicationShutdown { } @bindThis - public async stop() { + public async stop(): void { if (this.intervalId) { this.timeService.stopTimer(this.intervalId); } @@ -130,12 +148,20 @@ export class QueueStatsService implements OnApplicationShutdown { this.deliverQueueEvents?.off('active', this.onDeliverActive); this.inboxQueueEvents?.off('active', this.onInboxActive); + this.backgroundQueueEvents?.off('active', this.onBackgroundActive); await this.deliverQueueEvents?.close(); await this.inboxQueueEvents?.close(); + await this.backgroundQueueEvents?.close(); this.activeDeliverJobs = 0; this.activeInboxJobs = 0; + this.activeBackgroundJobs = 0; + } + + @bindThis + public async dispose(): void { + await this.stop(); } @bindThis diff --git a/packages/frontend/src/pages/admin/job-queue.vue b/packages/frontend/src/pages/admin/job-queue.vue index 155277c976..a37c8f872b 100644 --- a/packages/frontend/src/pages/admin/job-queue.vue +++ b/packages/frontend/src/pages/admin/job-queue.vue @@ -204,6 +204,7 @@ const QUEUE_TYPES = [ 'userWebhookDeliver', 'systemWebhookDeliver', 'scheduleNotePost', + 'backgroundTask', ] as const; const tab: Ref = ref('-'); diff --git a/packages/frontend/src/widgets/WidgetJobQueue.vue b/packages/frontend/src/widgets/WidgetJobQueue.vue index 485e532d51..7ecc663f45 100644 --- a/packages/frontend/src/widgets/WidgetJobQueue.vue +++ b/packages/frontend/src/widgets/WidgetJobQueue.vue @@ -47,6 +47,27 @@ SPDX-License-Identifier: AGPL-3.0-only +
+
Background queue
+
+
+
Process
+
{{ kmg(current.background.activeSincePrevTick, 2) }}
+
+
+
Active
+
{{ kmg(current.background.active, 2) }}
+
+
+
Delayed
+
{{ kmg(current.background.delayed, 2) }}
+
+
+
Waiting
+
{{ kmg(current.background.waiting, 2) }}
+
+
+
@@ -99,6 +120,12 @@ const current = reactive({ waiting: 0, delayed: 0, }, + background: { + activeSincePrevTick: 0, + active: 0, + waiting: 0, + delayed: 0, + }, }); const prev = reactive({} as typeof current); const jammedAudioBuffer = ref(null); @@ -111,12 +138,12 @@ if (prefer.s['sound.masterVolume']) { }); } -for (const domain of ['inbox', 'deliver']) { +for (const domain of ['inbox', 'deliver', 'background']) { prev[domain] = deepClone(current[domain]); } const onStats = (stats) => { - for (const domain of ['inbox', 'deliver']) { + for (const domain of ['inbox', 'deliver', 'background']) { prev[domain] = deepClone(current[domain]); current[domain].activeSincePrevTick = stats[domain].activeSincePrevTick; current[domain].active = stats[domain].active;