add UpdateLatestNote, PostSuspend, and PostUnsuspend background tasks

This commit is contained in:
Hazelnoot 2025-06-19 13:38:03 -04:00
parent 8e75d6149e
commit a4440e43a6
12 changed files with 512 additions and 289 deletions

View file

@ -0,0 +1,75 @@
/*
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
export class MoreNoteEditColumns1750353421706 {
name = 'MoreNoteEditColumns1750353421706'
async up(queryRunner) {
// Update column types
await queryRunner.query(`ALTER TABLE "note_edit" ALTER COLUMN "cw" TYPE text USING "cw"::text`);
// Rename columns
await queryRunner.query(`ALTER TABLE "note_edit" RENAME COLUMN "oldText" TO "text"`);
await queryRunner.query(`ALTER TABLE "note_edit" RENAME COLUMN "cw" TO "newCw"`);
// Add new fields
await queryRunner.query(`ALTER TABLE "note_edit" ADD "userId" character varying(32)`);
await queryRunner.query(`COMMENT ON COLUMN "note_edit"."userId" IS 'The ID of author.'`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD CONSTRAINT "FK_7f1ded0f6e8a5bef701b7e698ab" FOREIGN KEY ("userId") REFERENCES "user"("id") ON DELETE CASCADE ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD "renoteId" character varying(32)`);
await queryRunner.query(`COMMENT ON COLUMN "note_edit"."renoteId" IS 'The ID of renote target. Will always be null for older edits'`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD CONSTRAINT "FK_d3003e5256bcbfad6c3588835c0" FOREIGN KEY ("renoteId") REFERENCES "note"("id") ON DELETE CASCADE ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD "replyId" character varying(32)`);
await queryRunner.query(`COMMENT ON COLUMN "note_edit"."replyId" IS 'The ID of reply target. Will always be null for older edits'`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD CONSTRAINT "FK_f34b53ab9b39774ca014972ad84" FOREIGN KEY ("replyId") REFERENCES "note"("id") ON DELETE CASCADE ON UPDATE NO ACTION`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD "visibility" "public"."note_visibility_enum"`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD "cw" text`);
await queryRunner.query(`COMMENT ON COLUMN "note_edit"."cw" IS 'Will always be null for older edits'`);
await queryRunner.query(`ALTER TABLE "note_edit" ADD "hasPoll" boolean NOT NULL DEFAULT false`);
await queryRunner.query(`COMMENT ON COLUMN "note_edit"."hasPoll" IS 'Whether this revision had a poll. Will always be false for older edits'`);
// Populate non-nullable fields
await queryRunner.query(`
UPDATE "note_edit" "e"
SET
"visibility" = "n"."visibility",
"userId" = "n"."userId"
FROM "note" "n"
WHERE "n"."id" = "e"."noteId"
`);
await queryRunner.query(`ALTER TABLE "note_edit" ALTER COLUMN "visibility" SET NOT NULL`);
await queryRunner.query(`ALTER TABLE "note_edit" ALTER COLUMN "userId" SET NOT NULL`);
}
async down(queryRunner) {
// Drop new columns
await queryRunner.query(`ALTER TABLE "note_edit" DROP COLUMN "visibility"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP COLUMN "hasPoll"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP COLUMN "cw"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP COLUMN "userId"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP CONSTRAINT "FK_f34b53ab9b39774ca014972ad84"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP COLUMN "replyId"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP CONSTRAINT "FK_d3003e5256bcbfad6c3588835c0"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP COLUMN "renoteId"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP CONSTRAINT "FK_7f1ded0f6e8a5bef701b7e698ab"`);
await queryRunner.query(`ALTER TABLE "note_edit" DROP COLUMN "userId"`);
// Rename new columns
await queryRunner.query(`ALTER TABLE "note_edit" RENAME COLUMN "text" TO "oldText"`);
await queryRunner.query(`ALTER TABLE "note_edit" RENAME COLUMN "newCw" TO "cw"`);
// Revert column types
await queryRunner.query(`ALTER TABLE "note_edit" ALTER COLUMN "cw" TYPE varchar(512) USING "cw"::varchar(512)`);
}
}

View file

@ -1,18 +1,14 @@
import { Inject, Injectable } from '@nestjs/common';
import { Not } from 'typeorm';
import { MiNote } from '@/models/Note.js';
import { isPureRenote } from '@/misc/is-renote.js';
import { isPureRenote, MinimalNote } from '@/misc/is-renote.js';
import { SkLatestNote } from '@/models/LatestNote.js';
import { DI } from '@/di-symbols.js';
import type { LatestNotesRepository, NotesRepository } from '@/models/_.js';
import { LoggerService } from '@/core/LoggerService.js';
import Logger from '@/logger.js';
import { QueryService } from './QueryService.js';
import type { LatestNotesRepository, MiNote, NotesRepository } from '@/models/_.js';
import { QueryService } from '@/core/QueryService.js';
import { QueueService } from '@/core/QueueService.js';
@Injectable()
export class LatestNoteService {
private readonly logger: Logger;
constructor(
@Inject(DI.notesRepository)
private readonly notesRepository: NotesRepository,
@ -21,19 +17,23 @@ export class LatestNoteService {
private readonly latestNotesRepository: LatestNotesRepository,
private readonly queryService: QueryService,
loggerService: LoggerService,
) {
this.logger = loggerService.getLogger('LatestNoteService');
private readonly queueService: QueueService,
) {}
async handleUpdatedNoteDeferred(note: MiNote): Promise<void> {
await this.queueService.createUpdateLatestNoteJob(note);
}
handleUpdatedNoteBG(before: MiNote, after: MiNote): void {
this
.handleUpdatedNote(before, after)
.catch(err => this.logger.error('Unhandled exception while updating latest_note (after update):', err));
async handleCreatedNoteDeferred(note: MiNote): Promise<void> {
await this.queueService.createUpdateLatestNoteJob(note);
}
async handleUpdatedNote(before: MiNote, after: MiNote): Promise<void> {
// If the key didn't change, then there's nothing to update
async handleDeletedNoteDeferred(note: MiNote): Promise<void> {
await this.queueService.createUpdateLatestNoteJob(note);
}
async handleUpdatedNote(before: MinimalNote, after: MinimalNote): Promise<void> {
// If the key didn't change, then there's nothing to update.
if (SkLatestNote.areEquivalent(before, after)) return;
// Simulate update as delete + create
@ -41,13 +41,7 @@ export class LatestNoteService {
await this.handleCreatedNote(after);
}
handleCreatedNoteBG(note: MiNote): void {
this
.handleCreatedNote(note)
.catch(err => this.logger.error('Unhandled exception while updating latest_note (after create):', err));
}
async handleCreatedNote(note: MiNote): Promise<void> {
async handleCreatedNote(note: MinimalNote): Promise<void> {
// Ignore DMs.
// Followers-only posts are *included*, as this table is used to back the "following" feed.
if (note.visibility === 'specified') return;
@ -71,13 +65,7 @@ export class LatestNoteService {
await this.latestNotesRepository.upsert(latestNote, ['userId', 'isPublic', 'isReply', 'isQuote']);
}
handleDeletedNoteBG(note: MiNote): void {
this
.handleDeletedNote(note)
.catch(err => this.logger.error('Unhandled exception while updating latest_note (after delete):', err));
}
async handleDeletedNote(note: MiNote): Promise<void> {
async handleDeletedNote(note: MinimalNote): Promise<void> {
// If it's a DM, then it can't possibly be the latest note so we can safely skip this.
if (note.visibility === 'specified') return;

View file

@ -458,9 +458,6 @@ export class NoteCreateService implements OnApplicationShutdown {
const note = await this.insertNote(user, data, tags, emojis, mentionedUsers);
// Update the Latest Note index / following feed
this.latestNoteService.handleCreatedNoteBG(note);
await this.queueService.createPostNoteJob(note.id, silent, 'create');
return note;
@ -474,7 +471,7 @@ export class NoteCreateService implements OnApplicationShutdown {
isBot: MiUser['isBot'];
noindex: MiUser['noindex'];
}, data: Option): Promise<MiNote> {
return this.create(user, data, true);
return await this.create(user, data, true);
}
@bindThis
@ -583,7 +580,7 @@ export class NoteCreateService implements OnApplicationShutdown {
host: MiUser['host'];
isBot: MiUser['isBot'];
noindex: MiUser['noindex'];
}, data: Option, silent: boolean, tags: string[], mentionedUsers: MinimumUser[]) {
}, data: Option, silent: boolean, mentionedUsers: MinimumUser[]) {
this.notesChart.update(note, true);
if (note.visibility !== 'specified' && (this.meta.enableChartsForRemoteUser || (user.host == null))) {
this.perUserNotesChart.update(user, note, true);
@ -612,20 +609,20 @@ export class NoteCreateService implements OnApplicationShutdown {
if (!this.isRenote(note) || this.isQuote(note)) {
// Increment notes count (user)
this.incNotesCountOfUser(user);
await this.incNotesCountOfUser(user);
} else {
this.usersRepository.update({ id: user.id }, { updatedAt: this.timeService.date });
await this.usersRepository.update({ id: user.id }, { updatedAt: this.timeService.date });
}
this.pushToTl(note, user);
await this.pushToTl(note, user);
this.antennaService.addNoteToAntennas({
await this.antennaService.addNoteToAntennas({
...note,
channel: data.channel ?? null,
}, user);
if (data.reply) {
this.saveReply(data.reply, note);
await this.saveReply(data.reply, note);
}
if (data.reply == null) {
@ -654,12 +651,12 @@ export class NoteCreateService implements OnApplicationShutdown {
}
if (this.isRenote(data) && !this.isQuote(data) && data.renote.userId !== user.id && !user.isBot) {
this.incRenoteCount(data.renote, user);
await this.incRenoteCount(data.renote, user);
}
if (data.poll && data.poll.expiresAt) {
const delay = data.poll.expiresAt.getTime() - this.timeService.now;
this.queueService.endedPollNotificationQueue.add(note.id, {
await this.queueService.endedPollNotificationQueue.add(note.id, {
noteId: note.id,
}, {
jobId: `pollEnd_${note.id}`,
@ -683,9 +680,9 @@ export class NoteCreateService implements OnApplicationShutdown {
this.globalEventService.publishNotesStream(noteObj);
this.roleService.addNoteToRoleTimeline(noteObj);
await this.roleService.addNoteToRoleTimeline(noteObj);
this.webhookService.enqueueUserWebhook(user.id, 'note', { note: noteObj });
await this.webhookService.enqueueUserWebhook(user.id, 'note', { note: noteObj });
const nm = new NotificationManager(this.mutingsRepository, this.notificationService, user, note);
@ -714,7 +711,7 @@ export class NoteCreateService implements OnApplicationShutdown {
if (!isThreadMuted && !muted) {
nm.push(data.reply.userId, 'reply');
this.globalEventService.publishMainStream(data.reply.userId, 'reply', noteObj);
this.webhookService.enqueueUserWebhook(data.reply.userId, 'reply', { note: noteObj });
await this.webhookService.enqueueUserWebhook(data.reply.userId, 'reply', { note: noteObj });
}
}
}
@ -745,15 +742,15 @@ export class NoteCreateService implements OnApplicationShutdown {
// Publish event
if ((user.id !== data.renote.userId) && data.renote.userHost === null) {
this.globalEventService.publishMainStream(data.renote.userId, 'renote', noteObj);
this.webhookService.enqueueUserWebhook(data.renote.userId, 'renote', { note: noteObj });
await this.webhookService.enqueueUserWebhook(data.renote.userId, 'renote', { note: noteObj });
}
}
nm.notify();
await nm.notify();
//#region AP deliver
if (!data.localOnly && isLocalUser(user)) {
trackTask(async () => {
await trackTask(async () => {
const noteActivity = await this.apRendererService.renderNoteOrRenoteActivity(note, user, { renote: data.renote });
const dm = this.apDeliverManagerService.createDeliverManager(user, noteActivity);
@ -790,12 +787,12 @@ export class NoteCreateService implements OnApplicationShutdown {
}
if (data.channel) {
this.channelsRepository.increment({ id: data.channel.id }, 'notesCount', 1);
this.channelsRepository.update(data.channel.id, {
await this.channelsRepository.increment({ id: data.channel.id }, 'notesCount', 1);
await this.channelsRepository.update(data.channel.id, {
lastNotedAt: this.timeService.date,
});
this.notesRepository.countBy({
await this.notesRepository.countBy({
userId: user.id,
channelId: data.channel.id,
}).then(count => {
@ -807,8 +804,11 @@ export class NoteCreateService implements OnApplicationShutdown {
});
}
// Update the Latest Note index / following feed
await this.latestNoteService.handleCreatedNoteDeferred(note);
// Register to search database
if (!user.noindex) this.index(note);
if (!user.noindex) await this.index(note);
}
/**
@ -841,12 +841,12 @@ export class NoteCreateService implements OnApplicationShutdown {
if (policies.canTrend) {
if (renote.channelId != null) {
if (renote.replyId == null) {
this.featuredService.updateInChannelNotesRanking(renote.channelId, renote, 5);
await this.featuredService.updateInChannelNotesRanking(renote.channelId, renote, 5);
}
} else {
if (renote.visibility === 'public' && renote.userHost == null && renote.replyId == null) {
this.featuredService.updateGlobalNotesRanking(renote, 5);
this.featuredService.updatePerUserNotesRanking(renote.userId, renote, 5);
await this.featuredService.updateGlobalNotesRanking(renote, 5);
await this.featuredService.updatePerUserNotesRanking(renote.userId, renote, 5);
}
}
}
@ -880,7 +880,7 @@ export class NoteCreateService implements OnApplicationShutdown {
});
this.globalEventService.publishMainStream(u.id, 'mention', detailPackedNote);
this.webhookService.enqueueUserWebhook(u.id, 'mention', { note: detailPackedNote });
await this.webhookService.enqueueUserWebhook(u.id, 'mention', { note: detailPackedNote });
// Create notification
nm.push(u.id, 'mention');
@ -888,20 +888,20 @@ export class NoteCreateService implements OnApplicationShutdown {
}
@bindThis
private saveReply(reply: MiNote, note: MiNote) {
this.notesRepository.increment({ id: reply.id }, 'repliesCount', 1);
private async saveReply(reply: MiNote, note: MiNote) {
await this.notesRepository.increment({ id: reply.id }, 'repliesCount', 1);
}
@bindThis
private index(note: MiNote) {
private async index(note: MiNote) {
if (note.text == null && note.cw == null) return;
this.searchService.indexNote(note);
await this.searchService.indexNote(note);
}
@bindThis
private incNotesCountOfUser(user: { id: MiUser['id']; }) {
this.usersRepository.createQueryBuilder().update()
private async incNotesCountOfUser(user: { id: MiUser['id']; }) {
await this.usersRepository.createQueryBuilder().update()
.set({
updatedAt: this.timeService.date,
notesCount: () => '"notesCount" + 1',
@ -1037,7 +1037,7 @@ export class NoteCreateService implements OnApplicationShutdown {
// checkHibernation moved to HibernateUsersProcessorService
}
r.exec();
await r.exec();
}
// checkHibernation moved to HibernateUsersProcessorService

View file

@ -158,7 +158,11 @@ export class NoteDeleteService {
userId: user.id,
});
this.latestNoteService.handleDeletedNoteBG(note);
// Update the Latest Note index / following feed
this.latestNoteService.handleDeletedNoteDeferred(note);
for (const cascadingNote of cascadingNotes) {
this.latestNoteService.handleDeletedNote(cascadingNote);
}
if (deleter && (note.userId !== deleter.id)) {
const user = await this.usersRepository.findOneByOrFail({ id: note.userId });
@ -174,14 +178,14 @@ export class NoteDeleteService {
.map(n => n.uri)
.filter((u): u is string => u != null);
if (deletedUris.length > 0) {
this.apLogService.deleteObjectLogs(deletedUris)
.catch(err => this.logger.error(err, `Failed to delete AP logs for note '${note.uri}'`));
trackPromise(this.apLogService.deleteObjectLogs(deletedUris)
.catch(err => this.logger.error(err, `Failed to delete AP logs for note '${note.uri}'`)));
}
}
@bindThis
private decNotesCountOfUser(user: { id: MiUser['id']; }) {
this.usersRepository.createQueryBuilder().update()
private async decNotesCountOfUser(user: { id: MiUser['id']; }) {
await this.usersRepository.createQueryBuilder().update()
.set({
updatedAt: this.timeService.date,
notesCount: () => '"notesCount" - 1',

View file

@ -14,7 +14,7 @@ import { extractCustomEmojisFromMfm } from '@/misc/extract-custom-emojis-from-mf
import { extractHashtags } from '@/misc/extract-hashtags.js';
import type { IMentionedRemoteUsers } from '@/models/Note.js';
import { MiNote } from '@/models/Note.js';
import type { NoteEditRepository, ChannelFollowingsRepository, ChannelsRepository, FollowingsRepository, InstancesRepository, MiFollowing, MiMeta, MutingsRepository, NotesRepository, NoteThreadMutingsRepository, UserListMembershipsRepository, UserProfilesRepository, UsersRepository, PollsRepository } from '@/models/_.js';
import type { NoteEditsRepository, ChannelFollowingsRepository, ChannelsRepository, FollowingsRepository, InstancesRepository, MiFollowing, MiMeta, MutingsRepository, NotesRepository, NoteThreadMutingsRepository, UserListMembershipsRepository, UserProfilesRepository, UsersRepository, PollsRepository } from '@/models/_.js';
import type { MiDriveFile } from '@/models/DriveFile.js';
import type { MiApp } from '@/models/App.js';
import { concat } from '@/misc/prelude/array.js';
@ -195,8 +195,8 @@ export class NoteEditService implements OnApplicationShutdown {
@Inject(DI.channelFollowingsRepository)
private channelFollowingsRepository: ChannelFollowingsRepository,
@Inject(DI.noteEditRepository)
private noteEditRepository: NoteEditRepository,
@Inject(DI.noteEditsRepository)
private noteEditsRepository: NoteEditsRepository,
@Inject(DI.pollsRepository)
private pollsRepository: PollsRepository,
@ -234,29 +234,29 @@ export class NoteEditService implements OnApplicationShutdown {
throw new UnrecoverableError('edit failed: missing editid');
}
const oldnote = await this.notesRepository.findOneBy({
const oldNote = await this.notesRepository.findOneBy({
id: editid,
});
if (oldnote == null) {
if (oldNote == null) {
throw new UnrecoverableError(`edit failed for ${editid}: missing oldnote`);
}
if (oldnote.userId !== user.id) {
if (oldNote.userId !== user.id) {
throw new UnrecoverableError(`edit failed for ${editid}: user is not the note author`);
}
// we never want to change the replyId, so fetch the original "parent"
if (oldnote.replyId) {
data.reply = await this.notesRepository.findOneBy({ id: oldnote.replyId });
if (oldNote.replyId) {
data.reply = await this.notesRepository.findOneBy({ id: oldNote.replyId });
} else {
data.reply = undefined;
}
// changing visibility on an edit is ill-defined, let's try to
// keep the same visibility as the original note
data.visibility = oldnote.visibility;
data.localOnly = oldnote.localOnly;
data.visibility = oldNote.visibility;
data.localOnly = oldNote.localOnly;
// チャンネル外にリプライしたら対象のスコープに合わせる
// (クライアントサイドでやっても良い処理だと思うけどとりあえずサーバーサイドで)
@ -463,46 +463,55 @@ export class NoteEditService implements OnApplicationShutdown {
}
const update: Partial<MiNote> = {};
if (data.text !== undefined && data.text !== oldnote.text) {
if (data.text !== undefined && data.text !== oldNote.text) {
update.text = data.text;
}
if (data.cw !== undefined && data.cw !== oldnote.cw) {
if (data.cw !== undefined && data.cw !== oldNote.cw) {
update.cw = data.cw;
}
if (data.poll !== undefined && oldnote.hasPoll !== !!data.poll) {
if (data.poll !== undefined && oldNote.hasPoll !== !!data.poll) {
update.hasPoll = !!data.poll;
}
if (data.mandatoryCW !== undefined && oldnote.mandatoryCW !== data.mandatoryCW) {
if (data.mandatoryCW !== undefined && oldNote.mandatoryCW !== data.mandatoryCW) {
update.mandatoryCW = data.mandatoryCW;
}
// TODO deep-compare files
const filesChanged = oldnote.fileIds.length || data.files?.length;
const filesChanged = oldNote.fileIds.length || data.files?.length;
const poll = await this.pollsRepository.findOneBy({ noteId: oldnote.id });
const oldPoll = await this.pollsRepository.findOneBy({ noteId: oldNote.id });
const oldPoll = poll ? { choices: poll.choices, multiple: poll.multiple, expiresAt: poll.expiresAt } : null;
const pollChanged = data.poll != null && JSON.stringify(data.poll) !== JSON.stringify(oldPoll);
const oldPollData = oldPoll ? { choices: oldPoll.choices, multiple: oldPoll.multiple, expiresAt: oldPoll.expiresAt } : null;
const pollChanged =
(data.poll == null && oldPoll != null) ||
(data.poll != null && oldPoll == null) ||
(data.poll != null && oldPoll != null && JSON.stringify(data.poll) !== JSON.stringify(oldPollData));
if (Object.keys(update).length > 0 || filesChanged || pollChanged) {
const exists = await this.noteEditRepository.findOneBy({ noteId: oldnote.id });
const exists = await this.noteEditsRepository.findOneBy({ noteId: oldNote.id });
await this.noteEditRepository.insert({
await this.noteEditsRepository.insert({
id: this.idService.gen(),
noteId: oldnote.id,
oldText: oldnote.text || undefined,
userId: oldNote.userId,
noteId: oldNote.id,
renoteId: oldNote.renoteId,
replyId: oldNote.replyId,
visibility: oldNote.visibility,
text: oldNote.text || undefined,
newText: update.text || undefined,
cw: update.cw || undefined,
fileIds: undefined,
oldDate: exists ? oldnote.updatedAt as Date : this.idService.parse(oldnote.id).date,
cw: oldNote.cw || undefined,
newCw: update.cw || undefined,
fileIds: oldNote.fileIds,
oldDate: exists ? oldNote.updatedAt as Date : this.idService.parse(oldNote.id).date,
updatedAt: this.timeService.date,
hasPoll: oldPoll != null,
});
const note = new MiNote({
id: oldnote.id,
id: oldNote.id,
updatedAt: data.updatedAt ? data.updatedAt : this.timeService.date,
fileIds: data.files ? data.files.map(file => file.id) : [],
replyId: oldnote.replyId,
replyId: oldNote.replyId,
renoteId: data.renote ? data.renote.id : null,
channelId: data.channel ? data.channel.id : null,
threadId: data.reply
@ -516,7 +525,7 @@ export class NoteEditService implements OnApplicationShutdown {
cw: data.cw ?? null,
tags: tags.map(tag => normalizeForSearch(tag)),
emojis,
reactions: oldnote.reactions,
reactions: oldNote.reactions,
userId: user.id,
localOnly: data.localOnly!,
reactionAcceptance: data.reactionAcceptance,
@ -535,7 +544,7 @@ export class NoteEditService implements OnApplicationShutdown {
renoteUserId: data.renote ? data.renote.userId : null,
renoteUserHost: data.renote ? data.renote.userHost : null,
userHost: user.host,
reactionAndUserPairCache: oldnote.reactionAndUserPairCache,
reactionAndUserPairCache: oldNote.reactionAndUserPairCache,
mandatoryCW: data.mandatoryCW,
});
@ -561,41 +570,44 @@ export class NoteEditService implements OnApplicationShutdown {
if (pollChanged) {
// Start transaction
await this.db.transaction(async transactionalEntityManager => {
await transactionalEntityManager.update(MiNote, oldnote.id, note);
await transactionalEntityManager.update(MiNote, oldNote.id, note);
const poll = new MiPoll({
noteId: note.id,
choices: data.poll!.choices,
expiresAt: data.poll!.expiresAt,
multiple: data.poll!.multiple,
votes: new Array(data.poll!.choices.length).fill(0),
noteVisibility: note.visibility,
userId: user.id,
userHost: user.host,
channelId: data.channel?.id ?? null,
});
// Insert or update poll
if (data.poll) {
const poll = new MiPoll({
noteId: note.id,
choices: data.poll.choices,
expiresAt: data.poll.expiresAt,
multiple: data.poll.multiple,
votes: new Array(data.poll.choices.length).fill(0),
noteVisibility: note.visibility,
userId: user.id,
userHost: user.host,
channelId: data.channel?.id ?? null,
});
if (!oldnote.hasPoll) {
await transactionalEntityManager.insert(MiPoll, poll);
} else {
await transactionalEntityManager.update(MiPoll, oldnote.id, poll);
if (oldPoll) {
await transactionalEntityManager.update(MiPoll, { pollId: oldPoll.noteId }, poll);
} else {
await transactionalEntityManager.insert(MiPoll, poll);
}
// Delete poll
} else if (oldPoll) {
await transactionalEntityManager.delete(MiPoll, { pollId: oldPoll.noteId });
}
});
} else {
await this.notesRepository.update(oldnote.id, note);
await this.notesRepository.update(oldNote.id, note);
}
// Re-fetch note to get the default values of null / unset fields.
const edited = await this.notesRepository.findOneByOrFail({ id: note.id });
// Update the Latest Note index / following feed
this.latestNoteService.handleUpdatedNoteBG(edited, oldnote);
await this.queueService.createPostNoteJob(note.id, silent, 'edit');
return edited;
} else {
return oldnote;
return oldNote;
}
}
@ -606,7 +618,7 @@ export class NoteEditService implements OnApplicationShutdown {
host: MiUser['host'];
isBot: MiUser['isBot'];
noindex: MiUser['noindex'];
}, data: Option, silent: boolean, tags: string[], mentionedUsers: MinimumUser[]) {
}, data: Option, silent: boolean, mentionedUsers: MinimumUser[]) {
// Register host
if (this.meta.enableStatsForFederatedInstances) {
if (isRemoteUser(user)) {
@ -621,15 +633,15 @@ export class NoteEditService implements OnApplicationShutdown {
}
}
this.usersRepository.update({ id: user.id }, { updatedAt: this.timeService.date });
await this.usersRepository.update({ id: user.id }, { updatedAt: this.timeService.date });
// ハッシュタグ更新
this.pushToTl(note, user);
await this.pushToTl(note, user);
if (data.poll && data.poll.expiresAt) {
const delay = data.poll.expiresAt.getTime() - this.timeService.now;
this.queueService.endedPollNotificationQueue.remove(`pollEnd:${note.id}`);
this.queueService.endedPollNotificationQueue.add(note.id, {
await this.queueService.endedPollNotificationQueue.remove(`pollEnd:${note.id}`);
await this.queueService.endedPollNotificationQueue.add(note.id, {
noteId: note.id,
}, {
jobId: `pollEnd_${note.id}`,
@ -648,9 +660,9 @@ export class NoteEditService implements OnApplicationShutdown {
text: note.text ?? '',
});
this.roleService.addNoteToRoleTimeline(noteObj);
await this.roleService.addNoteToRoleTimeline(noteObj);
this.webhookService.enqueueUserWebhook(user.id, 'note', { note: noteObj });
await this.webhookService.enqueueUserWebhook(user.id, 'note', { note: noteObj });
const nm = new NotificationManager(this.mutingsRepository, this.notificationService, user, note);
@ -673,16 +685,16 @@ export class NoteEditService implements OnApplicationShutdown {
if (!isThreadMuted && !muted) {
nm.push(data.reply.userId, 'edited');
this.globalEventService.publishMainStream(data.reply.userId, 'edited', noteObj);
this.webhookService.enqueueUserWebhook(data.reply.userId, 'reply', { note: noteObj });
await this.webhookService.enqueueUserWebhook(data.reply.userId, 'reply', { note: noteObj });
}
}
}
nm.notify();
await nm.notify();
//#region AP deliver
if (!data.localOnly && isLocalUser(user)) {
trackTask(async () => {
await trackTask(async () => {
const noteActivity = await this.apRendererService.renderNoteOrRenoteActivity(note, user, { renote: data.renote });
const dm = this.apDeliverManagerService.createDeliverManager(user, noteActivity);
@ -737,8 +749,8 @@ export class NoteEditService implements OnApplicationShutdown {
}
if (data.channel) {
this.channelsRepository.increment({ id: data.channel.id }, 'notesCount', 1);
this.channelsRepository.update(data.channel.id, {
await this.channelsRepository.increment({ id: data.channel.id }, 'notesCount', 1);
await this.channelsRepository.update(data.channel.id, {
lastNotedAt: this.timeService.date,
});
@ -754,8 +766,11 @@ export class NoteEditService implements OnApplicationShutdown {
});
}
// Update the Latest Note index / following feed
await this.latestNoteService.handleUpdatedNoteDeferred(note);
// Register to search database
if (!user.noindex) this.index(note);
if (!user.noindex) await this.index(note);
}
@bindThis
@ -776,10 +791,10 @@ export class NoteEditService implements OnApplicationShutdown {
}
@bindThis
private index(note: MiNote) {
private async index(note: MiNote) {
if (note.text == null && note.cw == null) return;
this.searchService.indexNote(note);
await this.searchService.indexNote(note);
}
@bindThis
@ -909,7 +924,7 @@ export class NoteEditService implements OnApplicationShutdown {
// checkHibernation moved to HibernateUsersProcessorService
}
r.exec();
await r.exec();
}
// checkHibernation moved to HibernateUsersProcessorService

View file

@ -19,6 +19,7 @@ import { ApRequestCreator } from '@/core/activitypub/ApRequestService.js';
import { TimeService } from '@/global/TimeService.js';
import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js';
import type { MiNote } from '@/models/Note.js';
import type { MinimalNote } from '@/misc/is-renote.js';
import { type UserWebhookPayload } from './UserWebhookService.js';
import type {
BackgroundTaskJobData,
@ -44,7 +45,6 @@ import type {
} from './QueueModule.js';
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';
import type { MiUser } from '@/models/User.js';
export const QUEUE_TYPES = [
'system',
@ -846,147 +846,93 @@ export class QueueService implements OnModuleInit {
@bindThis
public async createUpdateUserJob(userId: string) {
return await this.createBackgroundTask(
'update-user',
{
type: 'update-user',
userId,
},
{
id: `update-user:${userId}`,
},
);
return await this.createBackgroundTask({ type: 'update-user', userId }, userId);
}
@bindThis
public async createUpdateFeaturedJob(userId: string) {
return await this.createBackgroundTask(
'update-featured',
{
type: 'update-featured',
userId,
},
{
id: `update-featured:${userId}`,
},
);
return await this.createBackgroundTask({ type: 'update-featured', userId }, userId);
}
@bindThis
public async createUpdateInstanceJob(host: string) {
return await this.createBackgroundTask(
'update-instance',
{
type: 'update-instance',
host,
},
{
id: `update-instance:${host}`,
},
);
return await this.createBackgroundTask({ type: 'update-instance', host }, host);
}
@bindThis
public async createPostDeliverJob(host: string, result: 'success' | 'temp-fail' | 'perm-fail') {
return await this.createBackgroundTask(
'post-deliver',
{
type: 'post-deliver',
host,
result,
},
);
return await this.createBackgroundTask({ type: 'post-deliver', host, result });
}
@bindThis
public async createPostInboxJob(host: string) {
return await this.createBackgroundTask(
'post-inbox',
{
type: 'post-inbox',
host,
},
);
return await this.createBackgroundTask({ type: 'post-inbox', host });
}
@bindThis
public async createPostNoteJob(noteId: string, silent: boolean, type: 'create' | 'edit') {
return await this.createBackgroundTask(
'post-note',
{
type: 'post-note',
noteId,
silent,
edit: type === 'edit',
},
{
id: `post-note:${noteId}:${type}`,
},
);
const edit = type === 'edit';
const duplication = `${noteId}:${type}`;
return await this.createBackgroundTask({ type: 'post-note', noteId, silent, edit }, duplication);
}
@bindThis
public async createCheckHibernationJob(userId: string) {
return await this.createBackgroundTask(
'check-hibernation',
{
type: 'check-hibernation',
userId,
},
{
id: `check-hibernation:${userId}`,
ttl: 1000 * 60 * 60 * 24, // This is a very heavy task, so only run once per day per user
},
{ type: 'check-hibernation', userId },
// This is a very heavy task, so only run once per day per user
{ id: `check-hibernation:${userId}`, ttl: 1000 * 60 * 60 * 24 },
);
}
@bindThis
public async createUpdateUserTagsJob(userId: string) {
return await this.createBackgroundTask(
'update-user-tags',
{
type: 'update-user-tags',
userId,
},
{
id: `update-user-tags:${userId}`,
},
);
return await this.createBackgroundTask({ type: 'update-user-tags', userId }, userId);
}
@bindThis
public async createUpdateNoteTagsJob(noteId: string) {
return await this.createBackgroundTask(
'update-note-tags',
{
type: 'update-note-tags',
noteId,
},
{
id: `update-note-tags:${noteId}`,
},
);
return await this.createBackgroundTask({ type: 'update-note-tags', noteId }, noteId);
}
@bindThis
public async createDeleteFileJob(fileId: string, isExpired?: boolean, deleterId?: string) {
return await this.createBackgroundTask(
'delete-file',
{
type: 'delete-file',
fileId,
isExpired,
deleterId,
},
{
id: `delete-file:${fileId}`,
},
);
return await this.createBackgroundTask({ type: 'delete-file', fileId, isExpired, deleterId }, fileId);
}
private async createBackgroundTask(name: string, data: BackgroundTaskJobData, duplication?: { id: string, ttl?: number }) {
@bindThis
public async createUpdateLatestNoteJob(note: MinimalNote) {
// Compact the note to avoid storing the entire thing in Redis, when all we need is minimal data for categorization
const packedNote: MinimalNote = {
id: note.id,
visibility: note.visibility,
userId: note.userId,
replyId: note.replyId,
renoteId: note.renoteId,
hasPoll: note.hasPoll,
text: note.text ? '1' : null,
cw: note.text ? '1' : null,
fileIds: note.fileIds.length > 0 ? ['1'] : [],
};
return await this.createBackgroundTask({ type: 'update-latest-note', note: packedNote }, note.id);
}
@bindThis
public async createPostSuspendJob(userId: string) {
return await this.createBackgroundTask({ type: 'post-suspend', userId }, userId);
}
@bindThis
public async createPostUnsuspendJob(userId: string) {
return await this.createBackgroundTask({ type: 'post-unsuspend', userId }, userId);
}
private async createBackgroundTask<T extends BackgroundTaskJobData>(data: T, duplication?: string | { id: string, ttl?: number }) {
return await this.backgroundTaskQueue.add(
name,
data.type,
data,
{
removeOnComplete: {
@ -1006,7 +952,9 @@ export class QueueService implements OnModuleInit {
},
// https://docs.bullmq.io/guide/jobs/deduplication
deduplication: duplication,
deduplication: typeof(duplication) === 'string'
? { id: `${data.type}:${duplication}` }
: duplication,
},
);
};

View file

@ -17,16 +17,10 @@ import { RelationshipJobData } from '@/queue/types.js';
import { ModerationLogService } from '@/core/ModerationLogService.js';
import { isSystemAccount } from '@/misc/is-system-account.js';
import { CacheService } from '@/core/CacheService.js';
import { LoggerService } from '@/core/LoggerService.js';
import type Logger from '@/logger.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import { InternalEventService } from '@/global/InternalEventService.js';
@Injectable()
export class UserSuspendService {
private readonly logger: Logger;
constructor(
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@ -47,11 +41,7 @@ export class UserSuspendService {
private moderationLogService: ModerationLogService,
private readonly cacheService: CacheService,
private readonly internalEventService: InternalEventService,
loggerService: LoggerService,
) {
this.logger = loggerService.getLogger('user-suspend');
}
) {}
@bindThis
public async suspend(user: MiUser, moderator: MiUser): Promise<void> {
@ -69,10 +59,7 @@ export class UserSuspendService {
userHost: user.host,
});
trackPromise((async () => {
await this.postSuspend(user);
await this.freezeAll(user);
})().catch(e => this.logger.error(`Error suspending user ${user.id}: ${renderInlineError(e)}`)));
await this.queueService.createPostSuspendJob(user.id);
}
@bindThis
@ -89,14 +76,11 @@ export class UserSuspendService {
userHost: user.host,
});
trackPromise((async () => {
await this.postUnsuspend(user);
await this.unFreezeAll(user);
})().catch(e => this.logger.error(`Error un-suspending for user ${user.id}: ${renderInlineError(e)}`)));
await this.queueService.createPostUnsuspendJob(user.id);
}
@bindThis
private async postSuspend(user: { id: MiUser['id']; host: MiUser['host'] }): Promise<void> {
public async postSuspend(user: MiUser): Promise<void> {
this.globalEventService.publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: true });
/*
@ -132,10 +116,12 @@ export class UserSuspendService {
await this.queueService.deliverMany(user, content, queue);
}
await this.freezeAll(user);
}
@bindThis
private async postUnsuspend(user: MiUser): Promise<void> {
public async postUnsuspend(user: MiUser): Promise<void> {
this.globalEventService.publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: false });
if (this.userEntityService.isLocalUser(user)) {
@ -162,6 +148,8 @@ export class UserSuspendService {
await this.queueService.deliverMany(user, content, queue);
}
await this.unFreezeAll(user);
}
@bindThis

View file

@ -5,42 +5,51 @@
import type { MiNote } from '@/models/Note.js';
import type { Packed } from '@/misc/json-schema.js';
import type { NoteEdit } from '@/models/NoteEdit.js';
// NoteEntityService.isPureRenote とよしなにリンク
type Renote =
export type Renote =
MiNote & {
renoteId: NonNullable<MiNote['renoteId']>
};
type Quote =
export type Quote =
Renote & ({
text: NonNullable<MiNote['text']>
} | {
cw: NonNullable<MiNote['cw']>
} | {
replyId: NonNullable<MiNote['replyId']>
reply: NonNullable<MiNote['reply']>
reply: NonNullable<MiNote['reply']> // TODO this is wrong
} | {
hasPoll: true
} | {
fileIds: [string, ...string[]]
});
type PureRenote =
export type PureRenote =
Renote & {
text: null,
cw: null,
replyId: null,
hasPoll: false,
fileIds: {
length: 0,
},
fileIds: [],
};
export function isRenote(note: MiNote): note is Renote {
export function isRenote(note: MiNote): note is Renote;
export function isRenote(note: NoteEdit): note is RenoteEdit;
export function isRenote(note: MinimalNote): note is MinimalRenote;
export function isRenote(note: MiNote | NoteEdit | MinimalNote): note is Renote | RenoteEdit | MinimalRenote;
export function isRenote(note: MiNote | NoteEdit | MinimalNote): note is Renote | RenoteEdit | MinimalRenote {
return note.renoteId != null;
}
export function isQuote(note: Renote): note is Quote {
export function isQuote(note: Renote): note is Quote;
export function isQuote(note: RenoteEdit): note is QuoteEdit;
export function isQuote(note: MinimalNote): note is MinimalQuote;
export function isQuote(note: Renote | RenoteEdit | MinimalNote): note is Quote | QuoteEdit | MinimalQuote;
export function isQuote(note: Renote | RenoteEdit | MinimalNote): note is Quote | QuoteEdit | MinimalQuote {
// NOTE: SYNC WITH NoteCreateService.isQuote
return note.text != null ||
note.cw != null ||
@ -49,7 +58,11 @@ export function isQuote(note: Renote): note is Quote {
note.fileIds.length > 0;
}
export function isPureRenote(note: MiNote): note is PureRenote {
export function isPureRenote(note: MiNote): note is PureRenote;
export function isPureRenote(note: NoteEdit): note is PureRenoteEdit;
export function isPureRenote(note: MinimalNote): note is MinimalPureRenote;
export function isPureRenote(note: MiNote | NoteEdit | MinimalNote): note is PureRenote | PureRenoteEdit | MinimalPureRenote;
export function isPureRenote(note: MiNote | NoteEdit | MinimalNote): note is PureRenote | PureRenoteEdit | MinimalPureRenote {
return isRenote(note) && !isQuote(note);
}
@ -68,15 +81,16 @@ type PackedQuote =
} | {
poll: NonNullable<Packed<'Note'>['poll']>
} | {
fileIds: NonNullable<Packed<'Note'>['fileIds']>
fileIds: [string, ...string[]]
});
type PackedPureRenote = PackedRenote & {
text: NonNullable<Packed<'Note'>['text']>;
cw: NonNullable<Packed<'Note'>['cw']>;
replyId: NonNullable<Packed<'Note'>['replyId']>;
poll: NonNullable<Packed<'Note'>['poll']>;
fileIds: NonNullable<Packed<'Note'>['fileIds']>;
text: null;
cw: null;
replyId: null;
reply: null;
poll: null;
fileIds: [];
};
export function isRenotePacked(note: Packed<'Note'>): note is PackedRenote {
@ -94,3 +108,58 @@ export function isQuotePacked(note: PackedRenote): note is PackedQuote {
export function isPackedPureRenote(note: Packed<'Note'>): note is PackedPureRenote {
return isRenotePacked(note) && !isQuotePacked(note);
}
export type RenoteEdit =
NoteEdit & {
renoteId: NonNullable<NoteEdit['renoteId']>
};
export type QuoteEdit =
RenoteEdit & ({
text: NonNullable<NoteEdit['text']>
} | {
cw: NonNullable<NoteEdit['cw']>
} | {
replyId: NonNullable<NoteEdit['replyId']>
} | {
hasPoll: true
} | {
fileIds: [string, ...string[]],
});
export type PureRenoteEdit =
RenoteEdit & {
text: null,
cw: null,
replyId: null,
reply: null,
hasPoll: false,
fileIds: [],
};
export type MinimalNote = Pick<MiNote, 'id' | 'visibility' | 'userId' | 'replyId' | 'renoteId' | 'text' | 'cw' | 'hasPoll' | 'fileIds'>;
export type MinimalRenote = MinimalNote & {
renoteId: string;
};
export type MinimalQuote = MinimalRenote & ({
text: NonNullable<MinimalNote['text']>
} | {
cw: NonNullable<MinimalNote['cw']>
} | {
replyId: NonNullable<MinimalNote['replyId']>
} | {
hasPoll: true
} | {
fileIds: [string, ...string[]],
});
export type MinimalPureRenote = MinimalRenote & {
text: null,
cw: null,
replyId: null,
reply: null,
hasPoll: false,
fileIds: [],
};

View file

@ -6,7 +6,7 @@
import { PrimaryColumn, Entity, JoinColumn, Column, ManyToOne } from 'typeorm';
import { MiUser } from '@/models/User.js';
import { MiNote } from '@/models/Note.js';
import { isQuote, isRenote } from '@/misc/is-renote.js';
import { isQuote, isRenote, MinimalNote } from '@/misc/is-renote.js';
/**
* Maps a user to the most recent post by that user.
@ -76,7 +76,7 @@ export class SkLatestNote {
/**
* Generates a compound key matching a provided note.
*/
static keyFor(note: MiNote) {
static keyFor(note: MinimalNote) {
return {
userId: note.userId,
isPublic: note.visibility === 'public',
@ -88,7 +88,7 @@ export class SkLatestNote {
/**
* Checks if two notes would produce equivalent compound keys.
*/
static areEquivalent(first: MiNote, second: MiNote): boolean {
static areEquivalent(first: MinimalNote, second: MinimalNote): boolean {
const firstKey = SkLatestNote.keyFor(first);
const secondKey = SkLatestNote.keyFor(second);

View file

@ -7,6 +7,8 @@ import { Entity, JoinColumn, Column, ManyToOne, PrimaryColumn, Index } from 'typ
import { id } from './util/id.js';
import { MiNote } from './Note.js';
import type { MiDriveFile } from './DriveFile.js';
import { MiUser } from '@/models/User.js';
import { noteVisibilities } from '@/types.js';
@Entity()
export class NoteEdit {
@ -26,17 +28,64 @@ export class NoteEdit {
@JoinColumn()
public note: MiNote | null;
// TODO data migration
@Column({
...id(),
comment: 'The ID of author.',
})
public userId: MiUser['id'];
@ManyToOne(type => MiUser, {
onDelete: 'CASCADE',
})
@JoinColumn()
public user: MiUser | null;
@Column({
...id(),
nullable: true,
comment: 'The ID of renote target. Will always be null for older edits',
})
public renoteId: MiNote['id'] | null;
@ManyToOne(() => MiNote, {
onDelete: 'CASCADE',
})
@JoinColumn()
public renote: MiNote | null;
@Column({
...id(),
nullable: true,
comment: 'The ID of reply target. Will always be null for older edits',
})
public replyId: MiNote['id'] | null;
@ManyToOne(() => MiNote, {
onDelete: 'CASCADE',
})
@JoinColumn()
public reply: MiNote | null;
@Column('enum', { enum: noteVisibilities })
public visibility: typeof noteVisibilities[number];
@Column('text', {
nullable: true,
})
public newText: string | null;
@Column('varchar', {
length: 512,
@Column('text', {
nullable: true,
comment: 'Will always be null for older edits',
})
public cw: string | null;
@Column('text', {
nullable: true,
})
public newCw: string | null;
@Column({
...id(),
array: true,
@ -49,14 +98,21 @@ export class NoteEdit {
})
public updatedAt: Date;
// TODO rename migration
@Column('text', {
nullable: true,
})
public oldText: string | null;
public text: string | null;
@Column('timestamp with time zone', {
comment: 'The old date from before the edit',
nullable: true,
})
public oldDate: Date | null;
@Column('boolean', {
default: false,
comment: 'Whether this revision had a poll. Will always be false for older edits',
})
public hasPoll: boolean;
}

View file

@ -5,7 +5,7 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask } from '@/queue/types.js';
import { BackgroundTaskJobData, CheckHibernationBackgroundTask, PostDeliverBackgroundTask, PostInboxBackgroundTask, PostNoteBackgroundTask, UpdateFeaturedBackgroundTask, UpdateInstanceBackgroundTask, UpdateUserTagsBackgroundTask, UpdateUserBackgroundTask, UpdateNoteTagsBackgroundTask, DeleteFileBackgroundTask, UpdateLatestNoteBackgroundTask, PostSuspendBackgroundTask, PostUnsuspendBackgroundTask } from '@/queue/types.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
import Logger from '@/logger.js';
@ -19,11 +19,14 @@ import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js';
import { NoteCreateService } from '@/core/NoteCreateService.js';
import type { DriveFilesRepository, NotesRepository } from '@/models/_.js';
import type { DriveFilesRepository, NoteEditsRepository, NotesRepository } from '@/models/_.js';
import { MiUser } from '@/models/_.js';
import { NoteEditService } from '@/core/NoteEditService.js';
import { HashtagService } from '@/core/HashtagService.js';
import { DriveService } from '@/core/DriveService.js';
import { LatestNoteService } from '@/core/LatestNoteService.js';
import { trackTask } from '@/misc/promise-tracker.js';
import { UserSuspendService } from '@/core/UserSuspendService.js';
@Injectable()
export class BackgroundTaskProcessorService {
@ -39,6 +42,9 @@ export class BackgroundTaskProcessorService {
@Inject(DI.driveFilesRepository)
private readonly driveFilesRepository: DriveFilesRepository,
@Inject(DI.noteEditsRepository)
private readonly noteEditsRepository: NoteEditsRepository,
private readonly apPersonService: ApPersonService,
private readonly cacheService: CacheService,
private readonly federatedInstanceService: FederatedInstanceService,
@ -51,6 +57,8 @@ export class BackgroundTaskProcessorService {
private readonly noteEditService: NoteEditService,
private readonly hashtagService: HashtagService,
private readonly driveService: DriveService,
private readonly latestNoteService: LatestNoteService,
private readonly userSuspendService: UserSuspendService,
queueLoggerService: QueueLoggerService,
) {
@ -76,9 +84,15 @@ export class BackgroundTaskProcessorService {
return await this.processPostNote(job.data);
} else if (job.data.type === 'check-hibernation') {
return await this.processCheckHibernation(job.data);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (job.data.type === 'delete-file') {
return await this.processDeleteFile(job.data);
} else if (job.data.type === 'update-latest-note') {
return await this.processUpdateLatestNote(job.data);
} else if (job.data.type === 'post-suspend') {
return await this.processPostSuspend(job.data);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (job.data.type === 'post-unsuspend') {
return await this.processPostUnsuspend(job.data);
} else {
this.logger.warn(`Can't process unknown job type "${job.data}"; this is likely a bug. Full job data:`, job.data);
throw new Error(`Unknown job type ${job.data}, see system logs for details`);
@ -201,12 +215,13 @@ export class BackgroundTaskProcessorService {
const instance = await this.federatedInstanceService.fetchOrRegister(task.host);
if (instance.isBlocked) return `Skipping post-inbox task: instance ${task.host} is blocked`;
// TODO move chart stuff out of background?
// Update charts
if (this.meta.enableChartsForFederatedInstances) {
await this.instanceChart.requestReceived(task.host);
this.instanceChart.requestReceived(task.host);
}
await this.apRequestChart.inbox();
await this.federationChart.inbox(task.host);
this.apRequestChart.inbox();
this.federationChart.inbox(task.host);
// Update instance metadata (deferred)
await this.fetchInstanceMetadataService.fetchInstanceMetadataLazy(instance);
@ -229,9 +244,9 @@ export class BackgroundTaskProcessorService {
const mentionedUsers = await this.cacheService.getUsers(note.mentions);
if (task.edit) {
await this.noteEditService.postNoteEdited(note, user, note, task.silent, note.tags, Array.from(mentionedUsers.values()));
await this.noteEditService.postNoteEdited(note, user, note, task.silent, Array.from(mentionedUsers.values()));
} else {
await this.noteCreateService.postNoteCreated(note, user, note, task.silent, note.tags, Array.from(mentionedUsers.values()));
await this.noteCreateService.postNoteCreated(note, user, note, task.silent, Array.from(mentionedUsers.values()));
}
return 'ok';
@ -260,4 +275,50 @@ export class BackgroundTaskProcessorService {
await this.driveService.deleteFileSync(file, task.isExpired, deleter);
return 'ok';
}
private async processUpdateLatestNote(task: UpdateLatestNoteBackgroundTask): Promise<string> {
const note = await this.notesRepository.findOneBy({ id: task.note.id });
if (note) {
const lastEdit = await this.noteEditsRepository.findOne({
where: { noteId: task.note.id },
order: { id: 'desc' },
});
if (lastEdit) {
// Update
await this.latestNoteService.handleUpdatedNote(lastEdit, note);
} else {
// Create
await this.latestNoteService.handleDeletedNote(note);
}
} else {
// Delete
await this.latestNoteService.handleDeletedNote(task.note);
}
return 'ok';
}
private async processPostSuspend(task: PostSuspendBackgroundTask): Promise<string> {
const user = await this.cacheService.findOptionalUserById(task.userId);
if (!user || user.isDeleted) return `Skipping post-suspend task: user ${task.userId} has been deleted`;
await trackTask(async () => {
await this.userSuspendService.postSuspend(user);
});
return 'ok';
}
private async processPostUnsuspend(task: PostUnsuspendBackgroundTask): Promise<string> {
const user = await this.cacheService.findOptionalUserById(task.userId);
if (!user || user.isDeleted) return `Skipping post-unsuspend task: user ${task.userId} has been deleted`;
await trackTask(async () => {
await this.userSuspendService.postUnsuspend(user);
});
return 'ok';
}
}

View file

@ -12,6 +12,7 @@ import type { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js';
import type { IActivity } from '@/core/activitypub/type.js';
import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js';
import type { UserWebhookPayload } from '@/core/UserWebhookService.js';
import type { MinimalNote } from '@/misc/is-renote.js';
import type httpSignature from '@peertube/http-signature';
export type DeliverJobData = {
@ -179,7 +180,10 @@ export type BackgroundTaskJobData =
PostInboxBackgroundTask |
PostNoteBackgroundTask |
CheckHibernationBackgroundTask |
DeleteFileBackgroundTask;
DeleteFileBackgroundTask |
UpdateLatestNoteBackgroundTask |
PostSuspendBackgroundTask |
PostUnsuspendBackgroundTask;
export type UpdateUserBackgroundTask = {
type: 'update-user';
@ -235,3 +239,18 @@ export type DeleteFileBackgroundTask = {
isExpired?: boolean;
deleterId?: string;
};
export type UpdateLatestNoteBackgroundTask = {
type: 'update-latest-note';
note: MinimalNote;
};
export type PostSuspendBackgroundTask = {
type: 'post-suspend';
userId: string;
};
export type PostUnsuspendBackgroundTask = {
type: 'post-unsuspend';
userId: string;
};