implement background task queue

This commit is contained in:
Hazelnoot 2025-06-17 14:06:15 -04:00
parent 911f90f95a
commit 4b08e978ce
18 changed files with 482 additions and 63 deletions

View file

@ -253,21 +253,42 @@ id: 'aidx'
# Number of worker processes
#clusterLimit: 1
# +-------------------------+
# | Job concurrency options |
# +-------------------------+
#
### Available options:
# [type]JobConcurrency - limits the number jobs that can run at the same time.
# Sharkey will allow this many jobs of this type *per worker process*.
# [type]JobPerSec - limits the total number of jobs that may complete within a single second.
# If this limit is exceeded, then Sharkey will pause this type of job until the next second.
# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped.
# If this limit is exceeded, then the job is considered "failed" and recorded for debugging.
#
### Job types:
# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances.
# All inbound activities are queued and processed in chronological order by this job.
# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances.
# All outbound activities are queued and processed in chronological order by this job.
# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs.
# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates.
# Job concurrency per worker
#deliverJobConcurrency: 128
#inboxJobConcurrency: 16
# relashionshipJobConcurrency: 16
# What's relashionshipJob?:
# Follow, unfollow, block and unblock(ings) while following-imports, etc. or account migrations.
#relationshipJobConcurrency: 16
#backgroundJobConcurrency: 32
# Job rate limiter
#deliverJobPerSec: 128
#inboxJobPerSec: 32
# relashionshipJobPerSec: 64
#relationshipJobPerSec: 64
#backgroundJobPerSec: 256
# Job attempts
#deliverJobMaxAttempts: 12
#inboxJobMaxAttempts: 8
#backgroundJobMaxAttempts: 8
# Local address used for outgoing requests
#outgoingAddress: 127.0.0.1

View file

@ -223,17 +223,42 @@ id: 'aidx'
# Number of worker processes
#clusterLimit: 1
# +-------------------------+
# | Job concurrency options |
# +-------------------------+
#
### Available options:
# [type]JobConcurrency - limits the number jobs that can run at the same time.
# Sharkey will allow this many jobs of this type *per worker process*.
# [type]JobPerSec - limits the total number of jobs that may complete within a single second.
# If this limit is exceeded, then Sharkey will pause this type of job until the next second.
# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped.
# If this limit is exceeded, then the job is considered "failed" and recorded for debugging.
#
### Job types:
# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances.
# All inbound activities are queued and processed in chronological order by this job.
# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances.
# All outbound activities are queued and processed in chronological order by this job.
# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs.
# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates.
# Job concurrency per worker
#deliverJobConcurrency: 128
#inboxJobConcurrency: 16
#relationshipJobConcurrency: 16
#backgroundJobConcurrency: 32
# Job rate limiter
#deliverJobPerSec: 128
#inboxJobPerSec: 32
#relationshipJobPerSec: 64
#backgroundJobPerSec: 256
# Job attempts
#deliverJobMaxAttempts: 12
#inboxJobMaxAttempts: 8
#backgroundJobMaxAttempts: 8
# IP address family used for outgoing request (ipv4, ipv6 or dual)
#outgoingAddressFamily: ipv4

View file

@ -307,21 +307,42 @@ id: 'aidx'
# Number of worker processes
#clusterLimit: 1
# +-------------------------+
# | Job concurrency options |
# +-------------------------+
#
### Available options:
# [type]JobConcurrency - limits the number jobs that can run at the same time.
# Sharkey will allow this many jobs of this type *per worker process*.
# [type]JobPerSec - limits the total number of jobs that may complete within a single second.
# If this limit is exceeded, then Sharkey will pause this type of job until the next second.
# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped.
# If this limit is exceeded, then the job is considered "failed" and recorded for debugging.
#
### Job types:
# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances.
# All inbound activities are queued and processed in chronological order by this job.
# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances.
# All outbound activities are queued and processed in chronological order by this job.
# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs.
# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates.
# Job concurrency per worker
#deliverJobConcurrency: 128
#inboxJobConcurrency: 16
#relationshipJobConcurrency: 16
# What's relationshipJob?:
# Follow, unfollow, block and unblock(ings) while following-imports, etc. or account migrations.
#backgroundJobConcurrency: 32
# Job rate limiter
#deliverJobPerSec: 128
#inboxJobPerSec: 32
#relationshipJobPerSec: 64
#backgroundJobPerSec: 256
# Job attempts
#deliverJobMaxAttempts: 12
#inboxJobMaxAttempts: 8
#backgroundJobMaxAttempts: 8
# Local address used for outgoing requests
#outgoingAddress: 127.0.0.1

