diff --git a/index.js b/index.js index 5d837b5..3d4c474 100644 --- a/index.js +++ b/index.js @@ -36,6 +36,8 @@ import { unmuteController, blockController, unblockController, + blockServerController, + unblockServerController, moderationController, filterModeController, } from "./lib/controllers/moderation.js"; @@ -103,6 +105,9 @@ import { startBatchRefollow } from "./lib/batch-refollow.js"; import { logActivity } from "./lib/activity-log.js"; import { scheduleCleanup } from "./lib/timeline-cleanup.js"; import { runSeparateMentionsMigration } from "./lib/migrations/separate-mentions.js"; +import { loadBlockedServersToRedis } from "./lib/storage/server-blocks.js"; +import { scheduleKeyRefresh } from "./lib/key-refresh.js"; +import { startInboxProcessor } from "./lib/inbox-queue.js"; import { deleteFederationController } from "./lib/controllers/federation-delete.js"; import { federationMgmtController, @@ -308,6 +313,8 @@ export default class ActivityPubEndpoint { router.post("/admin/reader/unmute", unmuteController(mp, this)); router.post("/admin/reader/block", blockController(mp, this)); router.post("/admin/reader/unblock", unblockController(mp, this)); + router.post("/admin/reader/block-server", blockServerController(mp)); + router.post("/admin/reader/unblock-server", unblockServerController(mp)); router.get("/admin/followers", followersController(mp)); router.post("/admin/followers/approve", approveFollowController(mp, this)); router.post("/admin/followers/reject", rejectFollowController(mp, this)); @@ -1124,6 +1131,12 @@ export default class ActivityPubEndpoint { Indiekit.addCollection("ap_reports"); // Pending follow requests (manual approval) Indiekit.addCollection("ap_pending_follows"); + // Server-level blocks + Indiekit.addCollection("ap_blocked_servers"); + // Key freshness tracking for proactive refresh + Indiekit.addCollection("ap_key_freshness"); + // Async inbox processing queue + Indiekit.addCollection("ap_inbox_queue"); // Store collection references (posts resolved lazily) const indiekitCollections = Indiekit.collections; @@ -1151,6 +1164,12 @@ export default class ActivityPubEndpoint { ap_reports: indiekitCollections.get("ap_reports"), // Pending follow requests (manual approval) ap_pending_follows: indiekitCollections.get("ap_pending_follows"), + // Server-level blocks + ap_blocked_servers: indiekitCollections.get("ap_blocked_servers"), + // Key freshness tracking + ap_key_freshness: indiekitCollections.get("ap_key_freshness"), + // Async inbox processing queue + ap_inbox_queue: indiekitCollections.get("ap_inbox_queue"), get posts() { return indiekitCollections.get("posts"); }, @@ -1351,6 +1370,27 @@ export default class ActivityPubEndpoint { { requestedAt: -1 }, { background: true }, ); + // Server-level blocks + this._collections.ap_blocked_servers.createIndex( + { hostname: 1 }, + { unique: true, background: true }, + ); + // Key freshness tracking + this._collections.ap_key_freshness.createIndex( + { actorUrl: 1 }, + { unique: true, background: true }, + ); + + // Inbox queue indexes + this._collections.ap_inbox_queue.createIndex( + { status: 1, receivedAt: 1 }, + { background: true }, + ); + // TTL: auto-prune completed items after 24h + this._collections.ap_inbox_queue.createIndex( + { processedAt: 1 }, + { expireAfterSeconds: 86_400, background: true }, + ); } catch { // Index creation failed — collections not yet available. // Indexes already exist from previous startups; non-fatal. @@ -1446,6 +1486,34 @@ export default class ActivityPubEndpoint { if (this.options.timelineRetention > 0) { scheduleCleanup(this._collections, this.options.timelineRetention); } + + // Load server blocks into Redis for fast inbox checks + loadBlockedServersToRedis(this._collections).catch((error) => { + console.warn("[ActivityPub] Failed to load blocked servers to Redis:", error.message); + }); + + // Schedule proactive key refresh for stale follower keys (runs on startup + every 24h) + const keyRefreshHandle = this.options.actor.handle; + const keyRefreshFederation = this._federation; + const keyRefreshPubUrl = this._publicationUrl; + scheduleKeyRefresh( + this._collections, + () => keyRefreshFederation?.createContext(new URL(keyRefreshPubUrl), { + handle: keyRefreshHandle, + publicationUrl: keyRefreshPubUrl, + }), + keyRefreshHandle, + ); + + // Start async inbox queue processor (processes one item every 3s) + this._inboxProcessorInterval = startInboxProcessor( + this._collections, + () => this._federation?.createContext(new URL(this._publicationUrl), { + handle: this.options.actor.handle, + publicationUrl: this._publicationUrl, + }), + this.options.actor.handle, + ); } /** diff --git a/lib/controllers/moderation.js b/lib/controllers/moderation.js index 5e61335..80abf45 100644 --- a/lib/controllers/moderation.js +++ b/lib/controllers/moderation.js @@ -14,6 +14,11 @@ import { getFilterMode, setFilterMode, } from "../storage/moderation.js"; +import { + addBlockedServer, + removeBlockedServer, + getAllBlockedServers, +} from "../storage/server-blocks.js"; /** * Helper to get moderation collections from request. @@ -23,6 +28,7 @@ function getModerationCollections(request) { return { ap_muted: application?.collections?.get("ap_muted"), ap_blocked: application?.collections?.get("ap_blocked"), + ap_blocked_servers: application?.collections?.get("ap_blocked_servers"), ap_timeline: application?.collections?.get("ap_timeline"), ap_profile: application?.collections?.get("ap_profile"), }; @@ -282,6 +288,77 @@ export function unblockController(mountPath, plugin) { }; } +/** + * POST /admin/reader/block-server — Block a server by hostname. + */ +export function blockServerController(mountPath) { + return async (request, response, next) => { + try { + if (!validateToken(request)) { + return response.status(403).json({ + success: false, + error: "Invalid CSRF token", + }); + } + + const { hostname, reason } = request.body; + if (!hostname) { + return response.status(400).json({ + success: false, + error: "Missing hostname", + }); + } + + const collections = getModerationCollections(request); + await addBlockedServer(collections, hostname, reason); + + console.info(`[ActivityPub] Blocked server: ${hostname}`); + return response.json({ success: true, type: "block-server", hostname }); + } catch (error) { + console.error("[ActivityPub] Block server failed:", error.message); + return response.status(500).json({ + success: false, + error: "Operation failed. Please try again later.", + }); + } + }; +} + +/** + * POST /admin/reader/unblock-server — Unblock a server. + */ +export function unblockServerController(mountPath) { + return async (request, response, next) => { + try { + if (!validateToken(request)) { + return response.status(403).json({ + success: false, + error: "Invalid CSRF token", + }); + } + + const { hostname } = request.body; + if (!hostname) { + return response.status(400).json({ + success: false, + error: "Missing hostname", + }); + } + + const collections = getModerationCollections(request); + await removeBlockedServer(collections, hostname); + + console.info(`[ActivityPub] Unblocked server: ${hostname}`); + return response.json({ success: true, type: "unblock-server", hostname }); + } catch (error) { + return response.status(500).json({ + success: false, + error: "Operation failed. Please try again later.", + }); + } + }; +} + /** * GET /admin/reader/moderation — View muted/blocked lists. */ @@ -291,9 +368,10 @@ export function moderationController(mountPath) { const collections = getModerationCollections(request); const csrfToken = getToken(request.session); - const [muted, blocked, filterMode] = await Promise.all([ + const [muted, blocked, blockedServers, filterMode] = await Promise.all([ getAllMuted(collections), getAllBlocked(collections), + getAllBlockedServers(collections), getFilterMode(collections), ]); @@ -305,6 +383,7 @@ export function moderationController(mountPath) { readerParent: { href: `${mountPath}/admin/reader`, text: response.locals.__("activitypub.reader.title") }, muted, blocked, + blockedServers, mutedActors, mutedKeywords, filterMode, diff --git a/lib/federation-setup.js b/lib/federation-setup.js index b7db806..26815aa 100644 --- a/lib/federation-setup.js +++ b/lib/federation-setup.js @@ -40,6 +40,9 @@ import Redis from "ioredis"; import { MongoKvStore } from "./kv-store.js"; import { registerInboxListeners } from "./inbox-listeners.js"; import { jf2ToAS2Activity, resolvePostUrl } from "./jf2-to-as2.js"; +import { cachedQuery } from "./redis-cache.js"; + +const COLLECTION_CACHE_TTL = 300; // 5 minutes /** * Create and configure a Fedify Federation instance. @@ -404,10 +407,12 @@ function setupFollowers(federation, mountPath, handle, collections) { // as Recipient objects so sendActivity("followers") can deliver. // See: https://fedify.dev/manual/collections#one-shot-followers-collection-for-gathering-recipients if (cursor == null) { - const docs = await collections.ap_followers - .find() - .sort({ followedAt: -1 }) - .toArray(); + const docs = await cachedQuery("col:followers:recipients", COLLECTION_CACHE_TTL, async () => { + return await collections.ap_followers + .find() + .sort({ followedAt: -1 }) + .toArray(); + }); return { items: docs.map((f) => ({ id: new URL(f.actorUrl), @@ -422,13 +427,16 @@ function setupFollowers(federation, mountPath, handle, collections) { // Paginated collection: for remote browsing of /followers endpoint const pageSize = 20; const skip = Number.parseInt(cursor, 10); - const docs = await collections.ap_followers - .find() - .sort({ followedAt: -1 }) - .skip(skip) - .limit(pageSize) - .toArray(); - const total = await collections.ap_followers.countDocuments(); + const [docs, total] = await cachedQuery(`col:followers:page:${cursor}`, COLLECTION_CACHE_TTL, async () => { + const d = await collections.ap_followers + .find() + .sort({ followedAt: -1 }) + .skip(skip) + .limit(pageSize) + .toArray(); + const t = await collections.ap_followers.countDocuments(); + return [d, t]; + }); return { items: docs.map((f) => new URL(f.actorUrl)), @@ -439,7 +447,9 @@ function setupFollowers(federation, mountPath, handle, collections) { ) .setCounter(async (ctx, identifier) => { if (identifier !== handle) return 0; - return await collections.ap_followers.countDocuments(); + return await cachedQuery("col:followers:count", COLLECTION_CACHE_TTL, async () => { + return await collections.ap_followers.countDocuments(); + }); }) .setFirstCursor(async () => "0"); } @@ -452,13 +462,16 @@ function setupFollowing(federation, mountPath, handle, collections) { if (identifier !== handle) return null; const pageSize = 20; const skip = cursor ? Number.parseInt(cursor, 10) : 0; - const docs = await collections.ap_following - .find() - .sort({ followedAt: -1 }) - .skip(skip) - .limit(pageSize) - .toArray(); - const total = await collections.ap_following.countDocuments(); + const [docs, total] = await cachedQuery(`col:following:page:${cursor}`, COLLECTION_CACHE_TTL, async () => { + const d = await collections.ap_following + .find() + .sort({ followedAt: -1 }) + .skip(skip) + .limit(pageSize) + .toArray(); + const t = await collections.ap_following.countDocuments(); + return [d, t]; + }); return { items: docs.map((f) => new URL(f.actorUrl)), @@ -469,7 +482,9 @@ function setupFollowing(federation, mountPath, handle, collections) { ) .setCounter(async (ctx, identifier) => { if (identifier !== handle) return 0; - return await collections.ap_following.countDocuments(); + return await cachedQuery("col:following:count", COLLECTION_CACHE_TTL, async () => { + return await collections.ap_following.countDocuments(); + }); }) .setFirstCursor(async () => "0"); } @@ -485,13 +500,16 @@ function setupLiked(federation, mountPath, handle, collections) { const pageSize = 20; const skip = cursor ? Number.parseInt(cursor, 10) : 0; const query = { "properties.post-type": "like" }; - const docs = await collections.posts - .find(query) - .sort({ "properties.published": -1 }) - .skip(skip) - .limit(pageSize) - .toArray(); - const total = await collections.posts.countDocuments(query); + const [docs, total] = await cachedQuery(`col:liked:page:${cursor}`, COLLECTION_CACHE_TTL, async () => { + const d = await collections.posts + .find(query) + .sort({ "properties.published": -1 }) + .skip(skip) + .limit(pageSize) + .toArray(); + const t = await collections.posts.countDocuments(query); + return [d, t]; + }); const items = docs .map((d) => { @@ -510,8 +528,10 @@ function setupLiked(federation, mountPath, handle, collections) { .setCounter(async (ctx, identifier) => { if (identifier !== handle) return 0; if (!collections.posts) return 0; - return await collections.posts.countDocuments({ - "properties.post-type": "like", + return await cachedQuery("col:liked:count", COLLECTION_CACHE_TTL, async () => { + return await collections.posts.countDocuments({ + "properties.post-type": "like", + }); }); }) .setFirstCursor(async () => "0"); diff --git a/lib/inbox-handlers.js b/lib/inbox-handlers.js new file mode 100644 index 0000000..3e736dc --- /dev/null +++ b/lib/inbox-handlers.js @@ -0,0 +1,1021 @@ +/** + * Inbox handler functions for each ActivityPub activity type. + * + * These handlers are extracted from inbox-listeners.js so they can be + * invoked from a background queue processor. Each handler receives a + * queue item document instead of a live Fedify activity object. + * + * Design notes: + * - Follow handler: only logs activity. Follower storage, Accept/Reject + * response, pending follow storage, and notifications are all handled + * synchronously in the inbox listener before the item is enqueued. + * - Block handler: only logs activity. Follower removal is done + * synchronously in the inbox listener. + * - All other handlers: perform full processing. + */ + +import { + Accept, + Announce, + Article, + Block, + Create, + Delete, + Flag, + Follow, + Like, + Move, + Note, + Reject, + Undo, + Update, +} from "@fedify/fedify/vocab"; + +import { logActivity as logActivityShared } from "./activity-log.js"; +import { sanitizeContent, extractActorInfo, extractObjectData } from "./timeline-store.js"; +import { addTimelineItem, deleteTimelineItem, updateTimelineItem } from "./storage/timeline.js"; +import { addNotification } from "./storage/notifications.js"; +import { addMessage } from "./storage/messages.js"; +import { fetchAndStorePreviews, fetchAndStoreQuote } from "./og-unfurl.js"; +import { getFollowedTags } from "./storage/followed-tags.js"; + +/** @type {string} ActivityStreams Public Collection constant */ +const PUBLIC = "https://www.w3.org/ns/activitystreams#Public"; + +// --------------------------------------------------------------------------- +// Router +// --------------------------------------------------------------------------- + +/** + * Route a queued inbox item to the appropriate handler. + * + * @param {object} item - Queue document + * @param {string} item.activityType - Activity type name (e.g. "Follow") + * @param {string} item.actorUrl - Actor URL + * @param {string} [item.objectUrl] - Object URL (if applicable) + * @param {object} item.rawJson - Raw JSON-LD activity payload + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Local actor handle + */ +export async function routeToHandler(item, collections, ctx, handle) { + const { activityType } = item; + switch (activityType) { + case "Follow": + return handleFollow(item, collections); + case "Undo": + return handleUndo(item, collections, ctx, handle); + case "Accept": + return handleAccept(item, collections, ctx, handle); + case "Reject": + return handleReject(item, collections, ctx, handle); + case "Like": + return handleLike(item, collections, ctx, handle); + case "Announce": + return handleAnnounce(item, collections, ctx, handle); + case "Create": + return handleCreate(item, collections, ctx, handle); + case "Delete": + return handleDelete(item, collections); + case "Move": + return handleMove(item, collections, ctx, handle); + case "Update": + return handleUpdate(item, collections, ctx, handle); + case "Block": + return handleBlock(item, collections); + case "Flag": + return handleFlag(item, collections, ctx, handle); + default: + console.warn(`[inbox-handlers] Unknown activity type: ${activityType}`); + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** + * Get an authenticated DocumentLoader that signs outbound fetches with + * our actor's key. + * + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + * @returns {Promise} + */ +function getAuthLoader(ctx, handle) { + return ctx.getDocumentLoader({ identifier: handle }); +} + +/** + * Log an activity to the ap_activities collection. + * + * @param {object} collections - MongoDB collections + * @param {object} record - Activity record fields + */ +async function logActivity(collections, record) { + await logActivityShared(collections.ap_activities, record, {}); +} + +// --------------------------------------------------------------------------- +// isDirectMessage +// --------------------------------------------------------------------------- + +/** + * Determine if an object is a direct message (DM). + * A DM is addressed only to specific actors — no PUBLIC_COLLECTION, + * no followers collection, and includes our actor URL. + * + * Duplicated from inbox-listeners.js (not exported there). + * + * @param {object} object - Fedify object (Note, Article, etc.) + * @param {string} ourActorUrl - Our actor's URL + * @param {string} followersUrl - Our followers collection URL + * @returns {boolean} + */ +function isDirectMessage(object, ourActorUrl, followersUrl) { + const allAddressed = [ + ...object.toIds.map((u) => u.href), + ...object.ccIds.map((u) => u.href), + ...object.btoIds.map((u) => u.href), + ...object.bccIds.map((u) => u.href), + ]; + + // Must be addressed to us + if (!allAddressed.includes(ourActorUrl)) return false; + + // Must NOT include public collection + if (allAddressed.some((u) => u === PUBLIC || u === "as:Public")) return false; + + // Must NOT include our followers collection + if (followersUrl && allAddressed.includes(followersUrl)) return false; + + return true; +} + +// --------------------------------------------------------------------------- +// Individual handlers +// --------------------------------------------------------------------------- + +/** + * Handle Follow activity. + * + * The synchronous inbox listener already handled: + * - follower storage (or pending follow storage) + * - Accept/Reject response + * - notification creation + * + * This async handler only logs the activity. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + */ +export async function handleFollow(item, collections) { + await logActivity(collections, { + direction: "inbound", + type: "Follow", + actorUrl: item.actorUrl, + summary: `${item.actorUrl} follow activity processed`, + }); +} + +/** + * Handle Undo activity. + * + * Undoes a Follow, Like, or Announce depending on the inner object type. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleUndo(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + const actorUrl = item.actorUrl; + + let undo; + try { + undo = await Undo.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Undo from rawJson:", error.message); + return; + } + + let inner; + try { + inner = await undo.getObject({ documentLoader: authLoader }); + } catch { + // Inner activity not dereferenceable — can't determine what was undone + return; + } + + if (inner instanceof Follow) { + await collections.ap_followers.deleteOne({ actorUrl }); + await logActivity(collections, { + direction: "inbound", + type: "Undo(Follow)", + actorUrl, + summary: `${actorUrl} unfollowed you`, + }); + } else if (inner instanceof Like) { + const objectId = inner.objectId?.href || ""; + await collections.ap_activities.deleteOne({ + type: "Like", + actorUrl, + objectUrl: objectId, + }); + } else if (inner instanceof Announce) { + const objectId = inner.objectId?.href || ""; + await collections.ap_activities.deleteOne({ + type: "Announce", + actorUrl, + objectUrl: objectId, + }); + } else { + const typeName = inner?.constructor?.name || "unknown"; + await logActivity(collections, { + direction: "inbound", + type: `Undo(${typeName})`, + actorUrl, + summary: `${actorUrl} undid ${typeName}`, + }); + } +} + +/** + * Handle Accept activity. + * + * Marks a pending follow in ap_following as accepted ("federation"). + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleAccept(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + + let accept; + try { + accept = await Accept.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Accept from rawJson:", error.message); + return; + } + + // We match against ap_following rather than inspecting the inner object + // because Fedify often resolves the Follow's target to a Person instead + // of the Follow itself. Any Accept from this actor confirms our pending follow. + const actorObj = await accept.getActor({ documentLoader: authLoader }); + const actorUrl = actorObj?.id?.href || ""; + if (!actorUrl) return; + + const result = await collections.ap_following.findOneAndUpdate( + { + actorUrl, + source: { $in: ["refollow:sent", "reader", "microsub-reader"] }, + }, + { + $set: { + source: "federation", + acceptedAt: new Date().toISOString(), + }, + $unset: { + refollowAttempts: "", + refollowLastAttempt: "", + refollowError: "", + }, + }, + { returnDocument: "after" }, + ); + + if (result) { + const actorName = result.name || result.handle || actorUrl; + await logActivity(collections, { + direction: "inbound", + type: "Accept(Follow)", + actorUrl, + actorName, + summary: `${actorName} accepted our Follow`, + }); + } +} + +/** + * Handle Reject activity. + * + * Marks a pending follow in ap_following as rejected. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleReject(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + + let reject; + try { + reject = await Reject.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Reject from rawJson:", error.message); + return; + } + + const actorObj = await reject.getActor({ documentLoader: authLoader }); + const actorUrl = actorObj?.id?.href || ""; + if (!actorUrl) return; + + const result = await collections.ap_following.findOneAndUpdate( + { + actorUrl, + source: { $in: ["refollow:sent", "reader", "microsub-reader"] }, + }, + { + $set: { + source: "rejected", + rejectedAt: new Date().toISOString(), + }, + }, + { returnDocument: "after" }, + ); + + if (result) { + const actorName = result.name || result.handle || actorUrl; + await logActivity(collections, { + direction: "inbound", + type: "Reject(Follow)", + actorUrl, + actorName, + summary: `${actorName} rejected our Follow`, + }); + } +} + +/** + * Handle Like activity. + * + * Only logs likes of our own content and creates a notification. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleLike(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + + let like; + try { + like = await Like.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Like from rawJson:", error.message); + return; + } + + const objectId = like.objectId?.href || ""; + + // Only log likes of our own content + const pubUrl = collections._publicationUrl; + if (!objectId || (pubUrl && !objectId.startsWith(pubUrl))) return; + + const actorUrl = like.actorId?.href || ""; + let actorObj; + try { + actorObj = await like.getActor({ documentLoader: authLoader }); + } catch { + actorObj = null; + } + + const actorName = + actorObj?.name?.toString() || + actorObj?.preferredUsername?.toString() || + actorUrl; + + // Extract actor info (including avatar) before logging so we can store it + const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); + + await logActivity(collections, { + direction: "inbound", + type: "Like", + actorUrl, + actorName, + actorAvatar: actorInfo.photo || "", + objectUrl: objectId, + summary: `${actorName} liked ${objectId}`, + }); + + // Store notification + await addNotification(collections, { + uid: like.id?.href || `like:${actorUrl}:${objectId}`, + type: "like", + actorUrl: actorInfo.url, + actorName: actorInfo.name, + actorPhoto: actorInfo.photo, + actorHandle: actorInfo.handle, + targetUrl: objectId, + targetName: "", // Could fetch post title, but not critical + published: like.published ? String(like.published) : new Date().toISOString(), + createdAt: new Date().toISOString(), + }); +} + +/** + * Handle Announce (boost) activity. + * + * PATH 1: If boost of OUR content → notification. + * PATH 2: If from followed account → store timeline item, quote enrichment. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleAnnounce(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + + let announce; + try { + announce = await Announce.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Announce from rawJson:", error.message); + return; + } + + const objectId = announce.objectId?.href || ""; + if (!objectId) return; + + const actorUrl = announce.actorId?.href || ""; + const pubUrl = collections._publicationUrl; + + // PATH 1: Boost of OUR content → Notification + if (pubUrl && objectId.startsWith(pubUrl)) { + let actorObj; + try { + actorObj = await announce.getActor({ documentLoader: authLoader }); + } catch { + actorObj = null; + } + + const actorName = + actorObj?.name?.toString() || + actorObj?.preferredUsername?.toString() || + actorUrl; + + // Extract actor info (including avatar) before logging so we can store it + const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); + + // Log the boost activity + await logActivity(collections, { + direction: "inbound", + type: "Announce", + actorUrl, + actorName, + actorAvatar: actorInfo.photo || "", + objectUrl: objectId, + summary: `${actorName} boosted ${objectId}`, + }); + + // Create notification + await addNotification(collections, { + uid: announce.id?.href || `${actorUrl}#boost-${objectId}`, + type: "boost", + actorUrl: actorInfo.url, + actorName: actorInfo.name, + actorPhoto: actorInfo.photo, + actorHandle: actorInfo.handle, + targetUrl: objectId, + targetName: "", // Could fetch post title, but not critical + published: announce.published ? String(announce.published) : new Date().toISOString(), + createdAt: new Date().toISOString(), + }); + + // Don't return — fall through to check if actor is also followed + } + + // PATH 2: Boost from someone we follow → Timeline (store original post) + const following = await collections.ap_following.findOne({ actorUrl }); + if (following) { + try { + // Fetch the original object being boosted (authenticated for Secure Mode servers) + const object = await announce.getObject({ documentLoader: authLoader }); + if (!object) return; + + // Skip non-content objects (Lemmy/PieFed like/create activities + // that resolve to activity IDs instead of actual Note/Article posts) + const hasContent = object.content?.toString() || object.name?.toString(); + if (!hasContent) return; + + // Get booster actor info + const boosterActor = await announce.getActor({ documentLoader: authLoader }); + const boosterInfo = await extractActorInfo(boosterActor, { documentLoader: authLoader }); + + // Extract and store with boost metadata + const timelineItem = await extractObjectData(object, { + boostedBy: boosterInfo, + boostedAt: announce.published ? String(announce.published) : new Date().toISOString(), + documentLoader: authLoader, + }); + + await addTimelineItem(collections, timelineItem); + + // Fire-and-forget quote enrichment for boosted posts + if (timelineItem.quoteUrl) { + fetchAndStoreQuote(collections, timelineItem.uid, timelineItem.quoteUrl, ctx, authLoader) + .catch((error) => { + console.error(`[inbox-handlers] Quote fetch failed for ${timelineItem.uid}:`, error.message); + }); + } + } catch (error) { + // Remote object unreachable (timeout, Authorized Fetch, deleted, etc.) — skip + const cause = error?.cause?.code || error?.message || "unknown"; + console.warn(`[inbox-handlers] Skipped boost from ${actorUrl}: ${cause}`); + } + } +} + +/** + * Handle Create activity. + * + * Processes DMs, replies, mentions, and timeline storage. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleCreate(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + + let create; + try { + create = await Create.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Create from rawJson:", error.message); + return; + } + + let object; + try { + object = await create.getObject({ documentLoader: authLoader }); + } catch { + // Remote object not dereferenceable (deleted, etc.) + return; + } + if (!object) return; + + const actorUrl = create.actorId?.href || ""; + let actorObj; + try { + actorObj = await create.getActor({ documentLoader: authLoader }); + } catch { + // Actor not dereferenceable — use URL as fallback + actorObj = null; + } + const actorName = + actorObj?.name?.toString() || + actorObj?.preferredUsername?.toString() || + actorUrl; + + // --- DM detection --- + // Check if this is a direct message before processing as reply/mention/timeline. + // DMs are handled separately and stored in ap_messages instead of ap_timeline. + const ourActorUrl = ctx.getActorUri(handle).href; + const followersUrl = ctx.getFollowersUri(handle)?.href || ""; + + if (isDirectMessage(object, ourActorUrl, followersUrl)) { + const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); + const rawHtml = object.content?.toString() || ""; + const contentHtml = sanitizeContent(rawHtml); + const contentText = rawHtml.replace(/<[^>]*>/g, "").substring(0, 500); + const published = object.published ? String(object.published) : new Date().toISOString(); + const inReplyToDM = object.replyTargetId?.href || null; + + // Store as message + await addMessage(collections, { + uid: object.id?.href || `dm:${actorUrl}:${Date.now()}`, + actorUrl: actorInfo.url, + actorName: actorInfo.name, + actorPhoto: actorInfo.photo, + actorHandle: actorInfo.handle, + content: { + text: contentText, + html: contentHtml, + }, + inReplyTo: inReplyToDM, + conversationId: actorInfo.url, + direction: "inbound", + published, + createdAt: new Date().toISOString(), + }); + + // Also create a notification so DMs appear in the notification tab + await addNotification(collections, { + uid: `dm:${object.id?.href || `${actorUrl}:${Date.now()}`}`, + url: object.url?.href || object.id?.href || "", + type: "dm", + actorUrl: actorInfo.url, + actorName: actorInfo.name, + actorPhoto: actorInfo.photo, + actorHandle: actorInfo.handle, + content: { + text: contentText, + html: contentHtml, + }, + published, + createdAt: new Date().toISOString(), + }); + + await logActivity(collections, { + direction: "inbound", + type: "DirectMessage", + actorUrl, + actorName, + actorAvatar: actorInfo.photo || "", + objectUrl: object.id?.href || "", + content: contentText.substring(0, 100), + summary: `${actorName} sent a direct message`, + }); + + return; // Don't process DMs as timeline/mention/reply + } + + // Use replyTargetId (non-fetching) for the inReplyTo URL + const inReplyTo = object.replyTargetId?.href || null; + + // Log replies to our posts (existing behavior for conversations) + const pubUrl = collections._publicationUrl; + if (inReplyTo) { + const content = object.content?.toString() || ""; + + // Extract actor info (including avatar) before logging so we can store it + const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); + + await logActivity(collections, { + direction: "inbound", + type: "Reply", + actorUrl, + actorName, + actorAvatar: actorInfo.photo || "", + objectUrl: object.id?.href || "", + targetUrl: inReplyTo, + content, + summary: `${actorName} replied to ${inReplyTo}`, + }); + + // Create notification if reply is to one of OUR posts + if (pubUrl && inReplyTo.startsWith(pubUrl)) { + const rawHtml = object.content?.toString() || ""; + const contentHtml = sanitizeContent(rawHtml); + const contentText = rawHtml.replace(/<[^>]*>/g, "").substring(0, 200); + + await addNotification(collections, { + uid: object.id?.href || `reply:${actorUrl}:${inReplyTo}`, + url: object.url?.href || object.id?.href || "", + type: "reply", + actorUrl: actorInfo.url, + actorName: actorInfo.name, + actorPhoto: actorInfo.photo, + actorHandle: actorInfo.handle, + targetUrl: inReplyTo, + targetName: "", + content: { + text: contentText, + html: contentHtml, + }, + published: object.published ? String(object.published) : new Date().toISOString(), + createdAt: new Date().toISOString(), + }); + } + } + + // Check for mentions of our actor + if (object.tag) { + const tags = Array.isArray(object.tag) ? object.tag : [object.tag]; + + for (const tag of tags) { + if (tag.type === "Mention" && tag.href?.href === ourActorUrl) { + const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); + const rawMentionHtml = object.content?.toString() || ""; + const mentionHtml = sanitizeContent(rawMentionHtml); + const contentText = rawMentionHtml.replace(/<[^>]*>/g, "").substring(0, 200); + + await addNotification(collections, { + uid: object.id?.href || `mention:${actorUrl}:${object.id?.href}`, + url: object.url?.href || object.id?.href || "", + type: "mention", + actorUrl: actorInfo.url, + actorName: actorInfo.name, + actorPhoto: actorInfo.photo, + actorHandle: actorInfo.handle, + content: { + text: contentText, + html: mentionHtml, + }, + published: object.published ? String(object.published) : new Date().toISOString(), + createdAt: new Date().toISOString(), + }); + + break; // Only create one mention notification per post + } + } + } + + // Store timeline items from accounts we follow (native storage) + const following = await collections.ap_following.findOne({ actorUrl }); + if (following) { + try { + const timelineItem = await extractObjectData(object, { + actorFallback: actorObj, + documentLoader: authLoader, + }); + await addTimelineItem(collections, timelineItem); + + // Fire-and-forget OG unfurling for notes and articles (not boosts) + if (timelineItem.type === "note" || timelineItem.type === "article") { + fetchAndStorePreviews(collections, timelineItem.uid, timelineItem.content.html) + .catch((error) => { + console.error(`[inbox-handlers] OG unfurl failed for ${timelineItem.uid}:`, error); + }); + } + + // Fire-and-forget quote enrichment + if (timelineItem.quoteUrl) { + fetchAndStoreQuote(collections, timelineItem.uid, timelineItem.quoteUrl, ctx, authLoader) + .catch((error) => { + console.error(`[inbox-handlers] Quote fetch failed for ${timelineItem.uid}:`, error.message); + }); + } + } catch (error) { + // Log extraction errors but don't fail the entire handler + console.error("[inbox-handlers] Failed to store timeline item:", error); + } + } else if (collections.ap_followed_tags) { + // Not a followed account — check if the post's hashtags match any followed tags + // so tagged posts from across the fediverse appear in the timeline + try { + const objectTags = Array.isArray(object.tag) ? object.tag : (object.tag ? [object.tag] : []); + const postHashtags = objectTags + .filter((t) => t.type === "Hashtag" && t.name) + .map((t) => t.name.toString().replace(/^#/, "").toLowerCase()); + + if (postHashtags.length > 0) { + const followedTags = await getFollowedTags(collections); + const followedSet = new Set(followedTags.map((t) => t.toLowerCase())); + const hasMatchingTag = postHashtags.some((tag) => followedSet.has(tag)); + + if (hasMatchingTag) { + const timelineItem = await extractObjectData(object, { + actorFallback: actorObj, + documentLoader: authLoader, + }); + await addTimelineItem(collections, timelineItem); + } + } + } catch (error) { + // Non-critical — don't fail the handler + console.error("[inbox-handlers] Followed tag check failed:", error.message); + } + } +} + +/** + * Handle Delete activity. + * + * Removes from ap_activities and timeline by object URL. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + */ +export async function handleDelete(item, collections) { + const objectId = item.objectUrl; + if (objectId) { + // Remove from activity log + await collections.ap_activities.deleteMany({ objectUrl: objectId }); + + // Remove from timeline + await deleteTimelineItem(collections, objectId); + } +} + +/** + * Handle Move activity. + * + * Updates ap_followers to reflect the actor's new URL. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleMove(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + + let move; + try { + move = await Move.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Move from rawJson:", error.message); + return; + } + + const oldActorObj = await move.getActor({ documentLoader: authLoader }); + const oldActorUrl = oldActorObj?.id?.href || ""; + const target = await move.getTarget({ documentLoader: authLoader }); + const newActorUrl = target?.id?.href || ""; + + if (oldActorUrl && newActorUrl) { + await collections.ap_followers.updateOne( + { actorUrl: oldActorUrl }, + { $set: { actorUrl: newActorUrl, movedFrom: oldActorUrl } }, + ); + } + + await logActivity(collections, { + direction: "inbound", + type: "Move", + actorUrl: oldActorUrl, + objectUrl: newActorUrl, + summary: `${oldActorUrl} moved to ${newActorUrl}`, + }); +} + +/** + * Handle Update activity. + * + * PATH 1: If Note/Article → update timeline item content. + * PATH 2: Otherwise → refresh stored follower data. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleUpdate(item, collections, ctx, handle) { + const authLoader = await getAuthLoader(ctx, handle); + + let update; + try { + update = await Update.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Update from rawJson:", error.message); + return; + } + + // Try to get the object being updated + let object; + try { + object = await update.getObject({ documentLoader: authLoader }); + } catch { + object = null; + } + + // PATH 1: If object is a Note/Article → Update timeline item content + if (object && (object instanceof Note || object instanceof Article)) { + const objectUrl = object.id?.href || ""; + if (objectUrl) { + try { + // Extract updated content + const contentHtml = object.content?.toString() || ""; + const contentText = object.source?.content?.toString() || contentHtml.replace(/<[^>]*>/g, ""); + + const updates = { + content: { + text: contentText, + html: contentHtml, + }, + name: object.name?.toString() || "", + summary: object.summary?.toString() || "", + sensitive: object.sensitive || false, + }; + + await updateTimelineItem(collections, objectUrl, updates); + } catch (error) { + console.error("[inbox-handlers] Failed to update timeline item:", error); + } + } + return; + } + + // PATH 2: Otherwise, assume profile update — refresh stored follower data + const actorObj = await update.getActor({ documentLoader: authLoader }); + const actorUrl = actorObj?.id?.href || ""; + if (!actorUrl) return; + + const existing = await collections.ap_followers.findOne({ actorUrl }); + if (existing) { + await collections.ap_followers.updateOne( + { actorUrl }, + { + $set: { + name: + actorObj.name?.toString() || + actorObj.preferredUsername?.toString() || + actorUrl, + handle: actorObj.preferredUsername?.toString() || "", + avatar: actorObj.icon + ? (await actorObj.icon)?.url?.href || "" + : "", + updatedAt: new Date().toISOString(), + }, + }, + ); + } +} + +/** + * Handle Block activity. + * + * The synchronous inbox listener already handled follower removal. + * This async handler only logs the activity. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + */ +export async function handleBlock(item, collections) { + await logActivity(collections, { + direction: "inbound", + type: "Block", + actorUrl: item.actorUrl, + summary: `${item.actorUrl} block activity processed`, + }); +} + +/** + * Handle Flag (report) activity. + * + * Stores the report in ap_reports, creates a notification, and logs the activity. + * + * @param {object} item - Queue document + * @param {object} collections - MongoDB collections + * @param {import("@fedify/fedify").Context} ctx - Fedify context + * @param {string} handle - Actor handle + */ +export async function handleFlag(item, collections, ctx, handle) { + try { + const authLoader = await getAuthLoader(ctx, handle); + + let flag; + try { + flag = await Flag.fromJsonLd(item.rawJson, { documentLoader: authLoader }); + } catch (error) { + console.warn("[inbox-handlers] Failed to reconstruct Flag from rawJson:", error.message); + return; + } + + const actorObj = await flag.getActor({ documentLoader: authLoader }).catch(() => null); + + const reporterUrl = actorObj?.id?.href || flag.actorId?.href || ""; + const reporterName = actorObj?.name?.toString() || actorObj?.preferredUsername?.toString() || reporterUrl; + + // Extract reported objects — Flag can report actors or posts + const reportedIds = flag.objectIds?.map((u) => u.href) || []; + const reason = flag.content?.toString() || flag.summary?.toString() || ""; + + if (reportedIds.length === 0 && !reason) { + console.info("[inbox-handlers] Ignoring empty Flag from", reporterUrl); + return; + } + + // Store report + if (collections.ap_reports) { + await collections.ap_reports.insertOne({ + reporterUrl, + reporterName, + reportedUrls: reportedIds, + reason, + createdAt: new Date().toISOString(), + read: false, + }); + } + + // Create notification + if (collections.ap_notifications) { + await addNotification(collections, { + uid: `flag:${reporterUrl}:${Date.now()}`, + type: "report", + actorUrl: reporterUrl, + actorName: reporterName, + actorPhoto: actorObj?.iconUrl?.href || actorObj?.icon?.url?.href || "", + actorHandle: actorObj?.preferredUsername + ? `@${actorObj.preferredUsername}@${new URL(reporterUrl).hostname}` + : reporterUrl, + objectUrl: reportedIds[0] || "", + summary: reason ? reason.slice(0, 200) : "Report received", + published: new Date().toISOString(), + createdAt: new Date().toISOString(), + }); + } + + await logActivity(collections, { + direction: "inbound", + type: "Flag", + actorUrl: reporterUrl, + objectUrl: reportedIds[0] || "", + summary: `Report from ${reporterName}: ${reason.slice(0, 100)}`, + }); + + console.info(`[inbox-handlers] Flag received from ${reporterName} — ${reportedIds.length} objects reported`); + } catch (error) { + console.warn("[inbox-handlers] Flag handler error:", error.message); + } +} diff --git a/lib/inbox-listeners.js b/lib/inbox-listeners.js index da169c7..209e602 100644 --- a/lib/inbox-listeners.js +++ b/lib/inbox-listeners.js @@ -1,15 +1,17 @@ /** * Inbox listener registrations for the Fedify Federation instance. * - * Each listener handles a specific ActivityPub activity type received - * in the actor's inbox (Follow, Undo, Like, Announce, Create, Delete, Move). + * Each listener is a thin shim that: + * 1. Checks server-level blocks (Redis, O(1)) + * 2. Updates key freshness tracking + * 3. Performs synchronous-only work (Follow Accept, Block follower removal) + * 4. Enqueues the activity for async processing */ import { Accept, Add, Announce, - Article, Block, Create, Delete, @@ -17,20 +19,17 @@ import { Follow, Like, Move, - Note, Reject, Remove, Undo, Update, } from "@fedify/fedify/vocab"; -import { logActivity as logActivityShared } from "./activity-log.js"; -import { sanitizeContent, extractActorInfo, extractObjectData } from "./timeline-store.js"; -import { addTimelineItem, deleteTimelineItem, updateTimelineItem } from "./storage/timeline.js"; +import { isServerBlocked } from "./storage/server-blocks.js"; +import { touchKeyFreshness } from "./key-refresh.js"; +import { enqueueActivity } from "./inbox-queue.js"; +import { extractActorInfo } from "./timeline-store.js"; import { addNotification } from "./storage/notifications.js"; -import { addMessage } from "./storage/messages.js"; -import { fetchAndStorePreviews, fetchAndStoreQuote } from "./og-unfurl.js"; -import { getFollowedTags } from "./storage/followed-tags.js"; /** * Register all inbox listeners on a federation's inbox chain. @@ -41,54 +40,20 @@ import { getFollowedTags } from "./storage/followed-tags.js"; * @param {string} options.handle - Actor handle * @param {boolean} options.storeRawActivities - Whether to store raw JSON */ -/** @type {string} ActivityStreams Public Collection constant */ -const PUBLIC = "https://www.w3.org/ns/activitystreams#Public"; - -/** - * Determine if an object is a direct message (DM). - * A DM is addressed only to specific actors — no PUBLIC_COLLECTION, - * no followers collection, and includes our actor URL. - * - * @param {object} object - Fedify object (Note, Article, etc.) - * @param {string} ourActorUrl - Our actor's URL - * @param {string} followersUrl - Our followers collection URL - * @returns {boolean} - */ -function isDirectMessage(object, ourActorUrl, followersUrl) { - const allAddressed = [ - ...object.toIds.map((u) => u.href), - ...object.ccIds.map((u) => u.href), - ...object.btoIds.map((u) => u.href), - ...object.bccIds.map((u) => u.href), - ]; - - // Must be addressed to us - if (!allAddressed.includes(ourActorUrl)) return false; - - // Must NOT include public collection - if (allAddressed.some((u) => u === PUBLIC || u === "as:Public")) return false; - - // Must NOT include our followers collection - if (followersUrl && allAddressed.includes(followersUrl)) return false; - - return true; -} - export function registerInboxListeners(inboxChain, options) { - const { collections, handle, storeRawActivities } = options; + const { collections, handle } = options; - /** - * Get an authenticated DocumentLoader that signs outbound fetches with - * our actor's key. This allows .getActor()/.getObject() to succeed - * against Authorized Fetch (Secure Mode) servers like hachyderm.io. - * - * @param {import("@fedify/fedify").Context} ctx - Fedify context - * @returns {Promise} - */ const getAuthLoader = (ctx) => ctx.getDocumentLoader({ identifier: handle }); inboxChain + // ── Follow ────────────────────────────────────────────────────── + // Synchronous: Accept/Reject + follower storage (federation requirement) + // Async: notification + activity log .on(Follow, async (ctx, follow) => { + const actorUrl = follow.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); + const authLoader = await getAuthLoader(ctx); const followerActor = await follow.getActor({ documentLoader: authLoader }); if (!followerActor?.id) return; @@ -99,7 +64,6 @@ export function registerInboxListeners(inboxChain, options) { followerActor.preferredUsername?.toString() || followerUrl; - // Build common follower data const followerData = { actorUrl: followerUrl, handle: followerActor.preferredUsername?.toString() || "", @@ -111,12 +75,10 @@ export function registerInboxListeners(inboxChain, options) { sharedInbox: followerActor.endpoints?.sharedInbox?.href || "", }; - // Check if manual approval is enabled const profile = await collections.ap_profile.findOne({}); const manualApproval = profile?.manuallyApprovesFollowers || false; if (manualApproval && collections.ap_pending_follows) { - // Store as pending — do NOT send Accept yet await collections.ap_pending_follows.updateOne( { actorUrl: followerUrl }, { @@ -129,15 +91,7 @@ export function registerInboxListeners(inboxChain, options) { { upsert: true }, ); - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Follow", - actorUrl: followerUrl, - actorName: followerName, - summary: `${followerName} requested to follow you`, - }); - - // Notification with type "follow_request" + // Notification for follow request (synchronous — needed for UI) const followerInfo = await extractActorInfo(followerActor, { documentLoader: authLoader }); await addNotification(collections, { uid: follow.id?.href || `follow_request:${followerUrl}`, @@ -150,7 +104,6 @@ export function registerInboxListeners(inboxChain, options) { createdAt: new Date().toISOString(), }); } else { - // Auto-accept: store follower + send Accept back await collections.ap_followers.updateOne( { actorUrl: followerUrl }, { @@ -172,15 +125,7 @@ export function registerInboxListeners(inboxChain, options) { { orderingKey: followerUrl }, ); - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Follow", - actorUrl: followerUrl, - actorName: followerName, - summary: `${followerName} followed you`, - }); - - // Store notification + // Notification for follow (synchronous — needed for UI) const followerInfo = await extractActorInfo(followerActor, { documentLoader: authLoader }); await addNotification(collections, { uid: follow.id?.href || `follow:${followerUrl}`, @@ -193,680 +138,172 @@ export function registerInboxListeners(inboxChain, options) { createdAt: new Date().toISOString(), }); } + + // Enqueue async portion (activity log) + await enqueueActivity(collections, { + activityType: "Follow", + actorUrl, + rawJson: await follow.toJsonLd(), + }); }) + + // ── Undo ──────────────────────────────────────────────────────── .on(Undo, async (ctx, undo) => { const actorUrl = undo.actorId?.href || ""; - const authLoader = await getAuthLoader(ctx); - let inner; - try { - inner = await undo.getObject({ documentLoader: authLoader }); - } catch { - // Inner activity not dereferenceable — can't determine what was undone - return; - } + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); - if (inner instanceof Follow) { - await collections.ap_followers.deleteOne({ actorUrl }); - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Undo(Follow)", - actorUrl, - summary: `${actorUrl} unfollowed you`, - }); - } else if (inner instanceof Like) { - const objectId = inner.objectId?.href || ""; - await collections.ap_activities.deleteOne({ - type: "Like", - actorUrl, - objectUrl: objectId, - }); - } else if (inner instanceof Announce) { - const objectId = inner.objectId?.href || ""; - await collections.ap_activities.deleteOne({ - type: "Announce", - actorUrl, - objectUrl: objectId, - }); - } else { - const typeName = inner?.constructor?.name || "unknown"; - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: `Undo(${typeName})`, - actorUrl, - summary: `${actorUrl} undid ${typeName}`, - }); - } - }) - .on(Accept, async (ctx, accept) => { - // Handle Accept(Follow) — remote server accepted our Follow request. - // We don't inspect the inner object type because Fedify often resolves - // it to a Person (the Follow's target) rather than the Follow itself. - // Instead, we match directly against ap_following — if we have a - // pending follow for this actor, any Accept from them confirms it. - const authLoader = await getAuthLoader(ctx); - const actorObj = await accept.getActor({ documentLoader: authLoader }); - const actorUrl = actorObj?.id?.href || ""; - if (!actorUrl) return; - - const result = await collections.ap_following.findOneAndUpdate( - { - actorUrl, - source: { $in: ["refollow:sent", "reader", "microsub-reader"] }, - }, - { - $set: { - source: "federation", - acceptedAt: new Date().toISOString(), - }, - $unset: { - refollowAttempts: "", - refollowLastAttempt: "", - refollowError: "", - }, - }, - { returnDocument: "after" }, - ); - - if (result) { - const actorName = - result.name || result.handle || actorUrl; - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Accept(Follow)", - actorUrl, - actorName, - summary: `${actorName} accepted our Follow`, - }); - } - }) - .on(Reject, async (ctx, reject) => { - const authLoader = await getAuthLoader(ctx); - const actorObj = await reject.getActor({ documentLoader: authLoader }); - const actorUrl = actorObj?.id?.href || ""; - if (!actorUrl) return; - - // Mark rejected follow in ap_following - const result = await collections.ap_following.findOneAndUpdate( - { - actorUrl, - source: { $in: ["refollow:sent", "reader", "microsub-reader"] }, - }, - { - $set: { - source: "rejected", - rejectedAt: new Date().toISOString(), - }, - }, - { returnDocument: "after" }, - ); - - if (result) { - const actorName = result.name || result.handle || actorUrl; - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Reject(Follow)", - actorUrl, - actorName, - summary: `${actorName} rejected our Follow`, - }); - } - }) - .on(Like, async (ctx, like) => { - // Use .objectId (non-fetching) for the liked URL — we only need the - // URL to filter and log, not the full remote object. - const objectId = like.objectId?.href || ""; - - // Only log likes of our own content - const pubUrl = collections._publicationUrl; - if (!objectId || (pubUrl && !objectId.startsWith(pubUrl))) return; - - const authLoader = await getAuthLoader(ctx); - const actorUrl = like.actorId?.href || ""; - let actorObj; - try { - actorObj = await like.getActor({ documentLoader: authLoader }); - } catch { - actorObj = null; - } - - const actorName = - actorObj?.name?.toString() || - actorObj?.preferredUsername?.toString() || - actorUrl; - - // Extract actor info (including avatar) before logging so we can store it - const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); - - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Like", + await enqueueActivity(collections, { + activityType: "Undo", actorUrl, - actorName, - actorAvatar: actorInfo.photo || "", - objectUrl: objectId, - summary: `${actorName} liked ${objectId}`, - }); - - // Store notification - await addNotification(collections, { - uid: like.id?.href || `like:${actorUrl}:${objectId}`, - type: "like", - actorUrl: actorInfo.url, - actorName: actorInfo.name, - actorPhoto: actorInfo.photo, - actorHandle: actorInfo.handle, - targetUrl: objectId, - targetName: "", // Could fetch post title, but not critical - published: like.published ? String(like.published) : new Date().toISOString(), - createdAt: new Date().toISOString(), + rawJson: await undo.toJsonLd(), }); }) + + // ── Accept ────────────────────────────────────────────────────── + .on(Accept, async (ctx, accept) => { + const actorUrl = accept.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); + + await enqueueActivity(collections, { + activityType: "Accept", + actorUrl, + rawJson: await accept.toJsonLd(), + }); + }) + + // ── Reject ────────────────────────────────────────────────────── + .on(Reject, async (ctx, reject) => { + const actorUrl = reject.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); + + await enqueueActivity(collections, { + activityType: "Reject", + actorUrl, + rawJson: await reject.toJsonLd(), + }); + }) + + // ── Like ──────────────────────────────────────────────────────── + .on(Like, async (ctx, like) => { + const actorUrl = like.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); + + await enqueueActivity(collections, { + activityType: "Like", + actorUrl, + objectUrl: like.objectId?.href || "", + rawJson: await like.toJsonLd(), + }); + }) + + // ── Announce ──────────────────────────────────────────────────── .on(Announce, async (ctx, announce) => { - const objectId = announce.objectId?.href || ""; - if (!objectId) return; - - const authLoader = await getAuthLoader(ctx); const actorUrl = announce.actorId?.href || ""; - const pubUrl = collections._publicationUrl; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); - // Dual path logic: Notification vs Timeline - - // PATH 1: Boost of OUR content → Notification - if (pubUrl && objectId.startsWith(pubUrl)) { - let actorObj; - try { - actorObj = await announce.getActor({ documentLoader: authLoader }); - } catch { - actorObj = null; - } - - const actorName = - actorObj?.name?.toString() || - actorObj?.preferredUsername?.toString() || - actorUrl; - - // Extract actor info (including avatar) before logging so we can store it - const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); - - // Log the boost activity - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Announce", - actorUrl, - actorName, - actorAvatar: actorInfo.photo || "", - objectUrl: objectId, - summary: `${actorName} boosted ${objectId}`, - }); - - // Create notification - await addNotification(collections, { - uid: announce.id?.href || `${actorUrl}#boost-${objectId}`, - type: "boost", - actorUrl: actorInfo.url, - actorName: actorInfo.name, - actorPhoto: actorInfo.photo, - actorHandle: actorInfo.handle, - targetUrl: objectId, - targetName: "", // Could fetch post title, but not critical - published: announce.published ? String(announce.published) : new Date().toISOString(), - createdAt: new Date().toISOString(), - }); - - // Don't return — fall through to check if actor is also followed - } - - // PATH 2: Boost from someone we follow → Timeline (store original post) - const following = await collections.ap_following.findOne({ actorUrl }); - if (following) { - try { - // Fetch the original object being boosted (authenticated for Secure Mode servers) - const object = await announce.getObject({ documentLoader: authLoader }); - if (!object) return; - - // Skip non-content objects (Lemmy/PieFed like/create activities - // that resolve to activity IDs instead of actual Note/Article posts) - const hasContent = object.content?.toString() || object.name?.toString(); - if (!hasContent) return; - - // Get booster actor info - const boosterActor = await announce.getActor({ documentLoader: authLoader }); - const boosterInfo = await extractActorInfo(boosterActor, { documentLoader: authLoader }); - - // Extract and store with boost metadata - const timelineItem = await extractObjectData(object, { - boostedBy: boosterInfo, - boostedAt: announce.published ? String(announce.published) : new Date().toISOString(), - documentLoader: authLoader, - }); - - await addTimelineItem(collections, timelineItem); - - // Fire-and-forget quote enrichment for boosted posts - if (timelineItem.quoteUrl) { - fetchAndStoreQuote(collections, timelineItem.uid, timelineItem.quoteUrl, ctx, authLoader) - .catch((error) => { - console.error(`[inbox] Quote fetch failed for ${timelineItem.uid}:`, error.message); - }); - } - } catch (error) { - // Remote object unreachable (timeout, Authorized Fetch, deleted, etc.) — skip - const cause = error?.cause?.code || error?.message || "unknown"; - console.warn(`[AP] Skipped boost from ${actorUrl}: ${cause}`); - } - } - }) - .on(Create, async (ctx, create) => { - const authLoader = await getAuthLoader(ctx); - let object; - try { - object = await create.getObject({ documentLoader: authLoader }); - } catch { - // Remote object not dereferenceable (deleted, etc.) - return; - } - if (!object) return; - - const actorUrl = create.actorId?.href || ""; - let actorObj; - try { - actorObj = await create.getActor({ documentLoader: authLoader }); - } catch { - // Actor not dereferenceable — use URL as fallback - actorObj = null; - } - const actorName = - actorObj?.name?.toString() || - actorObj?.preferredUsername?.toString() || - actorUrl; - - // --- DM detection --- - // Check if this is a direct message before processing as reply/mention/timeline. - // DMs are handled separately and stored in ap_messages instead of ap_timeline. - const ourActorUrl = ctx.getActorUri(handle).href; - const followersUrl = ctx.getFollowersUri(handle)?.href || ""; - - if (isDirectMessage(object, ourActorUrl, followersUrl)) { - const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); - const rawHtml = object.content?.toString() || ""; - const contentHtml = sanitizeContent(rawHtml); - const contentText = rawHtml.replace(/<[^>]*>/g, "").substring(0, 500); - const published = object.published ? String(object.published) : new Date().toISOString(); - const inReplyToDM = object.replyTargetId?.href || null; - - // Store as message - await addMessage(collections, { - uid: object.id?.href || `dm:${actorUrl}:${Date.now()}`, - actorUrl: actorInfo.url, - actorName: actorInfo.name, - actorPhoto: actorInfo.photo, - actorHandle: actorInfo.handle, - content: { - text: contentText, - html: contentHtml, - }, - inReplyTo: inReplyToDM, - conversationId: actorInfo.url, - direction: "inbound", - published, - createdAt: new Date().toISOString(), - }); - - // Also create a notification so DMs appear in the notification tab - await addNotification(collections, { - uid: `dm:${object.id?.href || `${actorUrl}:${Date.now()}`}`, - url: object.url?.href || object.id?.href || "", - type: "dm", - actorUrl: actorInfo.url, - actorName: actorInfo.name, - actorPhoto: actorInfo.photo, - actorHandle: actorInfo.handle, - content: { - text: contentText, - html: contentHtml, - }, - published, - createdAt: new Date().toISOString(), - }); - - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "DirectMessage", - actorUrl, - actorName, - actorAvatar: actorInfo.photo || "", - objectUrl: object.id?.href || "", - content: contentText.substring(0, 100), - summary: `${actorName} sent a direct message`, - }); - - return; // Don't process DMs as timeline/mention/reply - } - - // Use replyTargetId (non-fetching) for the inReplyTo URL - const inReplyTo = object.replyTargetId?.href || null; - - // Log replies to our posts (existing behavior for conversations) - const pubUrl = collections._publicationUrl; - if (inReplyTo) { - const content = object.content?.toString() || ""; - - // Extract actor info (including avatar) before logging so we can store it - const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); - - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Reply", - actorUrl, - actorName, - actorAvatar: actorInfo.photo || "", - objectUrl: object.id?.href || "", - targetUrl: inReplyTo, - content, - summary: `${actorName} replied to ${inReplyTo}`, - }); - - // Create notification if reply is to one of OUR posts - if (pubUrl && inReplyTo.startsWith(pubUrl)) { - const rawHtml = object.content?.toString() || ""; - const contentHtml = sanitizeContent(rawHtml); - const contentText = rawHtml.replace(/<[^>]*>/g, "").substring(0, 200); - - await addNotification(collections, { - uid: object.id?.href || `reply:${actorUrl}:${inReplyTo}`, - url: object.url?.href || object.id?.href || "", - type: "reply", - actorUrl: actorInfo.url, - actorName: actorInfo.name, - actorPhoto: actorInfo.photo, - actorHandle: actorInfo.handle, - targetUrl: inReplyTo, - targetName: "", - content: { - text: contentText, - html: contentHtml, - }, - published: object.published ? String(object.published) : new Date().toISOString(), - createdAt: new Date().toISOString(), - }); - } - } - - // Check for mentions of our actor - if (object.tag) { - const tags = Array.isArray(object.tag) ? object.tag : [object.tag]; - const ourActorUrl = ctx.getActorUri(handle).href; - - for (const tag of tags) { - if (tag.type === "Mention" && tag.href?.href === ourActorUrl) { - const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader }); - const rawMentionHtml = object.content?.toString() || ""; - const mentionHtml = sanitizeContent(rawMentionHtml); - const contentText = rawMentionHtml.replace(/<[^>]*>/g, "").substring(0, 200); - - await addNotification(collections, { - uid: object.id?.href || `mention:${actorUrl}:${object.id?.href}`, - url: object.url?.href || object.id?.href || "", - type: "mention", - actorUrl: actorInfo.url, - actorName: actorInfo.name, - actorPhoto: actorInfo.photo, - actorHandle: actorInfo.handle, - content: { - text: contentText, - html: mentionHtml, - }, - published: object.published ? String(object.published) : new Date().toISOString(), - createdAt: new Date().toISOString(), - }); - - break; // Only create one mention notification per post - } - } - } - - // Store timeline items from accounts we follow (native storage) - const following = await collections.ap_following.findOne({ actorUrl }); - if (following) { - try { - const timelineItem = await extractObjectData(object, { - actorFallback: actorObj, - documentLoader: authLoader, - }); - await addTimelineItem(collections, timelineItem); - - // Fire-and-forget OG unfurling for notes and articles (not boosts) - if (timelineItem.type === "note" || timelineItem.type === "article") { - fetchAndStorePreviews(collections, timelineItem.uid, timelineItem.content.html) - .catch((error) => { - console.error(`[inbox] OG unfurl failed for ${timelineItem.uid}:`, error); - }); - } - - // Fire-and-forget quote enrichment - if (timelineItem.quoteUrl) { - fetchAndStoreQuote(collections, timelineItem.uid, timelineItem.quoteUrl, ctx, authLoader) - .catch((error) => { - console.error(`[inbox] Quote fetch failed for ${timelineItem.uid}:`, error.message); - }); - } - } catch (error) { - // Log extraction errors but don't fail the entire handler - console.error("Failed to store timeline item:", error); - } - } else if (collections.ap_followed_tags) { - // Not a followed account — check if the post's hashtags match any followed tags - // so tagged posts from across the fediverse appear in the timeline - try { - const objectTags = Array.isArray(object.tag) ? object.tag : (object.tag ? [object.tag] : []); - const postHashtags = objectTags - .filter((t) => t.type === "Hashtag" && t.name) - .map((t) => t.name.toString().replace(/^#/, "").toLowerCase()); - - if (postHashtags.length > 0) { - const followedTags = await getFollowedTags(collections); - const followedSet = new Set(followedTags.map((t) => t.toLowerCase())); - const hasMatchingTag = postHashtags.some((tag) => followedSet.has(tag)); - - if (hasMatchingTag) { - const timelineItem = await extractObjectData(object, { - actorFallback: actorObj, - documentLoader: authLoader, - }); - await addTimelineItem(collections, timelineItem); - } - } - } catch (error) { - // Non-critical — don't fail the handler - console.error("[inbox] Followed tag check failed:", error.message); - } - } - - }) - .on(Delete, async (ctx, del) => { - const objectId = del.objectId?.href || ""; - if (objectId) { - // Remove from activity log - await collections.ap_activities.deleteMany({ objectUrl: objectId }); - - // Remove from timeline - await deleteTimelineItem(collections, objectId); - } - }) - .on(Move, async (ctx, move) => { - const authLoader = await getAuthLoader(ctx); - const oldActorObj = await move.getActor({ documentLoader: authLoader }); - const oldActorUrl = oldActorObj?.id?.href || ""; - const target = await move.getTarget({ documentLoader: authLoader }); - const newActorUrl = target?.id?.href || ""; - - if (oldActorUrl && newActorUrl) { - await collections.ap_followers.updateOne( - { actorUrl: oldActorUrl }, - { $set: { actorUrl: newActorUrl, movedFrom: oldActorUrl } }, - ); - } - - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Move", - actorUrl: oldActorUrl, - objectUrl: newActorUrl, - summary: `${oldActorUrl} moved to ${newActorUrl}`, + await enqueueActivity(collections, { + activityType: "Announce", + actorUrl, + objectUrl: announce.objectId?.href || "", + rawJson: await announce.toJsonLd(), }); }) - .on(Update, async (ctx, update) => { - // Update can be for a profile OR for a post (edited content) - const authLoader = await getAuthLoader(ctx); - // Try to get the object being updated - let object; - try { - object = await update.getObject({ documentLoader: authLoader }); - } catch { - object = null; - } + // ── Create ────────────────────────────────────────────────────── + .on(Create, async (ctx, create) => { + const actorUrl = create.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); - // PATH 1: If object is a Note/Article → Update timeline item content - if (object && (object instanceof Note || object instanceof Article)) { - const objectUrl = object.id?.href || ""; - if (objectUrl) { - try { - // Extract updated content - const contentHtml = object.content?.toString() || ""; - const contentText = object.source?.content?.toString() || contentHtml.replace(/<[^>]*>/g, ""); - - const updates = { - content: { - text: contentText, - html: contentHtml, - }, - name: object.name?.toString() || "", - summary: object.summary?.toString() || "", - sensitive: object.sensitive || false, - }; - - await updateTimelineItem(collections, objectUrl, updates); - } catch (error) { - console.error("Failed to update timeline item:", error); - } - } - return; - } - - // PATH 2: Otherwise, assume profile update — refresh stored follower data - const actorObj = await update.getActor({ documentLoader: authLoader }); - const actorUrl = actorObj?.id?.href || ""; - if (!actorUrl) return; - - const existing = await collections.ap_followers.findOne({ actorUrl }); - if (existing) { - await collections.ap_followers.updateOne( - { actorUrl }, - { - $set: { - name: - actorObj.name?.toString() || - actorObj.preferredUsername?.toString() || - actorUrl, - handle: actorObj.preferredUsername?.toString() || "", - avatar: actorObj.icon - ? (await actorObj.icon)?.url?.href || "" - : "", - updatedAt: new Date().toISOString(), - }, - }, - ); - } + await enqueueActivity(collections, { + activityType: "Create", + actorUrl, + objectUrl: create.objectId?.href || "", + rawJson: await create.toJsonLd(), + }); }) + + // ── Delete ────────────────────────────────────────────────────── + .on(Delete, async (ctx, del) => { + const actorUrl = del.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); + + await enqueueActivity(collections, { + activityType: "Delete", + actorUrl, + objectUrl: del.objectId?.href || "", + rawJson: await del.toJsonLd(), + }); + }) + + // ── Move ──────────────────────────────────────────────────────── + .on(Move, async (ctx, move) => { + const actorUrl = move.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); + + await enqueueActivity(collections, { + activityType: "Move", + actorUrl, + rawJson: await move.toJsonLd(), + }); + }) + + // ── Update ────────────────────────────────────────────────────── + .on(Update, async (ctx, update) => { + const actorUrl = update.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); + + await enqueueActivity(collections, { + activityType: "Update", + actorUrl, + rawJson: await update.toJsonLd(), + }); + }) + + // ── Block ─────────────────────────────────────────────────────── + // Synchronous: remove from followers (immediate) + // Async: activity log .on(Block, async (ctx, block) => { - // Remote actor blocked us — remove them from followers + const actorUrl = block.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + + // Synchronous: remove from followers immediately const authLoader = await getAuthLoader(ctx); const actorObj = await block.getActor({ documentLoader: authLoader }); - const actorUrl = actorObj?.id?.href || ""; - if (actorUrl) { - await collections.ap_followers.deleteOne({ actorUrl }); + const resolvedUrl = actorObj?.id?.href || ""; + if (resolvedUrl) { + await collections.ap_followers.deleteOne({ actorUrl: resolvedUrl }); } + + await enqueueActivity(collections, { + activityType: "Block", + actorUrl: resolvedUrl || actorUrl, + rawJson: await block.toJsonLd(), + }); }) - .on(Add, async () => { - // Mastodon uses Add for pinning posts to featured collections — safe to ignore - }) - .on(Remove, async () => { - // Mastodon uses Remove for unpinning posts from featured collections — safe to ignore - }) - // ── Flag (Report) ────────────────────────────────────────────── + + // ── Add / Remove (no-ops) ─────────────────────────────────────── + .on(Add, async () => {}) + .on(Remove, async () => {}) + + // ── Flag ──────────────────────────────────────────────────────── .on(Flag, async (ctx, flag) => { - try { - const authLoader = await getAuthLoader(ctx); - const actorObj = await flag.getActor({ documentLoader: authLoader }).catch(() => null); + const actorUrl = flag.actorId?.href || ""; + if (await isServerBlocked(actorUrl, collections)) return; + await touchKeyFreshness(collections, actorUrl); - const reporterUrl = actorObj?.id?.href || flag.actorId?.href || ""; - const reporterName = actorObj?.name?.toString() || actorObj?.preferredUsername?.toString() || reporterUrl; - - // Extract reported objects — Flag can report actors or posts - const reportedIds = flag.objectIds?.map((u) => u.href) || []; - const reason = flag.content?.toString() || flag.summary?.toString() || ""; - - if (reportedIds.length === 0 && !reason) { - console.info("[ActivityPub] Ignoring empty Flag from", reporterUrl); - return; - } - - // Store report - if (collections.ap_reports) { - await collections.ap_reports.insertOne({ - reporterUrl, - reporterName, - reportedUrls: reportedIds, - reason, - createdAt: new Date().toISOString(), - read: false, - }); - } - - // Create notification - if (collections.ap_notifications) { - await addNotification(collections, { - uid: `flag:${reporterUrl}:${Date.now()}`, - type: "report", - actorUrl: reporterUrl, - actorName: reporterName, - actorPhoto: actorObj?.iconUrl?.href || actorObj?.icon?.url?.href || "", - actorHandle: actorObj?.preferredUsername - ? `@${actorObj.preferredUsername}@${new URL(reporterUrl).hostname}` - : reporterUrl, - objectUrl: reportedIds[0] || "", - summary: reason ? reason.slice(0, 200) : "Report received", - published: new Date().toISOString(), - createdAt: new Date().toISOString(), - }); - } - - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Flag", - actorUrl: reporterUrl, - objectUrl: reportedIds[0] || "", - summary: `Report from ${reporterName}: ${reason.slice(0, 100)}`, - }); - - console.info(`[ActivityPub] Flag received from ${reporterName} — ${reportedIds.length} objects reported`); - } catch (error) { - console.warn("[ActivityPub] Flag handler error:", error.message); - } + await enqueueActivity(collections, { + activityType: "Flag", + actorUrl, + rawJson: await flag.toJsonLd(), + }); }); } - -/** - * Log an activity to the ap_activities collection. - * Wrapper around the shared utility that accepts the (collections, storeRaw, record) signature - * used throughout this file. - */ -async function logActivity(collections, storeRaw, record, rawJson) { - await logActivityShared( - collections.ap_activities, - record, - storeRaw && rawJson ? { rawJson } : {}, - ); -} - diff --git a/lib/inbox-queue.js b/lib/inbox-queue.js new file mode 100644 index 0000000..920a0a6 --- /dev/null +++ b/lib/inbox-queue.js @@ -0,0 +1,99 @@ +/** + * MongoDB-backed inbox processing queue. + * Runs a setInterval-based processor that dequeues and processes + * one activity at a time from ap_inbox_queue. + * @module inbox-queue + */ + +import { routeToHandler } from "./inbox-handlers.js"; + +/** + * Process the next pending item from the inbox queue. + * Uses findOneAndUpdate for atomic claim (prevents double-processing). + * + * @param {object} collections - MongoDB collections + * @param {object} ctx - Fedify context + * @param {string} handle - Our actor handle + */ +async function processNextItem(collections, ctx, handle) { + const { ap_inbox_queue } = collections; + if (!ap_inbox_queue) return; + + const item = await ap_inbox_queue.findOneAndUpdate( + { status: "pending" }, + { $set: { status: "processing" } }, + { sort: { receivedAt: 1 }, returnDocument: "after" }, + ); + if (!item) return; + + try { + await routeToHandler(item, collections, ctx, handle); + await ap_inbox_queue.updateOne( + { _id: item._id }, + { $set: { status: "completed", processedAt: new Date().toISOString() } }, + ); + } catch (error) { + const attempts = (item.attempts || 0) + 1; + await ap_inbox_queue.updateOne( + { _id: item._id }, + { + $set: { + status: attempts >= (item.maxAttempts || 3) ? "failed" : "pending", + attempts, + error: error.message, + }, + }, + ); + console.error(`[inbox-queue] Failed processing ${item.activityType} from ${item.actorUrl}: ${error.message}`); + } +} + +/** + * Enqueue an activity for async processing. + * @param {object} collections - MongoDB collections + * @param {object} params + * @param {string} params.activityType - Activity type name + * @param {string} params.actorUrl - Actor URL + * @param {string} [params.objectUrl] - Object URL + * @param {object} params.rawJson - Full activity JSON-LD + */ +export async function enqueueActivity(collections, { activityType, actorUrl, objectUrl, rawJson }) { + const { ap_inbox_queue } = collections; + if (!ap_inbox_queue) return; + + await ap_inbox_queue.insertOne({ + activityType, + actorUrl: actorUrl || "", + objectUrl: objectUrl || "", + rawJson, + status: "pending", + attempts: 0, + maxAttempts: 3, + receivedAt: new Date().toISOString(), + processedAt: null, + error: null, + }); +} + +/** + * Start the background inbox processor. + * @param {object} collections - MongoDB collections + * @param {Function} getCtx - Function returning a Fedify context + * @param {string} handle - Our actor handle + * @returns {NodeJS.Timeout} Interval ID (for cleanup) + */ +export function startInboxProcessor(collections, getCtx, handle) { + const intervalId = setInterval(async () => { + try { + const ctx = getCtx(); + if (ctx) { + await processNextItem(collections, ctx, handle); + } + } catch (error) { + console.error("[inbox-queue] Processor error:", error.message); + } + }, 3_000); // Every 3 seconds + + console.info("[ActivityPub] Inbox queue processor started (3s interval)"); + return intervalId; +} diff --git a/lib/key-refresh.js b/lib/key-refresh.js new file mode 100644 index 0000000..c5fa4a5 --- /dev/null +++ b/lib/key-refresh.js @@ -0,0 +1,138 @@ +/** + * Proactive key refresh for remote actors. + * Periodically re-fetches actor documents for active followers + * whose keys may have rotated, keeping Fedify's KV cache fresh. + * @module key-refresh + */ + +import { lookupWithSecurity } from "./lookup-helpers.js"; + +/** + * Update key freshness tracking after successfully processing + * an activity from a remote actor. + * @param {object} collections - MongoDB collections + * @param {string} actorUrl - Remote actor URL + */ +export async function touchKeyFreshness(collections, actorUrl) { + if (!actorUrl || !collections.ap_key_freshness) return; + try { + await collections.ap_key_freshness.updateOne( + { actorUrl }, + { + $set: { lastSeenAt: new Date().toISOString() }, + $setOnInsert: { lastRefreshedAt: new Date().toISOString() }, + }, + { upsert: true }, + ); + } catch { + // Non-critical + } +} + +/** + * Refresh stale keys for active followers. + * Finds followers whose keys haven't been refreshed in 7+ days + * and re-fetches their actor documents (up to 10 per cycle). + * + * @param {object} collections - MongoDB collections + * @param {object} ctx - Fedify context (for lookupObject) + * @param {string} handle - Our actor handle + */ +export async function refreshStaleKeys(collections, ctx, handle) { + if (!collections.ap_key_freshness || !collections.ap_followers) return; + + const sevenDaysAgo = new Date(Date.now() - 7 * 86_400_000).toISOString(); + + // Find actors with stale keys who are still our followers + const staleActors = await collections.ap_key_freshness + .aggregate([ + { + $match: { + lastRefreshedAt: { $lt: sevenDaysAgo }, + }, + }, + { + $lookup: { + from: "ap_followers", + localField: "actorUrl", + foreignField: "actorUrl", + as: "follower", + }, + }, + { $match: { "follower.0": { $exists: true } } }, + { $limit: 10 }, + ]) + .toArray(); + + if (staleActors.length === 0) return; + + console.info(`[ActivityPub] Refreshing keys for ${staleActors.length} stale actors`); + + const documentLoader = await ctx.getDocumentLoader({ identifier: handle }); + + for (const entry of staleActors) { + try { + const result = await lookupWithSecurity(ctx, new URL(entry.actorUrl), { + documentLoader, + }); + + await collections.ap_key_freshness.updateOne( + { actorUrl: entry.actorUrl }, + { $set: { lastRefreshedAt: new Date().toISOString() } }, + ); + + if (!result) { + // Actor gone — log as stale + await collections.ap_activities?.insertOne({ + direction: "system", + type: "StaleActor", + actorUrl: entry.actorUrl, + summary: `Actor ${entry.actorUrl} could not be resolved during key refresh`, + receivedAt: new Date().toISOString(), + }); + } + } catch (error) { + const status = error?.cause?.status || error?.message || "unknown"; + if (status === 410 || String(status).includes("410")) { + // 410 Gone — actor deleted + await collections.ap_activities?.insertOne({ + direction: "system", + type: "StaleActor", + actorUrl: entry.actorUrl, + summary: `Actor ${entry.actorUrl} returned 410 Gone during key refresh`, + receivedAt: new Date().toISOString(), + }); + } + // Update lastRefreshedAt even on failure to avoid retrying every cycle + await collections.ap_key_freshness.updateOne( + { actorUrl: entry.actorUrl }, + { $set: { lastRefreshedAt: new Date().toISOString() } }, + ); + } + } +} + +/** + * Schedule key refresh job (runs on startup + every 24h). + * @param {object} collections - MongoDB collections + * @param {Function} getCtx - Function returning a Fedify context + * @param {string} handle - Our actor handle + */ +export function scheduleKeyRefresh(collections, getCtx, handle) { + const run = async () => { + try { + const ctx = getCtx(); + if (ctx) { + await refreshStaleKeys(collections, ctx, handle); + } + } catch (error) { + console.error("[ActivityPub] Key refresh failed:", error.message); + } + }; + + // Run once on startup (delayed to let federation initialize) + setTimeout(run, 30_000); + + // Then every 24 hours + setInterval(run, 86_400_000); +} diff --git a/lib/redis-cache.js b/lib/redis-cache.js index 5d3a465..97e07f6 100644 --- a/lib/redis-cache.js +++ b/lib/redis-cache.js @@ -96,3 +96,19 @@ export async function cacheExists(key) { return false; } } + +/** + * Cache-aside wrapper for query functions. + * Returns cached result if available, otherwise runs queryFn and caches result. + * @param {string} key - Cache key (without prefix — cacheGet/cacheSet add it) + * @param {number} ttlSeconds - TTL in seconds + * @param {Function} queryFn - Async function to run on cache miss + * @returns {Promise} + */ +export async function cachedQuery(key, ttlSeconds, queryFn) { + const cached = await cacheGet(key); + if (cached !== null) return cached; + const result = await queryFn(); + await cacheSet(key, result, ttlSeconds); + return result; +} diff --git a/lib/storage/server-blocks.js b/lib/storage/server-blocks.js new file mode 100644 index 0000000..6dd6e33 --- /dev/null +++ b/lib/storage/server-blocks.js @@ -0,0 +1,121 @@ +/** + * Server-level blocking storage operations. + * Blocks entire instances by hostname, checked in inbox listeners + * before any expensive work is done. + * @module storage/server-blocks + */ + +import { getRedisClient } from "../redis-cache.js"; + +const REDIS_KEY = "indiekit:blocked_servers"; + +/** + * Add a server block by hostname. + * @param {object} collections - MongoDB collections + * @param {string} hostname - Hostname to block (lowercase, no protocol) + * @param {string} [reason] - Optional admin note + */ +export async function addBlockedServer(collections, hostname, reason) { + const { ap_blocked_servers } = collections; + const normalized = hostname.toLowerCase().trim(); + + await ap_blocked_servers.updateOne( + { hostname: normalized }, + { + $setOnInsert: { + hostname: normalized, + blockedAt: new Date().toISOString(), + ...(reason ? { reason } : {}), + }, + }, + { upsert: true }, + ); + + // Incremental Redis update + const redis = getRedisClient(); + if (redis) { + try { + await redis.sadd(REDIS_KEY, normalized); + } catch { + // Non-critical + } + } +} + +/** + * Remove a server block by hostname. + * @param {object} collections - MongoDB collections + * @param {string} hostname - Hostname to unblock + */ +export async function removeBlockedServer(collections, hostname) { + const { ap_blocked_servers } = collections; + const normalized = hostname.toLowerCase().trim(); + + await ap_blocked_servers.deleteOne({ hostname: normalized }); + + const redis = getRedisClient(); + if (redis) { + try { + await redis.srem(REDIS_KEY, normalized); + } catch { + // Non-critical + } + } +} + +/** + * Get all blocked servers. + * @param {object} collections - MongoDB collections + * @returns {Promise} Array of block entries + */ +export async function getAllBlockedServers(collections) { + const { ap_blocked_servers } = collections; + return await ap_blocked_servers.find({}).sort({ blockedAt: -1 }).toArray(); +} + +/** + * Check if a server is blocked by actor URL. + * Uses Redis Set (O(1)) with MongoDB fallback. + * @param {string} actorUrl - Full actor URL + * @param {object} collections - MongoDB collections (fallback only) + * @returns {Promise} + */ +export async function isServerBlocked(actorUrl, collections) { + if (!actorUrl) return false; + try { + const hostname = new URL(actorUrl).hostname.toLowerCase(); + const redis = getRedisClient(); + if (redis) { + return (await redis.sismember(REDIS_KEY, hostname)) === 1; + } + // Fallback: direct MongoDB check + const { ap_blocked_servers } = collections; + return !!(await ap_blocked_servers.findOne({ hostname })); + } catch { + return false; + } +} + +/** + * Load all blocked hostnames into Redis Set on startup. + * Replaces existing set contents entirely. + * @param {object} collections - MongoDB collections + */ +export async function loadBlockedServersToRedis(collections) { + const redis = getRedisClient(); + if (!redis) return; + + try { + const { ap_blocked_servers } = collections; + const docs = await ap_blocked_servers.find({}).toArray(); + const hostnames = docs.map((d) => d.hostname); + + // Replace: delete existing set, then add all + await redis.del(REDIS_KEY); + if (hostnames.length > 0) { + await redis.sadd(REDIS_KEY, ...hostnames); + } + } catch { + // Non-critical — isServerBlocked falls back to MongoDB + } +} diff --git a/views/activitypub-moderation.njk b/views/activitypub-moderation.njk index b8fb18d..e229ca9 100644 --- a/views/activitypub-moderation.njk +++ b/views/activitypub-moderation.njk @@ -26,6 +26,38 @@ + {# Blocked servers #} +
+

Blocked Servers

+

Block entire instances by hostname. Activities from blocked servers are rejected before any processing.

+ {% if blockedServers and blockedServers.length > 0 %} +
    + {% for entry in blockedServers %} +
  • + {{ entry.hostname }} + {% if entry.reason %}({{ entry.reason }}){% endif %} + +
  • + {% endfor %} +
+ {% else %} +

No servers blocked.

+ {% endif %} + +
+ + +
+
+ {# Blocked actors #}

{{ __("activitypub.moderation.blockedTitle") }}

@@ -108,6 +140,7 @@ document.addEventListener('alpine:init', () => { Alpine.data('moderationPage', () => ({ newKeyword: '', + newServerHostname: '', submitting: false, error: '', @@ -157,6 +190,50 @@ this.submitting = false; }, + async addBlockedServer() { + const hostname = this.newServerHostname.trim(); + if (!hostname) return; + this.submitting = true; + this.error = ''; + try { + const res = await fetch(this.mountPath + '/admin/reader/block-server', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-CSRF-Token': this.csrfToken, + }, + body: JSON.stringify({ hostname }), + }); + const data = await res.json(); + if (data.success) { + const list = this.$refs.serverList; + if (list) { + const li = document.createElement('li'); + li.className = 'ap-moderation__entry'; + li.dataset.hostname = hostname; + const code = document.createElement('code'); + code.textContent = hostname; + const btn = document.createElement('button'); + btn.className = 'ap-moderation__remove'; + btn.textContent = 'Unblock'; + btn.addEventListener('click', () => { + this.removeEntry(btn, 'unblock-server', { hostname }); + }); + li.append(code, btn); + list.appendChild(li); + } + if (this.$refs.serverEmpty) this.$refs.serverEmpty.remove(); + this.newServerHostname = ''; + this.$refs.serverInput.focus(); + } else { + this.error = data.error || 'Failed to block server'; + } + } catch (e) { + this.error = 'Request failed'; + } + this.submitting = false; + }, + async removeEntry(el, action, payload) { const li = el.closest('li'); if (!li) return;