implement updateAntennaQueue

This commit is contained in:
Hazelnoot 2025-06-25 20:15:48 -04:00
parent be2baf739e
commit 1cb21b0911
5 changed files with 92 additions and 39 deletions

View file

@ -18,13 +18,15 @@ import type { AntennasRepository, UserListMembershipsRepository } from '@/models
import type { MiAntenna } from '@/models/Antenna.js';
import type { MiNote } from '@/models/Note.js';
import type { MiUser } from '@/models/User.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { CacheService } from './CacheService.js';
import type { OnApplicationShutdown } from '@nestjs/common';
@Injectable()
export class AntennaService implements OnApplicationShutdown {
// TODO implement QuantumSingleCache then replace this
private antennasFetched: boolean;
private antennas: MiAntenna[];
private antennas: Map<string, MiAntenna>;
constructor(
@Inject(DI.redisForTimelines)
@ -43,9 +45,10 @@ export class AntennaService implements OnApplicationShutdown {
private utilityService: UtilityService,
private globalEventService: GlobalEventService,
private fanoutTimelineService: FanoutTimelineService,
private readonly internalEventService: InternalEventService,
) {
this.antennasFetched = false;
this.antennas = [];
this.antennas = new Map();
this.redisForSub.on('message', this.onRedisMessage);
}
@ -58,35 +61,16 @@ export class AntennaService implements OnApplicationShutdown {
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
switch (type) {
case 'antennaCreated':
this.antennas.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい
case 'antennaUpdated':
this.antennas.set(body.id, { // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい
...body,
lastUsedAt: new Date(body.lastUsedAt),
user: null, // joinなカラムは通常取ってこないので
userList: null, // joinなカラムは通常取ってこないので
});
break;
case 'antennaUpdated': {
const idx = this.antennas.findIndex(a => a.id === body.id);
if (idx >= 0) {
this.antennas[idx] = { // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい
...body,
lastUsedAt: new Date(body.lastUsedAt),
user: null, // joinなカラムは通常取ってこないので
userList: null, // joinなカラムは通常取ってこないので
};
} else {
// サーバ起動時にactiveじゃなかった場合、リストに持っていないので追加する必要あり
this.antennas.push({ // TODO: このあたりのデシリアライズ処理は各modelファイル内に関数としてexportしたい
...body,
lastUsedAt: new Date(body.lastUsedAt),
user: null, // joinなカラムは通常取ってこないので
userList: null, // joinなカラムは通常取ってこないので
});
}
}
break;
case 'antennaDeleted':
this.antennas = this.antennas.filter(a => a.id !== body.id);
this.antennas.delete(body.id);
break;
default:
break;
@ -94,6 +78,20 @@ export class AntennaService implements OnApplicationShutdown {
}
}
@bindThis
public async updateAntenna(id: string, data: Partial<MiAntenna>) {
await this.antennasRepository.update({ id }, data);
const antenna = this.antennas.get(id) ?? await this.antennasRepository.findOneBy({ id });
if (antenna) {
// This will be handled above to save result
await this.internalEventService.emit('antennaUpdated', {
...antenna,
...data,
});
}
}
@bindThis
public async addNoteToAntennas(note: MiNote, noteUser: { id: MiUser['id']; username: string; host: string | null; isBot: boolean; }): Promise<void> {
const antennas = await this.getAntennas();
@ -212,13 +210,14 @@ export class AntennaService implements OnApplicationShutdown {
@bindThis
public async getAntennas() {
if (!this.antennasFetched) {
this.antennas = await this.antennasRepository.findBy({
const allAntennas = await this.antennasRepository.findBy({
isActive: true,
});
this.antennas = new Map(allAntennas.map(a => [a.id, a]));
this.antennasFetched = true;
}
return this.antennas;
return Array.from(this.antennas.values());
}
@bindThis

View file

@ -14,8 +14,9 @@ import { bindThis } from '@/decorators.js';
import type { MiInstance } from '@/models/Instance.js';
import { InternalEventService } from '@/core/InternalEventService.js';
import { MiUser } from '@/models/User.js';
import type { MiNote, UsersRepository, NotesRepository, MiAccessToken, AccessTokensRepository } from '@/models/_.js';
import type { MiNote, UsersRepository, NotesRepository, MiAccessToken, AccessTokensRepository, MiAntenna, AntennasRepository } from '@/models/_.js';
import { DI } from '@/di-symbols.js';
import { AntennaService } from '@/core/AntennaService.js';
export type UpdateInstanceJob = {
latestRequestReceivedAt?: Date,
@ -46,14 +47,30 @@ export type UpdateAccessTokenJob = {
lastUsedAt: Date;
};
export type UpdateAntennaJob = {
isActive: boolean,
lastUsedAt?: Date,
};
// TODO sync cross-process:
// 1. Emit internal events when scheduling timer, performing queue, and enqueuing data
// 2. On schedule, mark ID as deferred.
// 3. On perform, clear mark.
// 4. On performAll, skip deferred IDs.
// 5. On enqueue when ID is deferred, send data as event instead.
// 6. On delete when ID is deferred, clear mark
@Injectable()
export class CollapsedQueueService implements OnApplicationShutdown {
// Moved from InboxProcessorService
public readonly updateInstanceQueue: CollapsedQueue<MiInstance['id'], UpdateInstanceJob>;
// Moved from NoteCreateService, NoteEditService, and NoteDeleteService
public readonly updateUserQueue: CollapsedQueue<MiUser['id'], UpdateUserJob>;
public readonly updateNoteQueue: CollapsedQueue<MiNote['id'], UpdateNoteJob>;
public readonly updateAccessTokenQueue: CollapsedQueue<MiAccessToken['id'], UpdateAccessTokenJob>;
public readonly updateAntennaQueue: CollapsedQueue<MiAntenna['id'], UpdateAntennaJob>;
private readonly logger: Logger;
@ -67,9 +84,13 @@ export class CollapsedQueueService implements OnApplicationShutdown {
@Inject(DI.accessTokensRepository)
public readonly accessTokensRepository: AccessTokensRepository,
@Inject(DI.antennasRepository)
public readonly antennasRepository: AntennasRepository,
private readonly federatedInstanceService: FederatedInstanceService,
private readonly envService: EnvService,
private readonly internalEventService: InternalEventService,
private readonly antennaService: AntennaService,
loggerService: LoggerService,
) {
@ -136,15 +157,17 @@ export class CollapsedQueueService implements OnApplicationShutdown {
followingCountDelta: (oldJob.followingCountDelta ?? 0) + (newJob.followingCountDelta ?? 0),
followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0),
}),
(id, job) => this.usersRepository.update({ id }, {
updatedAt: job.updatedAt,
notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined,
followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined,
followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined,
}),
async (id, job) => {
await this.usersRepository.update({ id }, {
updatedAt: job.updatedAt,
notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined,
followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined,
followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined,
});
await this.internalEventService.emit('userUpdated', { id });
},
{
onError: this.onQueueError,
onPerform: (_, id) => this.internalEventService.emit('userUpdated', { id }),
concurrency: 4, // High concurrency - this queue gets a lot of activity
},
);
@ -183,7 +206,26 @@ export class CollapsedQueueService implements OnApplicationShutdown {
},
);
this.updateAntennaQueue = new CollapsedQueue(
'updateAntenna',
fiveMinuteInterval,
(oldJob, newJob) => ({
isActive: oldJob.isActive || newJob.isActive,
lastUsedAt: maxDate(oldJob.lastUsedAt, newJob.lastUsedAt),
}),
(id, job) => this.antennaService.updateAntenna(id, {
isActive: job.isActive,
lastUsedAt: job.lastUsedAt,
}),
{
onError: this.onQueueError,
concurrency: 4,
},
);
this.internalEventService.on('userChangeDeletedState', this.onUserDeleted);
this.internalEventService.on('antennaDeleted', this.onAntennaDeleted);
this.internalEventService.on('antennaUpdated', this.onAntennaDeleted);
}
@bindThis
@ -194,6 +236,7 @@ export class CollapsedQueueService implements OnApplicationShutdown {
await this.performQueue(this.updateUserQueue);
await this.performQueue(this.updateNoteQueue);
await this.performQueue(this.updateAccessTokenQueue);
await this.performQueue(this.updateAntennaQueue);
this.logger.info('Persistence complete.');
}
@ -226,8 +269,15 @@ export class CollapsedQueueService implements OnApplicationShutdown {
}
}
@bindThis
private onAntennaDeleted(data: MiAntenna) {
this.updateAntennaQueue.delete(data.id);
}
async onApplicationShutdown() {
this.internalEventService.off('userChangeDeletedState', this.onUserDeleted);
this.internalEventService.off('antennaDeleted', this.onAntennaDeleted);
this.internalEventService.off('antennaUpdated', this.onAntennaDeleted);
await this.performAllNow();
}

View file

@ -24,7 +24,6 @@ export class CollapsedQueue<K, V> {
private readonly perform: (key: K, value: V) => Promise<void | unknown>,
private readonly opts?: {
onError?: (queue: CollapsedQueue<K, V>, error: unknown) => void | Promise<void>,
onPerform?: (queue: CollapsedQueue<K, V>, key: K, value: V) => void | Promise<void>,
concurrency?: number,
},
) {
@ -74,7 +73,6 @@ export class CollapsedQueue<K, V> {
} else {
await this.perform(key, value);
}
await this.opts?.onPerform?.(this, key, value);
} catch (err) {
await this.opts?.onError?.(this, err);
throw err;

View file

@ -13,6 +13,7 @@ import { IdService } from '@/core/IdService.js';
import type { Config } from '@/config.js';
import { ReversiService } from '@/core/ReversiService.js';
import { TimeService } from '@/global/TimeService.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@ -37,6 +38,7 @@ export class CleanProcessorService {
private reversiService: ReversiService,
private idService: IdService,
private readonly timeService: TimeService,
private readonly collapsedQueueService: CollapsedQueueService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('clean');
}
@ -51,6 +53,7 @@ export class CleanProcessorService {
// 使われてないアンテナを停止
if (this.config.deactivateAntennaThreshold > 0) {
await this.collapsedQueueService.updateAntennaQueue.performAllNow();
await this.antennasRepository.update({
lastUsedAt: LessThan(new Date(this.timeService.now - this.config.deactivateAntennaThreshold)),
}, {

View file

@ -16,6 +16,7 @@ import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import ActiveUsersChart from '@/core/chart/charts/active-users.js';
import { CollapsedQueueService } from '@/core/CollapsedQueueService.js';
import { ApiError } from '../../error.js';
export const meta = {
@ -79,6 +80,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
private globalEventService: GlobalEventService,
private readonly activeUsersChart: ActiveUsersChart,
private readonly timeService: TimeService,
private readonly collapsedQueueService: CollapsedQueueService,
) {
super(meta, paramDef, async (ps, me) => {
const untilId = ps.untilId ?? (ps.untilDate ? this.idService.gen(ps.untilDate!) : null);
@ -96,9 +98,10 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
// falseだった場合はアンテナの配信先が増えたことを通知したい
const needPublishEvent = !antenna.isActive;
antenna.isActive = true;
antenna.lastUsedAt = this.timeService.date;
trackPromise(this.antennasRepository.update(antenna.id, antenna));
this.collapsedQueueService.updateAntennaQueue.enqueue(antenna.id, {
isActive: true,
lastUsedAt: this.timeService.date,
});
if (needPublishEvent) {
this.globalEventService.publishInternalEvent('antennaUpdated', antenna);