perf(inbox-queue): parallel batch processing to drain queue backlog

Sequential await in batch loop meant 20 items took 20x per-item time.
Switch to Promise.allSettled so all BATCH_SIZE workers run concurrently —
each claims its own item atomically via findOneAndUpdate, no double-processing.
Also bumps BATCH_SIZE from 10→20. Incoming rate was exceeding drain rate,
causing unbounded queue growth.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
svemagie
2026-04-25 19:04:19 +02:00
parent c64b7877a2
commit 2d67c640d7
+9 -6
View File
@@ -76,7 +76,7 @@ export async function enqueueActivity(collections, { activityType, actorUrl, obj
});
}
const BATCH_SIZE = 10;
const BATCH_SIZE = 20;
const POLL_INTERVAL_MS = 1_000;
/**
@@ -91,15 +91,18 @@ export function startInboxProcessor(collections, getCtx, handle) {
try {
const ctx = getCtx();
if (!ctx) return;
for (let i = 0; i < BATCH_SIZE; i++) {
const hadWork = await processNextItem(collections, ctx, handle);
if (!hadWork) break; // Queue empty, stop early
}
// Run BATCH_SIZE workers in parallel — each atomically claims its own item
// via findOneAndUpdate, so there is no double-processing risk.
await Promise.allSettled(
Array.from({ length: BATCH_SIZE }, () =>
processNextItem(collections, ctx, handle),
),
);
} catch (error) {
console.error("[inbox-queue] Processor error:", error.message);
}
}, POLL_INTERVAL_MS);
console.info(`[ActivityPub] Inbox queue processor started (${POLL_INTERVAL_MS}ms interval, batch size ${BATCH_SIZE})`);
console.info(`[ActivityPub] Inbox queue processor started (${POLL_INTERVAL_MS}ms interval, batch size ${BATCH_SIZE}, parallel)`);
return intervalId;
}