fix rate limits under multi-node environments

This commit is contained in:
Hazelnoot 2024-12-09 19:04:06 -05:00
parent 92ffd2a5fc
commit 9daafca155
2 changed files with 457 additions and 274 deletions

View file

@ -5,16 +5,13 @@
import { Inject, Injectable } from '@nestjs/common';
import Redis from 'ioredis';
import { LoggerService } from '@/core/LoggerService.js';
import { TimeService } from '@/core/TimeService.js';
import { EnvService } from '@/core/EnvService.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed } from '@/misc/rate-limit-utils.js';
@Injectable()
export class SkRateLimiterService {
private readonly logger: Logger;
private readonly disabled: boolean;
constructor(
@ -24,14 +21,10 @@ export class SkRateLimiterService {
@Inject(DI.redis)
private readonly redisClient: Redis.Redis,
@Inject(LoggerService)
loggerService: LoggerService,
@Inject(EnvService)
envService: EnvService,
) {
this.logger = loggerService.getLogger('limiter');
this.disabled = envService.env.NODE_ENV !== 'production'; // TODO disable in TEST *only*
this.disabled = envService.env.NODE_ENV !== 'production';
}
public async limit(limit: Keyed<RateLimit>, actor: string, factor = 1): Promise<LimitInfo> {
@ -50,10 +43,25 @@ export class SkRateLimiterService {
throw new Error(`Rate limit factor is zero or negative: ${factor}`);
}
if (isLegacyRateLimit(limit)) {
return await this.limitLegacy(limit, actor, factor);
} else {
return await this.limitBucket(limit, actor, factor);
return await this.tryLimit(limit, actor, factor);
}
private async tryLimit(limit: Keyed<RateLimit>, actor: string, factor: number, retry = 1): Promise<LimitInfo> {
try {
if (isLegacyRateLimit(limit)) {
return await this.limitLegacy(limit, actor, factor);
} else {
return await this.limitBucket(limit, actor, factor);
}
} catch (err) {
// We may experience collision errors from optimistic locking.
// This is expected, so we should retry a few times before giving up.
// https://redis.io/docs/latest/develop/interact/transactions/#optimistic-locking-using-check-and-set
if (err instanceof TransactionError && retry < 3) {
return await this.tryLimit(limit, actor, factor, retry + 1);
}
throw err;
}
}
@ -94,36 +102,30 @@ export class SkRateLimiterService {
if (limit.minInterval === 0) return null;
if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`);
const counter = await this.getLimitCounter(limit, actor, 'min');
const minInterval = Math.max(Math.ceil(limit.minInterval * factor), 0);
const expirationSec = Math.max(Math.ceil(minInterval / 1000), 1);
// Update expiration
if (counter.c > 0) {
const isCleared = this.timeService.now - counter.t >= minInterval;
// Check for window clear
const counter = await this.getLimitCounter(limit, actor, 'min');
if (counter.counter > 0) {
const isCleared = this.timeService.now - counter.timestamp >= minInterval;
if (isCleared) {
counter.c = 0;
counter.counter = 0;
}
}
const blocked = counter.c > 0;
// Increment the limit, then synchronize with redis
const blocked = counter.counter > 0;
if (!blocked) {
counter.c++;
counter.t = this.timeService.now;
counter.counter++;
counter.timestamp = this.timeService.now;
await this.updateLimitCounter(limit, actor, 'min', expirationSec, counter);
}
// Calculate limit status
const resetMs = Math.max(Math.ceil(minInterval - (this.timeService.now - counter.t)), 0);
const resetMs = Math.max(minInterval - (this.timeService.now - counter.timestamp), 0);
const resetSec = Math.ceil(resetMs / 1000);
const limitInfo: LimitInfo = { blocked, remaining: 0, resetSec, resetMs, fullResetSec: resetSec, fullResetMs: resetMs };
// Update the limit counter, but not if blocked
if (!blocked) {
// Don't await, or we will slow down the API.
this.setLimitCounter(limit, actor, counter, resetSec, 'min')
.catch(err => this.logger.error(`Failed to update limit ${limit.key}:min for ${actor}:`, err));
}
return limitInfo;
return { blocked, remaining: 0, resetSec, resetMs, fullResetSec: resetSec, fullResetMs: resetMs };
}
private async limitBucket(limit: Keyed<BucketRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
@ -131,68 +133,113 @@ export class SkRateLimiterService {
if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`);
if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`);
const counter = await this.getLimitCounter(limit, actor, 'bucket');
const bucketSize = Math.max(Math.ceil(limit.size / factor), 1);
const dripRate = Math.ceil(limit.dripRate ?? 1000);
const dripSize = Math.ceil(limit.dripSize ?? 1);
const expirationSec = Math.max(Math.ceil(bucketSize / dripRate), 1);
// Update drips
if (counter.c > 0) {
const dripsSinceLastTick = Math.floor((this.timeService.now - counter.t) / dripRate) * dripSize;
counter.c = Math.max(counter.c - dripsSinceLastTick, 0);
// Simulate bucket drips
const counter = await this.getLimitCounter(limit, actor, 'bucket');
if (counter.counter > 0) {
const dripsSinceLastTick = Math.floor((this.timeService.now - counter.timestamp) / dripRate) * dripSize;
counter.counter = Math.max(counter.counter - dripsSinceLastTick, 0);
}
const blocked = counter.c >= bucketSize;
// Increment the limit, then synchronize with redis
const blocked = counter.counter >= bucketSize;
if (!blocked) {
counter.c++;
counter.t = this.timeService.now;
counter.counter++;
counter.timestamp = this.timeService.now;
await this.updateLimitCounter(limit, actor, 'bucket', expirationSec, counter);
}
// Calculate how much time is needed to free up a bucket slot
const overflow = Math.max((counter.counter + 1) - bucketSize, 0);
const dripsNeeded = Math.ceil(overflow / dripSize);
const timeNeeded = Math.max((dripRate * dripsNeeded) - (this.timeService.now - counter.timestamp), 0);
// Calculate limit status
const remaining = Math.max(bucketSize - counter.c, 0);
const resetMs = remaining > 0 ? 0 : Math.max(dripRate - (this.timeService.now - counter.t), 0);
const remaining = Math.max(bucketSize - counter.counter, 0);
const resetMs = timeNeeded;
const resetSec = Math.ceil(resetMs / 1000);
const fullResetMs = Math.ceil(counter.c / dripSize) * dripRate;
const fullResetMs = Math.ceil(counter.counter / dripSize) * dripRate;
const fullResetSec = Math.ceil(fullResetMs / 1000);
const limitInfo: LimitInfo = { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs };
// Update the limit counter, but not if blocked
if (!blocked) {
// Don't await, or we will slow down the API.
this.setLimitCounter(limit, actor, counter, fullResetSec, 'bucket')
.catch(err => this.logger.error(`Failed to update limit ${limit.key} for ${actor}:`, err));
}
return limitInfo;
return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs };
}
private async getLimitCounter(limit: Keyed<RateLimit>, actor: string, subject: string): Promise<LimitCounter> {
const key = createLimitKey(limit, actor, subject);
const timestampKey = createLimitKey(limit, actor, subject, 't');
const counterKey = createLimitKey(limit, actor, subject, 'c');
const value = await this.redisClient.get(key);
if (value == null) {
return { t: 0, c: 0 };
const [timestamp, counter] = await this.executeRedis(
[
['get', timestampKey],
['get', counterKey],
],
[
timestampKey,
counterKey,
],
);
return {
timestamp: timestamp ? parseInt(timestamp) : 0,
counter: counter ? parseInt(counter) : 0,
};
}
private async updateLimitCounter(limit: Keyed<RateLimit>, actor: string, subject: string, expirationSec: number, counter: LimitCounter): Promise<void> {
const timestampKey = createLimitKey(limit, actor, subject, 't');
const counterKey = createLimitKey(limit, actor, subject, 'c');
await this.executeRedis(
[
['set', timestampKey, counter.timestamp.toString(), 'EX', expirationSec],
['set', counterKey, counter.counter.toString(), 'EX', expirationSec],
],
[
timestampKey,
counterKey,
],
);
}
private async executeRedis<Num extends number>(batch: RedisBatch<Num>, watch: string[]): Promise<RedisResults<Num>> {
const results = await this.redisClient
.multi(batch)
.watch(watch)
.exec();
// Transaction error
if (!results) {
throw new TransactionError('Redis error: transaction conflict');
}
return JSON.parse(value);
}
// The entire call failed
if (results.length !== batch.length) {
throw new Error('Redis error: failed to execute batch');
}
private async setLimitCounter(limit: Keyed<RateLimit>, actor: string, counter: LimitCounter, expiration: number, subject: string): Promise<void> {
const key = createLimitKey(limit, actor, subject);
const value = JSON.stringify(counter);
const expirationSec = Math.max(expiration, 1);
await this.redisClient.set(key, value, 'EX', expirationSec);
// A particular command failed
const errors = results.map(r => r[0]).filter(e => e != null);
if (errors.length > 0) {
throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errors.join('\', \'')}'`);
}
return results.map(r => r[1]) as RedisResults<Num>;
}
}
function createLimitKey(limit: Keyed<RateLimit>, actor: string, subject: string): string {
return `rl_${actor}_${limit.key}_${subject}`;
type RedisBatch<Num extends number> = [string, ...unknown[]][] & { length: Num };
type RedisResults<Num extends number> = (string | null)[] & { length: Num };
function createLimitKey(limit: Keyed<RateLimit>, actor: string, subject: string, value: string): string {
return `rl_${actor}_${limit.key}_${subject}_${value}`;
}
export interface LimitCounter {
/** Timestamp */
t: number;
class TransactionError extends Error {}
/** Counter */
c: number;
interface LimitCounter {
timestamp: number;
counter: number;
}