diff --git a/packages/backend/src/core/CollapsedQueueService.ts b/packages/backend/src/core/CollapsedQueueService.ts index 406f92f103..f142e98e35 100644 --- a/packages/backend/src/core/CollapsedQueueService.ts +++ b/packages/backend/src/core/CollapsedQueueService.ts @@ -12,9 +12,10 @@ import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { EnvService } from '@/core/EnvService.js'; import { bindThis } from '@/decorators.js'; import { InternalEventService } from '@/core/InternalEventService.js'; -import type { UsersRepository, NotesRepository, AccessTokensRepository, MiAntenna, AntennasRepository } from '@/models/_.js'; +import type { UsersRepository, NotesRepository, AccessTokensRepository, MiAntenna, FollowingsRepository } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; import { AntennaService } from '@/core/AntennaService.js'; +import { CacheService } from '@/core/CacheService.js'; export type UpdateInstanceJob = { latestRequestReceivedAt?: Date, @@ -30,6 +31,7 @@ export type UpdateInstanceJob = { export type UpdateUserJob = { updatedAt?: Date, + lastActiveDate?: Date, notesCountDelta?: number, followingCountDelta?: number, followersCountDelta?: number, @@ -66,21 +68,22 @@ export class CollapsedQueueService implements OnApplicationShutdown { constructor( @Inject(DI.usersRepository) - public readonly usersRepository: UsersRepository, + private readonly usersRepository: UsersRepository, @Inject(DI.notesRepository) - public readonly notesRepository: NotesRepository, + private readonly notesRepository: NotesRepository, @Inject(DI.accessTokensRepository) - public readonly accessTokensRepository: AccessTokensRepository, + private readonly accessTokensRepository: AccessTokensRepository, - @Inject(DI.antennasRepository) - public readonly antennasRepository: AntennasRepository, + @Inject(DI.followingsRepository) + private readonly followingsRepository: FollowingsRepository, private readonly federatedInstanceService: FederatedInstanceService, private readonly envService: EnvService, private readonly internalEventService: InternalEventService, private readonly antennaService: AntennaService, + private readonly cacheService: CacheService, loggerService: LoggerService, ) { @@ -171,20 +174,34 @@ export class CollapsedQueueService implements OnApplicationShutdown { oneMinuteInterval, (oldJob, newJob) => ({ updatedAt: maxDate(oldJob.updatedAt, newJob.updatedAt), + lastActiveDate: maxDate(oldJob.lastActiveDate, newJob.lastActiveDate), notesCountDelta: (oldJob.notesCountDelta ?? 0) + (newJob.notesCountDelta ?? 0), followingCountDelta: (oldJob.followingCountDelta ?? 0) + (newJob.followingCountDelta ?? 0), followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0), }), async (id, job) => { // Have to check this because all properties are optional - if (job.updatedAt || job.notesCountDelta || job.followingCountDelta || job.followersCountDelta) { + if (job.updatedAt || job.lastActiveDate || job.notesCountDelta || job.followingCountDelta || job.followersCountDelta) { + // Updating the user should implicitly mark them as active + const lastActiveDate = job.lastActiveDate ?? job.updatedAt; + const isWakingUp = lastActiveDate && (await this.cacheService.findUserById(id)).isHibernated; + + // Update user before the hibernation cache, because the latter may refresh from DB await this.usersRepository.update({ id }, { updatedAt: job.updatedAt, + lastActiveDate, + isHibernated: isWakingUp ? false : undefined, notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined, followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined, followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined, }); await this.internalEventService.emit('userUpdated', { id }); + + // Wake up hibernated users + if (isWakingUp) { + await this.followingsRepository.update({ followerId: id }, { isFollowerHibernated: false }); + await this.cacheService.hibernatedUserCache.set(id, false); + } } }, { @@ -195,6 +212,9 @@ export class CollapsedQueueService implements OnApplicationShutdown { updatedAt: data.updatedAt != null ? new Date(data.updatedAt) : data.updatedAt, + lastActiveDate: data.lastActiveDate != null + ? new Date(data.lastActiveDate) + : data.lastActiveDate, }), }, ); diff --git a/packages/backend/src/core/UserService.ts b/packages/backend/src/core/UserService.ts index 69efb3a230..845c83effc 100644 --- a/packages/backend/src/core/UserService.ts +++ b/packages/backend/src/core/UserService.ts @@ -10,7 +10,7 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { SystemWebhookService } from '@/core/SystemWebhookService.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; -import { CacheService } from '@/core/CacheService.js'; +import { CollapsedQueueService } from '@/global/CollapsedQueueService.js'; import { TimeService } from '@/global/TimeService.js'; @Injectable() @@ -22,43 +22,14 @@ export class UserService { private followingsRepository: FollowingsRepository, private systemWebhookService: SystemWebhookService, private userEntityService: UserEntityService, - private readonly cacheService: CacheService, + private readonly collapsedQueueService: CollapsedQueueService, private readonly timeService: TimeService, ) { } @bindThis public async updateLastActiveDate(user: MiUser): Promise { - if (user.isHibernated) { - const result = await this.usersRepository.createQueryBuilder().update() - .set({ - lastActiveDate: this.timeService.date, - }) - .where('id = :id', { id: user.id }) - .returning('*') - .execute() - .then((response) => { - return response.raw[0]; - }); - const wokeUp = result.isHibernated; - if (wokeUp) { - await Promise.all([ - this.usersRepository.update(user.id, { - isHibernated: false, - }), - this.followingsRepository.update({ - followerId: user.id, - }, { - isFollowerHibernated: false, - }), - this.cacheService.hibernatedUserCache.set(user.id, false), - ]); - } - } else { - this.usersRepository.update(user.id, { - lastActiveDate: this.timeService.date, - }); - } + await this.collapsedQueueService.updateUserQueue.enqueue(user.id, { lastActiveDate: this.timeService.date }); } /** diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 20d8fc4ca2..88e6574263 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -271,11 +271,11 @@ export class StreamingApiServerService implements OnApplicationShutdown { this.#connections.set(connection, this.timeService.now); // TODO use collapsed queue - const userUpdateIntervalId = user ? this.timeService.startTimer(() => { - this.usersService.updateLastActiveDate(user); + const userUpdateIntervalId = user ? this.timeService.startTimer(async () => { + await this.usersService.updateLastActiveDate(user); }, 1000 * 60 * 5, { repeated: true }) : null; if (user) { - this.usersService.updateLastActiveDate(user); + await this.usersService.updateLastActiveDate(user); } const pong = () => { this.#connections.set(connection, this.timeService.now);