View file

@ -310,21 +310,42 @@ id: 'aidx'
# Number of worker processes
#clusterLimit: 1
# +-------------------------+
# | Job concurrency options |
# +-------------------------+
#
### Available options:
# [type]JobConcurrency - limits the number jobs that can run at the same time.
# Sharkey will allow this many jobs of this type *per worker process*.
# [type]JobPerSec - limits the total number of jobs that may complete within a single second.
# If this limit is exceeded, then Sharkey will pause this type of job until the next second.
# [type]JobMaxAttempts - limits the number of times that a job is allowed to fail and re-try before it's permanently stopped.
# If this limit is exceeded, then the job is considered "failed" and recorded for debugging.
#
### Job types:
# inbox - processes ActivityPub messages (AKA "Activities") received from remote instances.
# All inbound activities are queued and processed in chronological order by this job.
# deliver - processes ActivityPub messages (AKA "Activities") being set to remote instances.
# All outbound activities are queued and processed in chronological order by this job.
# relationship - processes user-to-user tasks including follow/unfollow, block/unblock, account migrations, and all follow import jobs.
# background - processes background synchronization tasks that need to happen soon (but not immediately), such as remote user updates and instance metadata updates.
# Job concurrency per worker
#deliverJobConcurrency: 128
#inboxJobConcurrency: 16
#relationshipJobConcurrency: 16
# What's relationshipJob?:
# Follow, unfollow, block and unblock(ings) while following-imports, etc. or account migrations.
#backgroundJobConcurrency: 32
# Job rate limiter
#deliverJobPerSec: 128
#inboxJobPerSec: 32
#relationshipJobPerSec: 64
#backgroundJobPerSec: 256
# Job attempts
#deliverJobMaxAttempts: 12
#inboxJobMaxAttempts: 8
#backgroundJobMaxAttempts: 8
# Local address used for outgoing requests
#outgoingAddress: 127.0.0.1

View file

