feat: batch re-follow system for imported AP accounts

After Mastodon migration, imported accounts exist only locally — no
Follow activities were sent. This adds a gradual background processor
that sends Follow activities to all source:"import" accounts so remote
servers start delivering Create activities to our inbox.

- New batch engine (lib/batch-refollow.js) processes 10 accounts per
  batch with 3s between follows and 30s between batches
- Accept(Follow) inbox listener transitions source to "federation"
  and cleans up tracking fields
- Admin API: pause, resume, and status JSON endpoints
- Dashboard progress bar with Alpine.js polling (10s interval)
- Following list badges for refollow:sent and refollow:failed states
- Restart recovery resets stale refollow:pending back to import
- 3 retries with 1-hour cooldown before permanent failure
This commit is contained in:
Ricardo
2026-02-20 08:10:45 +01:00
parent 656b66c780
commit 84122cc470
9 changed files with 592 additions and 2 deletions
+22
View File
@@ -21,6 +21,12 @@ import {
profileGetController, profileGetController,
profilePostController, profilePostController,
} from "./lib/controllers/profile.js"; } from "./lib/controllers/profile.js";
import {
refollowPauseController,
refollowResumeController,
refollowStatusController,
} from "./lib/controllers/refollow.js";
import { startBatchRefollow } from "./lib/batch-refollow.js";
import { logActivity } from "./lib/activity-log.js"; import { logActivity } from "./lib/activity-log.js";
const defaults = { const defaults = {
@@ -137,6 +143,9 @@ export default class ActivityPubEndpoint {
"/admin/migrate/import", "/admin/migrate/import",
migrateImportController(mp, this.options), migrateImportController(mp, this.options),
); );
router.post("/admin/refollow/pause", refollowPauseController(mp, this));
router.post("/admin/refollow/resume", refollowResumeController(mp, this));
router.get("/admin/refollow/status", refollowStatusController(mp));
return router; return router;
} }
@@ -575,6 +584,19 @@ export default class ActivityPubEndpoint {
// Register syndicator (appears in post editing UI) // Register syndicator (appears in post editing UI)
Indiekit.addSyndicator(this.syndicator); Indiekit.addSyndicator(this.syndicator);
// Start batch re-follow processor after federation settles
const refollowOptions = {
federation: this._federation,
collections: this._collections,
handle: this.options.actor.handle,
publicationUrl: this._publicationUrl,
};
setTimeout(() => {
startBatchRefollow(refollowOptions).catch((error) => {
console.error("[ActivityPub] Batch refollow start failed:", error.message);
});
}, 10_000);
} }
/** /**
+314
View File
@@ -0,0 +1,314 @@
/**
* Batch re-follow processor for imported accounts.
*
* After a Mastodon migration, imported accounts (source: "import") exist only
* locally — no Follow activities were sent. This module gradually sends Follow
* activities to all imported accounts so remote servers start delivering
* Create activities to our inbox.
*
* Source field state machine:
* import → refollow:sent → federation (happy path)
* import → refollow:sent → refollow:failed (after MAX_RETRIES)
*/
import { Follow } from "@fedify/fedify";
import { logActivity } from "./activity-log.js";
const BATCH_SIZE = 10;
const DELAY_PER_FOLLOW = 3_000;
const DELAY_BETWEEN_BATCHES = 30_000;
const STARTUP_DELAY = 30_000;
const RETRY_COOLDOWN = 60 * 60 * 1_000; // 1 hour
const MAX_RETRIES = 3;
const KV_KEY = "batch-refollow/state";
let _timer = null;
/**
* Start the batch re-follow processor.
*
* @param {object} options
* @param {import("@fedify/fedify").Federation} options.federation
* @param {object} options.collections - MongoDB collections
* @param {string} options.handle - Actor handle
* @param {string} options.publicationUrl - Publication base URL
*/
export async function startBatchRefollow(options) {
const { collections } = options;
// Restart recovery: reset any stale "refollow:pending" back to "import"
await collections.ap_following.updateMany(
{ source: "refollow:pending" },
{ $set: { source: "import" } },
);
// Check if there's work to do
const importCount = await collections.ap_following.countDocuments({
source: "import",
});
if (importCount === 0) {
console.info("[ActivityPub] Batch refollow: no imported accounts to process");
return;
}
console.info(
`[ActivityPub] Batch refollow: ${importCount} imported accounts to process`,
);
// Set job state to running
await setJobState(collections, "running");
// Schedule first batch after startup delay
_timer = setTimeout(() => processNextBatch(options), STARTUP_DELAY);
}
/**
* Pause the batch re-follow processor.
*
* @param {object} collections - MongoDB collections
*/
export async function pauseBatchRefollow(collections) {
if (_timer) {
clearTimeout(_timer);
_timer = null;
}
// Reset any pending back to import so they get picked up on resume
await collections.ap_following.updateMany(
{ source: "refollow:pending" },
{ $set: { source: "import" } },
);
await setJobState(collections, "paused");
console.info("[ActivityPub] Batch refollow: paused");
}
/**
* Resume the batch re-follow processor.
*
* @param {object} options
* @param {import("@fedify/fedify").Federation} options.federation
* @param {object} options.collections - MongoDB collections
* @param {string} options.handle - Actor handle
* @param {string} options.publicationUrl - Publication base URL
*/
export async function resumeBatchRefollow(options) {
if (_timer) {
clearTimeout(_timer);
_timer = null;
}
await setJobState(options.collections, "running");
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES);
console.info("[ActivityPub] Batch refollow: resumed");
}
/**
* Get current batch re-follow status.
*
* @param {object} collections - MongoDB collections
* @returns {Promise<object>} Status object
*/
export async function getBatchRefollowStatus(collections) {
const state = await collections.ap_kv.findOne({ _id: KV_KEY });
const status = state?.value?.status || "idle";
const [remaining, sent, failed, federated] = await Promise.all([
collections.ap_following.countDocuments({ source: "import" }),
collections.ap_following.countDocuments({ source: "refollow:sent" }),
collections.ap_following.countDocuments({ source: "refollow:failed" }),
collections.ap_following.countDocuments({ source: "federation" }),
]);
const total = remaining + sent + failed;
const completed = sent + failed;
const progressPercent =
total > 0 ? Math.round((completed / total) * 100) : 100;
return {
status,
total,
remaining,
sent,
failed,
federated,
completed,
progressPercent,
startedAt: state?.value?.startedAt || null,
updatedAt: state?.value?.updatedAt || null,
};
}
// --- Internal helpers ---
/**
* Process the next batch of imported accounts.
*/
async function processNextBatch(options) {
const { federation, collections, handle, publicationUrl } = options;
_timer = null;
const state = await collections.ap_kv.findOne({ _id: KV_KEY });
if (state?.value?.status !== "running") return;
// Claim a batch atomically: set source to "refollow:pending"
const entries = [];
for (let i = 0; i < BATCH_SIZE; i++) {
const doc = await collections.ap_following.findOneAndUpdate(
{ source: "import" },
{ $set: { source: "refollow:pending" } },
{ returnDocument: "after" },
);
if (!doc) break;
entries.push(doc);
}
// Also pick up retryable entries (failed but not permanently)
const retryCutoff = new Date(Date.now() - RETRY_COOLDOWN).toISOString();
const retrySlots = BATCH_SIZE - entries.length;
for (let i = 0; i < retrySlots; i++) {
const doc = await collections.ap_following.findOneAndUpdate(
{
source: "refollow:sent",
refollowAttempts: { $lt: MAX_RETRIES },
refollowLastAttempt: { $lt: retryCutoff },
},
{ $set: { source: "refollow:pending" } },
{ returnDocument: "after" },
);
if (!doc) break;
entries.push(doc);
}
if (entries.length === 0) {
// Check if there are still sent entries awaiting Accept
const pendingAccepts = await collections.ap_following.countDocuments({
source: "refollow:sent",
});
if (pendingAccepts > 0) {
console.info(
`[ActivityPub] Batch refollow: all sent, ${pendingAccepts} awaiting Accept`,
);
}
await setJobState(collections, "completed");
console.info("[ActivityPub] Batch refollow: completed");
return;
}
console.info(
`[ActivityPub] Batch refollow: processing batch of ${entries.length}`,
);
for (const entry of entries) {
await processOneFollow(options, entry);
// Delay between individual follows
await sleep(DELAY_PER_FOLLOW);
}
// Update job state timestamp
await setJobState(collections, "running");
// Schedule next batch
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES);
}
/**
* Send a Follow activity for a single imported account.
*/
async function processOneFollow(options, entry) {
const { federation, collections, handle, publicationUrl } = options;
try {
const ctx = federation.createContext(new URL(publicationUrl), {});
// Resolve the remote actor
const remoteActor = await ctx.lookupObject(entry.actorUrl);
if (!remoteActor) {
throw new Error("Could not resolve remote actor");
}
// Send Follow activity
const follow = new Follow({
actor: ctx.getActorUri(handle),
object: new URL(entry.actorUrl),
});
await ctx.sendActivity({ identifier: handle }, remoteActor, follow);
// Mark as sent
await collections.ap_following.updateOne(
{ _id: entry._id },
{
$set: {
source: "refollow:sent",
refollowLastAttempt: new Date().toISOString(),
refollowError: null,
},
$inc: { refollowAttempts: 1 },
},
);
console.info(
`[ActivityPub] Batch refollow: sent Follow to ${entry.actorUrl}`,
);
await logActivity(collections.ap_activities, {
direction: "outbound",
type: "Follow",
actorUrl: publicationUrl,
objectUrl: entry.actorUrl,
actorName: entry.name || entry.actorUrl,
summary: `Batch refollow: sent Follow to ${entry.name || entry.actorUrl}`,
});
} catch (error) {
const attempts = (entry.refollowAttempts || 0) + 1;
const newSource =
attempts >= MAX_RETRIES ? "refollow:failed" : "refollow:sent";
await collections.ap_following.updateOne(
{ _id: entry._id },
{
$set: {
source: newSource,
refollowLastAttempt: new Date().toISOString(),
refollowError: error.message,
},
$inc: { refollowAttempts: 1 },
},
);
console.warn(
`[ActivityPub] Batch refollow: failed for ${entry.actorUrl} (attempt ${attempts}/${MAX_RETRIES}): ${error.message}`,
);
}
}
/**
* Set the batch re-follow job state in ap_kv.
*/
async function setJobState(collections, status) {
const now = new Date().toISOString();
const update = {
$set: {
"value.status": status,
"value.updatedAt": now,
},
$setOnInsert: { _id: KV_KEY },
};
// Only set startedAt on initial start or resume
const existing = await collections.ap_kv.findOne({ _id: KV_KEY });
if (!existing?.value?.startedAt || status === "running" && existing?.value?.status !== "running") {
update.$set["value.startedAt"] = now;
}
await collections.ap_kv.updateOne({ _id: KV_KEY }, update, { upsert: true });
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
+10
View File
@@ -1,6 +1,9 @@
/** /**
* Dashboard controller — shows follower/following counts and recent activity. * Dashboard controller — shows follower/following counts and recent activity.
*/ */
import { getBatchRefollowStatus } from "../batch-refollow.js";
export function dashboardController(mountPath) { export function dashboardController(mountPath) {
return async (request, response, next) => { return async (request, response, next) => {
try { try {
@@ -25,11 +28,18 @@ export function dashboardController(mountPath) {
.toArray() .toArray()
: []; : [];
// Get batch re-follow status for the progress section
const refollowStatus = await getBatchRefollowStatus({
ap_following: followingCollection,
ap_kv: application?.collections?.get("ap_kv"),
});
response.render("activitypub-dashboard", { response.render("activitypub-dashboard", {
title: response.locals.__("activitypub.title"), title: response.locals.__("activitypub.title"),
followerCount, followerCount,
followingCount, followingCount,
recentActivities, recentActivities,
refollowStatus,
mountPath, mountPath,
}); });
} catch (error) { } catch (error) {
+84
View File
@@ -0,0 +1,84 @@
/**
* Admin controllers for the batch re-follow system.
*
* Provides pause, resume, and status endpoints for managing the
* background batch processor from the admin UI.
*/
import {
pauseBatchRefollow,
resumeBatchRefollow,
getBatchRefollowStatus,
} from "../batch-refollow.js";
/**
* POST /admin/refollow/pause — pause the batch processor.
*
* @param {string} mountPath - Plugin mount path
* @param {object} plugin - Plugin instance (for federation/collections access)
* @returns {Function} Express route handler
*/
export function refollowPauseController(mountPath, plugin) {
return async (request, response, next) => {
try {
const { application } = request.app.locals;
const collections = {
ap_following: application.collections.get("ap_following"),
ap_kv: application.collections.get("ap_kv"),
};
await pauseBatchRefollow(collections);
response.json({ ok: true, status: "paused" });
} catch (error) {
next(error);
}
};
}
/**
* POST /admin/refollow/resume — resume the batch processor.
*
* @param {string} mountPath - Plugin mount path
* @param {object} plugin - Plugin instance
* @returns {Function} Express route handler
*/
export function refollowResumeController(mountPath, plugin) {
return async (request, response, next) => {
try {
await resumeBatchRefollow({
federation: plugin._federation,
collections: plugin._collections,
handle: plugin.options.actor.handle,
publicationUrl: plugin._publicationUrl,
});
response.json({ ok: true, status: "running" });
} catch (error) {
next(error);
}
};
}
/**
* GET /admin/refollow/status — get current batch processor status.
*
* @param {string} mountPath - Plugin mount path
* @returns {Function} Express route handler
*/
export function refollowStatusController(mountPath) {
return async (request, response, next) => {
try {
const { application } = request.app.locals;
const collections = {
ap_following: application.collections.get("ap_following"),
ap_kv: application.collections.get("ap_kv"),
};
const status = await getBatchRefollowStatus(collections);
response.json(status);
} catch (error) {
next(error);
}
};
}
+41
View File
@@ -119,6 +119,47 @@ export function registerInboxListeners(inboxChain, options) {
}); });
} }
}) })
.on(Accept, async (ctx, accept) => {
// Handle Accept(Follow) — remote server accepted our Follow request
const actorObj = await accept.getActor();
const actorUrl = actorObj?.id?.href || "";
if (!actorUrl) return;
const inner = await accept.getObject();
if (!(inner instanceof Follow)) return;
// Match against our following list for refollow or microsub-reader follows
const result = await collections.ap_following.findOneAndUpdate(
{
actorUrl,
source: { $in: ["refollow:sent", "microsub-reader"] },
},
{
$set: {
source: "federation",
acceptedAt: new Date().toISOString(),
},
$unset: {
refollowAttempts: "",
refollowLastAttempt: "",
refollowError: "",
},
},
{ returnDocument: "after" },
);
if (result) {
const actorName =
result.name || result.handle || actorUrl;
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Accept(Follow)",
actorUrl,
actorName,
summary: `${actorName} accepted our Follow`,
});
}
})
.on(Like, async (ctx, like) => { .on(Like, async (ctx, like) => {
const actorObj = await like.getActor(); const actorObj = await like.getActor();
const actorUrl = actorObj?.id?.href || ""; const actorUrl = actorObj?.id?.href || "";
+18
View File
@@ -16,6 +16,8 @@
"sourceImport": "Mastodon import", "sourceImport": "Mastodon import",
"sourceManual": "Manual", "sourceManual": "Manual",
"sourceFederation": "Federation", "sourceFederation": "Federation",
"sourceRefollowPending": "Re-follow pending",
"sourceRefollowFailed": "Re-follow failed",
"direction": "Direction", "direction": "Direction",
"directionInbound": "Received", "directionInbound": "Received",
"directionOutbound": "Sent", "directionOutbound": "Sent",
@@ -64,6 +66,22 @@
"failedList": "Could not resolve: %s", "failedList": "Could not resolve: %s",
"failedListSummary": "Failed handles", "failedListSummary": "Failed handles",
"aliasSuccess": "Alias saved — your actor document now includes this account as alsoKnownAs." "aliasSuccess": "Alias saved — your actor document now includes this account as alsoKnownAs."
},
"refollow": {
"title": "Batch re-follow",
"progress": "Re-follow progress",
"remaining": "Remaining",
"awaitingAccept": "Awaiting accept",
"accepted": "Accepted",
"failed": "Failed",
"pause": "Pause",
"resume": "Resume",
"status": {
"idle": "Idle",
"running": "Running",
"paused": "Paused",
"completed": "Completed"
}
} }
} }
} }
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@rmdes/indiekit-endpoint-activitypub", "name": "@rmdes/indiekit-endpoint-activitypub",
"version": "1.0.10", "version": "1.0.11",
"description": "ActivityPub federation endpoint for Indiekit via Fedify. Adds full fediverse support: actor, inbox, outbox, followers, following, syndication, and Mastodon migration.", "description": "ActivityPub federation endpoint for Indiekit via Fedify. Adds full fediverse support: actor, inbox, outbox, followers, following, syndication, and Mastodon migration.",
"keywords": [ "keywords": [
"indiekit", "indiekit",
+96
View File
@@ -32,6 +32,102 @@
} }
]}) }} ]}) }}
{% if refollowStatus and refollowStatus.status !== "idle" %}
<section x-data="refollowProgress('{{ mountPath }}')" class="s-refollow" style="margin-block-end: var(--space-l);">
{{ heading({ text: __("activitypub.refollow.title"), level: 2 }) }}
{# Progress bar #}
<div style="background: var(--color-offset); border-radius: 4px; height: 1.5rem; margin-block-end: var(--space-m); overflow: hidden;">
<div
x-bind:style="'width:' + progress + '%; background: var(--color-accent); height: 100%; transition: width 0.5s ease;'"
style="width: {{ refollowStatus.progressPercent }}%; background: var(--color-accent); height: 100%; transition: width 0.5s ease;">
</div>
</div>
{# Stats grid #}
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(10rem, 1fr)); gap: var(--space-s); margin-block-end: var(--space-m);">
<div style="padding: var(--space-s); background: var(--color-offset); border-radius: 4px; text-align: center;">
<div style="font-size: var(--font-size-xl);" x-text="remaining">{{ refollowStatus.remaining }}</div>
<div style="font-size: var(--font-size-s); color: var(--color-text-offset);">{{ __("activitypub.refollow.remaining") }}</div>
</div>
<div style="padding: var(--space-s); background: var(--color-offset); border-radius: 4px; text-align: center;">
<div style="font-size: var(--font-size-xl);" x-text="sent">{{ refollowStatus.sent }}</div>
<div style="font-size: var(--font-size-s); color: var(--color-text-offset);">{{ __("activitypub.refollow.awaitingAccept") }}</div>
</div>
<div style="padding: var(--space-s); background: var(--color-offset); border-radius: 4px; text-align: center;">
<div style="font-size: var(--font-size-xl);" x-text="federated">{{ refollowStatus.federated }}</div>
<div style="font-size: var(--font-size-s); color: var(--color-text-offset);">{{ __("activitypub.refollow.accepted") }}</div>
</div>
<div style="padding: var(--space-s); background: var(--color-offset); border-radius: 4px; text-align: center;">
<div style="font-size: var(--font-size-xl);" x-text="failed">{{ refollowStatus.failed }}</div>
<div style="font-size: var(--font-size-s); color: var(--color-text-offset);">{{ __("activitypub.refollow.failed") }}</div>
</div>
</div>
{# Status + controls #}
<div style="display: flex; align-items: center; gap: var(--space-s);">
{{ badge({ text: __("activitypub.refollow.status." + refollowStatus.status) }) }}
{% if refollowStatus.status === "running" %}
<form method="post" action="{{ mountPath }}/admin/refollow/pause" x-on:submit.prevent="pause">
<button type="submit" class="button" style="font-size: var(--font-size-s);">{{ __("activitypub.refollow.pause") }}</button>
</form>
{% elif refollowStatus.status === "paused" %}
<form method="post" action="{{ mountPath }}/admin/refollow/resume" x-on:submit.prevent="resume">
<button type="submit" class="button" style="font-size: var(--font-size-s);">{{ __("activitypub.refollow.resume") }}</button>
</form>
{% endif %}
</div>
</section>
<script>
function refollowProgress(mountPath) {
return {
progress: {{ refollowStatus.progressPercent }},
remaining: {{ refollowStatus.remaining }},
sent: {{ refollowStatus.sent }},
federated: {{ refollowStatus.federated }},
failed: {{ refollowStatus.failed }},
status: '{{ refollowStatus.status }}',
interval: null,
init() {
if (this.status === 'running' || this.status === 'paused') {
this.interval = setInterval(() => this.poll(), 10000);
}
},
destroy() {
if (this.interval) clearInterval(this.interval);
},
async poll() {
try {
const res = await fetch(mountPath + '/admin/refollow/status');
const data = await res.json();
this.progress = data.progressPercent;
this.remaining = data.remaining;
this.sent = data.sent;
this.federated = data.federated;
this.failed = data.failed;
this.status = data.status;
if (data.status === 'completed' || data.status === 'idle') {
clearInterval(this.interval);
}
} catch {}
},
async pause() {
await fetch(mountPath + '/admin/refollow/pause', { method: 'POST' });
this.status = 'paused';
},
async resume() {
await fetch(mountPath + '/admin/refollow/resume', { method: 'POST' });
this.status = 'running';
if (!this.interval) {
this.interval = setInterval(() => this.poll(), 10000);
}
}
};
}
</script>
{% endif %}
{{ heading({ text: __("activitypub.recentActivity"), level: 2 }) }} {{ heading({ text: __("activitypub.recentActivity"), level: 2 }) }}
{% if recentActivities.length > 0 %} {% if recentActivities.length > 0 %}
+6 -1
View File
@@ -16,7 +16,12 @@
url: account.actorUrl, url: account.actorUrl,
description: { text: "@" + account.handle if account.handle }, description: { text: "@" + account.handle if account.handle },
published: account.followedAt, published: account.followedAt,
badges: [{ text: __("activitypub.sourceImport") if account.source === "import" else __("activitypub.sourceFederation") }] badges: [{
text: __("activitypub.sourceImport") if account.source === "import"
else __("activitypub.sourceRefollowPending") if account.source === "refollow:sent"
else __("activitypub.sourceRefollowFailed") if account.source === "refollow:failed"
else __("activitypub.sourceFederation")
}]
}) }} }) }}
{% endfor %} {% endfor %}