diff --git a/packages/backend/src/boot/common.ts b/packages/backend/src/boot/common.ts index 8ba25c7c0b..34c8307dfc 100644 --- a/packages/backend/src/boot/common.ts +++ b/packages/backend/src/boot/common.ts @@ -13,7 +13,6 @@ import { ServerStatsService } from '@/daemons/ServerStatsService.js'; import { ServerService } from '@/server/ServerService.js'; import { MainModule } from '@/MainModule.js'; import { EnvService } from '@/global/EnvService.js'; -import { ApLogCleanupService } from '@/daemons/ApLogCleanupService.js'; export async function server() { const app = await NestFactory.createApplicationContext(MainModule, { @@ -32,7 +31,6 @@ export async function server() { if (!envService.options.noDaemons) { app.get(QueueStatsService).start(); app.get(ServerStatsService).start(); - app.get(ApLogCleanupService).start(); } return app; diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index e7cfbd136b..fd55c33bfb 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -1037,55 +1037,13 @@ export class NoteCreateService implements OnApplicationShutdown { } } - if (Math.random() < 0.1) { - process.nextTick(() => { - this.checkHibernation(followings); - }); - } + // checkHibernation moved to HibernateUsersProcessorService } r.exec(); } - @bindThis - public async checkHibernation(followings: MiFollowing[]) { - if (followings.length === 0) return; - - const shuffle = (array: MiFollowing[]) => { - for (let i = array.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [array[i], array[j]] = [array[j], array[i]]; - } - return array; - }; - - // ランダムに最大1000件サンプリング - const samples = shuffle(followings).slice(0, Math.min(followings.length, 1000)); - - const hibernatedUsers = await this.usersRepository.find({ - where: { - id: In(samples.map(x => x.followerId)), - lastActiveDate: LessThan(new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 50))), - }, - select: ['id'], - }); - - if (hibernatedUsers.length > 0) { - await Promise.all([ - this.usersRepository.update({ - id: In(hibernatedUsers.map(x => x.id)), - }, { - isHibernated: true, - }), - this.followingsRepository.update({ - followerId: In(hibernatedUsers.map(x => x.id)), - }, { - isFollowerHibernated: true, - }), - this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])), - ]); - } - } + // checkHibernation moved to HibernateUsersProcessorService public checkProhibitedWordsContain(content: Parameters[0], prohibitedWords?: string[]) { if (prohibitedWords == null) { diff --git a/packages/backend/src/core/NoteEditService.ts b/packages/backend/src/core/NoteEditService.ts index 17daf386d6..20afc4e63c 100644 --- a/packages/backend/src/core/NoteEditService.ts +++ b/packages/backend/src/core/NoteEditService.ts @@ -909,55 +909,13 @@ export class NoteEditService implements OnApplicationShutdown { } } - if (Math.random() < 0.1) { - process.nextTick(() => { - this.checkHibernation(followings); - }); - } + // checkHibernation moved to HibernateUsersProcessorService } r.exec(); } - @bindThis - public async checkHibernation(followings: MiFollowing[]) { - if (followings.length === 0) return; - - const shuffle = (array: MiFollowing[]) => { - for (let i = array.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [array[i], array[j]] = [array[j], array[i]]; - } - return array; - }; - - // ランダムに最大1000件サンプリング - const samples = shuffle(followings).slice(0, Math.min(followings.length, 1000)); - - const hibernatedUsers = await this.usersRepository.find({ - where: { - id: In(samples.map(x => x.followerId)), - lastActiveDate: LessThan(new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 50))), - }, - select: ['id'], - }); - - if (hibernatedUsers.length > 0) { - await Promise.all([ - this.usersRepository.update({ - id: In(hibernatedUsers.map(x => x.id)), - }, { - isHibernated: true, - }), - this.followingsRepository.update({ - followerId: In(hibernatedUsers.map(x => x.id)), - }, { - isFollowerHibernated: true, - }), - this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])), - ]); - } - } + // checkHibernation moved to HibernateUsersProcessorService @bindThis private collapseNotesCount(oldValue: number, newValue: number) { diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 5020614676..da5788f143 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -78,62 +78,119 @@ export class QueueService implements OnModuleInit { @bindThis public async onModuleInit() { - await this.systemQueue.add('tickCharts', { - }, { - repeat: { pattern: '55 * * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + await this.systemQueue.upsertJobScheduler( + 'tickCharts-scheduler', + { pattern: '0 * * * *' }, // every hour at :00 + { + name: 'tickCharts', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); - await this.systemQueue.add('resyncCharts', { - }, { - repeat: { pattern: '0 0 * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + await this.systemQueue.upsertJobScheduler( + 'resyncCharts-scheduler', + { pattern: '20 0 * * *' }, // every day at 00:20 (wait for tickCharts) + { + name: 'resyncCharts', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); - await this.systemQueue.add('cleanCharts', { - }, { - repeat: { pattern: '0 0 * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + await this.systemQueue.upsertJobScheduler( + 'cleanCharts-scheduler', + { pattern: '40 0 * * *' }, // every day at 00:40 (wait for resyncCharts) + { + name: 'cleanCharts', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); - await this.systemQueue.add('aggregateRetention', { - }, { - repeat: { pattern: '0 0 * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + await this.systemQueue.upsertJobScheduler( + 'aggregateRetention-scheduler', + { pattern: '0 1 * * *' }, // every day at 01:00 + { + name: 'aggregateRetention', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); - await this.systemQueue.add('clean', { - }, { - repeat: { pattern: '0 0 * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + await this.systemQueue.upsertJobScheduler( + 'clean-scheduler', + { pattern: '10 1 * * *' }, // every day at 01:10 (wait for aggregateRetention) + { + name: 'clean', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); - await this.systemQueue.add('checkExpiredMutings', { - }, { - repeat: { pattern: '*/5 * * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + await this.systemQueue.upsertJobScheduler( + 'checkExpiredMutings-scheduler', + { pattern: '*/5 * * * *' }, // every 5 minutes + { + name: 'checkExpiredMutings', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); - await this.systemQueue.add('bakeBufferedReactions', { - }, { - repeat: { pattern: '0 0 * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + await this.systemQueue.upsertJobScheduler( + 'bakeBufferedReactions-scheduler', + { pattern: '20 1 * * *' }, // every day at 01:40 (wait for clean) + { + name: 'bakeBufferedReactions', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); - await this.systemQueue.add('checkModeratorsActivity', { - }, { + await this.systemQueue.upsertJobScheduler( + 'checkModeratorsActivity-scheduler', // 毎時30分に起動 - repeat: { pattern: '30 * * * *' }, - removeOnComplete: 10, - removeOnFail: 30, - }); + { pattern: '30 * * * *' }, // every hour at :30 + { + name: 'checkModeratorsActivity', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); + + await this.systemQueue.upsertJobScheduler( + 'cleanupApLogs-scheduler', + { pattern: '*/10 * * *' }, // every 10 minutes + { + name: 'cleanupApLogs', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); + + await this.systemQueue.upsertJobScheduler( + 'hibernateUsers-scheduler', + { pattern: '30 1 * * *' }, // every day at 01:30 (avoid bakeBufferedReactions) + { + name: 'hibernateUsers', + opts: { + removeOnComplete: 10, + removeOnFail: 30, + }, + }); + + // Slot '40 1 * * *' is available for future work + // Slot '50 1 * * *' is available for future work } @bindThis diff --git a/packages/backend/src/daemons/ApLogCleanupService.ts b/packages/backend/src/daemons/ApLogCleanupService.ts deleted file mode 100644 index d3f09bf660..0000000000 --- a/packages/backend/src/daemons/ApLogCleanupService.ts +++ /dev/null @@ -1,65 +0,0 @@ -/* - * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import { Injectable, type OnApplicationShutdown } from '@nestjs/common'; -import { bindThis } from '@/decorators.js'; -import { LoggerService } from '@/core/LoggerService.js'; -import Logger from '@/logger.js'; -import { ApLogService } from '@/core/ApLogService.js'; -import { TimeService, type TimerHandle } from '@/global/TimeService.js'; - -// 10 minutes -export const scanInterval = 1000 * 60 * 10; - -@Injectable() -export class ApLogCleanupService implements OnApplicationShutdown { - private readonly logger: Logger; - private scanTimer: TimerHandle | null = null; - - constructor( - private readonly apLogService: ApLogService, - private readonly timeService: TimeService, - - loggerService: LoggerService, - ) { - this.logger = loggerService.getLogger('activity-log-cleanup'); - } - - @bindThis - public async start(): Promise { - // Just in case start() gets called multiple times. - this.dispose(); - - // Prune at startup, in case the server was rebooted during the interval. - // noinspection ES6MissingAwait - this.tick(); - - // Prune on a regular interval for the lifetime of the server. - this.scanTimer = this.timeService.startTimer(this.tick, scanInterval, { repeated: true }); - } - - @bindThis - private async tick(): Promise { - try { - const affected = await this.apLogService.deleteExpiredLogs(); - this.logger.info(`Activity Log cleanup complete; removed ${affected} expired logs.`); - } catch (err) { - this.logger.error('Activity Log cleanup failed:', err as Error); - } - } - - @bindThis - public onApplicationShutdown(): void { - this.dispose(); - } - - @bindThis - public dispose(): void { - if (this.scanTimer) { - this.timeService.stopTimer(this.scanTimer); - this.scanTimer = null; - } - } -} diff --git a/packages/backend/src/daemons/DaemonModule.ts b/packages/backend/src/daemons/DaemonModule.ts index 286fba56f3..00c5e2c847 100644 --- a/packages/backend/src/daemons/DaemonModule.ts +++ b/packages/backend/src/daemons/DaemonModule.ts @@ -7,7 +7,6 @@ import { Module } from '@nestjs/common'; import { CoreModule } from '@/core/CoreModule.js'; import { QueueStatsService } from './QueueStatsService.js'; import { ServerStatsService } from './ServerStatsService.js'; -import { ApLogCleanupService } from './ApLogCleanupService.js'; @Module({ imports: [ @@ -16,12 +15,10 @@ import { ApLogCleanupService } from './ApLogCleanupService.js'; providers: [ QueueStatsService, ServerStatsService, - ApLogCleanupService, ], exports: [ QueueStatsService, ServerStatsService, - ApLogCleanupService, ], }) export class DaemonModule {} diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index cb26a06529..b6469229d2 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -44,6 +44,8 @@ import { AggregateRetentionProcessorService } from './processors/AggregateRetent import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js'; import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js'; import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; +import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js'; +import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js'; @Module({ imports: [ @@ -89,6 +91,8 @@ import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostP CheckModeratorsActivityProcessorService, QueueProcessorService, ScheduleNotePostProcessorService, + CleanupApLogsProcessorService, + HibernateUsersProcessorService, ], exports: [ QueueProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 35dc812652..dd94fffb36 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -51,6 +51,8 @@ import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostP import { QueueLoggerService } from './QueueLoggerService.js'; import { QUEUE, baseWorkerOptions } from './const.js'; import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; +import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js'; +import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js'; // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 function httpRelatedBackoff(attemptsMade: number) { @@ -136,6 +138,8 @@ export class QueueProcessorService implements OnApplicationShutdown { private cleanProcessorService: CleanProcessorService, private scheduleNotePostProcessorService: ScheduleNotePostProcessorService, private readonly timeService: TimeService, + private readonly cleanupApLogsProcessorService: CleanupApLogsProcessorService, + private readonly hibernateUsersProcessorService: HibernateUsersProcessorService, ) { this.logger = this.queueLoggerService.logger; @@ -156,6 +160,8 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process(); case 'checkModeratorsActivity': return this.checkModeratorsActivityProcessorService.process(); case 'clean': return this.cleanProcessorService.process(); + case 'cleanupApLogs': return this.cleanupApLogsProcessorService.process(); + case 'hibernateUsers': return this.hibernateUsersProcessorService.process(); default: throw new Error(`unrecognized job type ${job.name} for system`); } }; diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts index 6f71b42fa9..7d346a2f5d 100644 --- a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts +++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts @@ -83,7 +83,7 @@ export class AggregateRetentionProcessorService { const data = deepClone(record.data); data[dateKey] = retention; - this.retentionAggregationsRepository.update(record.id, { + await this.retentionAggregationsRepository.update(record.id, { updatedAt: now, data, }); diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts index 149d72de6a..f678801e08 100644 --- a/packages/backend/src/queue/processors/CleanProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanProcessorService.ts @@ -45,13 +45,13 @@ export class CleanProcessorService { public async process(): Promise { this.logger.info('Cleaning...'); - this.userIpsRepository.delete({ + await this.userIpsRepository.delete({ createdAt: LessThan(new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 90))), }); // 使われてないアンテナを停止 if (this.config.deactivateAntennaThreshold > 0) { - this.antennasRepository.update({ + await this.antennasRepository.update({ lastUsedAt: LessThan(new Date(this.timeService.now - this.config.deactivateAntennaThreshold)), }, { isActive: false, @@ -69,7 +69,7 @@ export class CleanProcessorService { }); } - this.reversiService.cleanOutdatedGames(); + await this.reversiService.cleanOutdatedGames(); this.logger.info('Cleaned.'); } diff --git a/packages/backend/src/queue/processors/CleanupApLogsProcessorService.ts b/packages/backend/src/queue/processors/CleanupApLogsProcessorService.ts new file mode 100644 index 0000000000..1869d8bd79 --- /dev/null +++ b/packages/backend/src/queue/processors/CleanupApLogsProcessorService.ts @@ -0,0 +1,32 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable } from '@nestjs/common'; +import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '@/queue/QueueLoggerService.js'; +import Logger from '@/logger.js'; +import { ApLogService } from '@/core/ApLogService.js'; + +@Injectable() +export class CleanupApLogsProcessorService { + private readonly logger: Logger; + + constructor( + private readonly apLogService: ApLogService, + queueLoggerService: QueueLoggerService, + ) { + this.logger = queueLoggerService.logger.createSubLogger('activity-log-cleanup'); + } + + @bindThis + public async process(): Promise { + try { + const affected = await this.apLogService.deleteExpiredLogs(); + this.logger.info(`Activity Log cleanup complete; removed ${affected} expired logs.`); + } catch (err) { + this.logger.error('Activity Log cleanup failed:', err as Error); + } + } +} diff --git a/packages/backend/src/queue/processors/HibernateUsersProcessorService.ts b/packages/backend/src/queue/processors/HibernateUsersProcessorService.ts new file mode 100644 index 0000000000..892c3128aa --- /dev/null +++ b/packages/backend/src/queue/processors/HibernateUsersProcessorService.ts @@ -0,0 +1,73 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import { In, LessThan } from 'typeorm'; +import { QueueLoggerService } from '@/queue/QueueLoggerService.js'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; +import { CacheService } from '@/core/CacheService.js'; +import { TimeService } from '@/global/TimeService.js'; +import type { FollowingsRepository, UsersRepository } from '@/models/_.js'; +import { DI } from '@/di-symbols.js'; + +@Injectable() +export class HibernateUsersProcessorService { + private readonly logger: Logger; + + constructor( + @Inject(DI.usersRepository) + private readonly usersRepository: UsersRepository, + + @Inject(DI.followingsRepository) + private readonly followingsRepository: FollowingsRepository, + + private readonly cacheService: CacheService, + private readonly timeService: TimeService, + + queueLoggerService: QueueLoggerService, + ) { + this.logger = queueLoggerService.logger.createSubLogger('hibernate-users'); + } + + @bindThis + public async process() { + try { + let totalHibernated = 0; + + // Any users last active *before* this date should be hibernated + const hibernationThreshold = new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 50)); + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + // Work in batches of 100 + const page = await this.usersRepository.find({ + where: { isHibernated: false, lastActiveDate: LessThan(hibernationThreshold) }, + select: { id: true }, + take: 100, + }) as { id: string }[]; + const ids = page.map(u => u.id); + + // Stop when we get them all + if (ids.length < 1) break; + + await this.usersRepository.update({ id: In(ids) }, { isHibernated: true }); + await this.followingsRepository.update({ followerId: In(ids) }, { isFollowerHibernated: true }); + await this.cacheService.hibernatedUserCache.refreshMany(ids); + + totalHibernated += ids.length; + } + + if (totalHibernated > 0) { + this.logger.info(`Hibernated ${totalHibernated} inactive users`); + } else { + this.logger.debug('Skipping hibernation: nothing to do'); + } + } catch (err) { + this.logger.error(`Error hibernating users: ${renderInlineError(err)}`); + } + } +}