diff --git a/lib/federation-setup.js b/lib/federation-setup.js index 26815aa..5fb7ab0 100644 --- a/lib/federation-setup.js +++ b/lib/federation-setup.js @@ -41,6 +41,7 @@ import { MongoKvStore } from "./kv-store.js"; import { registerInboxListeners } from "./inbox-listeners.js"; import { jf2ToAS2Activity, resolvePostUrl } from "./jf2-to-as2.js"; import { cachedQuery } from "./redis-cache.js"; +import { onOutboxPermanentFailure } from "./outbox-failure.js"; const COLLECTION_CACHE_TTL = 300; // 5 minutes @@ -342,25 +343,15 @@ export function setupFederation(options) { }); // Handle permanent delivery failures (Fedify 2.0). - // Fires when a remote inbox returns 404/410 — the server is gone. - // Log it and let the admin see which followers are unreachable. + // Fires when a remote inbox returns 404/410. + // 410: immediate full cleanup. 404: strike system (3 strikes over 7 days). federation.setOutboxPermanentFailureHandler(async (_ctx, values) => { - const { inbox, error, actorIds } = values; - const inboxUrl = inbox?.href || String(inbox); - const actors = actorIds?.map((id) => id?.href || String(id)) || []; - console.warn( - `[ActivityPub] Permanent delivery failure to ${inboxUrl}: ${error?.message || "unknown"}` + - (actors.length ? ` (actors: ${actors.join(", ")})` : ""), + await onOutboxPermanentFailure( + values.statusCode, + values.actorIds, + values.inbox, + collections, ); - collections.ap_activities.insertOne({ - direction: "outbound", - type: "DeliveryFailed", - actorUrl: publicationUrl, - objectUrl: inboxUrl, - summary: `Permanent delivery failure to ${inboxUrl}: ${error?.message || "unknown"}`, - affectedActors: actors, - receivedAt: new Date().toISOString(), - }).catch(() => {}); }); // Wrap with debug dashboard if enabled. The debugger proxies the diff --git a/lib/inbox-handlers.js b/lib/inbox-handlers.js index 3e736dc..f40ca46 100644 --- a/lib/inbox-handlers.js +++ b/lib/inbox-handlers.js @@ -152,6 +152,73 @@ function isDirectMessage(object, ourActorUrl, followersUrl) { return true; } +/** + * Compute post visibility from to/cc addressing fields. + * Matches Hollo's write-time visibility classification. + * + * @param {object} object - Fedify object (Note, Article, etc.) + * @returns {"public"|"unlisted"|"private"|"direct"} + */ +function computeVisibility(object) { + const to = new Set((object.toIds || []).map((u) => u.href)); + const cc = new Set((object.ccIds || []).map((u) => u.href)); + + if (to.has(PUBLIC)) return "public"; + if (cc.has(PUBLIC)) return "unlisted"; + // Without knowing the remote actor's followers URL, we can't distinguish + // "private" (followers-only) from "direct". Both are non-public. + if (to.size > 0 || cc.size > 0) return "private"; + return "direct"; +} + +/** + * Recursively fetch and store ancestor posts for a reply chain. + * Each ancestor is stored with isContext: true so it can be filtered + * from the main timeline while being available for thread views. + * + * @param {object} object - Fedify object (Note, Article, etc.) + * @param {object} collections - MongoDB collections + * @param {object} authLoader - Authenticated document loader + * @param {number} maxDepth - Maximum recursion depth + */ +async function fetchReplyChain(object, collections, authLoader, maxDepth) { + if (maxDepth <= 0) return; + const parentUrl = object.replyTargetId?.href; + if (!parentUrl) return; + + // Skip if we already have this post + if (collections.ap_timeline) { + const existing = await collections.ap_timeline.findOne({ uid: parentUrl }); + if (existing) return; + } + + // Fetch the parent post + let parent; + try { + parent = await object.getReplyTarget({ documentLoader: authLoader }); + } catch { + // Remote server unreachable — stop climbing + return; + } + if (!parent || !parent.id) return; + + // Store as context item + try { + const timelineItem = await extractObjectData(parent, { + documentLoader: authLoader, + }); + timelineItem.isContext = true; + timelineItem.visibility = computeVisibility(parent); + await addTimelineItem(collections, timelineItem); + } catch { + // Extraction failed — stop climbing + return; + } + + // Recurse for the parent's parent + await fetchReplyChain(parent, collections, authLoader, maxDepth - 1); +} + // --------------------------------------------------------------------------- // Individual handlers // --------------------------------------------------------------------------- @@ -515,7 +582,7 @@ export async function handleAnnounce(item, collections, ctx, handle) { boostedAt: announce.published ? String(announce.published) : new Date().toISOString(), documentLoader: authLoader, }); - + timelineItem.visibility = computeVisibility(object); await addTimelineItem(collections, timelineItem); // Fire-and-forget quote enrichment for boosted posts @@ -688,6 +755,18 @@ export async function handleCreate(item, collections, ctx, handle) { } } + // --- Recursive reply chain fetching --- + // Fetch and store ancestor posts so conversation threads have context. + // Each ancestor is stored with isContext: true to distinguish from organic timeline items. + if (inReplyTo) { + try { + await fetchReplyChain(object, collections, authLoader, 5); + } catch (error) { + // Non-critical — incomplete context is acceptable + console.warn("[inbox-handlers] Reply chain fetch failed:", error.message); + } + } + // Check for mentions of our actor if (object.tag) { const tags = Array.isArray(object.tag) ? object.tag : [object.tag]; @@ -728,6 +807,7 @@ export async function handleCreate(item, collections, ctx, handle) { actorFallback: actorObj, documentLoader: authLoader, }); + timelineItem.visibility = computeVisibility(object); await addTimelineItem(collections, timelineItem); // Fire-and-forget OG unfurling for notes and articles (not boosts) @@ -768,6 +848,7 @@ export async function handleCreate(item, collections, ctx, handle) { actorFallback: actorObj, documentLoader: authLoader, }); + timelineItem.visibility = computeVisibility(object); await addTimelineItem(collections, timelineItem); } } diff --git a/lib/inbox-listeners.js b/lib/inbox-listeners.js index 209e602..12c845e 100644 --- a/lib/inbox-listeners.js +++ b/lib/inbox-listeners.js @@ -27,6 +27,7 @@ import { import { isServerBlocked } from "./storage/server-blocks.js"; import { touchKeyFreshness } from "./key-refresh.js"; +import { resetDeliveryStrikes } from "./outbox-failure.js"; import { enqueueActivity } from "./inbox-queue.js"; import { extractActorInfo } from "./timeline-store.js"; import { addNotification } from "./storage/notifications.js"; @@ -53,6 +54,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = follow.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); const authLoader = await getAuthLoader(ctx); const followerActor = await follow.getActor({ documentLoader: authLoader }); @@ -152,6 +154,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = undo.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Undo", @@ -165,6 +168,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = accept.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Accept", @@ -178,6 +182,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = reject.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Reject", @@ -191,6 +196,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = like.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Like", @@ -205,6 +211,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = announce.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Announce", @@ -219,11 +226,47 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = create.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); + + // Forward public replies to our posts to our followers. + // Must happen here (not in async handler) because forwardActivity + // is only available on InboxContext, not base Context. + const objectUrl = create.objectId?.href || ""; + try { + const obj = await create.getObject(); + const inReplyTo = obj?.replyTargetId?.href || ""; + if ( + inReplyTo && + collections._publicationUrl && + inReplyTo.startsWith(collections._publicationUrl) + ) { + // Check if the reply is public (to/cc includes PUBLIC collection) + const toUrls = (obj.toIds || []).map((u) => u.href); + const ccUrls = (obj.ccIds || []).map((u) => u.href); + const isPublic = [...toUrls, ...ccUrls].includes( + "https://www.w3.org/ns/activitystreams#Public", + ); + if (isPublic) { + await ctx.forwardActivity( + { identifier: handle }, + "followers", + { + skipIfUnsigned: true, + preferSharedInbox: true, + excludeBaseUris: [new URL(ctx.origin)], + }, + ); + } + } + } catch (error) { + // Non-critical — forwarding failure shouldn't block processing + console.warn("[inbox-listeners] Reply forwarding failed:", error.message); + } await enqueueActivity(collections, { activityType: "Create", actorUrl, - objectUrl: create.objectId?.href || "", + objectUrl, rawJson: await create.toJsonLd(), }); }) @@ -233,6 +276,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = del.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Delete", @@ -247,6 +291,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = move.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Move", @@ -260,6 +305,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = update.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Update", @@ -299,6 +345,7 @@ export function registerInboxListeners(inboxChain, options) { const actorUrl = flag.actorId?.href || ""; if (await isServerBlocked(actorUrl, collections)) return; await touchKeyFreshness(collections, actorUrl); + await resetDeliveryStrikes(collections, actorUrl); await enqueueActivity(collections, { activityType: "Flag", diff --git a/lib/outbox-failure.js b/lib/outbox-failure.js new file mode 100644 index 0000000..9cadc74 --- /dev/null +++ b/lib/outbox-failure.js @@ -0,0 +1,139 @@ +/** + * Outbox permanent failure handling. + * Cleans up dead followers when delivery permanently fails. + * + * - 410 Gone: Immediate full cleanup (actor is permanently gone) + * - 404: Strike system — 3 failures over 7+ days triggers full cleanup + * + * @module outbox-failure + */ + +import { logActivity } from "./activity-log.js"; + +const STRIKE_THRESHOLD = 3; +const STRIKE_WINDOW_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + +/** + * Clean up all data associated with an actor. + * Removes follower record, their timeline items, and their notifications. + * + * @param {object} collections - MongoDB collections + * @param {string} actorUrl - Actor URL to clean up + * @param {string} reason - Reason for cleanup (for logging) + */ +async function cleanupActor(collections, actorUrl, reason) { + const { ap_followers, ap_timeline, ap_notifications } = collections; + + // Remove from followers + const deleted = await ap_followers.deleteOne({ actorUrl }); + + // Remove their timeline items + if (ap_timeline) { + await ap_timeline.deleteMany({ "author.url": actorUrl }); + } + + // Remove their notifications + if (ap_notifications) { + await ap_notifications.deleteMany({ actorUrl }); + } + + if (deleted.deletedCount > 0) { + console.info(`[outbox-failure] Cleaned up actor ${actorUrl}: ${reason}`); + } +} + +/** + * Handle permanent outbox delivery failure. + * Called by Fedify's setOutboxPermanentFailureHandler. + * + * @param {number} statusCode - HTTP status code (404, 410, etc.) + * @param {readonly URL[]} actorIds - Array of actor ID URLs + * @param {URL} inbox - The inbox URL that failed + * @param {object} collections - MongoDB collections + */ +export async function onOutboxPermanentFailure(statusCode, actorIds, inbox, collections) { + const inboxUrl = inbox?.href || String(inbox); + + for (const actorId of actorIds) { + const actorUrl = actorId?.href || String(actorId); + + if (statusCode === 410) { + // 410 Gone — immediate full cleanup + await cleanupActor(collections, actorUrl, `410 Gone from ${inboxUrl}`); + + await logActivity(collections.ap_activities, { + direction: "outbound", + type: "DeliveryFailed:410", + actorUrl, + objectUrl: inboxUrl, + summary: `Permanent delivery failure (410 Gone) to ${inboxUrl} — actor cleaned up`, + }, {}); + } else { + // 404 or other — strike system + const now = new Date(); + const result = await collections.ap_followers.findOneAndUpdate( + { actorUrl }, + { + $inc: { deliveryFailures: 1 }, + $setOnInsert: { firstFailureAt: now.toISOString() }, + $set: { lastFailureAt: now.toISOString() }, + }, + { returnDocument: "after" }, + ); + + if (!result) { + // Not a follower — nothing to track or clean up + continue; + } + + const failures = result.deliveryFailures || 1; + const firstFailure = result.firstFailureAt + ? new Date(result.firstFailureAt) + : now; + const windowElapsed = now.getTime() - firstFailure.getTime() >= STRIKE_WINDOW_MS; + + if (failures >= STRIKE_THRESHOLD && windowElapsed) { + // Confirmed dead — full cleanup + await cleanupActor( + collections, + actorUrl, + `${failures} failures over ${Math.round((now.getTime() - firstFailure.getTime()) / 86400000)}d (HTTP ${statusCode})`, + ); + + await logActivity(collections.ap_activities, { + direction: "outbound", + type: `DeliveryFailed:${statusCode}:cleanup`, + actorUrl, + objectUrl: inboxUrl, + summary: `${failures} delivery failures over 7+ days — actor cleaned up`, + }, {}); + } else { + // Strike recorded, not yet confirmed dead + await logActivity(collections.ap_activities, { + direction: "outbound", + type: `DeliveryFailed:${statusCode}:strike`, + actorUrl, + objectUrl: inboxUrl, + summary: `Delivery strike ${failures}/${STRIKE_THRESHOLD} for ${actorUrl} (HTTP ${statusCode})`, + }, {}); + } + } + } +} + +/** + * Reset delivery failure strikes for an actor. + * Called when we receive an inbound activity from an actor, + * proving they are alive despite previous delivery failures. + * + * @param {object} collections - MongoDB collections + * @param {string} actorUrl - Actor URL + */ +export async function resetDeliveryStrikes(collections, actorUrl) { + if (!actorUrl) return; + // Only update if the fields exist — avoid unnecessary writes + await collections.ap_followers.updateOne( + { actorUrl, deliveryFailures: { $exists: true } }, + { $unset: { deliveryFailures: "", firstFailureAt: "", lastFailureAt: "" } }, + ); +}