feat: Hollo-inspired federation patterns — outbox failure handling, reply chains, forwarding, visibility
- Add outbox permanent failure handling with smart cleanup: - 410 Gone: immediate full cleanup (follower + timeline + notifications) - 404: strike system (3 failures over 7+ days triggers cleanup) - Strike reset on inbound activity (proves actor is alive) - Add recursive reply chain fetching (depth 5) with isContext flag - Add reply forwarding to followers for public replies to our posts - Add write-time visibility classification (public/unlisted/private/direct) Confab-Link: http://localhost:8080/sessions/af5f8b45-6b8d-442d-8f25-78c326190709
This commit is contained in:
+8
-17
@@ -41,6 +41,7 @@ import { MongoKvStore } from "./kv-store.js";
|
|||||||
import { registerInboxListeners } from "./inbox-listeners.js";
|
import { registerInboxListeners } from "./inbox-listeners.js";
|
||||||
import { jf2ToAS2Activity, resolvePostUrl } from "./jf2-to-as2.js";
|
import { jf2ToAS2Activity, resolvePostUrl } from "./jf2-to-as2.js";
|
||||||
import { cachedQuery } from "./redis-cache.js";
|
import { cachedQuery } from "./redis-cache.js";
|
||||||
|
import { onOutboxPermanentFailure } from "./outbox-failure.js";
|
||||||
|
|
||||||
const COLLECTION_CACHE_TTL = 300; // 5 minutes
|
const COLLECTION_CACHE_TTL = 300; // 5 minutes
|
||||||
|
|
||||||
@@ -342,25 +343,15 @@ export function setupFederation(options) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Handle permanent delivery failures (Fedify 2.0).
|
// Handle permanent delivery failures (Fedify 2.0).
|
||||||
// Fires when a remote inbox returns 404/410 — the server is gone.
|
// Fires when a remote inbox returns 404/410.
|
||||||
// Log it and let the admin see which followers are unreachable.
|
// 410: immediate full cleanup. 404: strike system (3 strikes over 7 days).
|
||||||
federation.setOutboxPermanentFailureHandler(async (_ctx, values) => {
|
federation.setOutboxPermanentFailureHandler(async (_ctx, values) => {
|
||||||
const { inbox, error, actorIds } = values;
|
await onOutboxPermanentFailure(
|
||||||
const inboxUrl = inbox?.href || String(inbox);
|
values.statusCode,
|
||||||
const actors = actorIds?.map((id) => id?.href || String(id)) || [];
|
values.actorIds,
|
||||||
console.warn(
|
values.inbox,
|
||||||
`[ActivityPub] Permanent delivery failure to ${inboxUrl}: ${error?.message || "unknown"}` +
|
collections,
|
||||||
(actors.length ? ` (actors: ${actors.join(", ")})` : ""),
|
|
||||||
);
|
);
|
||||||
collections.ap_activities.insertOne({
|
|
||||||
direction: "outbound",
|
|
||||||
type: "DeliveryFailed",
|
|
||||||
actorUrl: publicationUrl,
|
|
||||||
objectUrl: inboxUrl,
|
|
||||||
summary: `Permanent delivery failure to ${inboxUrl}: ${error?.message || "unknown"}`,
|
|
||||||
affectedActors: actors,
|
|
||||||
receivedAt: new Date().toISOString(),
|
|
||||||
}).catch(() => {});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Wrap with debug dashboard if enabled. The debugger proxies the
|
// Wrap with debug dashboard if enabled. The debugger proxies the
|
||||||
|
|||||||
+82
-1
@@ -152,6 +152,73 @@ function isDirectMessage(object, ourActorUrl, followersUrl) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute post visibility from to/cc addressing fields.
|
||||||
|
* Matches Hollo's write-time visibility classification.
|
||||||
|
*
|
||||||
|
* @param {object} object - Fedify object (Note, Article, etc.)
|
||||||
|
* @returns {"public"|"unlisted"|"private"|"direct"}
|
||||||
|
*/
|
||||||
|
function computeVisibility(object) {
|
||||||
|
const to = new Set((object.toIds || []).map((u) => u.href));
|
||||||
|
const cc = new Set((object.ccIds || []).map((u) => u.href));
|
||||||
|
|
||||||
|
if (to.has(PUBLIC)) return "public";
|
||||||
|
if (cc.has(PUBLIC)) return "unlisted";
|
||||||
|
// Without knowing the remote actor's followers URL, we can't distinguish
|
||||||
|
// "private" (followers-only) from "direct". Both are non-public.
|
||||||
|
if (to.size > 0 || cc.size > 0) return "private";
|
||||||
|
return "direct";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively fetch and store ancestor posts for a reply chain.
|
||||||
|
* Each ancestor is stored with isContext: true so it can be filtered
|
||||||
|
* from the main timeline while being available for thread views.
|
||||||
|
*
|
||||||
|
* @param {object} object - Fedify object (Note, Article, etc.)
|
||||||
|
* @param {object} collections - MongoDB collections
|
||||||
|
* @param {object} authLoader - Authenticated document loader
|
||||||
|
* @param {number} maxDepth - Maximum recursion depth
|
||||||
|
*/
|
||||||
|
async function fetchReplyChain(object, collections, authLoader, maxDepth) {
|
||||||
|
if (maxDepth <= 0) return;
|
||||||
|
const parentUrl = object.replyTargetId?.href;
|
||||||
|
if (!parentUrl) return;
|
||||||
|
|
||||||
|
// Skip if we already have this post
|
||||||
|
if (collections.ap_timeline) {
|
||||||
|
const existing = await collections.ap_timeline.findOne({ uid: parentUrl });
|
||||||
|
if (existing) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the parent post
|
||||||
|
let parent;
|
||||||
|
try {
|
||||||
|
parent = await object.getReplyTarget({ documentLoader: authLoader });
|
||||||
|
} catch {
|
||||||
|
// Remote server unreachable — stop climbing
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!parent || !parent.id) return;
|
||||||
|
|
||||||
|
// Store as context item
|
||||||
|
try {
|
||||||
|
const timelineItem = await extractObjectData(parent, {
|
||||||
|
documentLoader: authLoader,
|
||||||
|
});
|
||||||
|
timelineItem.isContext = true;
|
||||||
|
timelineItem.visibility = computeVisibility(parent);
|
||||||
|
await addTimelineItem(collections, timelineItem);
|
||||||
|
} catch {
|
||||||
|
// Extraction failed — stop climbing
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recurse for the parent's parent
|
||||||
|
await fetchReplyChain(parent, collections, authLoader, maxDepth - 1);
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Individual handlers
|
// Individual handlers
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
@@ -515,7 +582,7 @@ export async function handleAnnounce(item, collections, ctx, handle) {
|
|||||||
boostedAt: announce.published ? String(announce.published) : new Date().toISOString(),
|
boostedAt: announce.published ? String(announce.published) : new Date().toISOString(),
|
||||||
documentLoader: authLoader,
|
documentLoader: authLoader,
|
||||||
});
|
});
|
||||||
|
timelineItem.visibility = computeVisibility(object);
|
||||||
await addTimelineItem(collections, timelineItem);
|
await addTimelineItem(collections, timelineItem);
|
||||||
|
|
||||||
// Fire-and-forget quote enrichment for boosted posts
|
// Fire-and-forget quote enrichment for boosted posts
|
||||||
@@ -688,6 +755,18 @@ export async function handleCreate(item, collections, ctx, handle) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Recursive reply chain fetching ---
|
||||||
|
// Fetch and store ancestor posts so conversation threads have context.
|
||||||
|
// Each ancestor is stored with isContext: true to distinguish from organic timeline items.
|
||||||
|
if (inReplyTo) {
|
||||||
|
try {
|
||||||
|
await fetchReplyChain(object, collections, authLoader, 5);
|
||||||
|
} catch (error) {
|
||||||
|
// Non-critical — incomplete context is acceptable
|
||||||
|
console.warn("[inbox-handlers] Reply chain fetch failed:", error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Check for mentions of our actor
|
// Check for mentions of our actor
|
||||||
if (object.tag) {
|
if (object.tag) {
|
||||||
const tags = Array.isArray(object.tag) ? object.tag : [object.tag];
|
const tags = Array.isArray(object.tag) ? object.tag : [object.tag];
|
||||||
@@ -728,6 +807,7 @@ export async function handleCreate(item, collections, ctx, handle) {
|
|||||||
actorFallback: actorObj,
|
actorFallback: actorObj,
|
||||||
documentLoader: authLoader,
|
documentLoader: authLoader,
|
||||||
});
|
});
|
||||||
|
timelineItem.visibility = computeVisibility(object);
|
||||||
await addTimelineItem(collections, timelineItem);
|
await addTimelineItem(collections, timelineItem);
|
||||||
|
|
||||||
// Fire-and-forget OG unfurling for notes and articles (not boosts)
|
// Fire-and-forget OG unfurling for notes and articles (not boosts)
|
||||||
@@ -768,6 +848,7 @@ export async function handleCreate(item, collections, ctx, handle) {
|
|||||||
actorFallback: actorObj,
|
actorFallback: actorObj,
|
||||||
documentLoader: authLoader,
|
documentLoader: authLoader,
|
||||||
});
|
});
|
||||||
|
timelineItem.visibility = computeVisibility(object);
|
||||||
await addTimelineItem(collections, timelineItem);
|
await addTimelineItem(collections, timelineItem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+48
-1
@@ -27,6 +27,7 @@ import {
|
|||||||
|
|
||||||
import { isServerBlocked } from "./storage/server-blocks.js";
|
import { isServerBlocked } from "./storage/server-blocks.js";
|
||||||
import { touchKeyFreshness } from "./key-refresh.js";
|
import { touchKeyFreshness } from "./key-refresh.js";
|
||||||
|
import { resetDeliveryStrikes } from "./outbox-failure.js";
|
||||||
import { enqueueActivity } from "./inbox-queue.js";
|
import { enqueueActivity } from "./inbox-queue.js";
|
||||||
import { extractActorInfo } from "./timeline-store.js";
|
import { extractActorInfo } from "./timeline-store.js";
|
||||||
import { addNotification } from "./storage/notifications.js";
|
import { addNotification } from "./storage/notifications.js";
|
||||||
@@ -53,6 +54,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = follow.actorId?.href || "";
|
const actorUrl = follow.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
const authLoader = await getAuthLoader(ctx);
|
const authLoader = await getAuthLoader(ctx);
|
||||||
const followerActor = await follow.getActor({ documentLoader: authLoader });
|
const followerActor = await follow.getActor({ documentLoader: authLoader });
|
||||||
@@ -152,6 +154,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = undo.actorId?.href || "";
|
const actorUrl = undo.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Undo",
|
activityType: "Undo",
|
||||||
@@ -165,6 +168,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = accept.actorId?.href || "";
|
const actorUrl = accept.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Accept",
|
activityType: "Accept",
|
||||||
@@ -178,6 +182,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = reject.actorId?.href || "";
|
const actorUrl = reject.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Reject",
|
activityType: "Reject",
|
||||||
@@ -191,6 +196,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = like.actorId?.href || "";
|
const actorUrl = like.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Like",
|
activityType: "Like",
|
||||||
@@ -205,6 +211,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = announce.actorId?.href || "";
|
const actorUrl = announce.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Announce",
|
activityType: "Announce",
|
||||||
@@ -219,11 +226,47 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = create.actorId?.href || "";
|
const actorUrl = create.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
|
// Forward public replies to our posts to our followers.
|
||||||
|
// Must happen here (not in async handler) because forwardActivity
|
||||||
|
// is only available on InboxContext, not base Context.
|
||||||
|
const objectUrl = create.objectId?.href || "";
|
||||||
|
try {
|
||||||
|
const obj = await create.getObject();
|
||||||
|
const inReplyTo = obj?.replyTargetId?.href || "";
|
||||||
|
if (
|
||||||
|
inReplyTo &&
|
||||||
|
collections._publicationUrl &&
|
||||||
|
inReplyTo.startsWith(collections._publicationUrl)
|
||||||
|
) {
|
||||||
|
// Check if the reply is public (to/cc includes PUBLIC collection)
|
||||||
|
const toUrls = (obj.toIds || []).map((u) => u.href);
|
||||||
|
const ccUrls = (obj.ccIds || []).map((u) => u.href);
|
||||||
|
const isPublic = [...toUrls, ...ccUrls].includes(
|
||||||
|
"https://www.w3.org/ns/activitystreams#Public",
|
||||||
|
);
|
||||||
|
if (isPublic) {
|
||||||
|
await ctx.forwardActivity(
|
||||||
|
{ identifier: handle },
|
||||||
|
"followers",
|
||||||
|
{
|
||||||
|
skipIfUnsigned: true,
|
||||||
|
preferSharedInbox: true,
|
||||||
|
excludeBaseUris: [new URL(ctx.origin)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// Non-critical — forwarding failure shouldn't block processing
|
||||||
|
console.warn("[inbox-listeners] Reply forwarding failed:", error.message);
|
||||||
|
}
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Create",
|
activityType: "Create",
|
||||||
actorUrl,
|
actorUrl,
|
||||||
objectUrl: create.objectId?.href || "",
|
objectUrl,
|
||||||
rawJson: await create.toJsonLd(),
|
rawJson: await create.toJsonLd(),
|
||||||
});
|
});
|
||||||
})
|
})
|
||||||
@@ -233,6 +276,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = del.actorId?.href || "";
|
const actorUrl = del.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Delete",
|
activityType: "Delete",
|
||||||
@@ -247,6 +291,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = move.actorId?.href || "";
|
const actorUrl = move.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Move",
|
activityType: "Move",
|
||||||
@@ -260,6 +305,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = update.actorId?.href || "";
|
const actorUrl = update.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Update",
|
activityType: "Update",
|
||||||
@@ -299,6 +345,7 @@ export function registerInboxListeners(inboxChain, options) {
|
|||||||
const actorUrl = flag.actorId?.href || "";
|
const actorUrl = flag.actorId?.href || "";
|
||||||
if (await isServerBlocked(actorUrl, collections)) return;
|
if (await isServerBlocked(actorUrl, collections)) return;
|
||||||
await touchKeyFreshness(collections, actorUrl);
|
await touchKeyFreshness(collections, actorUrl);
|
||||||
|
await resetDeliveryStrikes(collections, actorUrl);
|
||||||
|
|
||||||
await enqueueActivity(collections, {
|
await enqueueActivity(collections, {
|
||||||
activityType: "Flag",
|
activityType: "Flag",
|
||||||
|
|||||||
@@ -0,0 +1,139 @@
|
|||||||
|
/**
|
||||||
|
* Outbox permanent failure handling.
|
||||||
|
* Cleans up dead followers when delivery permanently fails.
|
||||||
|
*
|
||||||
|
* - 410 Gone: Immediate full cleanup (actor is permanently gone)
|
||||||
|
* - 404: Strike system — 3 failures over 7+ days triggers full cleanup
|
||||||
|
*
|
||||||
|
* @module outbox-failure
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { logActivity } from "./activity-log.js";
|
||||||
|
|
||||||
|
const STRIKE_THRESHOLD = 3;
|
||||||
|
const STRIKE_WINDOW_MS = 7 * 24 * 60 * 60 * 1000; // 7 days
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clean up all data associated with an actor.
|
||||||
|
* Removes follower record, their timeline items, and their notifications.
|
||||||
|
*
|
||||||
|
* @param {object} collections - MongoDB collections
|
||||||
|
* @param {string} actorUrl - Actor URL to clean up
|
||||||
|
* @param {string} reason - Reason for cleanup (for logging)
|
||||||
|
*/
|
||||||
|
async function cleanupActor(collections, actorUrl, reason) {
|
||||||
|
const { ap_followers, ap_timeline, ap_notifications } = collections;
|
||||||
|
|
||||||
|
// Remove from followers
|
||||||
|
const deleted = await ap_followers.deleteOne({ actorUrl });
|
||||||
|
|
||||||
|
// Remove their timeline items
|
||||||
|
if (ap_timeline) {
|
||||||
|
await ap_timeline.deleteMany({ "author.url": actorUrl });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove their notifications
|
||||||
|
if (ap_notifications) {
|
||||||
|
await ap_notifications.deleteMany({ actorUrl });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (deleted.deletedCount > 0) {
|
||||||
|
console.info(`[outbox-failure] Cleaned up actor ${actorUrl}: ${reason}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle permanent outbox delivery failure.
|
||||||
|
* Called by Fedify's setOutboxPermanentFailureHandler.
|
||||||
|
*
|
||||||
|
* @param {number} statusCode - HTTP status code (404, 410, etc.)
|
||||||
|
* @param {readonly URL[]} actorIds - Array of actor ID URLs
|
||||||
|
* @param {URL} inbox - The inbox URL that failed
|
||||||
|
* @param {object} collections - MongoDB collections
|
||||||
|
*/
|
||||||
|
export async function onOutboxPermanentFailure(statusCode, actorIds, inbox, collections) {
|
||||||
|
const inboxUrl = inbox?.href || String(inbox);
|
||||||
|
|
||||||
|
for (const actorId of actorIds) {
|
||||||
|
const actorUrl = actorId?.href || String(actorId);
|
||||||
|
|
||||||
|
if (statusCode === 410) {
|
||||||
|
// 410 Gone — immediate full cleanup
|
||||||
|
await cleanupActor(collections, actorUrl, `410 Gone from ${inboxUrl}`);
|
||||||
|
|
||||||
|
await logActivity(collections.ap_activities, {
|
||||||
|
direction: "outbound",
|
||||||
|
type: "DeliveryFailed:410",
|
||||||
|
actorUrl,
|
||||||
|
objectUrl: inboxUrl,
|
||||||
|
summary: `Permanent delivery failure (410 Gone) to ${inboxUrl} — actor cleaned up`,
|
||||||
|
}, {});
|
||||||
|
} else {
|
||||||
|
// 404 or other — strike system
|
||||||
|
const now = new Date();
|
||||||
|
const result = await collections.ap_followers.findOneAndUpdate(
|
||||||
|
{ actorUrl },
|
||||||
|
{
|
||||||
|
$inc: { deliveryFailures: 1 },
|
||||||
|
$setOnInsert: { firstFailureAt: now.toISOString() },
|
||||||
|
$set: { lastFailureAt: now.toISOString() },
|
||||||
|
},
|
||||||
|
{ returnDocument: "after" },
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
// Not a follower — nothing to track or clean up
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const failures = result.deliveryFailures || 1;
|
||||||
|
const firstFailure = result.firstFailureAt
|
||||||
|
? new Date(result.firstFailureAt)
|
||||||
|
: now;
|
||||||
|
const windowElapsed = now.getTime() - firstFailure.getTime() >= STRIKE_WINDOW_MS;
|
||||||
|
|
||||||
|
if (failures >= STRIKE_THRESHOLD && windowElapsed) {
|
||||||
|
// Confirmed dead — full cleanup
|
||||||
|
await cleanupActor(
|
||||||
|
collections,
|
||||||
|
actorUrl,
|
||||||
|
`${failures} failures over ${Math.round((now.getTime() - firstFailure.getTime()) / 86400000)}d (HTTP ${statusCode})`,
|
||||||
|
);
|
||||||
|
|
||||||
|
await logActivity(collections.ap_activities, {
|
||||||
|
direction: "outbound",
|
||||||
|
type: `DeliveryFailed:${statusCode}:cleanup`,
|
||||||
|
actorUrl,
|
||||||
|
objectUrl: inboxUrl,
|
||||||
|
summary: `${failures} delivery failures over 7+ days — actor cleaned up`,
|
||||||
|
}, {});
|
||||||
|
} else {
|
||||||
|
// Strike recorded, not yet confirmed dead
|
||||||
|
await logActivity(collections.ap_activities, {
|
||||||
|
direction: "outbound",
|
||||||
|
type: `DeliveryFailed:${statusCode}:strike`,
|
||||||
|
actorUrl,
|
||||||
|
objectUrl: inboxUrl,
|
||||||
|
summary: `Delivery strike ${failures}/${STRIKE_THRESHOLD} for ${actorUrl} (HTTP ${statusCode})`,
|
||||||
|
}, {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset delivery failure strikes for an actor.
|
||||||
|
* Called when we receive an inbound activity from an actor,
|
||||||
|
* proving they are alive despite previous delivery failures.
|
||||||
|
*
|
||||||
|
* @param {object} collections - MongoDB collections
|
||||||
|
* @param {string} actorUrl - Actor URL
|
||||||
|
*/
|
||||||
|
export async function resetDeliveryStrikes(collections, actorUrl) {
|
||||||
|
if (!actorUrl) return;
|
||||||
|
// Only update if the fields exist — avoid unnecessary writes
|
||||||
|
await collections.ap_followers.updateOne(
|
||||||
|
{ actorUrl, deliveryFailures: { $exists: true } },
|
||||||
|
{ $unset: { deliveryFailures: "", firstFailureAt: "", lastFailureAt: "" } },
|
||||||
|
);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user