Merge branch 'develop' into upstream/2025.5.0
This commit is contained in:
commit
3ebf9c4a71
317 changed files with 6144 additions and 2603 deletions
|
|
@ -11,7 +11,7 @@ import { DI } from '@/di-symbols.js';
|
|||
import type Logger from '@/logger.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js';
|
||||
import { StatusError } from '@/misc/status-error.js';
|
||||
import { renderFullError } from '@/misc/render-full-error.js';
|
||||
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
|
||||
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
|
||||
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
|
||||
|
|
@ -73,7 +73,9 @@ function getJobInfo(job: Bull.Job | undefined, increment = false): string {
|
|||
const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
|
||||
const maxAttempts = job.opts.attempts ?? 0;
|
||||
|
||||
return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
|
||||
return job.name
|
||||
? `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated} name=${job.name}`
|
||||
: `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
|
|
@ -134,35 +136,6 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
) {
|
||||
this.logger = this.queueLoggerService.logger;
|
||||
|
||||
function renderError(e?: Error) {
|
||||
// 何故かeがundefinedで来ることがある
|
||||
if (!e) return '?';
|
||||
|
||||
if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError' || e instanceof StatusError) {
|
||||
return `${e.name}: ${e.message}`;
|
||||
}
|
||||
|
||||
return {
|
||||
stack: e.stack,
|
||||
message: e.message,
|
||||
name: e.name,
|
||||
};
|
||||
}
|
||||
|
||||
function renderJob(job?: Bull.Job) {
|
||||
if (!job) return '?';
|
||||
|
||||
const info: Record<string, string> = {
|
||||
info: getJobInfo(job),
|
||||
data: job.data,
|
||||
};
|
||||
|
||||
if (job.name) info.name = job.name;
|
||||
if (job.failedReason) info.failedReason = job.failedReason;
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
//#region system
|
||||
{
|
||||
const processer = (job: Bull.Job) => {
|
||||
|
|
@ -196,7 +169,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active id=${job.id}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
|
||||
.on('failed', (job, err: Error) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -204,7 +177,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
|
@ -261,7 +234,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active id=${job.id}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
|
||||
.on('failed', (job, err) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -269,7 +242,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
|
@ -301,7 +274,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('failed', (job, err) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -309,7 +282,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
|
@ -341,7 +314,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
|
||||
.on('failed', (job, err) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) });
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -349,7 +322,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error('inbox error:', renderError(err)))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
|
@ -381,7 +354,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('failed', (job, err) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -389,7 +362,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
|
@ -421,7 +394,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('failed', (job, err) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -429,7 +402,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
|
@ -468,7 +441,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active id=${job.id}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
|
||||
.on('failed', (job, err) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -476,7 +449,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
|
@ -509,7 +482,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
.on('active', (job) => logger.debug(`active id=${job.id}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
|
||||
.on('failed', (job, err) => {
|
||||
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
|
|
@ -517,13 +490,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
||||
//#region ended poll notification
|
||||
{
|
||||
const logger = this.logger.createSubLogger('endedPollNotification');
|
||||
|
||||
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => {
|
||||
if (this.config.sentryForBackend) {
|
||||
return Sentry.startSpan({ name: 'Queue: EndedPollNotification' }, () => this.endedPollNotificationProcessorService.process(job));
|
||||
|
|
@ -534,19 +509,75 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
|
||||
autorun: false,
|
||||
});
|
||||
this.endedPollNotificationQueueWorker
|
||||
.on('active', (job) => logger.debug(`active id=${job.id}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
|
||||
.on('failed', (job, err) => {
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: EndedPollNotification: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
extra: { job, err },
|
||||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
|
||||
//#region schedule note post
|
||||
{
|
||||
const logger = this.logger.createSubLogger('scheduleNotePost');
|
||||
|
||||
this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), {
|
||||
...baseWorkerOptions(this.config, QUEUE.SCHEDULE_NOTE_POST),
|
||||
autorun: false,
|
||||
});
|
||||
this.schedulerNotePostQueueWorker
|
||||
.on('active', (job) => logger.debug(`active id=${job.id}`))
|
||||
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
|
||||
.on('failed', (job, err) => {
|
||||
this.logError(logger, err, job);
|
||||
if (config.sentryForBackend) {
|
||||
Sentry.captureMessage(`Queue: ${QUEUE.SCHEDULE_NOTE_POST}: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
|
||||
level: 'error',
|
||||
extra: { job, err },
|
||||
});
|
||||
}
|
||||
})
|
||||
.on('error', (err: Error) => this.logError(logger, err))
|
||||
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
|
||||
}
|
||||
//#endregion
|
||||
}
|
||||
|
||||
private logError(logger: Logger, err: unknown, job?: Bull.Job | null): void {
|
||||
const parts: string[] = [];
|
||||
|
||||
// Render job
|
||||
if (job) {
|
||||
parts.push('job [');
|
||||
parts.push(getJobInfo(job));
|
||||
parts.push('] failed: ');
|
||||
} else {
|
||||
parts.push('job failed: ');
|
||||
}
|
||||
|
||||
// Render error
|
||||
const fullError = renderFullError(err);
|
||||
const errorText = typeof(fullError) === 'string' ? fullError : undefined;
|
||||
if (errorText) {
|
||||
parts.push(errorText);
|
||||
} else if (job?.failedReason) {
|
||||
parts.push(job.failedReason);
|
||||
}
|
||||
|
||||
const message = parts.join('');
|
||||
const data = typeof(fullError) !== 'string' ? { err: fullError } : undefined;
|
||||
logger.error(message, data);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async start(): Promise<void> {
|
||||
await Promise.all([
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ export class AggregateRetentionProcessorService {
|
|||
});
|
||||
} catch (err) {
|
||||
if (isDuplicateKeyValueError(err)) {
|
||||
this.logger.succ('Skip because it has already been processed by another worker.');
|
||||
this.logger.debug('Skip because it has already been processed by another worker.');
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
|
|
@ -87,6 +87,6 @@ export class AggregateRetentionProcessorService {
|
|||
});
|
||||
}
|
||||
|
||||
this.logger.succ('Retention aggregated.');
|
||||
this.logger.info('Retention aggregated.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,6 @@ export class BakeBufferedReactionsProcessorService {
|
|||
|
||||
await this.reactionsBufferingService.bake();
|
||||
|
||||
this.logger.succ('All buffered reactions baked.');
|
||||
this.logger.info('All buffered reactions baked.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,6 +41,6 @@ export class CheckExpiredMutingsProcessorService {
|
|||
await this.userMutingService.unmute(expired);
|
||||
}
|
||||
|
||||
this.logger.succ('All expired mutings checked.');
|
||||
this.logger.info('All expired mutings checked.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -98,16 +98,16 @@ export class CheckModeratorsActivityProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(): Promise<void> {
|
||||
this.logger.info('start.');
|
||||
this.logger.debug('start.');
|
||||
|
||||
const meta = await this.metaService.fetch(false);
|
||||
if (!meta.disableRegistration) {
|
||||
await this.processImpl();
|
||||
} else {
|
||||
this.logger.info('is already invitation only.');
|
||||
this.logger.debug('is already invitation only.');
|
||||
}
|
||||
|
||||
this.logger.succ('finish.');
|
||||
this.logger.debug('finish.');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
|||
|
|
@ -62,6 +62,6 @@ export class CleanChartsProcessorService {
|
|||
await this.perUserDriveChart.clean();
|
||||
await this.apRequestChart.clean();
|
||||
|
||||
this.logger.succ('All charts successfully cleaned.');
|
||||
this.logger.info('All charts successfully cleaned.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,6 +69,6 @@ export class CleanProcessorService {
|
|||
|
||||
this.reversiService.cleanOutdatedGames();
|
||||
|
||||
this.logger.succ('Cleaned.');
|
||||
this.logger.info('Cleaned.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,6 +75,6 @@ export class CleanRemoteFilesProcessorService {
|
|||
await job.updateProgress(100 / total * deletedCount);
|
||||
}
|
||||
|
||||
this.logger.succ(`All cached remote files processed. Total deleted: ${deletedCount}, Failed: ${errorCount}.`);
|
||||
this.logger.info(`All cached remote files processed. Total deleted: ${deletedCount}, Failed: ${errorCount}.`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ export class DeleteAccountProcessorService {
|
|||
userId: user.id,
|
||||
});
|
||||
|
||||
this.logger.succ('All clips have been deleted.');
|
||||
this.logger.info('All clips have been deleted.');
|
||||
}
|
||||
|
||||
{ // Delete favorites
|
||||
|
|
@ -136,7 +136,7 @@ export class DeleteAccountProcessorService {
|
|||
userId: user.id,
|
||||
});
|
||||
|
||||
this.logger.succ('All favorites have been deleted.');
|
||||
this.logger.info('All favorites have been deleted.');
|
||||
}
|
||||
|
||||
{ // Delete user relations
|
||||
|
|
@ -172,7 +172,7 @@ export class DeleteAccountProcessorService {
|
|||
muteeId: user.id,
|
||||
});
|
||||
|
||||
this.logger.succ('All user relations have been deleted.');
|
||||
this.logger.info('All user relations have been deleted.');
|
||||
}
|
||||
|
||||
{ // Delete reactions
|
||||
|
|
@ -206,7 +206,7 @@ export class DeleteAccountProcessorService {
|
|||
}
|
||||
}
|
||||
|
||||
this.logger.succ('All reactions have been deleted');
|
||||
this.logger.info('All reactions have been deleted');
|
||||
}
|
||||
|
||||
{ // Poll votes
|
||||
|
|
@ -238,7 +238,7 @@ export class DeleteAccountProcessorService {
|
|||
});
|
||||
}
|
||||
|
||||
this.logger.succ('All poll votes have been deleted');
|
||||
this.logger.info('All poll votes have been deleted');
|
||||
}
|
||||
|
||||
{ // Delete scheduled notes
|
||||
|
|
@ -254,7 +254,7 @@ export class DeleteAccountProcessorService {
|
|||
userId: user.id,
|
||||
});
|
||||
|
||||
this.logger.succ('All scheduled notes deleted');
|
||||
this.logger.info('All scheduled notes deleted');
|
||||
}
|
||||
|
||||
{ // Delete notes
|
||||
|
|
@ -312,7 +312,7 @@ export class DeleteAccountProcessorService {
|
|||
}
|
||||
}
|
||||
|
||||
this.logger.succ('All of notes deleted');
|
||||
this.logger.info('All of notes deleted');
|
||||
}
|
||||
|
||||
{ // Delete files
|
||||
|
|
@ -341,7 +341,7 @@ export class DeleteAccountProcessorService {
|
|||
}
|
||||
}
|
||||
|
||||
this.logger.succ('All of files deleted');
|
||||
this.logger.info('All of files deleted');
|
||||
}
|
||||
|
||||
{ // Delete actor logs
|
||||
|
|
@ -353,7 +353,7 @@ export class DeleteAccountProcessorService {
|
|||
await this.apLogService.deleteInboxLogs(user.id)
|
||||
.catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`));
|
||||
|
||||
this.logger.succ('All AP logs deleted');
|
||||
this.logger.info('All AP logs deleted');
|
||||
}
|
||||
|
||||
// Do this BEFORE deleting the account!
|
||||
|
|
@ -379,7 +379,7 @@ export class DeleteAccountProcessorService {
|
|||
await this.usersRepository.delete(user.id);
|
||||
}
|
||||
|
||||
this.logger.succ('Account data deleted');
|
||||
this.logger.info('Account data deleted');
|
||||
}
|
||||
|
||||
{ // Send email notification
|
||||
|
|
|
|||
|
|
@ -74,6 +74,6 @@ export class DeleteDriveFilesProcessorService {
|
|||
job.updateProgress(deletedCount / total);
|
||||
}
|
||||
|
||||
this.logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
|
||||
this.logger.info(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -138,23 +138,18 @@ export class DeliverProcessorService {
|
|||
}
|
||||
});
|
||||
|
||||
if (res instanceof StatusError) {
|
||||
if (res instanceof StatusError && !res.isRetryable) {
|
||||
// 4xx
|
||||
if (!res.isRetryable) {
|
||||
// 相手が閉鎖していることを明示しているため、配送停止する
|
||||
if (job.data.isSharedInbox && res.statusCode === 410) {
|
||||
this.federatedInstanceService.fetchOrRegister(host).then(i => {
|
||||
this.federatedInstanceService.update(i.id, {
|
||||
suspensionState: 'goneSuspended',
|
||||
});
|
||||
// 相手が閉鎖していることを明示しているため、配送停止する
|
||||
if (job.data.isSharedInbox && res.statusCode === 410) {
|
||||
this.federatedInstanceService.fetchOrRegister(host).then(i => {
|
||||
this.federatedInstanceService.update(i.id, {
|
||||
suspensionState: 'goneSuspended',
|
||||
});
|
||||
throw new Bull.UnrecoverableError(`${host} is gone`);
|
||||
}
|
||||
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
|
||||
});
|
||||
throw new Bull.UnrecoverableError(`${host} is gone`);
|
||||
}
|
||||
|
||||
// 5xx etc.
|
||||
throw new Error(`${res.statusCode} ${res.statusMessage}`);
|
||||
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
|
||||
} else {
|
||||
// DNS error, socket error, timeout ...
|
||||
throw res;
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import { Packed } from '@/misc/json-schema.js';
|
|||
import { UtilityService } from '@/core/UtilityService.js';
|
||||
import { DownloadService } from '@/core/DownloadService.js';
|
||||
import { EmailService } from '@/core/EmailService.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
|
||||
|
|
@ -85,21 +86,23 @@ export class ExportAccountDataProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job): Promise<void> {
|
||||
this.logger.info('Exporting Account Data...');
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
const profile = await this.userProfilesRepository.findOneBy({ userId: job.data.user.id });
|
||||
if (profile == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} has no profile`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting account data for ${job.data.user.id} ...`);
|
||||
|
||||
const [path, cleanup] = await createTempDir();
|
||||
|
||||
this.logger.info(`Temp dir is ${path}`);
|
||||
this.logger.debug(`Temp dir is ${path}`);
|
||||
|
||||
// User Export
|
||||
|
||||
|
|
@ -113,7 +116,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
userStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing user:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -145,7 +148,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
profileStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing profile:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -179,7 +182,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
ipStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing IPs:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -214,7 +217,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
notesStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing notes:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -275,7 +278,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
followingStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing following:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -345,7 +348,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
followerStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing followers:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -406,7 +409,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
filesStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing drive:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -432,7 +435,7 @@ export class ExportAccountDataProcessorService {
|
|||
await this.downloadService.downloadUrl(file.url, filePath);
|
||||
downloaded = true;
|
||||
} catch (e) {
|
||||
this.logger.error(e instanceof Error ? e : new Error(e as string));
|
||||
this.logger.error(`Error writing drive file ${file.id} (${file.name}): ${renderInlineError(e)}`);
|
||||
}
|
||||
|
||||
if (!downloaded) {
|
||||
|
|
@ -464,7 +467,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
mutingStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing mutings:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -527,7 +530,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
blockingStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing blockings:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -589,7 +592,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
favoriteStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing favorites:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -650,7 +653,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
antennaStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing antennas:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -708,7 +711,7 @@ export class ExportAccountDataProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
listStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error writing lists:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -744,12 +747,12 @@ export class ExportAccountDataProcessorService {
|
|||
zlib: { level: 0 },
|
||||
});
|
||||
archiveStream.on('close', async () => {
|
||||
this.logger.succ(`Exported to: ${archivePath}`);
|
||||
this.logger.debug(`Exported to path: ${archivePath}`);
|
||||
|
||||
const fileName = 'data-request-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip';
|
||||
const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to drive: ${driveFile.id}`);
|
||||
cleanup();
|
||||
archiveCleanup();
|
||||
if (profile.email) {
|
||||
|
|
|
|||
|
|
@ -45,15 +45,19 @@ export class ExportAntennasProcessorService {
|
|||
public async process(job: Bull.Job<DBExportAntennasData>): Promise<void> {
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting antennas of ${job.data.user.id} ...`);
|
||||
|
||||
const [path, cleanup] = await createTemp();
|
||||
const stream = fs.createWriteStream(path, { flags: 'a' });
|
||||
const write = (input: string): Promise<void> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
stream.write(input, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error exporting antennas:', err);
|
||||
reject();
|
||||
} else {
|
||||
resolve();
|
||||
|
|
@ -96,7 +100,7 @@ export class ExportAntennasProcessorService {
|
|||
|
||||
const fileName = 'antennas-' + DateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
|
||||
this.logger.succ('Exported to: ' + driveFile.id);
|
||||
this.logger.debug('Exported to: ' + driveFile.id);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'antenna',
|
||||
|
|
|
|||
|
|
@ -40,17 +40,18 @@ export class ExportBlockingProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
|
||||
this.logger.info(`Exporting blocking of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting blocking of ${job.data.user.id} ...`);
|
||||
|
||||
// Create temp file
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp file is ${path}`);
|
||||
this.logger.debug(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
const stream = fs.createWriteStream(path, { flags: 'a' });
|
||||
|
|
@ -87,7 +88,7 @@ export class ExportBlockingProcessorService {
|
|||
await new Promise<void>((res, rej) => {
|
||||
stream.write(content + '\n', err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error exporting blocking:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -105,12 +106,12 @@ export class ExportBlockingProcessorService {
|
|||
}
|
||||
|
||||
stream.end();
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
this.logger.debug(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'blocking',
|
||||
|
|
|
|||
|
|
@ -51,17 +51,18 @@ export class ExportClipsProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
|
||||
this.logger.info(`Exporting clips of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting clips of ${job.data.user.id} ...`);
|
||||
|
||||
// Create temp file
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp file is ${path}`);
|
||||
this.logger.debug(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
const stream = Writable.toWeb(fs.createWriteStream(path, { flags: 'a' }));
|
||||
|
|
@ -75,12 +76,12 @@ export class ExportClipsProcessorService {
|
|||
await writer.write(']');
|
||||
await writer.close();
|
||||
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
this.logger.debug(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'clips-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'clip',
|
||||
|
|
|
|||
|
|
@ -45,16 +45,17 @@ export class ExportCustomEmojisProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job): Promise<void> {
|
||||
this.logger.info('Exporting custom emojis ...');
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting custom emojis of ${job.data.user.id} ...`);
|
||||
|
||||
const [path, cleanup] = await createTempDir();
|
||||
|
||||
this.logger.info(`Temp dir is ${path}`);
|
||||
this.logger.debug(`Temp dir is ${path}`);
|
||||
|
||||
const metaPath = path + '/meta.json';
|
||||
|
||||
|
|
@ -66,7 +67,7 @@ export class ExportCustomEmojisProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
metaStream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error exporting custom emojis:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -101,7 +102,7 @@ export class ExportCustomEmojisProcessorService {
|
|||
await this.downloadService.downloadUrl(emoji.originalUrl, emojiPath);
|
||||
downloaded = true;
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
this.logger.error(e instanceof Error ? e : new Error(e as string));
|
||||
this.logger.error('Error exporting custom emojis:', e as Error);
|
||||
}
|
||||
|
||||
if (!downloaded) {
|
||||
|
|
@ -130,12 +131,12 @@ export class ExportCustomEmojisProcessorService {
|
|||
zlib: { level: 0 },
|
||||
});
|
||||
archiveStream.on('close', async () => {
|
||||
this.logger.succ(`Exported to: ${archivePath}`);
|
||||
this.logger.debug(`Exported to: ${archivePath}`);
|
||||
|
||||
const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip';
|
||||
const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'customEmoji',
|
||||
|
|
|
|||
|
|
@ -45,17 +45,18 @@ export class ExportFavoritesProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
|
||||
this.logger.info(`Exporting favorites of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting favorites of ${job.data.user.id} ...`);
|
||||
|
||||
// Create temp file
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp file is ${path}`);
|
||||
this.logger.debug(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
const stream = fs.createWriteStream(path, { flags: 'a' });
|
||||
|
|
@ -64,7 +65,7 @@ export class ExportFavoritesProcessorService {
|
|||
return new Promise<void>((res, rej) => {
|
||||
stream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error exporting favorites:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -119,12 +120,12 @@ export class ExportFavoritesProcessorService {
|
|||
await write(']');
|
||||
|
||||
stream.end();
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
this.logger.debug(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'favorites-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'favorite',
|
||||
|
|
|
|||
|
|
@ -44,17 +44,18 @@ export class ExportFollowingProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbExportFollowingData>): Promise<void> {
|
||||
this.logger.info(`Exporting following of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting following of ${job.data.user.id} ...`);
|
||||
|
||||
// Create temp file
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp file is ${path}`);
|
||||
this.logger.debug(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
const stream = fs.createWriteStream(path, { flags: 'a' });
|
||||
|
|
@ -98,7 +99,7 @@ export class ExportFollowingProcessorService {
|
|||
await new Promise<void>((res, rej) => {
|
||||
stream.write(content + '\n', err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error exporting following:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -109,12 +110,12 @@ export class ExportFollowingProcessorService {
|
|||
}
|
||||
|
||||
stream.end();
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
this.logger.debug(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'following',
|
||||
|
|
|
|||
|
|
@ -40,17 +40,18 @@ export class ExportMutingProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
|
||||
this.logger.info(`Exporting muting of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(`Exporting muting of ${job.data.user.id} ...`);
|
||||
|
||||
// Create temp file
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp file is ${path}`);
|
||||
this.logger.debug(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
const stream = fs.createWriteStream(path, { flags: 'a' });
|
||||
|
|
@ -88,7 +89,7 @@ export class ExportMutingProcessorService {
|
|||
await new Promise<void>((res, rej) => {
|
||||
stream.write(content + '\n', err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error exporting mutings:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -106,12 +107,12 @@ export class ExportMutingProcessorService {
|
|||
}
|
||||
|
||||
stream.end();
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
this.logger.debug(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'muting',
|
||||
|
|
|
|||
|
|
@ -120,17 +120,18 @@ export class ExportNotesProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
|
||||
this.logger.info(`Exporting notes of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting notes of ${job.data.user.id} ...`);
|
||||
|
||||
// Create temp file
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp file is ${path}`);
|
||||
this.logger.debug(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
// メモリが足りなくならないようにストリームで処理する
|
||||
|
|
@ -146,12 +147,12 @@ export class ExportNotesProcessorService {
|
|||
.pipeThrough(new TextEncoderStream())
|
||||
.pipeTo(new FileWriterStream(path));
|
||||
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
this.logger.debug(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'note',
|
||||
|
|
|
|||
|
|
@ -43,13 +43,14 @@ export class ExportUserListsProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
|
||||
this.logger.info(`Exporting user lists of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Exporting user lists of ${job.data.user.id} ...`);
|
||||
|
||||
const lists = await this.userListsRepository.findBy({
|
||||
userId: user.id,
|
||||
});
|
||||
|
|
@ -57,7 +58,7 @@ export class ExportUserListsProcessorService {
|
|||
// Create temp file
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp file is ${path}`);
|
||||
this.logger.debug(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
const stream = fs.createWriteStream(path, { flags: 'a' });
|
||||
|
|
@ -74,7 +75,7 @@ export class ExportUserListsProcessorService {
|
|||
await new Promise<void>((res, rej) => {
|
||||
stream.write(content + '\n', err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error exporting lists:', err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
|
|
@ -85,12 +86,12 @@ export class ExportUserListsProcessorService {
|
|||
}
|
||||
|
||||
stream.end();
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
this.logger.debug(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
|
||||
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
|
||||
|
||||
this.logger.succ(`Exported to: ${driveFile.id}`);
|
||||
this.logger.debug(`Exported to: ${driveFile.id}`);
|
||||
|
||||
this.notificationService.createNotification(user.id, 'exportCompleted', {
|
||||
exportedEntity: 'userList',
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import _Ajv from 'ajv';
|
|||
import { IdService } from '@/core/IdService.js';
|
||||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import Logger from '@/logger.js';
|
||||
import type { AntennasRepository } from '@/models/_.js';
|
||||
import type { AntennasRepository, UsersRepository } from '@/models/_.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
|
|
@ -59,6 +59,9 @@ export class ImportAntennasProcessorService {
|
|||
@Inject(DI.antennasRepository)
|
||||
private antennasRepository: AntennasRepository,
|
||||
|
||||
@Inject(DI.usersRepository)
|
||||
private usersRepository: UsersRepository,
|
||||
|
||||
private queueLoggerService: QueueLoggerService,
|
||||
private idService: IdService,
|
||||
private globalEventService: GlobalEventService,
|
||||
|
|
@ -68,12 +71,20 @@ export class ImportAntennasProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DBAntennaImportJobData>): Promise<void> {
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(`Importing blocking of ${job.data.user.id} ...`);
|
||||
|
||||
const now = new Date();
|
||||
try {
|
||||
for (const antenna of job.data.antenna) {
|
||||
if (antenna.keywords.length === 0 || antenna.keywords[0].every(x => x === '')) continue;
|
||||
if (!validate(antenna)) {
|
||||
this.logger.warn('Validation Failed');
|
||||
this.logger.warn('Antenna validation failed');
|
||||
continue;
|
||||
}
|
||||
const result = await this.antennasRepository.insertOne({
|
||||
|
|
@ -92,11 +103,11 @@ export class ImportAntennasProcessorService {
|
|||
withReplies: antenna.withReplies,
|
||||
withFile: antenna.withFile,
|
||||
});
|
||||
this.logger.succ('Antenna created: ' + result.id);
|
||||
this.logger.debug('Antenna created: ' + result.id);
|
||||
this.globalEventService.publishInternalEvent('antennaCreated', result);
|
||||
}
|
||||
} catch (err: any) {
|
||||
this.logger.error(err);
|
||||
this.logger.error('Error importing antennas:', err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,10 +40,9 @@ export class ImportBlockingProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
|
||||
this.logger.info(`Importing blocking of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -51,14 +50,17 @@ export class ImportBlockingProcessorService {
|
|||
id: job.data.fileId,
|
||||
});
|
||||
if (file == null) {
|
||||
this.logger.debug(`Skip: file ${job.data.fileId} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(`Importing blocking of ${job.data.user.id} ...`);
|
||||
|
||||
const csv = await this.downloadService.downloadTextFile(file.url);
|
||||
const targets = csv.trim().split('\n');
|
||||
this.queueService.createImportBlockingToDbJob({ id: user.id }, targets);
|
||||
|
||||
this.logger.succ('Import jobs created');
|
||||
this.logger.debug('Import jobs created');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -93,11 +95,11 @@ export class ImportBlockingProcessorService {
|
|||
// skip myself
|
||||
if (target.id === job.data.user.id) return;
|
||||
|
||||
this.logger.info(`Block ${target.id} ...`);
|
||||
this.logger.debug(`Block ${target.id} ...`);
|
||||
|
||||
this.queueService.createBlockJob([{ from: { id: user.id }, to: { id: target.id }, silent: true }]);
|
||||
} catch (e) {
|
||||
this.logger.warn(`Error: ${e}`);
|
||||
this.logger.error('Error importing blockings:', e as Error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import { DriveService } from '@/core/DriveService.js';
|
|||
import { DownloadService } from '@/core/DownloadService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import type { Config } from '@/config.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
import type { DbUserImportJobData } from '../types.js';
|
||||
|
|
@ -45,18 +46,19 @@ export class ImportCustomEmojisProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
|
||||
this.logger.info('Importing custom emojis ...');
|
||||
|
||||
const file = await this.driveFilesRepository.findOneBy({
|
||||
id: job.data.fileId,
|
||||
});
|
||||
if (file == null) {
|
||||
this.logger.debug(`Skip: file ${job.data.fileId} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Importing custom emojis from ${file.id} (${file.name}) ...`);
|
||||
|
||||
const [path, cleanup] = await createTempDir();
|
||||
|
||||
this.logger.info(`Temp dir is ${path}`);
|
||||
this.logger.debug(`Temp dir is ${path}`);
|
||||
|
||||
const destPath = path + '/emojis.zip';
|
||||
|
||||
|
|
@ -65,14 +67,14 @@ export class ImportCustomEmojisProcessorService {
|
|||
await this.downloadService.downloadUrl(file.url, destPath, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize });
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
if (e instanceof Error || typeof e === 'string') {
|
||||
this.logger.error(e);
|
||||
this.logger.error('Error importing custom emojis:', e as Error);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
const outputPath = path + '/emojis';
|
||||
try {
|
||||
this.logger.succ(`Unzipping to ${outputPath}`);
|
||||
this.logger.debug(`Unzipping to ${outputPath}`);
|
||||
ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath));
|
||||
const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8');
|
||||
const meta = JSON.parse(metaRaw);
|
||||
|
|
@ -117,7 +119,7 @@ export class ImportCustomEmojisProcessorService {
|
|||
});
|
||||
} catch (e) {
|
||||
if (e instanceof Error || typeof e === 'string') {
|
||||
this.logger.error(`couldn't import ${emojiPath} for ${emojiInfo.name}: ${e}`);
|
||||
this.logger.error(`couldn't import ${emojiPath} for ${emojiInfo.name}: ${renderInlineError(e)}`);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
|
@ -125,11 +127,9 @@ export class ImportCustomEmojisProcessorService {
|
|||
|
||||
cleanup();
|
||||
|
||||
this.logger.succ('Imported');
|
||||
this.logger.debug('Imported');
|
||||
} catch (e) {
|
||||
if (e instanceof Error || typeof e === 'string') {
|
||||
this.logger.error(e);
|
||||
}
|
||||
this.logger.error('Error importing custom emojis:', e as Error);
|
||||
cleanup();
|
||||
throw e;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,10 +40,9 @@ export class ImportFollowingProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
|
||||
this.logger.info(`Importing following of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -51,14 +50,17 @@ export class ImportFollowingProcessorService {
|
|||
id: job.data.fileId,
|
||||
});
|
||||
if (file == null) {
|
||||
this.logger.debug(`Skip: file ${job.data.fileId} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Importing following of ${job.data.user.id} ...`);
|
||||
|
||||
const csv = await this.downloadService.downloadTextFile(file.url);
|
||||
const targets = csv.trim().split('\n');
|
||||
this.queueService.createImportFollowingToDbJob({ id: user.id }, targets, job.data.withReplies);
|
||||
|
||||
this.logger.succ('Import jobs created');
|
||||
this.logger.debug('Import jobs created');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -93,11 +95,11 @@ export class ImportFollowingProcessorService {
|
|||
// skip myself
|
||||
if (target.id === job.data.user.id) return;
|
||||
|
||||
this.logger.info(`Follow ${target.id} ${job.data.withReplies ? 'with replies' : 'without replies'} ...`);
|
||||
this.logger.debug(`Follow ${target.id} ${job.data.withReplies ? 'with replies' : 'without replies'} ...`);
|
||||
|
||||
this.queueService.createFollowJob([{ from: user, to: { id: target.id }, silent: true, withReplies: job.data.withReplies }]);
|
||||
} catch (e) {
|
||||
this.logger.warn(`Error: ${e}`);
|
||||
this.logger.error('Error importing followings:', e as Error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import { DownloadService } from '@/core/DownloadService.js';
|
|||
import { UserMutingService } from '@/core/UserMutingService.js';
|
||||
import { UtilityService } from '@/core/UtilityService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
import type { DbUserImportJobData } from '../types.js';
|
||||
|
|
@ -40,10 +41,9 @@ export class ImportMutingProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
|
||||
this.logger.info(`Importing muting of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -51,9 +51,12 @@ export class ImportMutingProcessorService {
|
|||
id: job.data.fileId,
|
||||
});
|
||||
if (file == null) {
|
||||
this.logger.debug(`Skip: file ${job.data.fileId} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Importing muting of ${job.data.user.id} ...`);
|
||||
|
||||
const csv = await this.downloadService.downloadTextFile(file.url);
|
||||
|
||||
let linenum = 0;
|
||||
|
|
@ -88,14 +91,14 @@ export class ImportMutingProcessorService {
|
|||
// skip myself
|
||||
if (target.id === job.data.user.id) continue;
|
||||
|
||||
this.logger.info(`Mute[${linenum}] ${target.id} ...`);
|
||||
this.logger.debug(`Mute[${linenum}] ${target.id} ...`);
|
||||
|
||||
await this.userMutingService.mute(user, target);
|
||||
} catch (e) {
|
||||
this.logger.warn(`Error in line:${linenum} ${e}`);
|
||||
this.logger.warn(`Error in line:${linenum} ${renderInlineError(e)}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.succ('Imported');
|
||||
this.logger.debug('Imported');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,10 +159,9 @@ export class ImportNotesProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbNoteImportJobData>): Promise<void> {
|
||||
this.logger.info(`Starting note import of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -170,9 +169,12 @@ export class ImportNotesProcessorService {
|
|||
id: job.data.fileId,
|
||||
});
|
||||
if (file == null) {
|
||||
this.logger.debug(`Skip: file ${job.data.fileId} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Starting note import of ${job.data.user.id} ...`);
|
||||
|
||||
let folder = await this.driveFoldersRepository.findOneBy({ name: 'Imports', userId: job.data.user.id });
|
||||
if (folder == null) {
|
||||
await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Imports', userId: job.data.user.id });
|
||||
|
|
@ -184,7 +186,7 @@ export class ImportNotesProcessorService {
|
|||
if (type === 'Twitter' || file.name.startsWith('twitter') && file.name.endsWith('.zip')) {
|
||||
const [path, cleanup] = await createTempDir();
|
||||
|
||||
this.logger.info(`Temp dir is ${path}`);
|
||||
this.logger.debug(`Temp dir is ${path}`);
|
||||
|
||||
const destPath = path + '/twitter.zip';
|
||||
|
||||
|
|
@ -192,15 +194,13 @@ export class ImportNotesProcessorService {
|
|||
await fsp.writeFile(destPath, '', 'binary');
|
||||
await this.downloadUrl(file.url, destPath);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
if (e instanceof Error || typeof e === 'string') {
|
||||
this.logger.error(e);
|
||||
}
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
throw e;
|
||||
}
|
||||
|
||||
const outputPath = path + '/twitter';
|
||||
try {
|
||||
this.logger.succ(`Unzipping to ${outputPath}`);
|
||||
this.logger.debug(`Unzipping to ${outputPath}`);
|
||||
ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath));
|
||||
|
||||
const unprocessedTweets = this.parseTwitterFile(await fsp.readFile(outputPath + '/data/tweets.js', 'utf-8'));
|
||||
|
|
@ -214,7 +214,7 @@ export class ImportNotesProcessorService {
|
|||
} else if (type === 'Facebook' || file.name.startsWith('facebook-') && file.name.endsWith('.zip')) {
|
||||
const [path, cleanup] = await createTempDir();
|
||||
|
||||
this.logger.info(`Temp dir is ${path}`);
|
||||
this.logger.debug(`Temp dir is ${path}`);
|
||||
|
||||
const destPath = path + '/facebook.zip';
|
||||
|
||||
|
|
@ -222,15 +222,13 @@ export class ImportNotesProcessorService {
|
|||
await fsp.writeFile(destPath, '', 'binary');
|
||||
await this.downloadUrl(file.url, destPath);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
if (e instanceof Error || typeof e === 'string') {
|
||||
this.logger.error(e);
|
||||
}
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
throw e;
|
||||
}
|
||||
|
||||
const outputPath = path + '/facebook';
|
||||
try {
|
||||
this.logger.succ(`Unzipping to ${outputPath}`);
|
||||
this.logger.debug(`Unzipping to ${outputPath}`);
|
||||
ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath));
|
||||
const postsJson = await fsp.readFile(outputPath + '/your_activity_across_facebook/posts/your_posts__check_ins__photos_and_videos_1.json', 'utf-8');
|
||||
const posts = JSON.parse(postsJson);
|
||||
|
|
@ -247,7 +245,7 @@ export class ImportNotesProcessorService {
|
|||
} else if (file.name.endsWith('.zip')) {
|
||||
const [path, cleanup] = await createTempDir();
|
||||
|
||||
this.logger.info(`Temp dir is ${path}`);
|
||||
this.logger.debug(`Temp dir is ${path}`);
|
||||
|
||||
const destPath = path + '/unknown.zip';
|
||||
|
||||
|
|
@ -255,15 +253,13 @@ export class ImportNotesProcessorService {
|
|||
await fsp.writeFile(destPath, '', 'binary');
|
||||
await this.downloadUrl(file.url, destPath);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
if (e instanceof Error || typeof e === 'string') {
|
||||
this.logger.error(e);
|
||||
}
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
throw e;
|
||||
}
|
||||
|
||||
const outputPath = path + '/unknown';
|
||||
try {
|
||||
this.logger.succ(`Unzipping to ${outputPath}`);
|
||||
this.logger.debug(`Unzipping to ${outputPath}`);
|
||||
ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath));
|
||||
const isInstagram = type === 'Instagram' || fs.existsSync(outputPath + '/instagram_live') || fs.existsSync(outputPath + '/instagram_ads_and_businesses');
|
||||
const isOutbox = type === 'Mastodon' || fs.existsSync(outputPath + '/outbox.json');
|
||||
|
|
@ -307,15 +303,13 @@ export class ImportNotesProcessorService {
|
|||
} else if (job.data.type === 'Misskey' || file.name.startsWith('notes-') && file.name.endsWith('.json')) {
|
||||
const [path, cleanup] = await createTemp();
|
||||
|
||||
this.logger.info(`Temp dir is ${path}`);
|
||||
this.logger.debug(`Temp dir is ${path}`);
|
||||
|
||||
try {
|
||||
await fsp.writeFile(path, '', 'utf-8');
|
||||
await this.downloadUrl(file.url, path);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
if (e instanceof Error || typeof e === 'string') {
|
||||
this.logger.error(e);
|
||||
}
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
|
@ -326,7 +320,7 @@ export class ImportNotesProcessorService {
|
|||
cleanup();
|
||||
}
|
||||
|
||||
this.logger.succ('Import jobs created');
|
||||
this.logger.debug('Import jobs created');
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
@ -365,7 +359,7 @@ export class ImportNotesProcessorService {
|
|||
try {
|
||||
await this.downloadUrl(file.url, filePath);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
this.logger.error(e instanceof Error ? e : new Error(e as string));
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
}
|
||||
const driveFile = await this.driveService.addFile({
|
||||
user: user,
|
||||
|
|
@ -504,7 +498,7 @@ export class ImportNotesProcessorService {
|
|||
try {
|
||||
await this.downloadUrl(file.url, filePath);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
this.logger.error(e instanceof Error ? e : new Error(e as string));
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
}
|
||||
const driveFile = await this.driveService.addFile({
|
||||
user: user,
|
||||
|
|
@ -628,7 +622,7 @@ export class ImportNotesProcessorService {
|
|||
try {
|
||||
await this.downloadUrl(videos[0].url, filePath);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
this.logger.error(e instanceof Error ? e : new Error(e as string));
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
}
|
||||
const driveFile = await this.driveService.addFile({
|
||||
user: user,
|
||||
|
|
@ -653,7 +647,7 @@ export class ImportNotesProcessorService {
|
|||
try {
|
||||
await this.downloadUrl(file.media_url_https, filePath);
|
||||
} catch (e) { // TODO: 何度か再試行
|
||||
this.logger.error(e instanceof Error ? e : new Error(e as string));
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
}
|
||||
|
||||
const driveFile = await this.driveService.addFile({
|
||||
|
|
@ -673,7 +667,7 @@ export class ImportNotesProcessorService {
|
|||
const createdNote = await this.noteCreateService.import(user, { createdAt: date, reply: parentNote, text: text, files: files });
|
||||
if (tweet.childNotes) this.queueService.createImportTweetsToDbJob(user, tweet.childNotes, createdNote.id);
|
||||
} catch (e) {
|
||||
this.logger.warn(`Error: ${e}`);
|
||||
this.logger.error('Error importing notes:', e as Error);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import { UserListService } from '@/core/UserListService.js';
|
|||
import { IdService } from '@/core/IdService.js';
|
||||
import { UtilityService } from '@/core/UtilityService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
import type { DbUserImportJobData } from '../types.js';
|
||||
|
|
@ -48,10 +49,9 @@ export class ImportUserListsProcessorService {
|
|||
|
||||
@bindThis
|
||||
public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
|
||||
this.logger.info(`Importing user lists of ${job.data.user.id} ...`);
|
||||
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
this.logger.debug(`Skip: user ${job.data.user.id} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -59,9 +59,12 @@ export class ImportUserListsProcessorService {
|
|||
id: job.data.fileId,
|
||||
});
|
||||
if (file == null) {
|
||||
this.logger.debug(`Skip: file ${job.data.fileId} does not exist`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Importing user lists of ${job.data.user.id} ...`);
|
||||
|
||||
const csv = await this.downloadService.downloadTextFile(file.url);
|
||||
|
||||
let linenum = 0;
|
||||
|
|
@ -102,10 +105,10 @@ export class ImportUserListsProcessorService {
|
|||
|
||||
this.userListService.addMember(target, list!, user);
|
||||
} catch (e) {
|
||||
this.logger.warn(`Error in line:${linenum} ${e}`);
|
||||
this.logger.warn(`Error in line:${linenum} ${renderInlineError(e)}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.succ('Imported');
|
||||
this.logger.debug('Imported');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ import { SkApInboxLog } from '@/models/_.js';
|
|||
import type { Config } from '@/config.js';
|
||||
import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js';
|
||||
import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js';
|
||||
import { isRetryableError } from '@/misc/is-retryable-error.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type { InboxJobData } from '../types.js';
|
||||
|
||||
|
|
@ -125,6 +127,14 @@ export class InboxProcessorService implements OnApplicationShutdown {
|
|||
return `Old keyId is no longer supported. ${keyIdLower}`;
|
||||
}
|
||||
|
||||
if (activity.actor as unknown == null || (Array.isArray(activity.actor) && activity.actor.length < 1)) {
|
||||
return 'skip: activity has no actor';
|
||||
}
|
||||
if (typeof(activity.actor) !== 'string' && typeof(activity.actor) !== 'object') {
|
||||
return `skip: activity actor has invalid type: ${typeof(activity.actor)}`;
|
||||
}
|
||||
const actorId = getApId(activity.actor);
|
||||
|
||||
// HTTP-Signature keyIdを元にDBから取得
|
||||
let authUser: {
|
||||
user: MiRemoteUser;
|
||||
|
|
@ -134,26 +144,25 @@ export class InboxProcessorService implements OnApplicationShutdown {
|
|||
// keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
|
||||
if (authUser == null) {
|
||||
try {
|
||||
authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor));
|
||||
authUser = await this.apDbResolverService.getAuthUserFromApId(actorId);
|
||||
} catch (err) {
|
||||
// 対象が4xxならスキップ
|
||||
if (err instanceof StatusError) {
|
||||
if (!err.isRetryable) {
|
||||
throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`);
|
||||
}
|
||||
throw new Error(`Error in actor ${activity.actor} - ${err.statusCode}`);
|
||||
if (!isRetryableError(err)) {
|
||||
throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${actorId}`);
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
// それでもわからなければ終了
|
||||
if (authUser == null) {
|
||||
throw new Bull.UnrecoverableError(`skip: failed to resolve user ${getApId(activity.actor)}`);
|
||||
throw new Bull.UnrecoverableError(`skip: failed to resolve user ${actorId}`);
|
||||
}
|
||||
|
||||
// publicKey がなくても終了
|
||||
if (authUser.key == null) {
|
||||
throw new Bull.UnrecoverableError(`skip: failed to resolve user publicKey ${getApId(activity.actor)}`);
|
||||
throw new Bull.UnrecoverableError(`skip: failed to resolve user publicKey ${actorId}`);
|
||||
}
|
||||
|
||||
// HTTP-Signatureの検証
|
||||
|
|
@ -168,7 +177,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
// また、signatureのsignerは、activity.actorと一致する必要がある
|
||||
if (!httpSignatureValidated || authUser.user.uri !== getApId(activity.actor)) {
|
||||
if (!httpSignatureValidated || authUser.user.uri !== actorId) {
|
||||
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
|
||||
const ldSignature = activity.signature;
|
||||
if (ldSignature) {
|
||||
|
|
@ -213,13 +222,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
|
|||
activity.signature = ldSignature;
|
||||
|
||||
// もう一度actorチェック
|
||||
if (authUser.user.uri !== activity.actor) {
|
||||
throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
|
||||
if (authUser.user.uri !== actorId) {
|
||||
throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${actorId})`);
|
||||
}
|
||||
|
||||
const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
|
||||
if (!this.utilityService.isFederationAllowedHost(ldHost)) {
|
||||
throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`);
|
||||
throw new Bull.UnrecoverableError(`skip: request host is blocked: ${ldHost}`);
|
||||
}
|
||||
} else {
|
||||
throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`);
|
||||
|
|
@ -292,16 +301,8 @@ export class InboxProcessorService implements OnApplicationShutdown {
|
|||
}
|
||||
}
|
||||
|
||||
if (e instanceof StatusError && !e.isRetryable) {
|
||||
return `skip: permanent error ${e.statusCode}`;
|
||||
}
|
||||
|
||||
if (e instanceof IdentifiableError && !e.isRetryable) {
|
||||
if (e.message) {
|
||||
return `skip: permanent error ${e.id}: ${e.message}`;
|
||||
} else {
|
||||
return `skip: permanent error ${e.id}`;
|
||||
}
|
||||
if (!isRetryableError(e)) {
|
||||
return `skip: permanent error ${renderInlineError(e)}`;
|
||||
}
|
||||
|
||||
throw e;
|
||||
|
|
|
|||
|
|
@ -36,6 +36,6 @@ export class ResyncChartsProcessorService {
|
|||
await this.notesChart.resync();
|
||||
await this.usersChart.resync();
|
||||
|
||||
this.logger.succ('All charts successfully resynced.');
|
||||
this.logger.info('All charts successfully resynced.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import { DI } from '@/di-symbols.js';
|
|||
import { NotificationService } from '@/core/NotificationService.js';
|
||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||
import type { MiScheduleNoteType } from '@/models/NoteSchedule.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
import type { ScheduleNotePostJobData } from '../types.js';
|
||||
|
|
@ -129,10 +130,11 @@ export class ScheduleNotePostProcessorService {
|
|||
channel,
|
||||
}).catch(async (err: IdentifiableError) => {
|
||||
this.notificationService.createNotification(me.id, 'scheduledNoteFailed', {
|
||||
reason: err.message,
|
||||
reason: renderInlineError(err),
|
||||
});
|
||||
await this.noteScheduleRepository.remove(data);
|
||||
throw this.logger.error(`Schedule Note Failed Reason: ${err.message}`);
|
||||
this.logger.error(`Scheduled note failed: ${renderInlineError(err)}`);
|
||||
throw err;
|
||||
});
|
||||
await this.noteScheduleRepository.remove(data);
|
||||
this.notificationService.createNotification(me.id, 'scheduledNotePosted', {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import type Logger from '@/logger.js';
|
|||
import { HttpRequestService } from '@/core/HttpRequestService.js';
|
||||
import { StatusError } from '@/misc/status-error.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { renderInlineError } from '@/misc/render-inline-error.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import { SystemWebhookDeliverJobData } from '../types.js';
|
||||
|
||||
|
|
@ -63,21 +64,16 @@ export class SystemWebhookDeliverProcessorService {
|
|||
|
||||
return 'Success';
|
||||
} catch (res) {
|
||||
this.logger.error(res as Error);
|
||||
this.logger.error(`Failed to send webhook: ${renderInlineError(res)}`);
|
||||
|
||||
this.systemWebhooksRepository.update({ id: job.data.webhookId }, {
|
||||
latestSentAt: new Date(),
|
||||
latestStatus: res instanceof StatusError ? res.statusCode : 1,
|
||||
});
|
||||
|
||||
if (res instanceof StatusError) {
|
||||
if (res instanceof StatusError && !res.isRetryable) {
|
||||
// 4xx
|
||||
if (!res.isRetryable) {
|
||||
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
|
||||
}
|
||||
|
||||
// 5xx etc.
|
||||
throw new Error(`${res.statusCode} ${res.statusMessage}`);
|
||||
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
|
||||
} else {
|
||||
// DNS error, socket error, timeout ...
|
||||
throw res;
|
||||
|
|
|
|||
|
|
@ -62,6 +62,6 @@ export class TickChartsProcessorService {
|
|||
await this.perUserDriveChart.tick(false);
|
||||
await this.apRequestChart.tick(false);
|
||||
|
||||
this.logger.succ('All charts successfully ticked.');
|
||||
this.logger.info('All charts successfully ticked.');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,14 +69,9 @@ export class UserWebhookDeliverProcessorService {
|
|||
latestStatus: res instanceof StatusError ? res.statusCode : 1,
|
||||
});
|
||||
|
||||
if (res instanceof StatusError) {
|
||||
if (res instanceof StatusError && !res.isRetryable) {
|
||||
// 4xx
|
||||
if (!res.isRetryable) {
|
||||
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
|
||||
}
|
||||
|
||||
// 5xx etc.
|
||||
throw new Error(`${res.statusCode} ${res.statusMessage}`);
|
||||
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
|
||||
} else {
|
||||
// DNS error, socket error, timeout ...
|
||||
throw res;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue