From 2d67c640d7d12c6a6c453b708be295377ba87a91 Mon Sep 17 00:00:00 2001 From: svemagie <869694+svemagie@users.noreply.github.com> Date: Sat, 25 Apr 2026 19:04:19 +0200 Subject: [PATCH] perf(inbox-queue): parallel batch processing to drain queue backlog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sequential await in batch loop meant 20 items took 20x per-item time. Switch to Promise.allSettled so all BATCH_SIZE workers run concurrently — each claims its own item atomically via findOneAndUpdate, no double-processing. Also bumps BATCH_SIZE from 10→20. Incoming rate was exceeding drain rate, causing unbounded queue growth. Co-Authored-By: Claude Sonnet 4.6 --- lib/inbox-queue.js | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/inbox-queue.js b/lib/inbox-queue.js index 2df019f..092fb21 100644 --- a/lib/inbox-queue.js +++ b/lib/inbox-queue.js @@ -76,7 +76,7 @@ export async function enqueueActivity(collections, { activityType, actorUrl, obj }); } -const BATCH_SIZE = 10; +const BATCH_SIZE = 20; const POLL_INTERVAL_MS = 1_000; /** @@ -91,15 +91,18 @@ export function startInboxProcessor(collections, getCtx, handle) { try { const ctx = getCtx(); if (!ctx) return; - for (let i = 0; i < BATCH_SIZE; i++) { - const hadWork = await processNextItem(collections, ctx, handle); - if (!hadWork) break; // Queue empty, stop early - } + // Run BATCH_SIZE workers in parallel — each atomically claims its own item + // via findOneAndUpdate, so there is no double-processing risk. + await Promise.allSettled( + Array.from({ length: BATCH_SIZE }, () => + processNextItem(collections, ctx, handle), + ), + ); } catch (error) { console.error("[inbox-queue] Processor error:", error.message); } }, POLL_INTERVAL_MS); - console.info(`[ActivityPub] Inbox queue processor started (${POLL_INTERVAL_MS}ms interval, batch size ${BATCH_SIZE})`); + console.info(`[ActivityPub] Inbox queue processor started (${POLL_INTERVAL_MS}ms interval, batch size ${BATCH_SIZE}, parallel)`); return intervalId; }