feat: add outbound follow/unfollow, activity logging, and Microsub timeline integration
- Add followActor() and unfollowActor() methods for sending Follow/Undo(Follow) activities - Add shared activity-log.js utility for logging to ap_activities collection - Log all outbound activities (syndication, follow, unfollow) with success/failure details - Update inbox Create listener to store timeline items from followed accounts - Add Microsub collection accessors for cross-plugin timeline integration - Refactor inbox-listeners to use shared activity logging utility
This commit is contained in:
@@ -21,6 +21,7 @@ import {
|
||||
profileGetController,
|
||||
profilePostController,
|
||||
} from "./lib/controllers/profile.js";
|
||||
import { logActivity } from "./lib/activity-log.js";
|
||||
|
||||
const defaults = {
|
||||
mountPath: "/activitypub",
|
||||
@@ -256,6 +257,13 @@ export default class ActivityPubEndpoint {
|
||||
);
|
||||
|
||||
if (!activity) {
|
||||
await logActivity(self._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: "Syndicate",
|
||||
actorUrl: self._publicationUrl,
|
||||
objectUrl: properties.url,
|
||||
summary: `Syndication skipped: could not convert post to AS2`,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@@ -264,21 +272,225 @@ export default class ActivityPubEndpoint {
|
||||
{},
|
||||
);
|
||||
|
||||
// Count followers for logging
|
||||
const followerCount =
|
||||
await self._collections.ap_followers.countDocuments();
|
||||
|
||||
console.info(
|
||||
`[ActivityPub] Sending ${activity.constructor?.name || "activity"} for ${properties.url} to ${followerCount} followers`,
|
||||
);
|
||||
|
||||
await ctx.sendActivity(
|
||||
{ identifier: self.options.actor.handle },
|
||||
"followers",
|
||||
activity,
|
||||
);
|
||||
|
||||
// Determine activity type name
|
||||
const typeName =
|
||||
activity.constructor?.name || "Create";
|
||||
|
||||
await logActivity(self._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: typeName,
|
||||
actorUrl: self._publicationUrl,
|
||||
objectUrl: properties.url,
|
||||
summary: `Sent ${typeName} for ${properties.url} to ${followerCount} followers`,
|
||||
});
|
||||
|
||||
console.info(
|
||||
`[ActivityPub] Syndication queued: ${typeName} for ${properties.url}`,
|
||||
);
|
||||
|
||||
return properties.url || undefined;
|
||||
} catch (error) {
|
||||
console.error("[ActivityPub] Syndication failed:", error.message);
|
||||
await logActivity(self._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: "Syndicate",
|
||||
actorUrl: self._publicationUrl,
|
||||
objectUrl: properties.url,
|
||||
summary: `Syndication failed: ${error.message}`,
|
||||
}).catch(() => {});
|
||||
return undefined;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a Follow activity to a remote actor and store in ap_following.
|
||||
* @param {string} actorUrl - The remote actor's URL
|
||||
* @param {object} [actorInfo] - Optional pre-fetched actor info
|
||||
* @param {string} [actorInfo.name] - Actor display name
|
||||
* @param {string} [actorInfo.handle] - Actor handle
|
||||
* @param {string} [actorInfo.photo] - Actor avatar URL
|
||||
* @returns {Promise<{ok: boolean, error?: string}>}
|
||||
*/
|
||||
async followActor(actorUrl, actorInfo = {}) {
|
||||
if (!this._federation) {
|
||||
return { ok: false, error: "Federation not initialized" };
|
||||
}
|
||||
|
||||
try {
|
||||
const { Follow } = await import("@fedify/fedify");
|
||||
const handle = this.options.actor.handle;
|
||||
const ctx = this._federation.createContext(
|
||||
new URL(this._publicationUrl),
|
||||
{},
|
||||
);
|
||||
|
||||
// Resolve the remote actor to get their inbox
|
||||
const remoteActor = await ctx.lookupObject(actorUrl);
|
||||
if (!remoteActor) {
|
||||
return { ok: false, error: "Could not resolve remote actor" };
|
||||
}
|
||||
|
||||
// Send Follow activity
|
||||
const follow = new Follow({
|
||||
actor: ctx.getActorUri(handle),
|
||||
object: new URL(actorUrl),
|
||||
});
|
||||
|
||||
await ctx.sendActivity({ identifier: handle }, remoteActor, follow);
|
||||
|
||||
// Store in ap_following
|
||||
const name =
|
||||
actorInfo.name ||
|
||||
remoteActor.name?.toString() ||
|
||||
remoteActor.preferredUsername?.toString() ||
|
||||
actorUrl;
|
||||
const actorHandle =
|
||||
actorInfo.handle ||
|
||||
remoteActor.preferredUsername?.toString() ||
|
||||
"";
|
||||
const avatar =
|
||||
actorInfo.photo ||
|
||||
(remoteActor.icon
|
||||
? (await remoteActor.icon)?.url?.href || ""
|
||||
: "");
|
||||
const inbox = remoteActor.inbox?.id?.href || "";
|
||||
const sharedInbox = remoteActor.endpoints?.sharedInbox?.href || "";
|
||||
|
||||
await this._collections.ap_following.updateOne(
|
||||
{ actorUrl },
|
||||
{
|
||||
$set: {
|
||||
actorUrl,
|
||||
handle: actorHandle,
|
||||
name,
|
||||
avatar,
|
||||
inbox,
|
||||
sharedInbox,
|
||||
followedAt: new Date().toISOString(),
|
||||
source: "microsub-reader",
|
||||
},
|
||||
},
|
||||
{ upsert: true },
|
||||
);
|
||||
|
||||
console.info(`[ActivityPub] Sent Follow to ${actorUrl}`);
|
||||
|
||||
await logActivity(this._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: "Follow",
|
||||
actorUrl: this._publicationUrl,
|
||||
objectUrl: actorUrl,
|
||||
actorName: name,
|
||||
summary: `Sent Follow to ${name} (${actorUrl})`,
|
||||
});
|
||||
|
||||
return { ok: true };
|
||||
} catch (error) {
|
||||
console.error(`[ActivityPub] Follow failed for ${actorUrl}:`, error.message);
|
||||
|
||||
await logActivity(this._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: "Follow",
|
||||
actorUrl: this._publicationUrl,
|
||||
objectUrl: actorUrl,
|
||||
summary: `Follow failed for ${actorUrl}: ${error.message}`,
|
||||
}).catch(() => {});
|
||||
|
||||
return { ok: false, error: error.message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an Undo(Follow) activity and remove from ap_following.
|
||||
* @param {string} actorUrl - The remote actor's URL
|
||||
* @returns {Promise<{ok: boolean, error?: string}>}
|
||||
*/
|
||||
async unfollowActor(actorUrl) {
|
||||
if (!this._federation) {
|
||||
return { ok: false, error: "Federation not initialized" };
|
||||
}
|
||||
|
||||
try {
|
||||
const { Follow, Undo } = await import("@fedify/fedify");
|
||||
const handle = this.options.actor.handle;
|
||||
const ctx = this._federation.createContext(
|
||||
new URL(this._publicationUrl),
|
||||
{},
|
||||
);
|
||||
|
||||
const remoteActor = await ctx.lookupObject(actorUrl);
|
||||
if (!remoteActor) {
|
||||
// Even if we can't resolve, remove locally
|
||||
await this._collections.ap_following.deleteOne({ actorUrl });
|
||||
|
||||
await logActivity(this._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: "Undo(Follow)",
|
||||
actorUrl: this._publicationUrl,
|
||||
objectUrl: actorUrl,
|
||||
summary: `Removed ${actorUrl} locally (could not resolve remote actor)`,
|
||||
}).catch(() => {});
|
||||
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
const follow = new Follow({
|
||||
actor: ctx.getActorUri(handle),
|
||||
object: new URL(actorUrl),
|
||||
});
|
||||
|
||||
const undo = new Undo({
|
||||
actor: ctx.getActorUri(handle),
|
||||
object: follow,
|
||||
});
|
||||
|
||||
await ctx.sendActivity({ identifier: handle }, remoteActor, undo);
|
||||
await this._collections.ap_following.deleteOne({ actorUrl });
|
||||
|
||||
console.info(`[ActivityPub] Sent Undo(Follow) to ${actorUrl}`);
|
||||
|
||||
await logActivity(this._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: "Undo(Follow)",
|
||||
actorUrl: this._publicationUrl,
|
||||
objectUrl: actorUrl,
|
||||
summary: `Sent Undo(Follow) to ${actorUrl}`,
|
||||
});
|
||||
|
||||
return { ok: true };
|
||||
} catch (error) {
|
||||
console.error(`[ActivityPub] Unfollow failed for ${actorUrl}:`, error.message);
|
||||
|
||||
await logActivity(this._collections.ap_activities, {
|
||||
direction: "outbound",
|
||||
type: "Undo(Follow)",
|
||||
actorUrl: this._publicationUrl,
|
||||
objectUrl: actorUrl,
|
||||
summary: `Unfollow failed for ${actorUrl}: ${error.message}`,
|
||||
}).catch(() => {});
|
||||
|
||||
// Remove locally even if remote delivery fails
|
||||
await this._collections.ap_following.deleteOne({ actorUrl }).catch(() => {});
|
||||
return { ok: false, error: error.message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the full actor URL from config.
|
||||
* @returns {string}
|
||||
@@ -316,6 +528,13 @@ export default class ActivityPubEndpoint {
|
||||
get posts() {
|
||||
return indiekitCollections.get("posts");
|
||||
},
|
||||
// Lazy access to Microsub collections (may not exist if plugin not loaded)
|
||||
get microsub_items() {
|
||||
return indiekitCollections.get("microsub_items");
|
||||
},
|
||||
get microsub_channels() {
|
||||
return indiekitCollections.get("microsub_channels");
|
||||
},
|
||||
_publicationUrl: this._publicationUrl,
|
||||
};
|
||||
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Shared activity logging utility.
|
||||
*
|
||||
* Logs inbound and outbound ActivityPub activities to the ap_activities
|
||||
* collection so they appear in the Activity Log admin UI.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Log an activity to the ap_activities collection.
|
||||
*
|
||||
* @param {object} collection - The ap_activities MongoDB collection
|
||||
* @param {object} record - Activity record
|
||||
* @param {string} record.direction - "inbound" or "outbound"
|
||||
* @param {string} record.type - Activity type (e.g. "Create", "Follow", "Undo(Follow)")
|
||||
* @param {string} [record.actorUrl] - Actor URL
|
||||
* @param {string} [record.actorName] - Actor display name
|
||||
* @param {string} [record.objectUrl] - Object URL
|
||||
* @param {string} [record.targetUrl] - Target URL (e.g. reply target)
|
||||
* @param {string} [record.content] - Content excerpt
|
||||
* @param {string} record.summary - Human-readable summary
|
||||
*/
|
||||
export async function logActivity(collection, record) {
|
||||
try {
|
||||
await collection.insertOne({
|
||||
...record,
|
||||
receivedAt: new Date().toISOString(),
|
||||
});
|
||||
} catch (error) {
|
||||
console.warn("[ActivityPub] Failed to log activity:", error.message);
|
||||
}
|
||||
}
|
||||
+162
-12
@@ -21,6 +21,8 @@ import {
|
||||
Update,
|
||||
} from "@fedify/fedify";
|
||||
|
||||
import { logActivity as logActivityShared } from "./activity-log.js";
|
||||
|
||||
/**
|
||||
* Register all inbox listeners on a federation's inbox chain.
|
||||
*
|
||||
@@ -157,12 +159,6 @@ export function registerInboxListeners(inboxChain, options) {
|
||||
const object = await create.getObject();
|
||||
if (!object) return;
|
||||
|
||||
const inReplyTo =
|
||||
object instanceof Note
|
||||
? (await object.getInReplyTo())?.id?.href
|
||||
: null;
|
||||
if (!inReplyTo) return;
|
||||
|
||||
const actorObj = await create.getActor();
|
||||
const actorUrl = actorObj?.id?.href || "";
|
||||
const actorName =
|
||||
@@ -170,9 +166,14 @@ export function registerInboxListeners(inboxChain, options) {
|
||||
actorObj?.preferredUsername?.toString() ||
|
||||
actorUrl;
|
||||
|
||||
// Extract reply content (HTML)
|
||||
const content = object.content?.toString() || "";
|
||||
const inReplyTo =
|
||||
object instanceof Note
|
||||
? (await object.getInReplyTo())?.id?.href
|
||||
: null;
|
||||
|
||||
// Log replies to our posts (existing behavior for conversations)
|
||||
if (inReplyTo) {
|
||||
const content = object.content?.toString() || "";
|
||||
await logActivity(collections, storeRawActivities, {
|
||||
direction: "inbound",
|
||||
type: "Reply",
|
||||
@@ -183,6 +184,16 @@ export function registerInboxListeners(inboxChain, options) {
|
||||
content,
|
||||
summary: `${actorName} replied to ${inReplyTo}`,
|
||||
});
|
||||
}
|
||||
|
||||
// Store timeline items from accounts we follow
|
||||
await storeTimelineItem(collections, {
|
||||
actorUrl,
|
||||
actorName,
|
||||
actorObj,
|
||||
object,
|
||||
inReplyTo,
|
||||
});
|
||||
})
|
||||
.on(Delete, async (ctx, del) => {
|
||||
const objectId = (await del.getObject())?.id?.href || "";
|
||||
@@ -255,10 +266,149 @@ export function registerInboxListeners(inboxChain, options) {
|
||||
|
||||
/**
|
||||
* Log an activity to the ap_activities collection.
|
||||
* Wrapper around the shared utility that accepts the (collections, storeRaw, record) signature
|
||||
* used throughout this file.
|
||||
*/
|
||||
async function logActivity(collections, storeRaw, record) {
|
||||
await collections.ap_activities.insertOne({
|
||||
...record,
|
||||
receivedAt: new Date().toISOString(),
|
||||
});
|
||||
await logActivityShared(collections.ap_activities, record);
|
||||
}
|
||||
|
||||
// Cached ActivityPub channel ObjectId
|
||||
let _apChannelId = null;
|
||||
|
||||
/**
|
||||
* Look up the ActivityPub channel's ObjectId (cached after first call).
|
||||
* @param {object} collections - MongoDB collections
|
||||
* @returns {Promise<import("mongodb").ObjectId|null>}
|
||||
*/
|
||||
async function getApChannelId(collections) {
|
||||
if (_apChannelId) return _apChannelId;
|
||||
const channel = await collections.microsub_channels?.findOne({
|
||||
uid: "activitypub",
|
||||
});
|
||||
_apChannelId = channel?._id || null;
|
||||
return _apChannelId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a Create activity as a Microsub timeline item if the actor
|
||||
* is someone we follow. Skips gracefully if the Microsub plugin
|
||||
* isn't loaded or the AP channel doesn't exist yet.
|
||||
*
|
||||
* @param {object} collections - MongoDB collections
|
||||
* @param {object} params
|
||||
* @param {string} params.actorUrl - Actor URL
|
||||
* @param {string} params.actorName - Actor display name
|
||||
* @param {object} params.actorObj - Fedify actor object
|
||||
* @param {object} params.object - Fedify Note/Article object
|
||||
* @param {string|null} params.inReplyTo - URL this is a reply to (if any)
|
||||
*/
|
||||
async function storeTimelineItem(
|
||||
collections,
|
||||
{ actorUrl, actorName, actorObj, object, inReplyTo },
|
||||
) {
|
||||
// Skip if Microsub plugin not loaded
|
||||
if (!collections.microsub_items || !collections.microsub_channels) return;
|
||||
|
||||
// Only store posts from accounts we follow
|
||||
const following = await collections.ap_following.findOne({ actorUrl });
|
||||
if (!following) return;
|
||||
|
||||
const channelId = await getApChannelId(collections);
|
||||
if (!channelId) return;
|
||||
|
||||
const objectUrl = object.id?.href || "";
|
||||
if (!objectUrl) return;
|
||||
|
||||
// Extract content
|
||||
const contentHtml = object.content?.toString() || "";
|
||||
const contentText = contentHtml.replace(/<[^>]*>/g, "").trim();
|
||||
|
||||
// Name (usually only on Article, not Note)
|
||||
const name = object.name?.toString() || undefined;
|
||||
const summary = object.summary?.toString() || undefined;
|
||||
|
||||
// Published date — Fedify returns Temporal.Instant
|
||||
let published;
|
||||
if (object.published) {
|
||||
try {
|
||||
published = new Date(Number(object.published.epochMilliseconds));
|
||||
} catch {
|
||||
published = new Date();
|
||||
}
|
||||
}
|
||||
|
||||
// Author avatar
|
||||
let authorPhoto = "";
|
||||
try {
|
||||
if (actorObj.icon) {
|
||||
const iconObj = await actorObj.icon;
|
||||
authorPhoto = iconObj?.url?.href || "";
|
||||
}
|
||||
} catch {
|
||||
/* remote fetch may fail */
|
||||
}
|
||||
|
||||
// Tags / categories
|
||||
const category = [];
|
||||
try {
|
||||
for await (const tag of object.getTags()) {
|
||||
const tagName = tag.name?.toString();
|
||||
if (tagName) category.push(tagName.replace(/^#/, ""));
|
||||
}
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
|
||||
// Attachments (photos, videos, audio)
|
||||
const photo = [];
|
||||
const video = [];
|
||||
const audio = [];
|
||||
try {
|
||||
for await (const att of object.getAttachments()) {
|
||||
const mediaType = att.mediaType?.toString() || "";
|
||||
const url = att.url?.href || att.id?.href || "";
|
||||
if (!url) continue;
|
||||
if (mediaType.startsWith("image/")) photo.push(url);
|
||||
else if (mediaType.startsWith("video/")) video.push(url);
|
||||
else if (mediaType.startsWith("audio/")) audio.push(url);
|
||||
}
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
|
||||
const item = {
|
||||
channelId,
|
||||
feedId: null,
|
||||
uid: objectUrl,
|
||||
type: "entry",
|
||||
url: objectUrl,
|
||||
name,
|
||||
content: contentHtml ? { text: contentText, html: contentHtml } : undefined,
|
||||
summary,
|
||||
published: published || new Date(),
|
||||
author: {
|
||||
name: actorName,
|
||||
url: actorUrl,
|
||||
photo: authorPhoto,
|
||||
},
|
||||
category,
|
||||
photo,
|
||||
video,
|
||||
audio,
|
||||
inReplyTo: inReplyTo ? [inReplyTo] : [],
|
||||
source: {
|
||||
type: "activitypub",
|
||||
actorUrl,
|
||||
},
|
||||
readBy: [],
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
// Atomic upsert — prevents duplicates without a separate check+insert
|
||||
await collections.microsub_items.updateOne(
|
||||
{ channelId, uid: objectUrl },
|
||||
{ $setOnInsert: item },
|
||||
{ upsert: true },
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user