From e40ffbf61dc301273d7fafd8675ea00197a6c096 Mon Sep 17 00:00:00 2001 From: Ricardo Date: Thu, 19 Feb 2026 18:11:28 +0100 Subject: [PATCH] feat: add outbound follow/unfollow, activity logging, and Microsub timeline integration - Add followActor() and unfollowActor() methods for sending Follow/Undo(Follow) activities - Add shared activity-log.js utility for logging to ap_activities collection - Log all outbound activities (syndication, follow, unfollow) with success/failure details - Update inbox Create listener to store timeline items from followed accounts - Add Microsub collection accessors for cross-plugin timeline integration - Refactor inbox-listeners to use shared activity logging utility --- index.js | 219 +++++++++++++++++++++++++++++++++++++++++ lib/activity-log.js | 31 ++++++ lib/inbox-listeners.js | 188 +++++++++++++++++++++++++++++++---- 3 files changed, 419 insertions(+), 19 deletions(-) create mode 100644 lib/activity-log.js diff --git a/index.js b/index.js index 16181a1..6e3518e 100644 --- a/index.js +++ b/index.js @@ -21,6 +21,7 @@ import { profileGetController, profilePostController, } from "./lib/controllers/profile.js"; +import { logActivity } from "./lib/activity-log.js"; const defaults = { mountPath: "/activitypub", @@ -256,6 +257,13 @@ export default class ActivityPubEndpoint { ); if (!activity) { + await logActivity(self._collections.ap_activities, { + direction: "outbound", + type: "Syndicate", + actorUrl: self._publicationUrl, + objectUrl: properties.url, + summary: `Syndication skipped: could not convert post to AS2`, + }); return undefined; } @@ -264,21 +272,225 @@ export default class ActivityPubEndpoint { {}, ); + // Count followers for logging + const followerCount = + await self._collections.ap_followers.countDocuments(); + + console.info( + `[ActivityPub] Sending ${activity.constructor?.name || "activity"} for ${properties.url} to ${followerCount} followers`, + ); + await ctx.sendActivity( { identifier: self.options.actor.handle }, "followers", activity, ); + // Determine activity type name + const typeName = + activity.constructor?.name || "Create"; + + await logActivity(self._collections.ap_activities, { + direction: "outbound", + type: typeName, + actorUrl: self._publicationUrl, + objectUrl: properties.url, + summary: `Sent ${typeName} for ${properties.url} to ${followerCount} followers`, + }); + + console.info( + `[ActivityPub] Syndication queued: ${typeName} for ${properties.url}`, + ); + return properties.url || undefined; } catch (error) { console.error("[ActivityPub] Syndication failed:", error.message); + await logActivity(self._collections.ap_activities, { + direction: "outbound", + type: "Syndicate", + actorUrl: self._publicationUrl, + objectUrl: properties.url, + summary: `Syndication failed: ${error.message}`, + }).catch(() => {}); return undefined; } }, }; } + /** + * Send a Follow activity to a remote actor and store in ap_following. + * @param {string} actorUrl - The remote actor's URL + * @param {object} [actorInfo] - Optional pre-fetched actor info + * @param {string} [actorInfo.name] - Actor display name + * @param {string} [actorInfo.handle] - Actor handle + * @param {string} [actorInfo.photo] - Actor avatar URL + * @returns {Promise<{ok: boolean, error?: string}>} + */ + async followActor(actorUrl, actorInfo = {}) { + if (!this._federation) { + return { ok: false, error: "Federation not initialized" }; + } + + try { + const { Follow } = await import("@fedify/fedify"); + const handle = this.options.actor.handle; + const ctx = this._federation.createContext( + new URL(this._publicationUrl), + {}, + ); + + // Resolve the remote actor to get their inbox + const remoteActor = await ctx.lookupObject(actorUrl); + if (!remoteActor) { + return { ok: false, error: "Could not resolve remote actor" }; + } + + // Send Follow activity + const follow = new Follow({ + actor: ctx.getActorUri(handle), + object: new URL(actorUrl), + }); + + await ctx.sendActivity({ identifier: handle }, remoteActor, follow); + + // Store in ap_following + const name = + actorInfo.name || + remoteActor.name?.toString() || + remoteActor.preferredUsername?.toString() || + actorUrl; + const actorHandle = + actorInfo.handle || + remoteActor.preferredUsername?.toString() || + ""; + const avatar = + actorInfo.photo || + (remoteActor.icon + ? (await remoteActor.icon)?.url?.href || "" + : ""); + const inbox = remoteActor.inbox?.id?.href || ""; + const sharedInbox = remoteActor.endpoints?.sharedInbox?.href || ""; + + await this._collections.ap_following.updateOne( + { actorUrl }, + { + $set: { + actorUrl, + handle: actorHandle, + name, + avatar, + inbox, + sharedInbox, + followedAt: new Date().toISOString(), + source: "microsub-reader", + }, + }, + { upsert: true }, + ); + + console.info(`[ActivityPub] Sent Follow to ${actorUrl}`); + + await logActivity(this._collections.ap_activities, { + direction: "outbound", + type: "Follow", + actorUrl: this._publicationUrl, + objectUrl: actorUrl, + actorName: name, + summary: `Sent Follow to ${name} (${actorUrl})`, + }); + + return { ok: true }; + } catch (error) { + console.error(`[ActivityPub] Follow failed for ${actorUrl}:`, error.message); + + await logActivity(this._collections.ap_activities, { + direction: "outbound", + type: "Follow", + actorUrl: this._publicationUrl, + objectUrl: actorUrl, + summary: `Follow failed for ${actorUrl}: ${error.message}`, + }).catch(() => {}); + + return { ok: false, error: error.message }; + } + } + + /** + * Send an Undo(Follow) activity and remove from ap_following. + * @param {string} actorUrl - The remote actor's URL + * @returns {Promise<{ok: boolean, error?: string}>} + */ + async unfollowActor(actorUrl) { + if (!this._federation) { + return { ok: false, error: "Federation not initialized" }; + } + + try { + const { Follow, Undo } = await import("@fedify/fedify"); + const handle = this.options.actor.handle; + const ctx = this._federation.createContext( + new URL(this._publicationUrl), + {}, + ); + + const remoteActor = await ctx.lookupObject(actorUrl); + if (!remoteActor) { + // Even if we can't resolve, remove locally + await this._collections.ap_following.deleteOne({ actorUrl }); + + await logActivity(this._collections.ap_activities, { + direction: "outbound", + type: "Undo(Follow)", + actorUrl: this._publicationUrl, + objectUrl: actorUrl, + summary: `Removed ${actorUrl} locally (could not resolve remote actor)`, + }).catch(() => {}); + + return { ok: true }; + } + + const follow = new Follow({ + actor: ctx.getActorUri(handle), + object: new URL(actorUrl), + }); + + const undo = new Undo({ + actor: ctx.getActorUri(handle), + object: follow, + }); + + await ctx.sendActivity({ identifier: handle }, remoteActor, undo); + await this._collections.ap_following.deleteOne({ actorUrl }); + + console.info(`[ActivityPub] Sent Undo(Follow) to ${actorUrl}`); + + await logActivity(this._collections.ap_activities, { + direction: "outbound", + type: "Undo(Follow)", + actorUrl: this._publicationUrl, + objectUrl: actorUrl, + summary: `Sent Undo(Follow) to ${actorUrl}`, + }); + + return { ok: true }; + } catch (error) { + console.error(`[ActivityPub] Unfollow failed for ${actorUrl}:`, error.message); + + await logActivity(this._collections.ap_activities, { + direction: "outbound", + type: "Undo(Follow)", + actorUrl: this._publicationUrl, + objectUrl: actorUrl, + summary: `Unfollow failed for ${actorUrl}: ${error.message}`, + }).catch(() => {}); + + // Remove locally even if remote delivery fails + await this._collections.ap_following.deleteOne({ actorUrl }).catch(() => {}); + return { ok: false, error: error.message }; + } + } + /** * Build the full actor URL from config. * @returns {string} @@ -316,6 +528,13 @@ export default class ActivityPubEndpoint { get posts() { return indiekitCollections.get("posts"); }, + // Lazy access to Microsub collections (may not exist if plugin not loaded) + get microsub_items() { + return indiekitCollections.get("microsub_items"); + }, + get microsub_channels() { + return indiekitCollections.get("microsub_channels"); + }, _publicationUrl: this._publicationUrl, }; diff --git a/lib/activity-log.js b/lib/activity-log.js new file mode 100644 index 0000000..abf9637 --- /dev/null +++ b/lib/activity-log.js @@ -0,0 +1,31 @@ +/** + * Shared activity logging utility. + * + * Logs inbound and outbound ActivityPub activities to the ap_activities + * collection so they appear in the Activity Log admin UI. + */ + +/** + * Log an activity to the ap_activities collection. + * + * @param {object} collection - The ap_activities MongoDB collection + * @param {object} record - Activity record + * @param {string} record.direction - "inbound" or "outbound" + * @param {string} record.type - Activity type (e.g. "Create", "Follow", "Undo(Follow)") + * @param {string} [record.actorUrl] - Actor URL + * @param {string} [record.actorName] - Actor display name + * @param {string} [record.objectUrl] - Object URL + * @param {string} [record.targetUrl] - Target URL (e.g. reply target) + * @param {string} [record.content] - Content excerpt + * @param {string} record.summary - Human-readable summary + */ +export async function logActivity(collection, record) { + try { + await collection.insertOne({ + ...record, + receivedAt: new Date().toISOString(), + }); + } catch (error) { + console.warn("[ActivityPub] Failed to log activity:", error.message); + } +} diff --git a/lib/inbox-listeners.js b/lib/inbox-listeners.js index 6871f4b..5e7090f 100644 --- a/lib/inbox-listeners.js +++ b/lib/inbox-listeners.js @@ -21,6 +21,8 @@ import { Update, } from "@fedify/fedify"; +import { logActivity as logActivityShared } from "./activity-log.js"; + /** * Register all inbox listeners on a federation's inbox chain. * @@ -157,12 +159,6 @@ export function registerInboxListeners(inboxChain, options) { const object = await create.getObject(); if (!object) return; - const inReplyTo = - object instanceof Note - ? (await object.getInReplyTo())?.id?.href - : null; - if (!inReplyTo) return; - const actorObj = await create.getActor(); const actorUrl = actorObj?.id?.href || ""; const actorName = @@ -170,18 +166,33 @@ export function registerInboxListeners(inboxChain, options) { actorObj?.preferredUsername?.toString() || actorUrl; - // Extract reply content (HTML) - const content = object.content?.toString() || ""; + const inReplyTo = + object instanceof Note + ? (await object.getInReplyTo())?.id?.href + : null; - await logActivity(collections, storeRawActivities, { - direction: "inbound", - type: "Reply", + // Log replies to our posts (existing behavior for conversations) + if (inReplyTo) { + const content = object.content?.toString() || ""; + await logActivity(collections, storeRawActivities, { + direction: "inbound", + type: "Reply", + actorUrl, + actorName, + objectUrl: object.id?.href || "", + targetUrl: inReplyTo, + content, + summary: `${actorName} replied to ${inReplyTo}`, + }); + } + + // Store timeline items from accounts we follow + await storeTimelineItem(collections, { actorUrl, actorName, - objectUrl: object.id?.href || "", - targetUrl: inReplyTo, - content, - summary: `${actorName} replied to ${inReplyTo}`, + actorObj, + object, + inReplyTo, }); }) .on(Delete, async (ctx, del) => { @@ -255,10 +266,149 @@ export function registerInboxListeners(inboxChain, options) { /** * Log an activity to the ap_activities collection. + * Wrapper around the shared utility that accepts the (collections, storeRaw, record) signature + * used throughout this file. */ async function logActivity(collections, storeRaw, record) { - await collections.ap_activities.insertOne({ - ...record, - receivedAt: new Date().toISOString(), - }); + await logActivityShared(collections.ap_activities, record); +} + +// Cached ActivityPub channel ObjectId +let _apChannelId = null; + +/** + * Look up the ActivityPub channel's ObjectId (cached after first call). + * @param {object} collections - MongoDB collections + * @returns {Promise} + */ +async function getApChannelId(collections) { + if (_apChannelId) return _apChannelId; + const channel = await collections.microsub_channels?.findOne({ + uid: "activitypub", + }); + _apChannelId = channel?._id || null; + return _apChannelId; +} + +/** + * Store a Create activity as a Microsub timeline item if the actor + * is someone we follow. Skips gracefully if the Microsub plugin + * isn't loaded or the AP channel doesn't exist yet. + * + * @param {object} collections - MongoDB collections + * @param {object} params + * @param {string} params.actorUrl - Actor URL + * @param {string} params.actorName - Actor display name + * @param {object} params.actorObj - Fedify actor object + * @param {object} params.object - Fedify Note/Article object + * @param {string|null} params.inReplyTo - URL this is a reply to (if any) + */ +async function storeTimelineItem( + collections, + { actorUrl, actorName, actorObj, object, inReplyTo }, +) { + // Skip if Microsub plugin not loaded + if (!collections.microsub_items || !collections.microsub_channels) return; + + // Only store posts from accounts we follow + const following = await collections.ap_following.findOne({ actorUrl }); + if (!following) return; + + const channelId = await getApChannelId(collections); + if (!channelId) return; + + const objectUrl = object.id?.href || ""; + if (!objectUrl) return; + + // Extract content + const contentHtml = object.content?.toString() || ""; + const contentText = contentHtml.replace(/<[^>]*>/g, "").trim(); + + // Name (usually only on Article, not Note) + const name = object.name?.toString() || undefined; + const summary = object.summary?.toString() || undefined; + + // Published date — Fedify returns Temporal.Instant + let published; + if (object.published) { + try { + published = new Date(Number(object.published.epochMilliseconds)); + } catch { + published = new Date(); + } + } + + // Author avatar + let authorPhoto = ""; + try { + if (actorObj.icon) { + const iconObj = await actorObj.icon; + authorPhoto = iconObj?.url?.href || ""; + } + } catch { + /* remote fetch may fail */ + } + + // Tags / categories + const category = []; + try { + for await (const tag of object.getTags()) { + const tagName = tag.name?.toString(); + if (tagName) category.push(tagName.replace(/^#/, "")); + } + } catch { + /* ignore */ + } + + // Attachments (photos, videos, audio) + const photo = []; + const video = []; + const audio = []; + try { + for await (const att of object.getAttachments()) { + const mediaType = att.mediaType?.toString() || ""; + const url = att.url?.href || att.id?.href || ""; + if (!url) continue; + if (mediaType.startsWith("image/")) photo.push(url); + else if (mediaType.startsWith("video/")) video.push(url); + else if (mediaType.startsWith("audio/")) audio.push(url); + } + } catch { + /* ignore */ + } + + const item = { + channelId, + feedId: null, + uid: objectUrl, + type: "entry", + url: objectUrl, + name, + content: contentHtml ? { text: contentText, html: contentHtml } : undefined, + summary, + published: published || new Date(), + author: { + name: actorName, + url: actorUrl, + photo: authorPhoto, + }, + category, + photo, + video, + audio, + inReplyTo: inReplyTo ? [inReplyTo] : [], + source: { + type: "activitypub", + actorUrl, + }, + readBy: [], + createdAt: new Date().toISOString(), + }; + + // Atomic upsert — prevents duplicates without a separate check+insert + await collections.microsub_items.updateOne( + { channelId, uid: objectUrl }, + { $setOnInsert: item }, + { upsert: true }, + ); }