fix sporadic update failures caused by lack of data
This commit is contained in:
parent
63f2073af1
commit
1baa9b2f9d
1 changed files with 62 additions and 39 deletions
|
|
@ -104,37 +104,52 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
followingCountDelta: (oldJob.followingCountDelta ?? 0) + (newJob.followingCountDelta ?? 0),
|
||||
followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0),
|
||||
}),
|
||||
async (id, job) => await this.federatedInstanceService.update(id, {
|
||||
// Direct update if defined
|
||||
latestRequestReceivedAt: job.latestRequestReceivedAt,
|
||||
async (id, job) => {
|
||||
// Have to check this because all properties are optional
|
||||
if (
|
||||
job.latestRequestReceivedAt ||
|
||||
job.notRespondingSince !== undefined ||
|
||||
job.shouldSuspendNotResponding ||
|
||||
job.shouldSuspendGone ||
|
||||
job.shouldUnsuspend ||
|
||||
job.notesCountDelta ||
|
||||
job.usersCountDelta ||
|
||||
job.followingCountDelta ||
|
||||
job.followersCountDelta
|
||||
) {
|
||||
await this.federatedInstanceService.update(id, {
|
||||
// Direct update if defined
|
||||
latestRequestReceivedAt: job.latestRequestReceivedAt,
|
||||
|
||||
// null (responding) > Date (not responding)
|
||||
notRespondingSince: job.latestRequestReceivedAt
|
||||
? null
|
||||
: job.notRespondingSince,
|
||||
// null (responding) > Date (not responding)
|
||||
notRespondingSince: job.latestRequestReceivedAt
|
||||
? null
|
||||
: job.notRespondingSince,
|
||||
|
||||
// false (responding) > true (not responding)
|
||||
isNotResponding: job.latestRequestReceivedAt
|
||||
? false
|
||||
: job.notRespondingSince
|
||||
? true
|
||||
: undefined,
|
||||
// false (responding) > true (not responding)
|
||||
isNotResponding: job.latestRequestReceivedAt
|
||||
? false
|
||||
: job.notRespondingSince
|
||||
? true
|
||||
: undefined,
|
||||
|
||||
// gone > none > auto
|
||||
suspensionState: job.shouldSuspendGone
|
||||
? 'goneSuspended'
|
||||
: job.shouldUnsuspend
|
||||
? 'none'
|
||||
: job.shouldSuspendNotResponding
|
||||
? 'autoSuspendedForNotResponding'
|
||||
: undefined,
|
||||
// gone > none > auto
|
||||
suspensionState: job.shouldSuspendGone
|
||||
? 'goneSuspended'
|
||||
: job.shouldUnsuspend
|
||||
? 'none'
|
||||
: job.shouldSuspendNotResponding
|
||||
? 'autoSuspendedForNotResponding'
|
||||
: undefined,
|
||||
|
||||
// Increment if defined
|
||||
notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined,
|
||||
usersCount: job.usersCountDelta ? () => `"usersCount" + ${job.usersCountDelta}` : undefined,
|
||||
followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined,
|
||||
followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined,
|
||||
}),
|
||||
// Increment if defined
|
||||
notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined,
|
||||
usersCount: job.usersCountDelta ? () => `"usersCount" + ${job.usersCountDelta}` : undefined,
|
||||
followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined,
|
||||
followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined,
|
||||
});
|
||||
}
|
||||
},
|
||||
{
|
||||
onError: this.onQueueError,
|
||||
concurrency: 2, // Low concurrency, this table is slow for some reason
|
||||
|
|
@ -161,13 +176,16 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
followersCountDelta: (oldJob.followersCountDelta ?? 0) + (newJob.followersCountDelta ?? 0),
|
||||
}),
|
||||
async (id, job) => {
|
||||
await this.usersRepository.update({ id }, {
|
||||
updatedAt: job.updatedAt,
|
||||
notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined,
|
||||
followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined,
|
||||
followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined,
|
||||
});
|
||||
await this.internalEventService.emit('userUpdated', { id });
|
||||
// Have to check this because all properties are optional
|
||||
if (job.updatedAt || job.notesCountDelta || job.followingCountDelta || job.followersCountDelta) {
|
||||
await this.usersRepository.update({ id }, {
|
||||
updatedAt: job.updatedAt,
|
||||
notesCount: job.notesCountDelta ? () => `"notesCount" + ${job.notesCountDelta}` : undefined,
|
||||
followingCount: job.followingCountDelta ? () => `"followingCount" + ${job.followingCountDelta}` : undefined,
|
||||
followersCount: job.followersCountDelta ? () => `"followersCount" + ${job.followersCountDelta}` : undefined,
|
||||
});
|
||||
await this.internalEventService.emit('userUpdated', { id });
|
||||
}
|
||||
},
|
||||
{
|
||||
onError: this.onQueueError,
|
||||
|
|
@ -190,11 +208,16 @@ export class CollapsedQueueService implements OnApplicationShutdown {
|
|||
renoteCountDelta: (oldJob.renoteCountDelta ?? 0) + (newJob.renoteCountDelta ?? 0),
|
||||
clippedCountDelta: (oldJob.clippedCountDelta ?? 0) + (newJob.clippedCountDelta ?? 0),
|
||||
}),
|
||||
async (id, job) => await this.notesRepository.update({ id }, {
|
||||
repliesCount: job.repliesCountDelta ? () => `"repliesCount" + ${job.repliesCountDelta}` : undefined,
|
||||
renoteCount: job.renoteCountDelta ? () => `"renoteCount" + ${job.renoteCountDelta}` : undefined,
|
||||
clippedCount: job.clippedCountDelta ? () => `"clippedCount" + ${job.clippedCountDelta}` : undefined,
|
||||
}),
|
||||
async (id, job) => {
|
||||
// Have to check this because all properties are optional
|
||||
if (job.repliesCountDelta || job.renoteCountDelta || job.clippedCountDelta) {
|
||||
await this.notesRepository.update({ id }, {
|
||||
repliesCount: job.repliesCountDelta ? () => `"repliesCount" + ${job.repliesCountDelta}` : undefined,
|
||||
renoteCount: job.renoteCountDelta ? () => `"renoteCount" + ${job.renoteCountDelta}` : undefined,
|
||||
clippedCount: job.clippedCountDelta ? () => `"clippedCount" + ${job.clippedCountDelta}` : undefined,
|
||||
});
|
||||
}
|
||||
},
|
||||
{
|
||||
onError: this.onQueueError,
|
||||
concurrency: 4, // High concurrency - this queue gets a lot of activity
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue