2d67c640d7
Sequential await in batch loop meant 20 items took 20x per-item time. Switch to Promise.allSettled so all BATCH_SIZE workers run concurrently — each claims its own item atomically via findOneAndUpdate, no double-processing. Also bumps BATCH_SIZE from 10→20. Incoming rate was exceeding drain rate, causing unbounded queue growth. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
109 lines
3.4 KiB
JavaScript
109 lines
3.4 KiB
JavaScript
/**
|
|
* MongoDB-backed inbox processing queue.
|
|
* Runs a setInterval-based processor that dequeues and processes
|
|
* one activity at a time from ap_inbox_queue.
|
|
* @module inbox-queue
|
|
*/
|
|
|
|
import { routeToHandler } from "./inbox-handlers.js";
|
|
|
|
/**
|
|
* Process the next pending item from the inbox queue.
|
|
* Uses findOneAndUpdate for atomic claim (prevents double-processing).
|
|
*
|
|
* @param {object} collections - MongoDB collections
|
|
* @param {object} ctx - Fedify context
|
|
* @param {string} handle - Our actor handle
|
|
*/
|
|
async function processNextItem(collections, ctx, handle) {
|
|
const { ap_inbox_queue } = collections;
|
|
if (!ap_inbox_queue) return false;
|
|
|
|
const item = await ap_inbox_queue.findOneAndUpdate(
|
|
{ status: "pending" },
|
|
{ $set: { status: "processing" } },
|
|
{ sort: { receivedAt: 1 }, returnDocument: "after" },
|
|
);
|
|
if (!item) return false;
|
|
|
|
try {
|
|
await routeToHandler(item, collections, ctx, handle);
|
|
// Delete completed items immediately — prevents unbounded collection growth
|
|
// that caused the inbox processor to hang on restart (95K+ documents).
|
|
await ap_inbox_queue.deleteOne({ _id: item._id });
|
|
} catch (error) {
|
|
const attempts = (item.attempts || 0) + 1;
|
|
await ap_inbox_queue.updateOne(
|
|
{ _id: item._id },
|
|
{
|
|
$set: {
|
|
status: attempts >= (item.maxAttempts || 3) ? "failed" : "pending",
|
|
attempts,
|
|
error: error.message,
|
|
},
|
|
},
|
|
);
|
|
console.error(`[inbox-queue] Failed processing ${item.activityType} from ${item.actorUrl}: ${error.message}`);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Enqueue an activity for async processing.
|
|
* @param {object} collections - MongoDB collections
|
|
* @param {object} params
|
|
* @param {string} params.activityType - Activity type name
|
|
* @param {string} params.actorUrl - Actor URL
|
|
* @param {string} [params.objectUrl] - Object URL
|
|
* @param {object} params.rawJson - Full activity JSON-LD
|
|
*/
|
|
export async function enqueueActivity(collections, { activityType, actorUrl, objectUrl, rawJson }) {
|
|
const { ap_inbox_queue } = collections;
|
|
if (!ap_inbox_queue) return;
|
|
|
|
await ap_inbox_queue.insertOne({
|
|
activityType,
|
|
actorUrl: actorUrl || "",
|
|
objectUrl: objectUrl || "",
|
|
rawJson,
|
|
status: "pending",
|
|
attempts: 0,
|
|
maxAttempts: 3,
|
|
receivedAt: new Date().toISOString(),
|
|
processedAt: null,
|
|
error: null,
|
|
});
|
|
}
|
|
|
|
const BATCH_SIZE = 20;
|
|
const POLL_INTERVAL_MS = 1_000;
|
|
|
|
/**
|
|
* Start the background inbox processor.
|
|
* @param {object} collections - MongoDB collections
|
|
* @param {Function} getCtx - Function returning a Fedify context
|
|
* @param {string} handle - Our actor handle
|
|
* @returns {NodeJS.Timeout} Interval ID (for cleanup)
|
|
*/
|
|
export function startInboxProcessor(collections, getCtx, handle) {
|
|
const intervalId = setInterval(async () => {
|
|
try {
|
|
const ctx = getCtx();
|
|
if (!ctx) return;
|
|
// Run BATCH_SIZE workers in parallel — each atomically claims its own item
|
|
// via findOneAndUpdate, so there is no double-processing risk.
|
|
await Promise.allSettled(
|
|
Array.from({ length: BATCH_SIZE }, () =>
|
|
processNextItem(collections, ctx, handle),
|
|
),
|
|
);
|
|
} catch (error) {
|
|
console.error("[inbox-queue] Processor error:", error.message);
|
|
}
|
|
}, POLL_INTERVAL_MS);
|
|
|
|
console.info(`[ActivityPub] Inbox queue processor started (${POLL_INTERVAL_MS}ms interval, batch size ${BATCH_SIZE}, parallel)`);
|
|
return intervalId;
|
|
}
|