diff --git a/packages/backend/src/core/FederatedInstanceService.ts b/packages/backend/src/core/FederatedInstanceService.ts index 34df10f0ff..1b97820541 100644 --- a/packages/backend/src/core/FederatedInstanceService.ts +++ b/packages/backend/src/core/FederatedInstanceService.ts @@ -5,37 +5,72 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import * as Redis from 'ioredis'; +import { In } from 'typeorm'; import type { InstancesRepository, MiMeta } from '@/models/_.js'; import type { MiInstance } from '@/models/Instance.js'; -import { MemoryKVCache } from '@/misc/cache.js'; import { IdService } from '@/core/IdService.js'; import { DI } from '@/di-symbols.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; -import type { GlobalEvents } from '@/core/GlobalEventService.js'; -import { Serialized } from '@/types.js'; -import { diffArrays, diffArraysSimple } from '@/misc/diff-arrays.js'; +import { diffArraysSimple } from '@/misc/diff-arrays.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; +import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js'; @Injectable() export class FederatedInstanceService implements OnApplicationShutdown { - private readonly federatedInstanceCache: MemoryKVCache; + public readonly federatedInstanceCache: QuantumKVCache; constructor( - @Inject(DI.redisForSub) - private redisForSub: Redis.Redis, - @Inject(DI.instancesRepository) private instancesRepository: InstancesRepository, + @Inject(DI.meta) + private readonly meta: MiMeta, + private utilityService: UtilityService, private idService: IdService, + private readonly internalEventService: InternalEventService, ) { - this.federatedInstanceCache = new MemoryKVCache(1000 * 60 * 3); // 3m - this.redisForSub.on('message', this.onMessage); + this.federatedInstanceCache = new QuantumKVCache(this.internalEventService, 'federatedInstance', { + lifetime: 1000 * 60 * 3, // 3 minutes + fetcher: async key => { + const host = this.utilityService.toPuny(key); + let instance = await this.instancesRepository.findOneBy({ host }); + if (instance == null) { + await this.instancesRepository.createQueryBuilder('instance') + .insert() + .values({ + id: this.idService.gen(), + host, + firstRetrievedAt: new Date(), + isBlocked: this.utilityService.isBlockedHost(host), + isSilenced: this.utilityService.isSilencedHost(host), + isMediaSilenced: this.utilityService.isMediaSilencedHost(host), + isAllowListed: this.utilityService.isAllowListedHost(host), + isBubbled: this.utilityService.isBubbledHost(host), + }) + .orIgnore() + .execute(); + + instance = await this.instancesRepository.findOneByOrFail({ host }); + } + return instance; + }, + bulkFetcher: async keys => { + const hosts = keys.map(key => this.utilityService.toPuny(key)); + const instances = await this.instancesRepository.findBy({ host: In(hosts) }); + return new Map(instances.map(i => [i.host, i])); + }, + }); + + this.internalEventService.on('metaUpdated', this.onMetaUpdated); } @bindThis public async fetchOrRegister(host: string): Promise { + return this.federatedInstanceCache.fetch(host); + /* host = this.utilityService.toPuny(host); const cached = this.federatedInstanceCache.get(host); @@ -61,12 +96,15 @@ export class FederatedInstanceService implements OnApplicationShutdown { index = await this.instancesRepository.findOneByOrFail({ host }); } - this.federatedInstanceCache.set(host, index); + await this.federatedInstanceCache.set(host, index); return index; + */ } @bindThis - public async fetch(host: string): Promise { + public async fetch(host: string): Promise { + return this.federatedInstanceCache.fetch(host); + /* host = this.utilityService.toPuny(host); const cached = this.federatedInstanceCache.get(host); @@ -75,29 +113,54 @@ export class FederatedInstanceService implements OnApplicationShutdown { const index = await this.instancesRepository.findOneBy({ host }); if (index == null) { - this.federatedInstanceCache.set(host, null); + await this.federatedInstanceCache.set(host, null); return null; } else { - this.federatedInstanceCache.set(host, index); + await this.federatedInstanceCache.set(host, index); return index; } + */ } @bindThis - public async update(id: MiInstance['id'], data: Partial): Promise { + public async update(id: MiInstance['id'], data: QueryDeepPartialEntity): Promise { const result = await this.instancesRepository.createQueryBuilder().update() .set(data) .where('id = :id', { id }) .returning('*') .execute() .then((response) => { - return response.raw[0]; + return response.raw[0] as MiInstance; }); - this.federatedInstanceCache.set(result.host, result); + await this.federatedInstanceCache.set(result.host, result); + + return result; } - private syncCache(before: Serialized, after: Serialized): void { + /** + * Gets all instances in the allowlist (meta.federationHosts). + */ + @bindThis + public async getAllowList(): Promise { + const allowedHosts = new Set(this.meta.federationHosts); + this.meta.blockedHosts.forEach(h => allowedHosts.delete(h)); + + const instances = await this.federatedInstanceCache.fetchMany(this.meta.federationHosts); + return instances.map(i => i[1]); + } + + /** + * Gets all instances in the denylist (meta.blockedHosts). + */ + @bindThis + public async getDenyList(): Promise { + const instances = await this.federatedInstanceCache.fetchMany(this.meta.blockedHosts); + return instances.map(i => i[1]); + } + + // This gets fired *in each process* so don't do anything to trigger cache notifications! + private syncCache(before: MiMeta | undefined, after: MiMeta): void { const changed = diffArraysSimple(before?.blockedHosts, after.blockedHosts) || diffArraysSimple(before?.silencedHosts, after.silencedHosts) || @@ -112,20 +175,13 @@ export class FederatedInstanceService implements OnApplicationShutdown { } @bindThis - private async onMessage(_: string, data: string): Promise { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as GlobalEvents['internal']['payload']; - if (type === 'metaUpdated') { - this.syncCache(body.before, body.after); - } - } + private async onMetaUpdated(body: { before?: MiMeta; after: MiMeta; }) { + this.syncCache(body.before, body.after); } @bindThis public dispose(): void { - this.redisForSub.off('message', this.onMessage); + this.internalEventService.off('metaUpdated', this.onMetaUpdated); this.federatedInstanceCache.dispose(); }