From 84122cc470adc69da82b33cb4023763d9d154e4c Mon Sep 17 00:00:00 2001 From: Ricardo Date: Fri, 20 Feb 2026 08:10:45 +0100 Subject: [PATCH] feat: batch re-follow system for imported AP accounts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After Mastodon migration, imported accounts exist only locally — no Follow activities were sent. This adds a gradual background processor that sends Follow activities to all source:"import" accounts so remote servers start delivering Create activities to our inbox. - New batch engine (lib/batch-refollow.js) processes 10 accounts per batch with 3s between follows and 30s between batches - Accept(Follow) inbox listener transitions source to "federation" and cleans up tracking fields - Admin API: pause, resume, and status JSON endpoints - Dashboard progress bar with Alpine.js polling (10s interval) - Following list badges for refollow:sent and refollow:failed states - Restart recovery resets stale refollow:pending back to import - 3 retries with 1-hour cooldown before permanent failure --- index.js | 22 +++ lib/batch-refollow.js | 314 ++++++++++++++++++++++++++++++++ lib/controllers/dashboard.js | 10 + lib/controllers/refollow.js | 84 +++++++++ lib/inbox-listeners.js | 41 +++++ locales/en.json | 18 ++ package.json | 2 +- views/activitypub-dashboard.njk | 96 ++++++++++ views/activitypub-following.njk | 7 +- 9 files changed, 592 insertions(+), 2 deletions(-) create mode 100644 lib/batch-refollow.js create mode 100644 lib/controllers/refollow.js diff --git a/index.js b/index.js index 6e3518e..0e2fb19 100644 --- a/index.js +++ b/index.js @@ -21,6 +21,12 @@ import { profileGetController, profilePostController, } from "./lib/controllers/profile.js"; +import { + refollowPauseController, + refollowResumeController, + refollowStatusController, +} from "./lib/controllers/refollow.js"; +import { startBatchRefollow } from "./lib/batch-refollow.js"; import { logActivity } from "./lib/activity-log.js"; const defaults = { @@ -137,6 +143,9 @@ export default class ActivityPubEndpoint { "/admin/migrate/import", migrateImportController(mp, this.options), ); + router.post("/admin/refollow/pause", refollowPauseController(mp, this)); + router.post("/admin/refollow/resume", refollowResumeController(mp, this)); + router.get("/admin/refollow/status", refollowStatusController(mp)); return router; } @@ -575,6 +584,19 @@ export default class ActivityPubEndpoint { // Register syndicator (appears in post editing UI) Indiekit.addSyndicator(this.syndicator); + + // Start batch re-follow processor after federation settles + const refollowOptions = { + federation: this._federation, + collections: this._collections, + handle: this.options.actor.handle, + publicationUrl: this._publicationUrl, + }; + setTimeout(() => { + startBatchRefollow(refollowOptions).catch((error) => { + console.error("[ActivityPub] Batch refollow start failed:", error.message); + }); + }, 10_000); } /** diff --git a/lib/batch-refollow.js b/lib/batch-refollow.js new file mode 100644 index 0000000..2d88028 --- /dev/null +++ b/lib/batch-refollow.js @@ -0,0 +1,314 @@ +/** + * Batch re-follow processor for imported accounts. + * + * After a Mastodon migration, imported accounts (source: "import") exist only + * locally — no Follow activities were sent. This module gradually sends Follow + * activities to all imported accounts so remote servers start delivering + * Create activities to our inbox. + * + * Source field state machine: + * import → refollow:sent → federation (happy path) + * import → refollow:sent → refollow:failed (after MAX_RETRIES) + */ + +import { Follow } from "@fedify/fedify"; +import { logActivity } from "./activity-log.js"; + +const BATCH_SIZE = 10; +const DELAY_PER_FOLLOW = 3_000; +const DELAY_BETWEEN_BATCHES = 30_000; +const STARTUP_DELAY = 30_000; +const RETRY_COOLDOWN = 60 * 60 * 1_000; // 1 hour +const MAX_RETRIES = 3; + +const KV_KEY = "batch-refollow/state"; + +let _timer = null; + +/** + * Start the batch re-follow processor. + * + * @param {object} options + * @param {import("@fedify/fedify").Federation} options.federation + * @param {object} options.collections - MongoDB collections + * @param {string} options.handle - Actor handle + * @param {string} options.publicationUrl - Publication base URL + */ +export async function startBatchRefollow(options) { + const { collections } = options; + + // Restart recovery: reset any stale "refollow:pending" back to "import" + await collections.ap_following.updateMany( + { source: "refollow:pending" }, + { $set: { source: "import" } }, + ); + + // Check if there's work to do + const importCount = await collections.ap_following.countDocuments({ + source: "import", + }); + + if (importCount === 0) { + console.info("[ActivityPub] Batch refollow: no imported accounts to process"); + return; + } + + console.info( + `[ActivityPub] Batch refollow: ${importCount} imported accounts to process`, + ); + + // Set job state to running + await setJobState(collections, "running"); + + // Schedule first batch after startup delay + _timer = setTimeout(() => processNextBatch(options), STARTUP_DELAY); +} + +/** + * Pause the batch re-follow processor. + * + * @param {object} collections - MongoDB collections + */ +export async function pauseBatchRefollow(collections) { + if (_timer) { + clearTimeout(_timer); + _timer = null; + } + + // Reset any pending back to import so they get picked up on resume + await collections.ap_following.updateMany( + { source: "refollow:pending" }, + { $set: { source: "import" } }, + ); + + await setJobState(collections, "paused"); + console.info("[ActivityPub] Batch refollow: paused"); +} + +/** + * Resume the batch re-follow processor. + * + * @param {object} options + * @param {import("@fedify/fedify").Federation} options.federation + * @param {object} options.collections - MongoDB collections + * @param {string} options.handle - Actor handle + * @param {string} options.publicationUrl - Publication base URL + */ +export async function resumeBatchRefollow(options) { + if (_timer) { + clearTimeout(_timer); + _timer = null; + } + + await setJobState(options.collections, "running"); + _timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); + console.info("[ActivityPub] Batch refollow: resumed"); +} + +/** + * Get current batch re-follow status. + * + * @param {object} collections - MongoDB collections + * @returns {Promise} Status object + */ +export async function getBatchRefollowStatus(collections) { + const state = await collections.ap_kv.findOne({ _id: KV_KEY }); + const status = state?.value?.status || "idle"; + + const [remaining, sent, failed, federated] = await Promise.all([ + collections.ap_following.countDocuments({ source: "import" }), + collections.ap_following.countDocuments({ source: "refollow:sent" }), + collections.ap_following.countDocuments({ source: "refollow:failed" }), + collections.ap_following.countDocuments({ source: "federation" }), + ]); + + const total = remaining + sent + failed; + const completed = sent + failed; + const progressPercent = + total > 0 ? Math.round((completed / total) * 100) : 100; + + return { + status, + total, + remaining, + sent, + failed, + federated, + completed, + progressPercent, + startedAt: state?.value?.startedAt || null, + updatedAt: state?.value?.updatedAt || null, + }; +} + +// --- Internal helpers --- + +/** + * Process the next batch of imported accounts. + */ +async function processNextBatch(options) { + const { federation, collections, handle, publicationUrl } = options; + _timer = null; + + const state = await collections.ap_kv.findOne({ _id: KV_KEY }); + if (state?.value?.status !== "running") return; + + // Claim a batch atomically: set source to "refollow:pending" + const entries = []; + for (let i = 0; i < BATCH_SIZE; i++) { + const doc = await collections.ap_following.findOneAndUpdate( + { source: "import" }, + { $set: { source: "refollow:pending" } }, + { returnDocument: "after" }, + ); + if (!doc) break; + entries.push(doc); + } + + // Also pick up retryable entries (failed but not permanently) + const retryCutoff = new Date(Date.now() - RETRY_COOLDOWN).toISOString(); + const retrySlots = BATCH_SIZE - entries.length; + for (let i = 0; i < retrySlots; i++) { + const doc = await collections.ap_following.findOneAndUpdate( + { + source: "refollow:sent", + refollowAttempts: { $lt: MAX_RETRIES }, + refollowLastAttempt: { $lt: retryCutoff }, + }, + { $set: { source: "refollow:pending" } }, + { returnDocument: "after" }, + ); + if (!doc) break; + entries.push(doc); + } + + if (entries.length === 0) { + // Check if there are still sent entries awaiting Accept + const pendingAccepts = await collections.ap_following.countDocuments({ + source: "refollow:sent", + }); + + if (pendingAccepts > 0) { + console.info( + `[ActivityPub] Batch refollow: all sent, ${pendingAccepts} awaiting Accept`, + ); + } + + await setJobState(collections, "completed"); + console.info("[ActivityPub] Batch refollow: completed"); + return; + } + + console.info( + `[ActivityPub] Batch refollow: processing batch of ${entries.length}`, + ); + + for (const entry of entries) { + await processOneFollow(options, entry); + // Delay between individual follows + await sleep(DELAY_PER_FOLLOW); + } + + // Update job state timestamp + await setJobState(collections, "running"); + + // Schedule next batch + _timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); +} + +/** + * Send a Follow activity for a single imported account. + */ +async function processOneFollow(options, entry) { + const { federation, collections, handle, publicationUrl } = options; + + try { + const ctx = federation.createContext(new URL(publicationUrl), {}); + + // Resolve the remote actor + const remoteActor = await ctx.lookupObject(entry.actorUrl); + if (!remoteActor) { + throw new Error("Could not resolve remote actor"); + } + + // Send Follow activity + const follow = new Follow({ + actor: ctx.getActorUri(handle), + object: new URL(entry.actorUrl), + }); + + await ctx.sendActivity({ identifier: handle }, remoteActor, follow); + + // Mark as sent + await collections.ap_following.updateOne( + { _id: entry._id }, + { + $set: { + source: "refollow:sent", + refollowLastAttempt: new Date().toISOString(), + refollowError: null, + }, + $inc: { refollowAttempts: 1 }, + }, + ); + + console.info( + `[ActivityPub] Batch refollow: sent Follow to ${entry.actorUrl}`, + ); + + await logActivity(collections.ap_activities, { + direction: "outbound", + type: "Follow", + actorUrl: publicationUrl, + objectUrl: entry.actorUrl, + actorName: entry.name || entry.actorUrl, + summary: `Batch refollow: sent Follow to ${entry.name || entry.actorUrl}`, + }); + } catch (error) { + const attempts = (entry.refollowAttempts || 0) + 1; + const newSource = + attempts >= MAX_RETRIES ? "refollow:failed" : "refollow:sent"; + + await collections.ap_following.updateOne( + { _id: entry._id }, + { + $set: { + source: newSource, + refollowLastAttempt: new Date().toISOString(), + refollowError: error.message, + }, + $inc: { refollowAttempts: 1 }, + }, + ); + + console.warn( + `[ActivityPub] Batch refollow: failed for ${entry.actorUrl} (attempt ${attempts}/${MAX_RETRIES}): ${error.message}`, + ); + } +} + +/** + * Set the batch re-follow job state in ap_kv. + */ +async function setJobState(collections, status) { + const now = new Date().toISOString(); + const update = { + $set: { + "value.status": status, + "value.updatedAt": now, + }, + $setOnInsert: { _id: KV_KEY }, + }; + + // Only set startedAt on initial start or resume + const existing = await collections.ap_kv.findOne({ _id: KV_KEY }); + if (!existing?.value?.startedAt || status === "running" && existing?.value?.status !== "running") { + update.$set["value.startedAt"] = now; + } + + await collections.ap_kv.updateOne({ _id: KV_KEY }, update, { upsert: true }); +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/lib/controllers/dashboard.js b/lib/controllers/dashboard.js index 65433b3..efe05e8 100644 --- a/lib/controllers/dashboard.js +++ b/lib/controllers/dashboard.js @@ -1,6 +1,9 @@ /** * Dashboard controller — shows follower/following counts and recent activity. */ + +import { getBatchRefollowStatus } from "../batch-refollow.js"; + export function dashboardController(mountPath) { return async (request, response, next) => { try { @@ -25,11 +28,18 @@ export function dashboardController(mountPath) { .toArray() : []; + // Get batch re-follow status for the progress section + const refollowStatus = await getBatchRefollowStatus({ + ap_following: followingCollection, + ap_kv: application?.collections?.get("ap_kv"), + }); + response.render("activitypub-dashboard", { title: response.locals.__("activitypub.title"), followerCount, followingCount, recentActivities, + refollowStatus, mountPath, }); } catch (error) { diff --git a/lib/controllers/refollow.js b/lib/controllers/refollow.js new file mode 100644 index 0000000..58e25cf --- /dev/null +++ b/lib/controllers/refollow.js @@ -0,0 +1,84 @@ +/** + * Admin controllers for the batch re-follow system. + * + * Provides pause, resume, and status endpoints for managing the + * background batch processor from the admin UI. + */ + +import { + pauseBatchRefollow, + resumeBatchRefollow, + getBatchRefollowStatus, +} from "../batch-refollow.js"; + +/** + * POST /admin/refollow/pause — pause the batch processor. + * + * @param {string} mountPath - Plugin mount path + * @param {object} plugin - Plugin instance (for federation/collections access) + * @returns {Function} Express route handler + */ +export function refollowPauseController(mountPath, plugin) { + return async (request, response, next) => { + try { + const { application } = request.app.locals; + const collections = { + ap_following: application.collections.get("ap_following"), + ap_kv: application.collections.get("ap_kv"), + }; + + await pauseBatchRefollow(collections); + + response.json({ ok: true, status: "paused" }); + } catch (error) { + next(error); + } + }; +} + +/** + * POST /admin/refollow/resume — resume the batch processor. + * + * @param {string} mountPath - Plugin mount path + * @param {object} plugin - Plugin instance + * @returns {Function} Express route handler + */ +export function refollowResumeController(mountPath, plugin) { + return async (request, response, next) => { + try { + await resumeBatchRefollow({ + federation: plugin._federation, + collections: plugin._collections, + handle: plugin.options.actor.handle, + publicationUrl: plugin._publicationUrl, + }); + + response.json({ ok: true, status: "running" }); + } catch (error) { + next(error); + } + }; +} + +/** + * GET /admin/refollow/status — get current batch processor status. + * + * @param {string} mountPath - Plugin mount path + * @returns {Function} Express route handler + */ +export function refollowStatusController(mountPath) { + return async (request, response, next) => { + try { + const { application } = request.app.locals; + const collections = { + ap_following: application.collections.get("ap_following"), + ap_kv: application.collections.get("ap_kv"), + }; + + const status = await getBatchRefollowStatus(collections); + response.json(status); + } catch (error) { + next(error); + } + }; +} diff --git a/lib/inbox-listeners.js b/lib/inbox-listeners.js index 5e7090f..e7df6b6 100644 --- a/lib/inbox-listeners.js +++ b/lib/inbox-listeners.js @@ -119,6 +119,47 @@ export function registerInboxListeners(inboxChain, options) { }); } }) + .on(Accept, async (ctx, accept) => { + // Handle Accept(Follow) — remote server accepted our Follow request + const actorObj = await accept.getActor(); + const actorUrl = actorObj?.id?.href || ""; + if (!actorUrl) return; + + const inner = await accept.getObject(); + if (!(inner instanceof Follow)) return; + + // Match against our following list for refollow or microsub-reader follows + const result = await collections.ap_following.findOneAndUpdate( + { + actorUrl, + source: { $in: ["refollow:sent", "microsub-reader"] }, + }, + { + $set: { + source: "federation", + acceptedAt: new Date().toISOString(), + }, + $unset: { + refollowAttempts: "", + refollowLastAttempt: "", + refollowError: "", + }, + }, + { returnDocument: "after" }, + ); + + if (result) { + const actorName = + result.name || result.handle || actorUrl; + await logActivity(collections, storeRawActivities, { + direction: "inbound", + type: "Accept(Follow)", + actorUrl, + actorName, + summary: `${actorName} accepted our Follow`, + }); + } + }) .on(Like, async (ctx, like) => { const actorObj = await like.getActor(); const actorUrl = actorObj?.id?.href || ""; diff --git a/locales/en.json b/locales/en.json index fcaf014..a2bdc58 100644 --- a/locales/en.json +++ b/locales/en.json @@ -16,6 +16,8 @@ "sourceImport": "Mastodon import", "sourceManual": "Manual", "sourceFederation": "Federation", + "sourceRefollowPending": "Re-follow pending", + "sourceRefollowFailed": "Re-follow failed", "direction": "Direction", "directionInbound": "Received", "directionOutbound": "Sent", @@ -64,6 +66,22 @@ "failedList": "Could not resolve: %s", "failedListSummary": "Failed handles", "aliasSuccess": "Alias saved — your actor document now includes this account as alsoKnownAs." + }, + "refollow": { + "title": "Batch re-follow", + "progress": "Re-follow progress", + "remaining": "Remaining", + "awaitingAccept": "Awaiting accept", + "accepted": "Accepted", + "failed": "Failed", + "pause": "Pause", + "resume": "Resume", + "status": { + "idle": "Idle", + "running": "Running", + "paused": "Paused", + "completed": "Completed" + } } } } diff --git a/package.json b/package.json index 1d4a656..8506ece 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@rmdes/indiekit-endpoint-activitypub", - "version": "1.0.10", + "version": "1.0.11", "description": "ActivityPub federation endpoint for Indiekit via Fedify. Adds full fediverse support: actor, inbox, outbox, followers, following, syndication, and Mastodon migration.", "keywords": [ "indiekit", diff --git a/views/activitypub-dashboard.njk b/views/activitypub-dashboard.njk index 33f1a0f..2554b47 100644 --- a/views/activitypub-dashboard.njk +++ b/views/activitypub-dashboard.njk @@ -32,6 +32,102 @@ } ]}) }} + {% if refollowStatus and refollowStatus.status !== "idle" %} +
+ {{ heading({ text: __("activitypub.refollow.title"), level: 2 }) }} + + {# Progress bar #} +
+
+
+
+ + {# Stats grid #} +
+
+
{{ refollowStatus.remaining }}
+
{{ __("activitypub.refollow.remaining") }}
+
+
+
{{ refollowStatus.sent }}
+
{{ __("activitypub.refollow.awaitingAccept") }}
+
+
+
{{ refollowStatus.federated }}
+
{{ __("activitypub.refollow.accepted") }}
+
+
+
{{ refollowStatus.failed }}
+
{{ __("activitypub.refollow.failed") }}
+
+
+ + {# Status + controls #} +
+ {{ badge({ text: __("activitypub.refollow.status." + refollowStatus.status) }) }} + {% if refollowStatus.status === "running" %} +
+ +
+ {% elif refollowStatus.status === "paused" %} +
+ +
+ {% endif %} +
+
+ + + {% endif %} + {{ heading({ text: __("activitypub.recentActivity"), level: 2 }) }} {% if recentActivities.length > 0 %} diff --git a/views/activitypub-following.njk b/views/activitypub-following.njk index 5285a95..f0c5b05 100644 --- a/views/activitypub-following.njk +++ b/views/activitypub-following.njk @@ -16,7 +16,12 @@ url: account.actorUrl, description: { text: "@" + account.handle if account.handle }, published: account.followedAt, - badges: [{ text: __("activitypub.sourceImport") if account.source === "import" else __("activitypub.sourceFederation") }] + badges: [{ + text: __("activitypub.sourceImport") if account.source === "import" + else __("activitypub.sourceRefollowPending") if account.source === "refollow:sent" + else __("activitypub.sourceRefollowFailed") if account.source === "refollow:failed" + else __("activitypub.sourceFederation") + }] }) }} {% endfor %}