enhance: コントロールパネルでジョブキューをクリアできるように

This commit is contained in:
syuilo 2025-04-16 16:47:03 +09:00
parent f2aeafaddb
commit eda2f587a3
8 changed files with 71 additions and 17 deletions

View file

@ -38,6 +38,18 @@ import type {
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';
export const QUEUE_TYPES = [
'system',
'endedPollNotification',
'deliver',
'inbox',
'db',
'relationship',
'objectStorage',
'userWebhookDeliver',
'systemWebhookDeliver',
] as const;
@Injectable()
export class QueueService {
constructor(
@ -529,15 +541,35 @@ export class QueueService {
}
@bindThis
public destroy() {
this.deliverQueue.once('cleaned', (jobs, status) => {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.deliverQueue.clean(0, 0, 'delayed');
private getQueue(type: typeof QUEUE_TYPES[number]) {
switch (type) {
case 'system': return this.systemQueue;
case 'endedPollNotification': return this.endedPollNotificationQueue;
case 'deliver': return this.deliverQueue;
case 'inbox': return this.inboxQueue;
case 'db': return this.dbQueue;
case 'relationship': return this.relationshipQueue;
case 'objectStorage': return this.objectStorageQueue;
case 'userWebhookDeliver': return this.userWebhookDeliverQueue;
case 'systemWebhookDeliver': return this.systemWebhookDeliverQueue;
default: throw new Error(`Unrecognized queue type: ${type}`);
}
}
this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.inboxQueue.clean(0, 0, 'delayed');
@bindThis
public clearQueue(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') {
const queue = this.getQueue(queueType);
if (state === '*') {
queue.clean(0, 0, 'completed');
queue.clean(0, 0, 'wait');
queue.clean(0, 0, 'active');
queue.clean(0, 0, 'paused');
queue.clean(0, 0, 'prioritized');
queue.clean(0, 0, 'delayed');
queue.clean(0, 0, 'failed');
} else {
queue.clean(0, 0, state);
}
}
}

View file

@ -6,7 +6,7 @@
import { Injectable } from '@nestjs/common';
import { Endpoint } from '@/server/api/endpoint-base.js';
import { ModerationLogService } from '@/core/ModerationLogService.js';
import { QueueService } from '@/core/QueueService.js';
import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
export const meta = {
tags: ['admin'],
@ -18,8 +18,11 @@ export const meta = {
export const paramDef = {
type: 'object',
properties: {},
required: [],
properties: {
type: { type: 'string', enum: QUEUE_TYPES },
state: { type: 'string', enum: ['*', 'wait', 'delayed'] },
},
required: ['type', 'state'],
} as const;
@Injectable()
@ -29,7 +32,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
private queueService: QueueService,
) {
super(meta, paramDef, async (ps, me) => {
this.queueService.destroy();
this.queueService.clearQueue(ps.type, ps.state);
this.moderationLogService.log(me, 'clearQueue');
});