@ -111,11 +111,14 @@ type Source = {
deliverJobConcurrency?: number;
inboxJobConcurrency?: number;
relationshipJobConcurrency?: number;
backgroundJobConcurrency?: number;
deliverJobPerSec?: number;
inboxJobPerSec?: number;
relationshipJobPerSec?: number;
backgroundJobPerSec?: number;
deliverJobMaxAttempts?: number;
inboxJobMaxAttempts?: number;
backgroundJobMaxAttempts?: number;
mediaDirectory?: string;
mediaProxy?: string;
@ -272,11 +275,14 @@ export type Config = {
deliverJobConcurrency: number | undefined;
inboxJobConcurrency: number | undefined;
relationshipJobConcurrency: number | undefined;
backgroundJobConcurrency: number | undefined;
deliverJobPerSec: number | undefined;
inboxJobPerSec: number | undefined;
relationshipJobPerSec: number | undefined;
backgroundJobPerSec: number | undefined;
deliverJobMaxAttempts: number | undefined;
inboxJobMaxAttempts: number | undefined;
backgroundJobMaxAttempts: number | undefined;
proxyRemoteFiles: boolean | undefined;
customMOTD: string[] | undefined;
signToActivityPubGet: boolean;
@ -475,11 +481,14 @@ export function loadConfig(loggerService: LoggerService): Config {
deliverJobConcurrency: config.deliverJobConcurrency,
inboxJobConcurrency: config.inboxJobConcurrency,
relationshipJobConcurrency: config.relationshipJobConcurrency,
backgroundJobConcurrency: config.backgroundJobConcurrency,
deliverJobPerSec: config.deliverJobPerSec,
inboxJobPerSec: config.inboxJobPerSec,
relationshipJobPerSec: config.relationshipJobPerSec,
backgroundJobPerSec: config.backgroundJobPerSec,
deliverJobMaxAttempts: config.deliverJobMaxAttempts,
inboxJobMaxAttempts: config.inboxJobMaxAttempts,
backgroundJobMaxAttempts: config.backgroundJobMaxAttempts,
proxyRemoteFiles: config.proxyRemoteFiles,
customMOTD: config.customMOTD,
signToActivityPubGet: config.signToActivityPubGet ?? true,

View file

@ -17,6 +17,7 @@ import { bindThis } from '@/decorators.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { TimeService } from '@/global/TimeService.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { QueueService } from '@/core/QueueService.js';
import type { CheerioAPI } from 'cheerio/slim';
type NodeInfo = {
@ -50,6 +51,7 @@ export class FetchInstanceMetadataService {
private redisClient: Redis.Redis,
private readonly timeService: TimeService,
private readonly queueService: QueueService,
) {
this.logger = this.loggerService.getLogger('metadata', 'cyan');
}
@ -73,8 +75,21 @@ export class FetchInstanceMetadataService {
return this.redisClient.del(`fetchInstanceMetadata:mutex:v2:${host}`);
}
/**
* Schedules a deferred update on the background task worker.
* Duplicate updates are automatically skipped.
*/
@bindThis
public async fetchInstanceMetadataLazy(instance: MiInstance): Promise<void> {
if (!instance.isBlocked) {
await this.queueService.createUpdateInstanceJob(instance.host);
}
}
@bindThis
public async fetchInstanceMetadata(instance: MiInstance, force = false): Promise<void> {
if (instance.isBlocked) return;
const host = instance.host;
// finallyでunlockされてしまうのでtry内でロックチェックをしない

View file

@ -20,6 +20,7 @@ import {
UserWebhookDeliverJobData,
SystemWebhookDeliverJobData,
ScheduleNotePostJobData,
BackgroundTaskJobData,
} from '../queue/types.js';
import type { Provider } from '@nestjs/common';
@ -33,6 +34,7 @@ export type ObjectStorageQueue = Bull.Queue;
export type UserWebhookDeliverQueue = Bull.Queue<UserWebhookDeliverJobData>;
export type SystemWebhookDeliverQueue = Bull.Queue<SystemWebhookDeliverJobData>;
export type ScheduleNotePostQueue = Bull.Queue<ScheduleNotePostJobData>;
export type BackgroundTaskQueue = Bull.Queue<BackgroundTaskJobData>;
const $system: Provider = {
provide: 'queue:system',
@ -94,6 +96,12 @@ const $scheduleNotePost: Provider = {
inject: [DI.config],
};
const $backgroundTask: Provider = {
provide: 'queue:backgroundTask',
useFactory: (config: Config) => new Bull.Queue(QUEUE.BACKGROUND_TASK, baseQueueOptions(config, QUEUE.BACKGROUND_TASK)),
inject: [DI.config],
};
@Module({
imports: [
],
@ -108,6 +116,7 @@ const $scheduleNotePost: Provider = {
$userWebhookDeliver,
$systemWebhookDeliver,
$scheduleNotePost,
$backgroundTask,
],
exports: [
$system,
@ -120,6 +129,7 @@ const $scheduleNotePost: Provider = {
$userWebhookDeliver,
$systemWebhookDeliver,
$scheduleNotePost,
$backgroundTask,
],
})
export class QueueModule implements OnApplicationShutdown {
@ -136,6 +146,7 @@ export class QueueModule implements OnApplicationShutdown {
@Inject('queue:userWebhookDeliver') public userWebhookDeliverQueue: UserWebhookDeliverQueue,
@Inject('queue:systemWebhookDeliver') public systemWebhookDeliverQueue: SystemWebhookDeliverQueue,
@Inject('queue:scheduleNotePost') public scheduleNotePostQueue: ScheduleNotePostQueue,
@Inject('queue:backgroundTask') public readonly backgroundTaskQueue: BackgroundTaskQueue,
) {}
public async dispose(): Promise<void> {
@ -155,6 +166,7 @@ export class QueueModule implements OnApplicationShutdown {
this.userWebhookDeliverQueue.close(),
this.systemWebhookDeliverQueue.close(),
this.scheduleNotePostQueue.close(),
this.backgroundTaskQueue.close(),
]).then(res => {
for (const result of res) {
if (result.status === 'rejected') {

View file

@ -21,6 +21,7 @@ import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js';
import type { MiNote } from '@/models/Note.js';
import { type UserWebhookPayload } from './UserWebhookService.js';
import type {
BackgroundTaskJobData,
DbJobData,
DeliverJobData,
RelationshipJobData,
@ -39,6 +40,7 @@ import type {
SystemWebhookDeliverQueue,
UserWebhookDeliverQueue,
ScheduleNotePostQueue,
BackgroundTaskQueue,
} from './QueueModule.js';
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';
@ -54,6 +56,7 @@ export const QUEUE_TYPES = [
'userWebhookDeliver',
'systemWebhookDeliver',
'scheduleNotePost',
'backgroundTask',
] as const;
@Injectable()
@ -72,6 +75,7 @@ export class QueueService implements OnModuleInit {
@Inject('queue:userWebhookDeliver') public userWebhookDeliverQueue: UserWebhookDeliverQueue,
@Inject('queue:systemWebhookDeliver') public systemWebhookDeliverQueue: SystemWebhookDeliverQueue,
@Inject('queue:scheduleNotePost') public ScheduleNotePostQueue: ScheduleNotePostQueue,
@Inject('queue:backgroundTask') public readonly backgroundTaskQueue: BackgroundTaskQueue,
private readonly timeService: TimeService,
) {}
@ -839,6 +843,78 @@ export class QueueService implements OnModuleInit {
});
}
@bindThis
public async createUpdateUserJob(userId: string) {
return await this.createBackgroundTask(
'update-user',
{
type: 'update-user',
userId,
},
{
id: `update-user:${userId}`,
// ttl: 1000 * 60 * 60 * 24,
},
);
}
@bindThis
public async createUpdateFeaturedJob(userId: string) {
return await this.createBackgroundTask(
'update-featured',
{
type: 'update-featured',
userId,
},
{
id: `update-featured:${userId}`,
// ttl: 1000 * 60 * 60 * 24,
},
);
}
@bindThis
public async createUpdateInstanceJob(host: string) {
return await this.createBackgroundTask(
'update-instance',
{
type: 'update-instance',
host,
},
{
id: `update-instance:${host}`,
// ttl: 1000 * 60 * 60 * 24,
},
);
}
private async createBackgroundTask(name: string, data: BackgroundTaskJobData, duplication: { id: string, ttl?: number }) {
return await this.backgroundTaskQueue.add(
name,
data,
{
removeOnComplete: {
age: 3600 * 24 * 7, // keep up to 7 days
count: 30,
},
removeOnFail: {
age: 3600 * 24 * 7, // keep up to 7 days
count: 100,
},
// https://docs.bullmq.io/guide/retrying-failing-jobs#custom-back-off-strategies
attempts: this.config.backgroundJobMaxAttempts ?? 8,
backoff: {
// Resolves to QueueProcessorService::HttpRelatedBackoff()
type: 'custom',
},
// https://docs.bullmq.io/guide/jobs/deduplication
deduplication: duplication,
},
);
};
/**
* @see UserWebhookDeliverJobData
* @see UserWebhookDeliverProcessorService

View file

@ -153,11 +153,10 @@ export class ApInboxService {
// ついでにリモートユーザーの情報が古かったら更新しておく
if (actor.uri) {
if (actor.lastFetchedAt == null || this.timeService.now - actor.lastFetchedAt.getTime() > 1000 * 60 * 60 * 24) {
setImmediate(() => {
{
// 同一ユーザーの情報を再度処理するので、使用済みのresolverを再利用してはいけない
this.apPersonService.updatePerson(actor.uri)
.catch(err => this.logger.error(`Failed to update person: ${renderInlineError(err)}`));
});
await this.apPersonService.updatePersonLazy(actor);
}
}
}
return result;
@ -441,25 +440,15 @@ export class ApInboxService {
this.instanceChart.requestReceived(i.host).then();
}
this.fetchInstanceMetadataService.fetchInstanceMetadata(i).then();
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i);
});
// Process it!
return await this.performOneActivity(actor, activity, resolver)
.finally(() => {
// Update user (adapted from performActivity)
if (actor.lastFetchedAt == null || this.timeService.now - actor.lastFetchedAt.getTime() > 1000 * 60 * 60 * 24) {
setImmediate(() => {
// Don't re-use the resolver, or it may throw recursion errors.
// Instead, create a new resolver with an appropriately-reduced recursion limit.
const subResolver = this.apResolverService.createResolver({
recursionLimit: resolver.getRecursionLimit() - resolver.getHistory().length,
});
this.apPersonService.updatePerson(actor.uri, subResolver)
.catch(err => this.logger.error(`Failed to update person: ${renderInlineError(err)}`));
});
try {
return await this.performOneActivity(actor, activity, resolver);
} finally {
await this.apPersonService.updatePersonLazy(actor);
}
});
}
@bindThis

View file

@ -579,7 +579,7 @@ export class ApPersonService implements OnModuleInit {
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.newUser(i.host);
}
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i);
});
}
@ -604,16 +604,26 @@ export class ApPersonService implements OnModuleInit {
}
//#endregion
await this.updateFeatured(user.id, resolver).catch(err => {
// Permanent error implies hidden or inaccessible, which is a normal thing.
if (isRetryableError(err)) {
this.logger.error(`Error updating featured notes: ${renderInlineError(err)}`);
}
});
await this.updateFeaturedLazy(user);
return user;
}
/**
* Schedules a deferred update on the background task worker.
* Duplicate updates are automatically skipped.
*/
@bindThis
public async updatePersonLazy(uriOrUser: string | MiUser): Promise<void> {
const user = typeof(uriOrUser) === 'string'
? await this.fetchPerson(uriOrUser)
: uriOrUser;
if (user && user.host != null) {
await this.queueService.createUpdateUserJob(user.id);
}
}
/**
* Personの情報を更新します
* Misskeyに対象のPersonが登録されていなければ無視します
@ -817,12 +827,7 @@ export class ApPersonService implements OnModuleInit {
await this.cacheService.refreshFollowRelationsFor(exist.id);
}
await this.updateFeatured(exist.id, resolver).catch(err => {
// Permanent error implies hidden or inaccessible, which is a normal thing.
if (isRetryableError(err)) {
this.logger.error(`Error updating featured notes: ${renderInlineError(err)}`);
}
});
await this.updateFeaturedLazy(exist);
const updated = { ...exist, ...updates };
@ -902,9 +907,24 @@ export class ApPersonService implements OnModuleInit {
return fields;
}
/**
* Schedules a deferred update on the background task worker.
* Duplicate updates are automatically skipped.
*/
@bindThis
public async updateFeatured(userId: MiUser['id'], resolver?: Resolver): Promise<void> {
const user = await this.usersRepository.findOneByOrFail({ id: userId, isDeleted: false });
public async updateFeaturedLazy(userOrId: MiUser | MiUser['id']): Promise<void> {
const userId = typeof(userOrId) === 'object' ? userOrId.id : userOrId;
const user = typeof(userOrId) === 'object' ? userOrId : await this.usersRepository.findOneByOrFail({ id: userId, isDeleted: false });
if (isRemoteUser(user) && user.featured) {
await this.queueService.createUpdateFeaturedJob(userId);
}
}
@bindThis
public async updateFeatured(userOrId: MiUser | MiUser['id'], resolver?: Resolver): Promise<void> {
const userId = typeof(userOrId) === 'object' ? userOrId.id : userOrId;
const user = typeof(userOrId) === 'object' ? userOrId : await this.usersRepository.findOneByOrFail({ id: userId, isDeleted: false });
if (!isRemoteUser(user)) return;
if (!user.featured) return;

View file

@ -46,7 +46,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js';
import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js';
import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js';
import { BackgroundTaskProcessorService } from './processors/BackgroundTaskProcessorService.js';
@Module({
imports: [
CoreModule,
@ -93,6 +93,7 @@ import { HibernateUsersProcessorService } from './processors/HibernateUsersProce
ScheduleNotePostProcessorService,
CleanupApLogsProcessorService,
HibernateUsersProcessorService,
BackgroundTaskProcessorService,
],
exports: [
QueueProcessorService,

View file

@ -14,6 +14,7 @@ import { CheckModeratorsActivityProcessorService } from '@/queue/processors/Chec
import { TimeService } from '@/global/TimeService.js';
import { renderFullError } from '@/misc/render-full-error.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { isRetryableError } from '@/misc/is-retryable-error.js';
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
@ -53,9 +54,16 @@ import { QUEUE, baseWorkerOptions } from './const.js';
import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js';
import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js';
import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js';
import { BackgroundTaskProcessorService } from './processors/BackgroundTaskProcessorService.js';
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function httpRelatedBackoff(attemptsMade: number) {
function httpRelatedBackoff(attemptsMade: number, type?: string, error?: Error) {
// Don't retry permanent errors
// https://docs.bullmq.io/guide/retrying-failing-jobs#custom-back-off-strategies
if (error && !isRetryableError(error)) {
return -1;
}
const baseDelay = 60 * 1000; // 1min
const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
@ -95,6 +103,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private objectStorageQueueWorker: Bull.Worker;
private endedPollNotificationQueueWorker: Bull.Worker;
private schedulerNotePostQueueWorker: Bull.Worker;
private readonly backgroundTaskWorker: Bull.Worker;
constructor(
@Inject(DI.config)
@ -140,6 +149,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private readonly timeService: TimeService,
private readonly cleanupApLogsProcessorService: CleanupApLogsProcessorService,
private readonly hibernateUsersProcessorService: HibernateUsersProcessorService,
private readonly backgroundTaskProcessorService: BackgroundTaskProcessorService,
) {
this.logger = this.queueLoggerService.logger;
@ -565,6 +575,39 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
//#region background tasks
{
const logger = this.logger.createSubLogger('backgroundTask');
this.backgroundTaskWorker = new Bull.Worker(QUEUE.BACKGROUND_TASK, (job) => this.backgroundTaskProcessorService.process(job), {
...baseWorkerOptions(this.config, QUEUE.BACKGROUND_TASK),
autorun: false,
concurrency: this.config.backgroundJobConcurrency ?? 32,
limiter: {
max: this.config.backgroundJobPerSec ?? 256,
duration: 1000,
},
settings: {
backoffStrategy: httpRelatedBackoff,
},
});
this.backgroundTaskWorker
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: ${QUEUE.BACKGROUND_TASK}: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
}
private logError(logger: Logger, err: unknown, job?: Bull.Job | null): void {
@ -606,6 +649,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.objectStorageQueueWorker.run(),
this.endedPollNotificationQueueWorker.run(),
this.schedulerNotePostQueueWorker.run(),
this.backgroundTaskWorker.run(),
]);
}
@ -622,6 +666,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.objectStorageQueueWorker.close(),
this.endedPollNotificationQueueWorker.close(),
this.schedulerNotePostQueueWorker.close(),
this.backgroundTaskWorker.close(),
]).then(res => {
for (const result of res) {
if (result.status === 'rejected') {

View file

@ -18,6 +18,7 @@ export const QUEUE = {
USER_WEBHOOK_DELIVER: 'userWebhookDeliver',
SYSTEM_WEBHOOK_DELIVER: 'systemWebhookDeliver',
SCHEDULE_NOTE_POST: 'scheduleNotePost',
BACKGROUND_TASK: 'backgroundTask',
};
export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {

View file

@ -0,0 +1,93 @@
/*
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
import { BackgroundTaskJobData, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserBackgroundTask } from '@/queue/types.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
import Logger from '@/logger.js';
import { isRetryableError } from '@/misc/is-retryable-error.js';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { CacheService } from '@/core/CacheService.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
@Injectable()
export class BackgroundTaskProcessorService {
private readonly logger: Logger;
constructor(
@Inject(DI.config)
private readonly config: Config,
private readonly apPersonService: ApPersonService,
private readonly cacheService: CacheService,
private readonly federatedInstanceService: FederatedInstanceService,
private readonly fetchInstanceMetadataService: FetchInstanceMetadataService,
queueLoggerService: QueueLoggerService,
) {
this.logger = queueLoggerService.logger.createSubLogger('background-task');
}
public async process(job: Bull.Job<BackgroundTaskJobData>): Promise<string> {
if (job.data.type === 'update-user') {
return await this.processUpdateUser(job.data);
} else if (job.data.type === 'update-featured') {
return await this.processUpdateFeatured(job.data);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (job.data.type === 'update-instance') {
return await this.processUpdateInstance(job.data);
} else {
this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data);
throw new Error(`Unknown job type ${job.data}, see system logs for details`);
}
}
private async processUpdateUser(task: UpdateUserBackgroundTask): Promise<string> {
const user = await this.cacheService.findOptionalUserById(task.userId);
if (!user || user.isDeleted) return `Skipping update-user task: user ${task.userId} has been deleted`;
if (user.isSuspended) return `Skipping update-user task: user ${task.userId} is suspended`;
if (!user.uri) return `Skipping update-user task: user ${task.userId} is local`;
if (user.lastFetchedAt && Date.now() - user.lastFetchedAt.getTime() < 1000 * 60 * 60 * 24) {
return `Skipping update-user task: user ${task.userId} was recently updated`;
}
await this.apPersonService.updatePerson(user.uri);
return 'ok';
}
private async processUpdateFeatured(task: UpdateFeaturedBackgroundTask): Promise<string> {
const user = await this.cacheService.findOptionalUserById(task.userId);
if (!user || user.isDeleted) return `Skipping update-featured task: user ${task.userId} has been deleted`;
if (user.isSuspended) return `Skipping update-featured task: user ${task.userId} is suspended`;
if (!user.uri) return `Skipping update-featured task: user ${task.userId} is local`;
if (!user.featured) return `Skipping update-featured task: user ${task.userId} has no featured collection`;
if (user.lastFetchedAt && Date.now() - user.lastFetchedAt.getTime() < 1000 * 60 * 60 * 24) {
return `Skipping update-featured task: user ${task.userId} was recently updated`;
}
await this.apPersonService.updateFeatured(user);
return 'ok';
}
private async processUpdateInstance(task: UpdateInstanceBackgroundTask): Promise<string> {
const instance = await this.federatedInstanceService.fetch(task.host);
if (!instance) return `Skipping update-instance task: instance ${task.host} has been deleted`;
if (instance.isBlocked) return `Skipping update-instance task: instance ${task.host} is blocked`;
if (instance.infoUpdatedAt && Date.now() - instance.infoUpdatedAt.getTime() < 1000 * 60 * 60 * 24) {
return `Skipping update-instance task: instance ${task.host} was recently updated`;
}
await this.fetchInstanceMetadataService.fetchInstanceMetadata(instance);
return 'ok';
}
}

View file

@ -80,21 +80,21 @@ export class DeliverProcessorService {
if (i == null) return;
if (i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
await this.federatedInstanceService.update(i.id, {
isNotResponding: false,
notRespondingSince: null,
});
}
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, true);
await this.instanceChart.requestSent(i.host, true);
}
});
return 'Success';
} catch (res) {
this.apRequestChart.deliverFail();
this.federationChart.deliverd(host, false);
await this.apRequestChart.deliverFail();
await this.federationChart.deliverd(host, false);
// Update instance stats
this.federatedInstanceService.fetchOrRegister(host).then(i => {

View file

@ -275,10 +275,10 @@ export class InboxProcessorService implements OnApplicationShutdown {
});
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestReceived(i.host);
await this.instanceChart.requestReceived(i.host);
}
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(i);
});
// アクティビティを処理

View file

@ -168,3 +168,23 @@ export type ThinUser = {
export type ScheduleNotePostJobData = {
scheduleNoteId: MiNote['id'];
};
export type BackgroundTaskJobData =
UpdateUserBackgroundTask |
UpdateFeaturedBackgroundTask |
UpdateInstanceBackgroundTask;
export type UpdateUserBackgroundTask = {
type: 'update-user';
userId: string;
};
export type UpdateFeaturedBackgroundTask = {
type: 'update-featured';
userId: string;
};
export type UpdateInstanceBackgroundTask = {
type: 'update-instance';
host: string;
};

View file

@ -0,0 +1,50 @@
/*
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import type { IObject } from '@/core/activitypub/type.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
import { MiRemoteUser, MiUser } from '@/models/User.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import { MiInstance } from '@/models/Instance.js';
import { Resolver } from '@/core/activitypub/ApResolverService.js';
import { bindThis } from '@/decorators.js';
export class ImmediateApPersonService extends ApPersonService {
@bindThis
async createPerson(uri: string, resolver?: Resolver): Promise<MiRemoteUser> {
const user = await super.createPerson(uri, resolver);
await this.updateFeatured(user, resolver);
return user;
}
@bindThis
async updatePerson(uri: string, resolver?: Resolver | null, hint?: IObject, movePreventUris: string[] = []): Promise<string | void> {
const result = await super.updatePerson(uri, resolver, hint, movePreventUris);
const user = await this.fetchPerson(uri);
if (user == null) throw new Error('updated user is null, did you forget to mock out caches?');
await this.updateFeatured(user, resolver ?? undefined);
return result;
}
@bindThis
async updatePersonLazy(uriOrUser: string | MiUser): Promise<void> {
const userId = typeof(uriOrUser) === 'object' ? uriOrUser.id : uriOrUser;
await this.updatePerson(userId);
}
@bindThis
async updateFeaturedLazy(userOrId: string | MiUser): Promise<void> {
await this.updateFeatured(userOrId);
}
}
export class ImmediateFetchInstanceMetadataService extends FetchInstanceMetadataService {
@bindThis
async fetchInstanceMetadataLazy(instance: MiInstance): Promise<void> {
return await this.fetchInstanceMetadata(instance);
}
}