diff --git a/lib/inbox-queue.js b/lib/inbox-queue.js index 2df019f..092fb21 100644 --- a/lib/inbox-queue.js +++ b/lib/inbox-queue.js @@ -76,7 +76,7 @@ export async function enqueueActivity(collections, { activityType, actorUrl, obj }); } -const BATCH_SIZE = 10; +const BATCH_SIZE = 20; const POLL_INTERVAL_MS = 1_000; /** @@ -91,15 +91,18 @@ export function startInboxProcessor(collections, getCtx, handle) { try { const ctx = getCtx(); if (!ctx) return; - for (let i = 0; i < BATCH_SIZE; i++) { - const hadWork = await processNextItem(collections, ctx, handle); - if (!hadWork) break; // Queue empty, stop early - } + // Run BATCH_SIZE workers in parallel — each atomically claims its own item + // via findOneAndUpdate, so there is no double-processing risk. + await Promise.allSettled( + Array.from({ length: BATCH_SIZE }, () => + processNextItem(collections, ctx, handle), + ), + ); } catch (error) { console.error("[inbox-queue] Processor error:", error.message); } }, POLL_INTERVAL_MS); - console.info(`[ActivityPub] Inbox queue processor started (${POLL_INTERVAL_MS}ms interval, batch size ${BATCH_SIZE})`); + console.info(`[ActivityPub] Inbox queue processor started (${POLL_INTERVAL_MS}ms interval, batch size ${BATCH_SIZE}, parallel)`); return intervalId; }