merge: Move user hibernation to scheduled system task (!1227)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1227 Approved-by: dakkar <dakkar@thenautilus.net> Approved-by: Marie <github@yuugi.dev>
This commit is contained in:
commit
68a3b4293c
12 changed files with 228 additions and 210 deletions
|
|
@ -13,7 +13,6 @@ import { ServerStatsService } from '@/daemons/ServerStatsService.js';
|
|||
import { ServerService } from '@/server/ServerService.js';
|
||||
import { MainModule } from '@/MainModule.js';
|
||||
import { EnvService } from '@/global/EnvService.js';
|
||||
import { ApLogCleanupService } from '@/daemons/ApLogCleanupService.js';
|
||||
|
||||
export async function server() {
|
||||
const app = await NestFactory.createApplicationContext(MainModule, {
|
||||
|
|
@ -32,7 +31,6 @@ export async function server() {
|
|||
if (!envService.options.noDaemons) {
|
||||
app.get(QueueStatsService).start();
|
||||
app.get(ServerStatsService).start();
|
||||
app.get(ApLogCleanupService).start();
|
||||
}
|
||||
|
||||
return app;
|
||||
|
|
|
|||
|
|
@ -1037,55 +1037,13 @@ export class NoteCreateService implements OnApplicationShutdown {
|
|||
}
|
||||
}
|
||||
|
||||
if (Math.random() < 0.1) {
|
||||
process.nextTick(() => {
|
||||
this.checkHibernation(followings);
|
||||
});
|
||||
}
|
||||
// checkHibernation moved to HibernateUsersProcessorService
|
||||
}
|
||||
|
||||
r.exec();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async checkHibernation(followings: MiFollowing[]) {
|
||||
if (followings.length === 0) return;
|
||||
|
||||
const shuffle = (array: MiFollowing[]) => {
|
||||
for (let i = array.length - 1; i > 0; i--) {
|
||||
const j = Math.floor(Math.random() * (i + 1));
|
||||
[array[i], array[j]] = [array[j], array[i]];
|
||||
}
|
||||
return array;
|
||||
};
|
||||
|
||||
// ランダムに最大1000件サンプリング
|
||||
const samples = shuffle(followings).slice(0, Math.min(followings.length, 1000));
|
||||
|
||||
const hibernatedUsers = await this.usersRepository.find({
|
||||
where: {
|
||||
id: In(samples.map(x => x.followerId)),
|
||||
lastActiveDate: LessThan(new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 50))),
|
||||
},
|
||||
select: ['id'],
|
||||
});
|
||||
|
||||
if (hibernatedUsers.length > 0) {
|
||||
await Promise.all([
|
||||
this.usersRepository.update({
|
||||
id: In(hibernatedUsers.map(x => x.id)),
|
||||
}, {
|
||||
isHibernated: true,
|
||||
}),
|
||||
this.followingsRepository.update({
|
||||
followerId: In(hibernatedUsers.map(x => x.id)),
|
||||
}, {
|
||||
isFollowerHibernated: true,
|
||||
}),
|
||||
this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])),
|
||||
]);
|
||||
}
|
||||
}
|
||||
// checkHibernation moved to HibernateUsersProcessorService
|
||||
|
||||
public checkProhibitedWordsContain(content: Parameters<UtilityService['concatNoteContentsForKeyWordCheck']>[0], prohibitedWords?: string[]) {
|
||||
if (prohibitedWords == null) {
|
||||
|
|
|
|||
|
|
@ -909,55 +909,13 @@ export class NoteEditService implements OnApplicationShutdown {
|
|||
}
|
||||
}
|
||||
|
||||
if (Math.random() < 0.1) {
|
||||
process.nextTick(() => {
|
||||
this.checkHibernation(followings);
|
||||
});
|
||||
}
|
||||
// checkHibernation moved to HibernateUsersProcessorService
|
||||
}
|
||||
|
||||
r.exec();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async checkHibernation(followings: MiFollowing[]) {
|
||||
if (followings.length === 0) return;
|
||||
|
||||
const shuffle = (array: MiFollowing[]) => {
|
||||
for (let i = array.length - 1; i > 0; i--) {
|
||||
const j = Math.floor(Math.random() * (i + 1));
|
||||
[array[i], array[j]] = [array[j], array[i]];
|
||||
}
|
||||
return array;
|
||||
};
|
||||
|
||||
// ランダムに最大1000件サンプリング
|
||||
const samples = shuffle(followings).slice(0, Math.min(followings.length, 1000));
|
||||
|
||||
const hibernatedUsers = await this.usersRepository.find({
|
||||
where: {
|
||||
id: In(samples.map(x => x.followerId)),
|
||||
lastActiveDate: LessThan(new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 50))),
|
||||
},
|
||||
select: ['id'],
|
||||
});
|
||||
|
||||
if (hibernatedUsers.length > 0) {
|
||||
await Promise.all([
|
||||
this.usersRepository.update({
|
||||
id: In(hibernatedUsers.map(x => x.id)),
|
||||
}, {
|
||||
isHibernated: true,
|
||||
}),
|
||||
this.followingsRepository.update({
|
||||
followerId: In(hibernatedUsers.map(x => x.id)),
|
||||
}, {
|
||||
isFollowerHibernated: true,
|
||||
}),
|
||||
this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])),
|
||||
]);
|
||||
}
|
||||
}
|
||||
// checkHibernation moved to HibernateUsersProcessorService
|
||||
|
||||
@bindThis
|
||||
private collapseNotesCount(oldValue: number, newValue: number) {
|
||||
|
|
|
|||
|
|
@ -78,62 +78,119 @@ export class QueueService implements OnModuleInit {
|
|||
|
||||
@bindThis
|
||||
public async onModuleInit() {
|
||||
await this.systemQueue.add('tickCharts', {
|
||||
}, {
|
||||
repeat: { pattern: '55 * * * *' },
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'tickCharts-scheduler',
|
||||
{ pattern: '0 * * * *' }, // every hour at :00
|
||||
{
|
||||
name: 'tickCharts',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.add('resyncCharts', {
|
||||
}, {
|
||||
repeat: { pattern: '0 0 * * *' },
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'resyncCharts-scheduler',
|
||||
{ pattern: '20 0 * * *' }, // every day at 00:20 (wait for tickCharts)
|
||||
{
|
||||
name: 'resyncCharts',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.add('cleanCharts', {
|
||||
}, {
|
||||
repeat: { pattern: '0 0 * * *' },
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'cleanCharts-scheduler',
|
||||
{ pattern: '40 0 * * *' }, // every day at 00:40 (wait for resyncCharts)
|
||||
{
|
||||
name: 'cleanCharts',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.add('aggregateRetention', {
|
||||
}, {
|
||||
repeat: { pattern: '0 0 * * *' },
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'aggregateRetention-scheduler',
|
||||
{ pattern: '0 1 * * *' }, // every day at 01:00
|
||||
{
|
||||
name: 'aggregateRetention',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.add('clean', {
|
||||
}, {
|
||||
repeat: { pattern: '0 0 * * *' },
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'clean-scheduler',
|
||||
{ pattern: '10 1 * * *' }, // every day at 01:10 (wait for aggregateRetention)
|
||||
{
|
||||
name: 'clean',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.add('checkExpiredMutings', {
|
||||
}, {
|
||||
repeat: { pattern: '*/5 * * * *' },
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'checkExpiredMutings-scheduler',
|
||||
{ pattern: '*/5 * * * *' }, // every 5 minutes
|
||||
{
|
||||
name: 'checkExpiredMutings',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.add('bakeBufferedReactions', {
|
||||
}, {
|
||||
repeat: { pattern: '0 0 * * *' },
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'bakeBufferedReactions-scheduler',
|
||||
{ pattern: '20 1 * * *' }, // every day at 01:40 (wait for clean)
|
||||
{
|
||||
name: 'bakeBufferedReactions',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.add('checkModeratorsActivity', {
|
||||
}, {
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'checkModeratorsActivity-scheduler',
|
||||
// 毎時30分に起動
|
||||
repeat: { pattern: '30 * * * *' },
|
||||
{ pattern: '30 * * * *' }, // every hour at :30
|
||||
{
|
||||
name: 'checkModeratorsActivity',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'cleanupApLogs-scheduler',
|
||||
{ pattern: '*/10 * * *' }, // every 10 minutes
|
||||
{
|
||||
name: 'cleanupApLogs',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
await this.systemQueue.upsertJobScheduler(
|
||||
'hibernateUsers-scheduler',
|
||||
{ pattern: '30 1 * * *' }, // every day at 01:30 (avoid bakeBufferedReactions)
|
||||
{
|
||||
name: 'hibernateUsers',
|
||||
opts: {
|
||||
removeOnComplete: 10,
|
||||
removeOnFail: 30,
|
||||
},
|
||||
});
|
||||
|
||||
// Slot '40 1 * * *' is available for future work
|
||||
// Slot '50 1 * * *' is available for future work
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
|||
|
|
@ -1,65 +0,0 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import { Injectable, type OnApplicationShutdown } from '@nestjs/common';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { LoggerService } from '@/core/LoggerService.js';
|
||||
import Logger from '@/logger.js';
|
||||
import { ApLogService } from '@/core/ApLogService.js';
|
||||
import { TimeService, type TimerHandle } from '@/global/TimeService.js';
|
||||
|
||||
// 10 minutes
|
||||
export const scanInterval = 1000 * 60 * 10;
|
||||
|
||||
@Injectable()
|
||||
export class ApLogCleanupService implements OnApplicationShutdown {
|
||||
private readonly logger: Logger;
|
||||
private scanTimer: TimerHandle | null = null;
|
||||
|
||||
constructor(
|
||||
private readonly apLogService: ApLogService,
|
||||
private readonly timeService: TimeService,
|
||||
|
||||
loggerService: LoggerService,
|
||||
) {
|
||||
this.logger = loggerService.getLogger('activity-log-cleanup');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async start(): Promise<void> {
|
||||
// Just in case start() gets called multiple times.
|
||||
this.dispose();
|
||||
|
||||
// Prune at startup, in case the server was rebooted during the interval.
|
||||
// noinspection ES6MissingAwait
|
||||
this.tick();
|
||||
|
||||
// Prune on a regular interval for the lifetime of the server.
|
||||
this.scanTimer = this.timeService.startTimer(this.tick, scanInterval, { repeated: true });
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async tick(): Promise<void> {
|
||||
try {
|
||||
const affected = await this.apLogService.deleteExpiredLogs();
|
||||
this.logger.info(`Activity Log cleanup complete; removed ${affected} expired logs.`);
|
||||
} catch (err) {
|
||||
this.logger.error('Activity Log cleanup failed:', err as Error);
|
||||
}
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public onApplicationShutdown(): void {
|
||||
this.dispose();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public dispose(): void {
|
||||
if (this.scanTimer) {
|
||||
this.timeService.stopTimer(this.scanTimer);
|
||||
this.scanTimer = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -7,7 +7,6 @@ import { Module } from '@nestjs/common';
|
|||
import { CoreModule } from '@/core/CoreModule.js';
|
||||
import { QueueStatsService } from './QueueStatsService.js';
|
||||
import { ServerStatsService } from './ServerStatsService.js';
|
||||
import { ApLogCleanupService } from './ApLogCleanupService.js';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
|
|
@ -16,12 +15,10 @@ import { ApLogCleanupService } from './ApLogCleanupService.js';
|
|||
providers: [
|
||||
QueueStatsService,
|
||||
ServerStatsService,
|
||||
ApLogCleanupService,
|
||||
],
|
||||
exports: [
|
||||
QueueStatsService,
|
||||
ServerStatsService,
|
||||
ApLogCleanupService,
|
||||
],
|
||||
})
|
||||
export class DaemonModule {}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,8 @@ import { AggregateRetentionProcessorService } from './processors/AggregateRetent
|
|||
import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js';
|
||||
import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js';
|
||||
import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js';
|
||||
import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js';
|
||||
import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
|
|
@ -89,6 +91,8 @@ import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostP
|
|||
CheckModeratorsActivityProcessorService,
|
||||
QueueProcessorService,
|
||||
ScheduleNotePostProcessorService,
|
||||
CleanupApLogsProcessorService,
|
||||
HibernateUsersProcessorService,
|
||||
],
|
||||
exports: [
|
||||
QueueProcessorService,
|
||||
|
|
|
|||
|
|
@ -51,6 +51,8 @@ import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostP
|
|||
import { QueueLoggerService } from './QueueLoggerService.js';
|
||||
import { QUEUE, baseWorkerOptions } from './const.js';
|
||||
import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js';
|
||||
import { CleanupApLogsProcessorService } from './processors/CleanupApLogsProcessorService.js';
|
||||
import { HibernateUsersProcessorService } from './processors/HibernateUsersProcessorService.js';
|
||||
|
||||
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
|
||||
function httpRelatedBackoff(attemptsMade: number) {
|
||||
|
|
@ -136,6 +138,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
private cleanProcessorService: CleanProcessorService,
|
||||
private scheduleNotePostProcessorService: ScheduleNotePostProcessorService,
|
||||
private readonly timeService: TimeService,
|
||||
private readonly cleanupApLogsProcessorService: CleanupApLogsProcessorService,
|
||||
private readonly hibernateUsersProcessorService: HibernateUsersProcessorService,
|
||||
) {
|
||||
this.logger = this.queueLoggerService.logger;
|
||||
|
||||
|
|
@ -156,6 +160,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process();
|
||||
case 'checkModeratorsActivity': return this.checkModeratorsActivityProcessorService.process();
|
||||
case 'clean': return this.cleanProcessorService.process();
|
||||
case 'cleanupApLogs': return this.cleanupApLogsProcessorService.process();
|
||||
case 'hibernateUsers': return this.hibernateUsersProcessorService.process();
|
||||
default: throw new Error(`unrecognized job type ${job.name} for system`);
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ export class AggregateRetentionProcessorService {
|
|||
const data = deepClone(record.data);
|
||||
data[dateKey] = retention;
|
||||
|
||||
this.retentionAggregationsRepository.update(record.id, {
|
||||
await this.retentionAggregationsRepository.update(record.id, {
|
||||
updatedAt: now,
|
||||
data,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -45,13 +45,13 @@ export class CleanProcessorService {
|
|||
public async process(): Promise<void> {
|
||||
this.logger.info('Cleaning...');
|
||||
|
||||
this.userIpsRepository.delete({
|
||||
await this.userIpsRepository.delete({
|
||||
createdAt: LessThan(new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 90))),
|
||||
});
|
||||
|
||||
// 使われてないアンテナを停止
|
||||
if (this.config.deactivateAntennaThreshold > 0) {
|
||||
this.antennasRepository.update({
|
||||
await this.antennasRepository.update({
|
||||
lastUsedAt: LessThan(new Date(this.timeService.now - this.config.deactivateAntennaThreshold)),
|
||||
}, {
|
||||
isActive: false,
|
||||
|
|
@ -69,7 +69,7 @@ export class CleanProcessorService {
|
|||
});
|
||||
}
|
||||
|
||||
this.reversiService.cleanOutdatedGames();
|
||||
await this.reversiService.cleanOutdatedGames();
|
||||
|
||||
this.logger.info('Cleaned.');
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
|
||||
import Logger from '@/logger.js';
|
||||
import { ApLogService } from '@/core/ApLogService.js';
|
||||
|
||||
@Injectable()
|
||||
export class CleanupApLogsProcessorService {
|
||||
private readonly logger: Logger;
|
||||
|
||||
constructor(
|
||||
private readonly apLogService: ApLogService,
|
||||
queueLoggerService: QueueLoggerService,
|
||||
) {
|
||||
this.logger = queueLoggerService.logger.createSubLogger('activity-log-cleanup');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async process(): Promise<void> {
|
||||
try {
|
||||
const affected = await this.apLogService.deleteExpiredLogs();
|
||||
this.logger.info(`Activity Log cleanup complete; removed ${affected} expired logs.`);
|
||||
} catch (err) {
|
||||
this.logger.error('Activity Log cleanup failed:', err as Error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { In, LessThan } from 'typeorm';
|
||||
import { QueueLoggerService } from '@/queue/QueueLoggerService.js';
|
||||
import type Logger from '@/logger.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { CacheService } from '@/core/CacheService.js';
|
||||
import { TimeService } from '@/global/TimeService.js';
|
||||
import type { FollowingsRepository, UsersRepository } from '@/models/_.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
|
||||
@Injectable()
|
||||
export class HibernateUsersProcessorService {
|
||||
private readonly logger: Logger;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.usersRepository)
|
||||
private readonly usersRepository: UsersRepository,
|
||||
|
||||
@Inject(DI.followingsRepository)
|
||||
private readonly followingsRepository: FollowingsRepository,
|
||||
|
||||
private readonly cacheService: CacheService,
|
||||
private readonly timeService: TimeService,
|
||||
|
||||
queueLoggerService: QueueLoggerService,
|
||||
) {
|
||||
this.logger = queueLoggerService.logger.createSubLogger('hibernate-users');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async process() {
|
||||
try {
|
||||
let totalHibernated = 0;
|
||||
|
||||
// Any users last active *before* this date should be hibernated
|
||||
const hibernationThreshold = new Date(this.timeService.now - (1000 * 60 * 60 * 24 * 50));
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
while (true) {
|
||||
// Work in batches of 100
|
||||
const page = await this.usersRepository.find({
|
||||
where: { isHibernated: false, lastActiveDate: LessThan(hibernationThreshold) },
|
||||
select: { id: true },
|
||||
take: 100,
|
||||
}) as { id: string }[];
|
||||
const ids = page.map(u => u.id);
|
||||
|
||||
// Stop when we get them all
|
||||
if (ids.length < 1) break;
|
||||
|
||||
await this.usersRepository.update({ id: In(ids) }, { isHibernated: true });
|
||||
await this.followingsRepository.update({ followerId: In(ids) }, { isFollowerHibernated: true });
|
||||
await this.cacheService.hibernatedUserCache.refreshMany(ids);
|
||||
|
||||
totalHibernated += ids.length;
|
||||
}
|
||||
|
||||
if (totalHibernated > 0) {
|
||||
this.logger.info(`Hibernated ${totalHibernated} inactive users`);
|
||||
} else {
|
||||
this.logger.debug('Skipping hibernation: nothing to do');
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.error(`Error hibernating users: ${renderInlineError(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue