make sure S3 clients are properly disposed
This commit is contained in:
parent
db4661397b
commit
4d9317ad2b
4 changed files with 112 additions and 39 deletions
|
|
@ -7,54 +7,58 @@ import * as https from 'node:https';
|
|||
import * as fs from 'node:fs';
|
||||
import { Readable } from 'node:stream';
|
||||
import { finished } from 'node:stream/promises';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import type { MiMeta } from '@/models/Meta.js';
|
||||
import { HttpRequestService } from '@/core/HttpRequestService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||
import Logger from '@/logger.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
|
||||
@Injectable()
|
||||
export class BunnyService {
|
||||
private bunnyCdnLogger: Logger;
|
||||
private readonly bunnyCdnLogger: Logger;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.meta)
|
||||
private readonly meta: MiMeta,
|
||||
|
||||
private httpRequestService: HttpRequestService,
|
||||
) {
|
||||
this.bunnyCdnLogger = new Logger('bunnycdn', 'blue');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public getBunnyInfo(meta: MiMeta) {
|
||||
if (!meta.objectStorageEndpoint || !meta.objectStorageBucket || !meta.objectStorageSecretKey) {
|
||||
public getBunnyInfo() {
|
||||
if (!this.meta.objectStorageEndpoint || !this.meta.objectStorageBucket || !this.meta.objectStorageSecretKey) {
|
||||
throw new IdentifiableError('689ee33f-f97c-479a-ac49-1b9f8140bf90', 'Failed to use BunnyCDN, One of the required fields is missing.');
|
||||
}
|
||||
|
||||
return {
|
||||
endpoint: meta.objectStorageEndpoint,
|
||||
endpoint: this.meta.objectStorageEndpoint,
|
||||
/*
|
||||
The way S3 works is that the Secret Key is essentially the password for the API but Bunny calls their password AccessKey so we call it accessKey here.
|
||||
Bunny also doesn't specify a username/s3 access key when doing HTTP API requests so we end up not using our Access Key field from the form.
|
||||
*/
|
||||
accessKey: meta.objectStorageSecretKey,
|
||||
zone: meta.objectStorageBucket,
|
||||
fullUrl: `https://${meta.objectStorageEndpoint}/${meta.objectStorageBucket}`,
|
||||
accessKey: this.meta.objectStorageSecretKey,
|
||||
zone: this.meta.objectStorageBucket,
|
||||
fullUrl: `https://${this.meta.objectStorageEndpoint}/${this.meta.objectStorageBucket}`,
|
||||
};
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public usingBunnyCDN(meta: MiMeta) {
|
||||
return meta.objectStorageEndpoint && meta.objectStorageEndpoint.endsWith('bunnycdn.com');
|
||||
public usingBunnyCDN() {
|
||||
return this.meta.objectStorageEndpoint && this.meta.objectStorageEndpoint.endsWith('bunnycdn.com');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async upload(meta: MiMeta, path: string, input: fs.ReadStream | Buffer) {
|
||||
const client = this.getBunnyInfo(meta);
|
||||
public async upload(path: string, input: fs.ReadStream | Buffer) {
|
||||
const client = this.getBunnyInfo();
|
||||
|
||||
// Required to convert the buffer from webpublic and thumbnail to a ReadableStream for PUT
|
||||
const data = Buffer.isBuffer(input) ? Readable.from(input) : input;
|
||||
|
||||
const agent = this.httpRequestService.getAgentByUrl(new URL(`${client.fullUrl}/${path}`), !meta.objectStorageUseProxy, true);
|
||||
const agent = this.httpRequestService.getAgentByUrl(new URL(`${client.fullUrl}/${path}`), !this.meta.objectStorageUseProxy, true);
|
||||
|
||||
// Seperation of path and host/domain is required here
|
||||
const options = {
|
||||
|
|
@ -94,8 +98,8 @@ export class BunnyService {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public delete(meta: MiMeta, file: string) {
|
||||
const client = this.getBunnyInfo(meta);
|
||||
public delete(file: string) {
|
||||
const client = this.getBunnyInfo();
|
||||
return this.httpRequestService.send(`${client.fullUrl}/${file}`, { method: 'DELETE', headers: { AccessKey: client.accessKey } });
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -424,10 +424,10 @@ export class DriveService {
|
|||
if (this.meta.objectStorageSetPublicRead) params.ACL = 'public-read';
|
||||
|
||||
try {
|
||||
if (this.bunnyService.usingBunnyCDN(this.meta)) {
|
||||
await this.bunnyService.upload(this.meta, key, stream);
|
||||
if (this.bunnyService.usingBunnyCDN()) {
|
||||
await this.bunnyService.upload(key, stream);
|
||||
} else {
|
||||
const result = await this.s3Service.upload(this.meta, params);
|
||||
const result = await this.s3Service.upload(params);
|
||||
if ('Bucket' in result) { // CompleteMultipartUploadCommandOutput
|
||||
this.registerLogger.debug(`Uploaded: ${result.Bucket}/${result.Key} => ${result.Location}`);
|
||||
} else { // AbortMultipartUploadCommandOutput
|
||||
|
|
@ -843,15 +843,17 @@ export class DriveService {
|
|||
@bindThis
|
||||
public async deleteObjectStorageFile(key: string) {
|
||||
try {
|
||||
if (this.bunnyService.usingBunnyCDN()) {
|
||||
await this.bunnyService.delete(key);
|
||||
return;
|
||||
}
|
||||
|
||||
const param = {
|
||||
Bucket: this.meta.objectStorageBucket,
|
||||
Key: key,
|
||||
} as DeleteObjectCommandInput;
|
||||
if (this.bunnyService.usingBunnyCDN(this.meta)) {
|
||||
await this.bunnyService.delete(this.meta, key);
|
||||
} else {
|
||||
await this.s3Service.delete(this.meta, param);
|
||||
}
|
||||
|
||||
await this.s3Service.delete(param);
|
||||
} catch (err: any) {
|
||||
if (err.name === 'NoSuchKey') {
|
||||
this.deleteLogger.warn(`The object storage had no such key to delete: ${key}. Skipping this.`);
|
||||
|
|
|
|||
|
|
@ -6,24 +6,61 @@
|
|||
import { URL } from 'node:url';
|
||||
import * as http from 'node:http';
|
||||
import * as https from 'node:https';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||
import { DeleteObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
||||
import { Upload } from '@aws-sdk/lib-storage';
|
||||
import { NodeHttpHandler, NodeHttpHandlerOptions } from '@smithy/node-http-handler';
|
||||
import type { MiMeta } from '@/models/Meta.js';
|
||||
import { HttpRequestService } from '@/core/HttpRequestService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
import type { InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||
import type { DeleteObjectCommandInput, PutObjectCommandInput } from '@aws-sdk/client-s3';
|
||||
|
||||
@Injectable()
|
||||
export class S3Service {
|
||||
export class S3Service implements OnApplicationShutdown {
|
||||
private client?: S3Client;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.meta)
|
||||
private readonly meta: MiMeta,
|
||||
|
||||
private httpRequestService: HttpRequestService,
|
||||
private readonly internalEventService: InternalEventService,
|
||||
) {
|
||||
this.internalEventService.on('metaUpdated', this.onMetaUpdated);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public getS3Client(meta: MiMeta): S3Client {
|
||||
private onMetaUpdated(body: InternalEventTypes['metaUpdated']): void {
|
||||
if (this.needsChange(body.before, body.after)) {
|
||||
this.disposeClient();
|
||||
this.client = this.createS3Client(body.after);
|
||||
}
|
||||
}
|
||||
|
||||
private needsChange(before: MiMeta | undefined, after: MiMeta): boolean {
|
||||
if (before == null) return true;
|
||||
if (before.objectStorageEndpoint !== after.objectStorageEndpoint) return true;
|
||||
if (before.objectStorageUseSSL !== after.objectStorageUseSSL) return true;
|
||||
if (before.objectStorageUseProxy !== after.objectStorageUseProxy) return true;
|
||||
if (before.objectStorageAccessKey !== after.objectStorageAccessKey) return true;
|
||||
if (before.objectStorageSecretKey !== after.objectStorageSecretKey) return true;
|
||||
if (before.objectStorageRegion !== after.objectStorageRegion) return true;
|
||||
if (before.objectStorageUseSSL !== after.objectStorageUseSSL) return true;
|
||||
if (before.objectStorageS3ForcePathStyle !== after.objectStorageS3ForcePathStyle) return true;
|
||||
if (before.objectStorageRegion !== after.objectStorageRegion) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private getS3Client(): S3Client {
|
||||
return this.client ??= this.createS3Client(this.meta);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private createS3Client(meta: MiMeta): S3Client {
|
||||
const u = meta.objectStorageEndpoint
|
||||
? `${meta.objectStorageUseSSL ? 'https' : 'http'}://${meta.objectStorageEndpoint}`
|
||||
: `${meta.objectStorageUseSSL ? 'https' : 'http'}://example.net`; // dummy url to select http(s) agent
|
||||
|
|
@ -52,8 +89,8 @@ export class S3Service {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public async upload(meta: MiMeta, input: PutObjectCommandInput) {
|
||||
const client = this.getS3Client(meta);
|
||||
public async upload(input: PutObjectCommandInput) {
|
||||
const client = this.getS3Client();
|
||||
return new Upload({
|
||||
client,
|
||||
params: input,
|
||||
|
|
@ -64,8 +101,27 @@ export class S3Service {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public delete(meta: MiMeta, input: DeleteObjectCommandInput) {
|
||||
const client = this.getS3Client(meta);
|
||||
public delete(input: DeleteObjectCommandInput) {
|
||||
const client = this.getS3Client();
|
||||
return client.send(new DeleteObjectCommand(input));
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private disposeClient(): void {
|
||||
if (this.client) {
|
||||
this.client.destroy();
|
||||
this.client = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private dispose(): void {
|
||||
this.disposeClient();
|
||||
this.internalEventService.off('metaUpdated', this.onMetaUpdated);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
onApplicationShutdown() {
|
||||
this.dispose();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,28 +14,37 @@ import {
|
|||
UploadPartCommand,
|
||||
} from '@aws-sdk/client-s3';
|
||||
import { mockClient } from 'aws-sdk-client-mock';
|
||||
import { FakeInternalEventService } from '../misc/FakeInternalEventService.js';
|
||||
import type { TestingModule } from '@nestjs/testing';
|
||||
import { GlobalModule } from '@/GlobalModule.js';
|
||||
import { CoreModule } from '@/core/CoreModule.js';
|
||||
import { S3Service } from '@/core/S3Service.js';
|
||||
import { MiMeta } from '@/models/_.js';
|
||||
import type { TestingModule } from '@nestjs/testing';
|
||||
import { HttpRequestService } from '@/core/HttpRequestService.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
|
||||
describe('S3Service', () => {
|
||||
let app: TestingModule;
|
||||
let s3Service: S3Service;
|
||||
let fakeMeta: MiMeta;
|
||||
const s3Mock = mockClient(S3Client);
|
||||
|
||||
beforeAll(async () => {
|
||||
app = await Test.createTestingModule({
|
||||
imports: [GlobalModule, CoreModule],
|
||||
providers: [S3Service],
|
||||
}).compile();
|
||||
})
|
||||
.overrideProvider(InternalEventService).useClass(FakeInternalEventService)
|
||||
.compile();
|
||||
app.enableShutdownHooks();
|
||||
s3Service = app.get<S3Service>(S3Service);
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
s3Mock.reset();
|
||||
|
||||
fakeMeta = Object.create(app.get<MiMeta>(DI.meta));
|
||||
s3Service = new S3Service(fakeMeta, app.get(HttpRequestService), app.get(InternalEventService));
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
|
|
@ -45,8 +54,9 @@ describe('S3Service', () => {
|
|||
describe('upload', () => {
|
||||
test('upload a file', async () => {
|
||||
s3Mock.on(PutObjectCommand).resolves({});
|
||||
fakeMeta.objectStorageRegion = 'us-east-1';
|
||||
|
||||
await s3Service.upload({ objectStorageRegion: 'us-east-1' } as MiMeta, {
|
||||
await s3Service.upload({
|
||||
Bucket: 'fake',
|
||||
Key: 'fake',
|
||||
Body: 'x',
|
||||
|
|
@ -58,7 +68,7 @@ describe('S3Service', () => {
|
|||
s3Mock.on(UploadPartCommand).resolves({ ETag: '1' });
|
||||
s3Mock.on(CompleteMultipartUploadCommand).resolves({ Bucket: 'fake', Key: 'fake' });
|
||||
|
||||
await s3Service.upload({} as MiMeta, {
|
||||
await s3Service.upload({
|
||||
Bucket: 'fake',
|
||||
Key: 'fake',
|
||||
Body: 'x'.repeat(8 * 1024 * 1024 + 1), // デフォルトpartSizeにしている 8 * 1024 * 1024 を越えるサイズ
|
||||
|
|
@ -67,22 +77,23 @@ describe('S3Service', () => {
|
|||
|
||||
test('upload a file error', async () => {
|
||||
s3Mock.on(PutObjectCommand).rejects({ name: 'Fake Error' });
|
||||
fakeMeta.objectStorageRegion = 'us-east-1';
|
||||
|
||||
await expect(s3Service.upload({ objectStorageRegion: 'us-east-1' } as MiMeta, {
|
||||
await expect(s3Service.upload({
|
||||
Bucket: 'fake',
|
||||
Key: 'fake',
|
||||
Body: 'x',
|
||||
})).rejects.toThrowError(Error);
|
||||
})).rejects.toThrow();
|
||||
});
|
||||
|
||||
test('upload a large file error', async () => {
|
||||
s3Mock.on(UploadPartCommand).rejects();
|
||||
|
||||
await expect(s3Service.upload({} as MiMeta, {
|
||||
await expect(s3Service.upload({
|
||||
Bucket: 'fake',
|
||||
Key: 'fake',
|
||||
Body: 'x'.repeat(8 * 1024 * 1024 + 1), // デフォルトpartSizeにしている 8 * 1024 * 1024 を越えるサイズ
|
||||
})).rejects.toThrowError(Error);
|
||||
})).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue