feat: migrate Fedify KV store and plugin cache from MongoDB to Redis

Replace unbounded ap_kv MongoDB collection (169K docs, 49MB) with Redis:
- Fedify KV store uses @fedify/redis RedisKvStore (native TTL support)
- Plugin cache (fedidb, batch-refollow state, migration flags) uses new
  redis-cache.js utility with indiekit: key prefix
- All controllers updated to remove kvCollection parameter passing
- Addresses OOM kills caused by ap_kv growing ~14K entries/day
This commit is contained in:
Ricardo
2026-03-01 16:26:17 +01:00
parent 5c2fd09f8f
commit abf1b94bd6
10 changed files with 165 additions and 114 deletions
+6
View File
@@ -1,6 +1,7 @@
import express from "express"; import express from "express";
import { setupFederation, buildPersonActor } from "./lib/federation-setup.js"; import { setupFederation, buildPersonActor } from "./lib/federation-setup.js";
import { initRedisCache } from "./lib/redis-cache.js";
import { import {
createFedifyMiddleware, createFedifyMiddleware,
} from "./lib/federation-bridge.js"; } from "./lib/federation-bridge.js";
@@ -1059,6 +1060,11 @@ export default class ActivityPubEndpoint {
console.warn("[ActivityPub] Profile seed failed:", error.message); console.warn("[ActivityPub] Profile seed failed:", error.message);
}); });
// Initialize Redis cache for plugin-level KV (fedidb, batch-refollow, etc.)
if (this.options.redisUrl) {
initRedisCache(this.options.redisUrl);
}
// Set up Fedify Federation instance // Set up Fedify Federation instance
const { federation } = setupFederation({ const { federation } = setupFederation({
collections: this._collections, collections: this._collections,
+23 -23
View File
@@ -13,6 +13,7 @@
import { Follow } from "@fedify/fedify/vocab"; import { Follow } from "@fedify/fedify/vocab";
import { logActivity } from "./activity-log.js"; import { logActivity } from "./activity-log.js";
import { cacheGet, cacheSet } from "./redis-cache.js";
const BATCH_SIZE = 10; const BATCH_SIZE = 10;
const DELAY_PER_FOLLOW = 3_000; const DELAY_PER_FOLLOW = 3_000;
@@ -58,7 +59,7 @@ export async function startBatchRefollow(options) {
); );
// Set job state to running // Set job state to running
await setJobState(collections, "running"); await setJobState("running");
// Schedule first batch after startup delay // Schedule first batch after startup delay
_timer = setTimeout(() => processNextBatch(options), STARTUP_DELAY); _timer = setTimeout(() => processNextBatch(options), STARTUP_DELAY);
@@ -81,7 +82,7 @@ export async function pauseBatchRefollow(collections) {
{ $set: { source: "import" } }, { $set: { source: "import" } },
); );
await setJobState(collections, "paused"); await setJobState("paused");
console.info("[ActivityPub] Batch refollow: paused"); console.info("[ActivityPub] Batch refollow: paused");
} }
@@ -100,7 +101,7 @@ export async function resumeBatchRefollow(options) {
_timer = null; _timer = null;
} }
await setJobState(options.collections, "running"); await setJobState("running");
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); _timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES);
console.info("[ActivityPub] Batch refollow: resumed"); console.info("[ActivityPub] Batch refollow: resumed");
} }
@@ -112,8 +113,8 @@ export async function resumeBatchRefollow(options) {
* @returns {Promise<object>} Status object * @returns {Promise<object>} Status object
*/ */
export async function getBatchRefollowStatus(collections) { export async function getBatchRefollowStatus(collections) {
const state = await collections.ap_kv.findOne({ _id: KV_KEY }); const state = await cacheGet(KV_KEY);
const status = state?.value?.status || "idle"; const status = state?.status || "idle";
const [remaining, sent, failed, federated] = await Promise.all([ const [remaining, sent, failed, federated] = await Promise.all([
collections.ap_following.countDocuments({ source: "import" }), collections.ap_following.countDocuments({ source: "import" }),
@@ -138,8 +139,8 @@ export async function getBatchRefollowStatus(collections) {
federated, federated,
completed, completed,
progressPercent, progressPercent,
startedAt: state?.value?.startedAt || null, startedAt: state?.startedAt || null,
updatedAt: state?.value?.updatedAt || null, updatedAt: state?.updatedAt || null,
}; };
} }
@@ -152,8 +153,8 @@ async function processNextBatch(options) {
const { federation, collections, handle, publicationUrl } = options; const { federation, collections, handle, publicationUrl } = options;
_timer = null; _timer = null;
const state = await collections.ap_kv.findOne({ _id: KV_KEY }); const state = await cacheGet(KV_KEY);
if (state?.value?.status !== "running") return; if (state?.status !== "running") return;
// Claim a batch atomically: set source to "refollow:pending" // Claim a batch atomically: set source to "refollow:pending"
const entries = []; const entries = [];
@@ -196,7 +197,7 @@ async function processNextBatch(options) {
); );
} }
await setJobState(collections, "completed"); await setJobState("completed");
console.info("[ActivityPub] Batch refollow: completed"); console.info("[ActivityPub] Batch refollow: completed");
return; return;
} }
@@ -212,7 +213,7 @@ async function processNextBatch(options) {
} }
// Update job state timestamp // Update job state timestamp
await setJobState(collections, "running"); await setJobState("running");
// Schedule next batch // Schedule next batch
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); _timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES);
@@ -306,25 +307,24 @@ async function processOneFollow(options, entry) {
} }
/** /**
* Set the batch re-follow job state in ap_kv. * Set the batch re-follow job state in Redis.
*/ */
async function setJobState(collections, status) { async function setJobState(status) {
const now = new Date().toISOString(); const now = new Date().toISOString();
const update = { const existing = (await cacheGet(KV_KEY)) || {};
$set: {
"value.status": status, const newState = {
"value.updatedAt": now, ...existing,
}, status,
$setOnInsert: { _id: KV_KEY }, updatedAt: now,
}; };
// Only set startedAt on initial start or resume // Only set startedAt on initial start or resume
const existing = await collections.ap_kv.findOne({ _id: KV_KEY }); if (!existing.startedAt || (status === "running" && existing.status !== "running")) {
if (!existing?.value?.startedAt || status === "running" && existing?.value?.status !== "running") { newState.startedAt = now;
update.$set["value.startedAt"] = now;
} }
await collections.ap_kv.updateOne({ _id: KV_KEY }, update, { upsert: true }); await cacheSet(KV_KEY, newState);
} }
function sleep(ms) { function sleep(ms) {
-1
View File
@@ -40,7 +40,6 @@ export function dashboardController(mountPath) {
// Get batch re-follow status for the progress section // Get batch re-follow status for the progress section
const refollowStatus = await getBatchRefollowStatus({ const refollowStatus = await getBatchRefollowStatus({
ap_following: followingCollection, ap_following: followingCollection,
ap_kv: application?.collections?.get("ap_kv"),
}); });
response.render("activitypub-dashboard", { response.render("activitypub-dashboard", {
+3 -12
View File
@@ -233,10 +233,7 @@ export function instanceSearchApiController(mountPath) {
return response.json([]); return response.json([]);
} }
const { application } = request.app.locals; const results = await searchInstances(q, 8);
const kvCollection = application?.collections?.get("ap_kv") || null;
const results = await searchInstances(kvCollection, q, 8);
response.json(results); response.json(results);
} catch (error) { } catch (error) {
next(error); next(error);
@@ -262,10 +259,7 @@ export function instanceCheckApiController(mountPath) {
return response.status(400).json({ supported: false, error: "Invalid domain" }); return response.status(400).json({ supported: false, error: "Invalid domain" });
} }
const { application } = request.app.locals; const result = await checkInstanceTimeline(validated);
const kvCollection = application?.collections?.get("ap_kv") || null;
const result = await checkInstanceTimeline(kvCollection, validated);
response.json(result); response.json(result);
} catch (error) { } catch (error) {
next(error); next(error);
@@ -280,10 +274,7 @@ export function instanceCheckApiController(mountPath) {
export function popularAccountsApiController(mountPath) { export function popularAccountsApiController(mountPath) {
return async (request, response, next) => { return async (request, response, next) => {
try { try {
const { application } = request.app.locals; const accounts = await getPopularAccounts(50);
const kvCollection = application?.collections?.get("ap_kv") || null;
const accounts = await getPopularAccounts(kvCollection, 50);
response.json(accounts); response.json(accounts);
} catch (error) { } catch (error) {
next(error); next(error);
-2
View File
@@ -24,7 +24,6 @@ export function refollowPauseController(mountPath, plugin) {
const { application } = request.app.locals; const { application } = request.app.locals;
const collections = { const collections = {
ap_following: application.collections.get("ap_following"), ap_following: application.collections.get("ap_following"),
ap_kv: application.collections.get("ap_kv"),
}; };
await pauseBatchRefollow(collections); await pauseBatchRefollow(collections);
@@ -72,7 +71,6 @@ export function refollowStatusController(mountPath) {
const { application } = request.app.locals; const { application } = request.app.locals;
const collections = { const collections = {
ap_following: application.collections.get("ap_following"), ap_following: application.collections.get("ap_following"),
ap_kv: application.collections.get("ap_kv"),
}; };
const status = await getBatchRefollowStatus(collections); const status = await getBatchRefollowStatus(collections);
+10 -3
View File
@@ -34,7 +34,7 @@ import {
Service, Service,
} from "@fedify/fedify/vocab"; } from "@fedify/fedify/vocab";
import { configure, getConsoleSink } from "@logtape/logtape"; import { configure, getConsoleSink } from "@logtape/logtape";
import { RedisMessageQueue } from "@fedify/redis"; import { RedisMessageQueue, RedisKvStore } from "@fedify/redis";
import { createFederationDebugger } from "@fedify/debugger"; import { createFederationDebugger } from "@fedify/debugger";
import Redis from "ioredis"; import Redis from "ioredis";
import { MongoKvStore } from "./kv-store.js"; import { MongoKvStore } from "./kv-store.js";
@@ -100,6 +100,7 @@ export function setupFederation(options) {
} }
let queue; let queue;
let kv;
if (redisUrl) { if (redisUrl) {
const redisQueue = new RedisMessageQueue(() => new Redis(redisUrl)); const redisQueue = new RedisMessageQueue(() => new Redis(redisUrl));
if (parallelWorkers > 1) { if (parallelWorkers > 1) {
@@ -111,15 +112,21 @@ export function setupFederation(options) {
queue = redisQueue; queue = redisQueue;
console.info("[ActivityPub] Using Redis message queue (single worker)"); console.info("[ActivityPub] Using Redis message queue (single worker)");
} }
// Use Redis for Fedify KV store — idempotence records, public key cache,
// remote document cache. Redis handles TTL natively so entries auto-expire
// instead of growing unbounded in MongoDB.
kv = new RedisKvStore(new Redis(redisUrl));
console.info("[ActivityPub] Using Redis KV store for Fedify");
} else { } else {
queue = new InProcessMessageQueue(); queue = new InProcessMessageQueue();
kv = new MongoKvStore(collections.ap_kv);
console.warn( console.warn(
"[ActivityPub] Using in-process message queue (not recommended for production)", "[ActivityPub] Using in-process message queue + MongoDB KV store (not recommended for production)",
); );
} }
const federation = createFederation({ const federation = createFederation({
kv: new MongoKvStore(collections.ap_kv), kv,
queue, queue,
}); });
+17 -59
View File
@@ -1,5 +1,5 @@
/** /**
* FediDB API client with MongoDB caching. * FediDB API client with Redis caching.
* *
* Wraps https://api.fedidb.org/v1/ endpoints: * Wraps https://api.fedidb.org/v1/ endpoints:
* - /servers — cursor-paginated list of known fediverse instances (ranked by size) * - /servers — cursor-paginated list of known fediverse instances (ranked by size)
@@ -9,12 +9,14 @@
* returns the same ranked list. We paginate through ~500 servers, cache the full * returns the same ranked list. We paginate through ~500 servers, cache the full
* corpus for 24 hours, and filter locally when the user searches. * corpus for 24 hours, and filter locally when the user searches.
* *
* Cache TTL: 24 hours for both datasets. * Cache TTL: 24 hours for both datasets (enforced by Redis TTL).
*/ */
import { cacheGet, cacheSet } from "./redis-cache.js";
const API_BASE = "https://api.fedidb.org/v1"; const API_BASE = "https://api.fedidb.org/v1";
const FETCH_TIMEOUT_MS = 8_000; const FETCH_TIMEOUT_MS = 8_000;
const CACHE_TTL_MS = 24 * 60 * 60 * 1000; // 24 hours const CACHE_TTL_SECONDS = 24 * 60 * 60; // 24 hours
/** /**
* Fetch with timeout helper. * Fetch with timeout helper.
@@ -35,60 +37,21 @@ async function fetchWithTimeout(url) {
} }
} }
/**
* Get cached data from ap_kv, or null if expired/missing.
* @param {object} kvCollection - MongoDB ap_kv collection
* @param {string} cacheKey - Key to look up
* @returns {Promise<object|null>} Cached data or null
*/
async function getFromCache(kvCollection, cacheKey) {
if (!kvCollection) return null;
try {
const doc = await kvCollection.findOne({ _id: cacheKey });
if (!doc?.value?.data) return null;
const age = Date.now() - (doc.value.cachedAt || 0);
if (age > CACHE_TTL_MS) return null;
return doc.value.data;
} catch {
return null;
}
}
/**
* Write data to ap_kv cache.
* @param {object} kvCollection - MongoDB ap_kv collection
* @param {string} cacheKey - Key to store under
* @param {object} data - Data to cache
*/
async function writeToCache(kvCollection, cacheKey, data) {
if (!kvCollection) return;
try {
await kvCollection.updateOne(
{ _id: cacheKey },
{ $set: { value: { data, cachedAt: Date.now() } } },
{ upsert: true }
);
} catch {
// Cache write failure is non-critical
}
}
/** /**
* Fetch the FediDB server catalogue by paginating through cursor-based results. * Fetch the FediDB server catalogue by paginating through cursor-based results.
* Cached for 24 hours as a single entry. The API ignores the `q` param and * Cached for 24 hours as a single entry. The API ignores the `q` param and
* always returns a ranked list, so we collect a large corpus and filter locally. * always returns a ranked list, so we collect a large corpus and filter locally.
* *
* Paginates up to MAX_PAGES (13 pages × 40 = ~520 servers), which covers * Paginates up to MAX_PAGES (13 pages × 40 = ~520 servers), which covers
* all well-known instances. Results are cached in ap_kv for 24 hours. * all well-known instances. Results are cached in Redis for 24 hours.
* *
* @param {object} kvCollection - MongoDB ap_kv collection
* @returns {Promise<Array>} * @returns {Promise<Array>}
*/ */
const MAX_PAGES = 13; const MAX_PAGES = 13;
async function getAllServers(kvCollection) { async function getAllServers() {
const cacheKey = "fedidb:servers-all"; const cacheKey = "fedidb:servers-all";
const cached = await getFromCache(kvCollection, cacheKey); const cached = await cacheGet(cacheKey);
if (cached) return cached; if (cached) return cached;
const results = []; const results = [];
@@ -123,7 +86,7 @@ async function getAllServers(kvCollection) {
} }
if (results.length > 0) { if (results.length > 0) {
await writeToCache(kvCollection, cacheKey, results); await cacheSet(cacheKey, results, CACHE_TTL_SECONDS);
} }
} catch { } catch {
// Return whatever we collected so far // Return whatever we collected so far
@@ -137,19 +100,16 @@ async function getAllServers(kvCollection) {
* Returns a flat array of { domain, software, description, mau, openRegistration }. * Returns a flat array of { domain, software, description, mau, openRegistration }.
* *
* Fetches the full server list once (cached 24h) and filters by domain/software match. * Fetches the full server list once (cached 24h) and filters by domain/software match.
* FediDB's /v1/servers endpoint ignores the `q` param and always returns a static
* ranked list, so server-side filtering is the only way to get relevant results.
* *
* @param {object} kvCollection - MongoDB ap_kv collection
* @param {string} query - Search term (e.g. "mast") * @param {string} query - Search term (e.g. "mast")
* @param {number} [limit=10] - Max results * @param {number} [limit=10] - Max results
* @returns {Promise<Array>} * @returns {Promise<Array>}
*/ */
export async function searchInstances(kvCollection, query, limit = 10) { export async function searchInstances(query, limit = 10) {
const q = (query || "").trim().toLowerCase(); const q = (query || "").trim().toLowerCase();
if (!q) return []; if (!q) return [];
const allServers = await getAllServers(kvCollection); const allServers = await getAllServers();
return allServers return allServers
.filter( .filter(
@@ -166,13 +126,12 @@ export async function searchInstances(kvCollection, query, limit = 10) {
* *
* Cached per domain for 24 hours. * Cached per domain for 24 hours.
* *
* @param {object} kvCollection - MongoDB ap_kv collection
* @param {string} domain - Instance hostname * @param {string} domain - Instance hostname
* @returns {Promise<{ supported: boolean, error: string|null }>} * @returns {Promise<{ supported: boolean, error: string|null }>}
*/ */
export async function checkInstanceTimeline(kvCollection, domain) { export async function checkInstanceTimeline(domain) {
const cacheKey = `fedidb:timeline-check:${domain}`; const cacheKey = `fedidb:timeline-check:${domain}`;
const cached = await getFromCache(kvCollection, cacheKey); const cached = await cacheGet(cacheKey);
if (cached) return cached; if (cached) return cached;
try { try {
@@ -193,7 +152,7 @@ export async function checkInstanceTimeline(kvCollection, domain) {
result = { supported: false, error: errorMsg }; result = { supported: false, error: errorMsg };
} }
await writeToCache(kvCollection, cacheKey, result); await cacheSet(cacheKey, result, CACHE_TTL_SECONDS);
return result; return result;
} catch { } catch {
return { supported: false, error: "Connection failed" }; return { supported: false, error: "Connection failed" };
@@ -206,13 +165,12 @@ export async function checkInstanceTimeline(kvCollection, domain) {
* *
* Cached for 24 hours (single cache entry). * Cached for 24 hours (single cache entry).
* *
* @param {object} kvCollection - MongoDB ap_kv collection
* @param {number} [limit=50] - Max accounts to fetch * @param {number} [limit=50] - Max accounts to fetch
* @returns {Promise<Array>} * @returns {Promise<Array>}
*/ */
export async function getPopularAccounts(kvCollection, limit = 50) { export async function getPopularAccounts(limit = 50) {
const cacheKey = `fedidb:popular-accounts:${limit}`; const cacheKey = `fedidb:popular-accounts:${limit}`;
const cached = await getFromCache(kvCollection, cacheKey); const cached = await cacheGet(cacheKey);
if (cached) return cached; if (cached) return cached;
try { try {
@@ -234,7 +192,7 @@ export async function getPopularAccounts(kvCollection, limit = 50) {
bio: (a.bio || "").replace(/<[^>]*>/g, "").slice(0, 120), bio: (a.bio || "").replace(/<[^>]*>/g, "").slice(0, 120),
})); }));
await writeToCache(kvCollection, cacheKey, results); await cacheSet(cacheKey, results, CACHE_TTL_SECONDS);
return results; return results;
} catch { } catch {
return []; return [];
+7 -13
View File
@@ -12,6 +12,8 @@
* New items will have URLs populated by the fixed extractObjectData() (Task 1). * New items will have URLs populated by the fixed extractObjectData() (Task 1).
*/ */
import { cacheGet, cacheSet } from "../redis-cache.js";
const MIGRATION_KEY = "migration:separate-mentions"; const MIGRATION_KEY = "migration:separate-mentions";
/** /**
@@ -20,11 +22,11 @@ const MIGRATION_KEY = "migration:separate-mentions";
* @returns {Promise<{ skipped: boolean, updated: number }>} * @returns {Promise<{ skipped: boolean, updated: number }>}
*/ */
export async function runSeparateMentionsMigration(collections) { export async function runSeparateMentionsMigration(collections) {
const { ap_kv, ap_timeline } = collections; const { ap_timeline } = collections;
// Check if already completed // Check if already completed
const state = await ap_kv.findOne({ _id: MIGRATION_KEY }); const state = await cacheGet(MIGRATION_KEY);
if (state?.value?.completed) { if (state?.completed) {
return { skipped: true, updated: 0 }; return { skipped: true, updated: 0 };
} }
@@ -35,11 +37,7 @@ export async function runSeparateMentionsMigration(collections) {
if (docs.length === 0) { if (docs.length === 0) {
// No docs to migrate — mark complete immediately // No docs to migrate — mark complete immediately
await ap_kv.updateOne( await cacheSet(MIGRATION_KEY, { completed: true, date: new Date().toISOString(), updated: 0 });
{ _id: MIGRATION_KEY },
{ $set: { value: { completed: true, date: new Date().toISOString(), updated: 0 } } },
{ upsert: true }
);
return { skipped: false, updated: 0 }; return { skipped: false, updated: 0 };
} }
@@ -78,11 +76,7 @@ export async function runSeparateMentionsMigration(collections) {
const updated = result.modifiedCount || 0; const updated = result.modifiedCount || 0;
// Mark migration complete // Mark migration complete
await ap_kv.updateOne( await cacheSet(MIGRATION_KEY, { completed: true, date: new Date().toISOString(), updated });
{ _id: MIGRATION_KEY },
{ $set: { value: { completed: true, date: new Date().toISOString(), updated } } },
{ upsert: true }
);
return { skipped: false, updated }; return { skipped: false, updated };
} }
+98
View File
@@ -0,0 +1,98 @@
/**
* Redis-backed cache for plugin-level key-value storage.
*
* Replaces direct MongoDB ap_kv reads/writes for fedidb cache,
* batch-refollow state, and migration flags. Uses the same Redis
* connection as the Fedify message queue and KV store.
*
* All keys are prefixed with "indiekit:" to avoid collisions with
* Fedify's "fedify::" prefix.
*/
import Redis from "ioredis";
const KEY_PREFIX = "indiekit:";
let _redis = null;
/**
* Initialize the Redis cache with a connection URL.
* Safe to call multiple times reuses existing connection.
* @param {string} redisUrl - Redis connection URL
*/
export function initRedisCache(redisUrl) {
if (_redis) return;
if (!redisUrl) return;
_redis = new Redis(redisUrl);
}
/**
* Get the Redis client instance (for direct use if needed).
* @returns {import("ioredis").Redis|null}
*/
export function getRedisClient() {
return _redis;
}
/**
* Get a value from Redis cache.
* @param {string} key
* @returns {Promise<unknown|null>}
*/
export async function cacheGet(key) {
if (!_redis) return null;
try {
const raw = await _redis.get(KEY_PREFIX + key);
if (raw === null) return null;
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Set a value in Redis cache with optional TTL.
* @param {string} key
* @param {unknown} value - Must be JSON-serializable
* @param {number} [ttlSeconds] - Optional TTL in seconds (0 = no expiry)
*/
export async function cacheSet(key, value, ttlSeconds = 0) {
if (!_redis) return;
try {
const raw = JSON.stringify(value);
if (ttlSeconds > 0) {
await _redis.set(KEY_PREFIX + key, raw, "EX", ttlSeconds);
} else {
await _redis.set(KEY_PREFIX + key, raw);
}
} catch {
// Cache write failure is non-critical
}
}
/**
* Delete a key from Redis cache.
* @param {string} key
*/
export async function cacheDelete(key) {
if (!_redis) return;
try {
await _redis.del(KEY_PREFIX + key);
} catch {
// Ignore
}
}
/**
* Check if a key exists in Redis cache.
* @param {string} key
* @returns {Promise<boolean>}
*/
export async function cacheExists(key) {
if (!_redis) return false;
try {
return (await _redis.exists(KEY_PREFIX + key)) === 1;
} catch {
return false;
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@rmdes/indiekit-endpoint-activitypub", "name": "@rmdes/indiekit-endpoint-activitypub",
"version": "2.1.2", "version": "2.2.0",
"description": "ActivityPub federation endpoint for Indiekit via Fedify. Adds full fediverse support: actor, inbox, outbox, followers, following, syndication, and Mastodon migration.", "description": "ActivityPub federation endpoint for Indiekit via Fedify. Adds full fediverse support: actor, inbox, outbox, followers, following, syndication, and Mastodon migration.",
"keywords": [ "keywords": [
"indiekit", "indiekit",