implement background queue charts
This commit is contained in:
parent
4b08e978ce
commit
8f436ef8ca
5 changed files with 60 additions and 3 deletions
|
|
@ -1003,6 +1003,7 @@ export class QueueService implements OnModuleInit {
|
|||
case 'userWebhookDeliver': return this.userWebhookDeliverQueue;
|
||||
case 'systemWebhookDeliver': return this.systemWebhookDeliverQueue;
|
||||
case 'scheduleNotePost': return this.ScheduleNotePostQueue;
|
||||
case 'backgroundTask': return this.backgroundTaskQueue;
|
||||
default: throw new Error(`Unrecognized queue type: ${type}`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ export class SponsorsService {
|
|||
}
|
||||
|
||||
try {
|
||||
// TODO use HTTP service
|
||||
const backers = await fetch(`${this.meta.donationUrl}/members/users.json`).then((response) => response.json() as Promise<Sponsor[]>);
|
||||
|
||||
// Merge both together into one array and make sure it only has Active subscriptions
|
||||
|
|
@ -76,6 +77,7 @@ export class SponsorsService {
|
|||
@bindThis
|
||||
private async fetchSharkeySponsors(): Promise<Sponsor[]> {
|
||||
try {
|
||||
// TODO use HTTP service
|
||||
const backers = await fetch('https://opencollective.com/sharkey/tiers/backer/all.json').then((response) => response.json() as Promise<Sponsor[]>);
|
||||
const sponsorsOC = await fetch('https://opencollective.com/sharkey/tiers/sponsor/all.json').then((response) => response.json() as Promise<Sponsor[]>);
|
||||
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ export interface StatsEntry {
|
|||
export interface Stats {
|
||||
deliver: StatsEntry,
|
||||
inbox: StatsEntry,
|
||||
background: StatsEntry,
|
||||
}
|
||||
|
||||
const ev = new Xev();
|
||||
|
|
@ -35,9 +36,11 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
private intervalId?: TimerHandle;
|
||||
private activeDeliverJobs = 0;
|
||||
private activeInboxJobs = 0;
|
||||
private activeBackgroundJobs = 0;
|
||||
|
||||
private deliverQueueEvents?: Bull.QueueEvents;
|
||||
private inboxQueueEvents?: Bull.QueueEvents;
|
||||
private backgroundQueueEvents?: Bull.QueueEvents;
|
||||
|
||||
private log?: Stats[];
|
||||
|
||||
|
|
@ -60,6 +63,11 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
this.activeInboxJobs++;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private onBackgroundActive() {
|
||||
this.activeBackgroundJobs++;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private onRequestQueueStatsLog(x: { id: string, length?: number }) {
|
||||
if (this.log) {
|
||||
|
|
@ -80,13 +88,16 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
|
||||
this.deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
|
||||
this.inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));
|
||||
this.backgroundQueueEvents = new Bull.QueueEvents(QUEUE.BACKGROUND_TASK, baseQueueOptions(this.config, QUEUE.BACKGROUND_TASK));
|
||||
|
||||
this.deliverQueueEvents.on('active', this.onDeliverActive);
|
||||
this.inboxQueueEvents.on('active', this.onInboxActive);
|
||||
this.backgroundQueueEvents.on('active', this.onBackgroundActive);
|
||||
|
||||
const tick = async () => {
|
||||
const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts();
|
||||
const inboxJobCounts = await this.queueService.inboxQueue.getJobCounts();
|
||||
const backgroundJobCounts = await this.queueService.backgroundTaskQueue.getJobCounts();
|
||||
|
||||
const stats = {
|
||||
deliver: {
|
||||
|
|
@ -101,6 +112,12 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
waiting: inboxJobCounts.waiting,
|
||||
delayed: inboxJobCounts.delayed,
|
||||
},
|
||||
background: {
|
||||
activeSincePrevTick: this.activeBackgroundJobs,
|
||||
active: backgroundJobCounts.active,
|
||||
waiting: backgroundJobCounts.waiting,
|
||||
delayed: backgroundJobCounts.delayed,
|
||||
},
|
||||
};
|
||||
|
||||
ev.emit('queueStats', stats);
|
||||
|
|
@ -112,6 +129,7 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
|
||||
this.activeDeliverJobs = 0;
|
||||
this.activeInboxJobs = 0;
|
||||
this.activeBackgroundJobs = 0;
|
||||
};
|
||||
|
||||
tick();
|
||||
|
|
@ -120,7 +138,7 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public async stop() {
|
||||
public async stop(): void {
|
||||
if (this.intervalId) {
|
||||
this.timeService.stopTimer(this.intervalId);
|
||||
}
|
||||
|
|
@ -130,12 +148,20 @@ export class QueueStatsService implements OnApplicationShutdown {
|
|||
|
||||
this.deliverQueueEvents?.off('active', this.onDeliverActive);
|
||||
this.inboxQueueEvents?.off('active', this.onInboxActive);
|
||||
this.backgroundQueueEvents?.off('active', this.onBackgroundActive);
|
||||
|
||||
await this.deliverQueueEvents?.close();
|
||||
await this.inboxQueueEvents?.close();
|
||||
await this.backgroundQueueEvents?.close();
|
||||
|
||||
this.activeDeliverJobs = 0;
|
||||
this.activeInboxJobs = 0;
|
||||
this.activeBackgroundJobs = 0;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async dispose(): void {
|
||||
await this.stop();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue