diff --git a/index.js b/index.js index a6494dc..8e611c6 100644 --- a/index.js +++ b/index.js @@ -387,38 +387,6 @@ export default class ActivityPubEndpoint { const router = express.Router(); // eslint-disable-line new-cap const self = this; - // Intercept Micropub delete actions to broadcast Delete to fediverse. - // Wraps res.json to detect successful delete responses, then fires - // broadcastDelete asynchronously so remote servers remove the post. - router.use((req, res, next) => { - if (req.method !== "POST") return next(); - if (!req.path.endsWith("/micropub")) return next(); - - const action = req.query?.action || req.body?.action; - if (action !== "delete") return next(); - - const postUrl = req.query?.url || req.body?.url; - if (!postUrl) return next(); - - const originalJson = res.json.bind(res); - res.json = function (body) { - // Fire broadcastDelete after successful delete (status 200) - if (res.statusCode === 200 && body?.success === "delete") { - console.info( - `[ActivityPub] Micropub delete detected for ${postUrl}, broadcasting Delete to followers`, - ); - self.broadcastDelete(postUrl).catch((error) => { - console.warn( - `[ActivityPub] broadcastDelete after Micropub delete failed: ${error.message}`, - ); - }); - } - return originalJson(body); - }; - - return next(); - }); - // Let Fedify handle NodeInfo data (/nodeinfo/2.1) // Only pass GET/HEAD requests — POST/PUT/DELETE must not go through // Fedify here, because fromExpressRequest() consumes the body stream, @@ -1181,6 +1149,125 @@ export default class ActivityPubEndpoint { } } + /** + * Called by post-content.js when a Micropub delete succeeds. + * Broadcasts an ActivityPub Delete activity to all followers. + * @param {string} url - Full URL of the deleted post + */ + async delete(url) { + await this.broadcastDelete(url).catch((err) => + console.warn(`[ActivityPub] broadcastDelete failed for ${url}: ${err.message}`) + ); + } + + /** + * Called by post-content.js when a Micropub update succeeds. + * Broadcasts an ActivityPub Update activity for the post to all followers. + * @param {object} properties - JF2 post properties (must include url) + */ + async update(properties) { + await this.broadcastPostUpdate(properties).catch((err) => + console.warn(`[ActivityPub] broadcastPostUpdate failed for ${properties?.url}: ${err.message}`) + ); + } + + /** + * Send an Update activity to all followers for a modified post. + * Mirrors broadcastDelete() pattern: batch delivery with shared inbox dedup. + * @param {object} properties - JF2 post properties + */ + async broadcastPostUpdate(properties) { + if (!this._federation) return; + + try { + const actorUrl = this._getActorUrl(); + const activity = jf2ToAS2Activity( + properties, + actorUrl, + this._publicationUrl, + { visibility: this.options.defaultVisibility }, + ); + + if (!activity) { + console.warn(`[ActivityPub] broadcastPostUpdate: could not convert post to AS2 for ${properties?.url}`); + return; + } + + const handle = this.options.actor.handle; + const ctx = this._federation.createContext( + new URL(this._publicationUrl), + { handle, publicationUrl: this._publicationUrl }, + ); + + const followers = await this._collections.ap_followers + .find({}) + .project({ actorUrl: 1, inbox: 1, sharedInbox: 1 }) + .toArray(); + + const inboxMap = new Map(); + for (const f of followers) { + const key = f.sharedInbox || f.inbox; + if (key && !inboxMap.has(key)) { + inboxMap.set(key, f); + } + } + + const uniqueRecipients = [...inboxMap.values()]; + const BATCH_SIZE = 25; + const BATCH_DELAY_MS = 5000; + let delivered = 0; + let failed = 0; + + console.info( + `[ActivityPub] Broadcasting Update for ${properties.url} to ${uniqueRecipients.length} unique inboxes`, + ); + + for (let i = 0; i < uniqueRecipients.length; i += BATCH_SIZE) { + const batch = uniqueRecipients.slice(i, i + BATCH_SIZE); + const recipients = batch.map((f) => ({ + id: new URL(f.actorUrl), + inboxId: new URL(f.inbox || f.sharedInbox), + endpoints: f.sharedInbox + ? { sharedInbox: new URL(f.sharedInbox) } + : undefined, + })); + + try { + await ctx.sendActivity( + { identifier: handle }, + recipients, + activity, + { preferSharedInbox: true }, + ); + delivered += batch.length; + } catch (error) { + failed += batch.length; + console.warn( + `[ActivityPub] Update batch ${Math.floor(i / BATCH_SIZE) + 1} failed: ${error.message}`, + ); + } + + if (i + BATCH_SIZE < uniqueRecipients.length) { + await new Promise((resolve) => setTimeout(resolve, BATCH_DELAY_MS)); + } + } + + console.info( + `[ActivityPub] Update broadcast complete for ${properties.url}: ${delivered} delivered, ${failed} failed`, + ); + + await logActivity(this._collections.ap_activities, { + direction: "outbound", + type: "Update", + actorUrl: this._publicationUrl, + objectUrl: properties.url, + summary: `Sent Update for ${properties.url} to ${delivered} inboxes`, + }).catch(() => {}); + } catch (error) { + console.warn("[ActivityPub] broadcastPostUpdate failed:", error.message); + } + } + /** * Build the full actor URL from config. * @returns {string}