add update-user-tags, update-note-tags, post-deliver, post-inbox, post-note, and check-hibernation background tasks

This commit is contained in:
Hazelnoot 2025-06-18 01:53:59 -04:00
parent ce8c8e9851
commit 41e50eeb0e
18 changed files with 331 additions and 149 deletions

View file

@ -5,30 +5,46 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
import { BackgroundTaskJobData, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserBackgroundTask } from '@/queue/types.js';
import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask } from '@/queue/types.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
import Logger from '@/logger.js';
import { isRetryableError } from '@/misc/is-retryable-error.js';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { CacheService } from '@/core/CacheService.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { MiMeta } from '@/models/Meta.js';
import InstanceChart from '@/core/chart/charts/instance.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js';
import { NoteCreateService } from '@/core/NoteCreateService.js';
import type { NotesRepository } from '@/models/_.js';
import { NoteEditService } from '@/core/NoteEditService.js';
import { HashtagService } from '@/core/HashtagService.js';
@Injectable()
export class BackgroundTaskProcessorService {
private readonly logger: Logger;
constructor(
@Inject(DI.config)
private readonly config: Config,
@Inject(DI.meta)
private readonly meta: MiMeta,
@Inject(DI.notesRepository)
private readonly notesRepository: NotesRepository,
private readonly apPersonService: ApPersonService,
private readonly cacheService: CacheService,
private readonly federatedInstanceService: FederatedInstanceService,
private readonly fetchInstanceMetadataService: FetchInstanceMetadataService,
private readonly instanceChart: InstanceChart,
private readonly apRequestChart: ApRequestChart,
private readonly federationChart: FederationChart,
private readonly updateInstanceQueue: UpdateInstanceQueue,
private readonly noteCreateService: NoteCreateService,
private readonly noteEditService: NoteEditService,
private readonly hashtagService: HashtagService,
queueLoggerService: QueueLoggerService,
) {
@ -40,9 +56,21 @@ export class BackgroundTaskProcessorService {
return await this.processUpdateUser(job.data);
} else if (job.data.type === 'update-featured') {
return await this.processUpdateFeatured(job.data);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (job.data.type === 'update-user-tags') {
return await this.processUpdateUserTags(job.data);
} else if (job.data.type === 'update-note-tags') {
return await this.processUpdateNoteTags(job.data);
} else if (job.data.type === 'update-instance') {
return await this.processUpdateInstance(job.data);
} else if (job.data.type === 'post-deliver') {
return await this.processPostDeliver(job.data);
} else if (job.data.type === 'post-inbox') {
return await this.processPostInbox(job.data);
} else if (job.data.type === 'post-note') {
return await this.processPostNote(job.data);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (job.data.type === 'check-hibernation') {
return await this.processCheckHibernation(job.data);
} else {
this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data);
throw new Error(`Unknown job type ${job.data}, see system logs for details`);
@ -78,10 +106,30 @@ export class BackgroundTaskProcessorService {
return 'ok';
}
private async processUpdateUserTags(task: UpdateUserTagsBackgroundTask): Promise<string> {
const user = await this.cacheService.findOptionalUserById(task.userId);
if (!user || user.isDeleted) return `Skipping update-user-tags task: user ${task.userId} has been deleted`;
if (user.isSuspended) return `Skipping update-user-tags task: user ${task.userId} is suspended`;
if (!user.uri) return `Skipping update-user-tags task: user ${task.userId} is local`;
await this.hashtagService.updateUsertags(user, user.tags);
return 'ok';
}
private async processUpdateNoteTags(task: UpdateNoteTagsBackgroundTask): Promise<string> {
const note = await this.notesRepository.findOneBy({ id: task.noteId });
if (!note) return `Skipping update-note-tags task: note ${task.noteId} has been deleted`;
const user = await this.cacheService.findUserById(note.userId);
if (user.isSuspended) return `Skipping update-note-tags task: note ${task.noteId}'s user ${note.userId} is suspended`;
await this.hashtagService.updateHashtags(user, note.tags);
return 'ok';
}
private async processUpdateInstance(task: UpdateInstanceBackgroundTask): Promise<string> {
const instance = await this.federatedInstanceService.fetch(task.host);
if (!instance) return `Skipping update-instance task: instance ${task.host} has been deleted`;
if (instance.isBlocked) return `Skipping update-instance task: instance ${task.host} is blocked`;
if (instance.suspensionState === 'goneSuspended') return `Skipping update-instance task: instance ${task.host} is gone`;
if (instance.infoUpdatedAt && Date.now() - instance.infoUpdatedAt.getTime() < 1000 * 60 * 60 * 24) {
return `Skipping update-instance task: instance ${task.host} was recently updated`;
@ -90,4 +138,102 @@ export class BackgroundTaskProcessorService {
await this.fetchInstanceMetadataService.fetchInstanceMetadata(instance);
return 'ok';
}
private async processPostDeliver(task: PostDeliverBackgroundTask): Promise<string> {
let instance = await this.federatedInstanceService.fetchOrRegister(task.host);
if (instance.isBlocked) return `Skipping post-deliver task: instance ${task.host} is blocked`;
const success = task.result === 'success';
// isNotResponding should be the inverse of success, because:
// 1. We expect success (success=true) from a responding instance (isNotResponding=false).
// 2. We expect failure (success=false) from a non-responding instance (isNotResponding=true).
// If they are equal, then we need to update the cached state.
const updateNotResponding = success === instance.isNotResponding;
// If we get a permanent failure, then we need to immediately suspend the instance
const updateGoneSuspended = task.result === 'perm-fail' && instance.suspensionState !== 'goneSuspended';
// Check if we need to auto-suspend the instance
const updateAutoSuspended = instance.isNotResponding && instance.notRespondingSince && instance.suspensionState === 'none' && instance.notRespondingSince.getTime() <= Date.now() - 1000 * 60 * 60 * 24 * 7;
// This is messy, but we need to minimize updates to space in Postgres blocks.
if (updateNotResponding || updateGoneSuspended || updateAutoSuspended) {
instance = await this.federatedInstanceService.update(instance.id, {
isNotResponding: updateNotResponding ? !success : undefined,
notRespondingSince: updateNotResponding ? (success ? null : new Date()) : undefined,
suspensionState: updateGoneSuspended
? 'goneSuspended'
: updateAutoSuspended
? 'autoSuspendedForNotResponding'
: undefined,
});
}
// Update instance metadata (deferred)
if (success && this.meta.enableStatsForFederatedInstances) {
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance);
}
// Update charts
if (this.meta.enableChartsForFederatedInstances) {
await this.instanceChart.requestSent(task.host, success);
}
if (success) {
await this.apRequestChart.deliverSucc();
} else {
await this.apRequestChart.deliverFail();
}
await this.federationChart.deliverd(task.host, success);
return 'ok';
}
private async processPostInbox(task: PostInboxBackgroundTask): Promise<string> {
const instance = await this.federatedInstanceService.fetchOrRegister(task.host);
if (instance.isBlocked) return `Skipping post-inbox task: instance ${task.host} is blocked`;
// Update charts
if (this.meta.enableChartsForFederatedInstances) {
await this.instanceChart.requestReceived(task.host);
}
await this.apRequestChart.inbox();
await this.federationChart.inbox(task.host);
// Update instance metadata (deferred)
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance);
// Unsuspend instance (deferred)
this.updateInstanceQueue.enqueue(instance.id, {
latestRequestReceivedAt: new Date(),
shouldUnsuspend: instance.suspensionState === 'autoSuspendedForNotResponding',
});
return 'ok';
}
private async processPostNote(task: PostNoteBackgroundTask): Promise<string> {
const note = await this.notesRepository.findOneBy({ id: task.noteId });
if (!note) return `Skipping post-note task: note ${task.noteId} has been deleted`;
const user = await this.cacheService.findUserById(note.userId);
if (user.isSuspended) return `Skipping post-note task: note ${task.noteId}'s user ${note.userId} is suspended`;
const mentionedUsers = await this.cacheService.getUsers(note.mentions);
if (task.edit) {
await this.noteEditService.postNoteEdited(note, user, note, task.silent, note.tags, Array.from(mentionedUsers.values()));
} else {
await this.noteCreateService.postNoteCreated(note, user, note, task.silent, note.tags, Array.from(mentionedUsers.values()));
}
return 'ok';
}
private async processCheckHibernation(task: CheckHibernationBackgroundTask): Promise<string> {
const followers = await this.cacheService.getNonHibernatedFollowers(task.userId);
if (followers.length < 1) return `Skipping check-hibernation task: user ${task.userId} has no non-hibernated followers`;
await this.noteCreateService.checkHibernation(followers);
return 'ok';
}
}