process account migrations on the relationship queue

This commit is contained in:
Hazelnoot 2025-09-14 09:52:47 -04:00
parent 8089a67b4e
commit 3fb2b38c92
5 changed files with 25 additions and 5 deletions

View file

@ -128,8 +128,7 @@ export class AccountMoveService {
to: { id: followeeId },
})), process.env.NODE_ENV === 'test' ? 10000 : 1000 * 60 * 60 * 24);
// TODO add this to relationship queue
await this.postMoveProcess(src, dst);
await this.queueService.createMoveJob(src, dst);
return iObj;
}

View file

@ -706,7 +706,13 @@ export class QueueService implements OnModuleInit {
}
@bindThis
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobsOptions = {}): {
public createMoveJob(from: ThinUser, to: ThinUser) {
const job = this.generateRelationshipJobData('move', { from, to });
return this.relationshipQueue.add(job.name, job.data, job.opts);
}
@bindThis
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock' | 'move', data: RelationshipJobData, opts: Bull.JobsOptions = {}): {
name: string,
data: RelationshipJobData,
opts: Bull.JobsOptions,

View file

@ -988,7 +988,7 @@ export class ApPersonService implements OnModuleInit {
return 'skip: alsoKnownAs does not include from.uri';
}
await this.accountMoveService.postMoveProcess(src, dst);
await this.queueService.createMoveJob(src, dst);
return 'ok';
}

View file

@ -423,6 +423,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
case 'unfollow': return this.relationshipProcessorService.processUnfollow(job);
case 'block': return this.relationshipProcessorService.processBlock(job);
case 'unblock': return this.relationshipProcessorService.processUnblock(job);
case 'move': return this.relationshipProcessorService.processMove(job);
default: throw new Error(`unrecognized job type ${job.name} for relationship`);
}
};

View file

@ -7,6 +7,8 @@ import { Inject, Injectable } from '@nestjs/common';
import { UserFollowingService } from '@/core/UserFollowingService.js';
import { UserBlockingService } from '@/core/UserBlockingService.js';
import { CacheService } from '@/core/CacheService.js';
import { AccountMoveService } from '@/core/AccountMoveService.js';
import { bindThis } from '@/decorators.js';
import type Logger from '@/logger.js';
@ -28,8 +30,10 @@ export class RelationshipProcessorService {
private queueLoggerService: QueueLoggerService,
private userFollowingService: UserFollowingService,
private userBlockingService: UserBlockingService,
private readonly cacheService: CacheService,
private readonly accountMoveService: AccountMoveService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('follow-block');
this.logger = this.queueLoggerService.logger.createSubLogger('relationship');
}
@bindThis
@ -75,4 +79,14 @@ export class RelationshipProcessorService {
await this.userBlockingService.unblock(blockee, blocker);
return 'ok';
}
public async processMove(job: Bull.Job<RelationshipJobData>): Promise<string> {
this.logger.info(`${job.data.from.id} is trying to migrate to ${job.data.to.id}`);
const [src, dst] = await Promise.all([
this.cacheService.findUserById(job.data.from.id),
this.cacheService.findUserById(job.data.to.id),
]);
await this.accountMoveService.postMoveProcess(src, dst);
return 'ok';
}
}