From abf1b94bd672c160b8e4119b7237c519e13084fd Mon Sep 17 00:00:00 2001 From: Ricardo Date: Sun, 1 Mar 2026 16:26:17 +0100 Subject: [PATCH] feat: migrate Fedify KV store and plugin cache from MongoDB to Redis Replace unbounded ap_kv MongoDB collection (169K docs, 49MB) with Redis: - Fedify KV store uses @fedify/redis RedisKvStore (native TTL support) - Plugin cache (fedidb, batch-refollow state, migration flags) uses new redis-cache.js utility with indiekit: key prefix - All controllers updated to remove kvCollection parameter passing - Addresses OOM kills caused by ap_kv growing ~14K entries/day --- index.js | 6 ++ lib/batch-refollow.js | 46 +++++++------- lib/controllers/dashboard.js | 1 - lib/controllers/explore.js | 15 +---- lib/controllers/refollow.js | 2 - lib/federation-setup.js | 13 +++- lib/fedidb.js | 76 +++++----------------- lib/migrations/separate-mentions.js | 20 +++--- lib/redis-cache.js | 98 +++++++++++++++++++++++++++++ package.json | 2 +- 10 files changed, 165 insertions(+), 114 deletions(-) create mode 100644 lib/redis-cache.js diff --git a/index.js b/index.js index 9b0fb26..de974f8 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ import express from "express"; import { setupFederation, buildPersonActor } from "./lib/federation-setup.js"; +import { initRedisCache } from "./lib/redis-cache.js"; import { createFedifyMiddleware, } from "./lib/federation-bridge.js"; @@ -1059,6 +1060,11 @@ export default class ActivityPubEndpoint { console.warn("[ActivityPub] Profile seed failed:", error.message); }); + // Initialize Redis cache for plugin-level KV (fedidb, batch-refollow, etc.) + if (this.options.redisUrl) { + initRedisCache(this.options.redisUrl); + } + // Set up Fedify Federation instance const { federation } = setupFederation({ collections: this._collections, diff --git a/lib/batch-refollow.js b/lib/batch-refollow.js index 4452abe..99ac690 100644 --- a/lib/batch-refollow.js +++ b/lib/batch-refollow.js @@ -13,6 +13,7 @@ import { Follow } from "@fedify/fedify/vocab"; import { logActivity } from "./activity-log.js"; +import { cacheGet, cacheSet } from "./redis-cache.js"; const BATCH_SIZE = 10; const DELAY_PER_FOLLOW = 3_000; @@ -58,7 +59,7 @@ export async function startBatchRefollow(options) { ); // Set job state to running - await setJobState(collections, "running"); + await setJobState("running"); // Schedule first batch after startup delay _timer = setTimeout(() => processNextBatch(options), STARTUP_DELAY); @@ -81,7 +82,7 @@ export async function pauseBatchRefollow(collections) { { $set: { source: "import" } }, ); - await setJobState(collections, "paused"); + await setJobState("paused"); console.info("[ActivityPub] Batch refollow: paused"); } @@ -100,7 +101,7 @@ export async function resumeBatchRefollow(options) { _timer = null; } - await setJobState(options.collections, "running"); + await setJobState("running"); _timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); console.info("[ActivityPub] Batch refollow: resumed"); } @@ -112,8 +113,8 @@ export async function resumeBatchRefollow(options) { * @returns {Promise} Status object */ export async function getBatchRefollowStatus(collections) { - const state = await collections.ap_kv.findOne({ _id: KV_KEY }); - const status = state?.value?.status || "idle"; + const state = await cacheGet(KV_KEY); + const status = state?.status || "idle"; const [remaining, sent, failed, federated] = await Promise.all([ collections.ap_following.countDocuments({ source: "import" }), @@ -138,8 +139,8 @@ export async function getBatchRefollowStatus(collections) { federated, completed, progressPercent, - startedAt: state?.value?.startedAt || null, - updatedAt: state?.value?.updatedAt || null, + startedAt: state?.startedAt || null, + updatedAt: state?.updatedAt || null, }; } @@ -152,8 +153,8 @@ async function processNextBatch(options) { const { federation, collections, handle, publicationUrl } = options; _timer = null; - const state = await collections.ap_kv.findOne({ _id: KV_KEY }); - if (state?.value?.status !== "running") return; + const state = await cacheGet(KV_KEY); + if (state?.status !== "running") return; // Claim a batch atomically: set source to "refollow:pending" const entries = []; @@ -196,7 +197,7 @@ async function processNextBatch(options) { ); } - await setJobState(collections, "completed"); + await setJobState("completed"); console.info("[ActivityPub] Batch refollow: completed"); return; } @@ -212,7 +213,7 @@ async function processNextBatch(options) { } // Update job state timestamp - await setJobState(collections, "running"); + await setJobState("running"); // Schedule next batch _timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); @@ -306,25 +307,24 @@ async function processOneFollow(options, entry) { } /** - * Set the batch re-follow job state in ap_kv. + * Set the batch re-follow job state in Redis. */ -async function setJobState(collections, status) { +async function setJobState(status) { const now = new Date().toISOString(); - const update = { - $set: { - "value.status": status, - "value.updatedAt": now, - }, - $setOnInsert: { _id: KV_KEY }, + const existing = (await cacheGet(KV_KEY)) || {}; + + const newState = { + ...existing, + status, + updatedAt: now, }; // Only set startedAt on initial start or resume - const existing = await collections.ap_kv.findOne({ _id: KV_KEY }); - if (!existing?.value?.startedAt || status === "running" && existing?.value?.status !== "running") { - update.$set["value.startedAt"] = now; + if (!existing.startedAt || (status === "running" && existing.status !== "running")) { + newState.startedAt = now; } - await collections.ap_kv.updateOne({ _id: KV_KEY }, update, { upsert: true }); + await cacheSet(KV_KEY, newState); } function sleep(ms) { diff --git a/lib/controllers/dashboard.js b/lib/controllers/dashboard.js index 8c1b4d7..b14d630 100644 --- a/lib/controllers/dashboard.js +++ b/lib/controllers/dashboard.js @@ -40,7 +40,6 @@ export function dashboardController(mountPath) { // Get batch re-follow status for the progress section const refollowStatus = await getBatchRefollowStatus({ ap_following: followingCollection, - ap_kv: application?.collections?.get("ap_kv"), }); response.render("activitypub-dashboard", { diff --git a/lib/controllers/explore.js b/lib/controllers/explore.js index ea4bea3..0f19bce 100644 --- a/lib/controllers/explore.js +++ b/lib/controllers/explore.js @@ -233,10 +233,7 @@ export function instanceSearchApiController(mountPath) { return response.json([]); } - const { application } = request.app.locals; - const kvCollection = application?.collections?.get("ap_kv") || null; - - const results = await searchInstances(kvCollection, q, 8); + const results = await searchInstances(q, 8); response.json(results); } catch (error) { next(error); @@ -262,10 +259,7 @@ export function instanceCheckApiController(mountPath) { return response.status(400).json({ supported: false, error: "Invalid domain" }); } - const { application } = request.app.locals; - const kvCollection = application?.collections?.get("ap_kv") || null; - - const result = await checkInstanceTimeline(kvCollection, validated); + const result = await checkInstanceTimeline(validated); response.json(result); } catch (error) { next(error); @@ -280,10 +274,7 @@ export function instanceCheckApiController(mountPath) { export function popularAccountsApiController(mountPath) { return async (request, response, next) => { try { - const { application } = request.app.locals; - const kvCollection = application?.collections?.get("ap_kv") || null; - - const accounts = await getPopularAccounts(kvCollection, 50); + const accounts = await getPopularAccounts(50); response.json(accounts); } catch (error) { next(error); diff --git a/lib/controllers/refollow.js b/lib/controllers/refollow.js index 58e25cf..c97860e 100644 --- a/lib/controllers/refollow.js +++ b/lib/controllers/refollow.js @@ -24,7 +24,6 @@ export function refollowPauseController(mountPath, plugin) { const { application } = request.app.locals; const collections = { ap_following: application.collections.get("ap_following"), - ap_kv: application.collections.get("ap_kv"), }; await pauseBatchRefollow(collections); @@ -72,7 +71,6 @@ export function refollowStatusController(mountPath) { const { application } = request.app.locals; const collections = { ap_following: application.collections.get("ap_following"), - ap_kv: application.collections.get("ap_kv"), }; const status = await getBatchRefollowStatus(collections); diff --git a/lib/federation-setup.js b/lib/federation-setup.js index bc7f2e3..b7db806 100644 --- a/lib/federation-setup.js +++ b/lib/federation-setup.js @@ -34,7 +34,7 @@ import { Service, } from "@fedify/fedify/vocab"; import { configure, getConsoleSink } from "@logtape/logtape"; -import { RedisMessageQueue } from "@fedify/redis"; +import { RedisMessageQueue, RedisKvStore } from "@fedify/redis"; import { createFederationDebugger } from "@fedify/debugger"; import Redis from "ioredis"; import { MongoKvStore } from "./kv-store.js"; @@ -100,6 +100,7 @@ export function setupFederation(options) { } let queue; + let kv; if (redisUrl) { const redisQueue = new RedisMessageQueue(() => new Redis(redisUrl)); if (parallelWorkers > 1) { @@ -111,15 +112,21 @@ export function setupFederation(options) { queue = redisQueue; console.info("[ActivityPub] Using Redis message queue (single worker)"); } + // Use Redis for Fedify KV store — idempotence records, public key cache, + // remote document cache. Redis handles TTL natively so entries auto-expire + // instead of growing unbounded in MongoDB. + kv = new RedisKvStore(new Redis(redisUrl)); + console.info("[ActivityPub] Using Redis KV store for Fedify"); } else { queue = new InProcessMessageQueue(); + kv = new MongoKvStore(collections.ap_kv); console.warn( - "[ActivityPub] Using in-process message queue (not recommended for production)", + "[ActivityPub] Using in-process message queue + MongoDB KV store (not recommended for production)", ); } const federation = createFederation({ - kv: new MongoKvStore(collections.ap_kv), + kv, queue, }); diff --git a/lib/fedidb.js b/lib/fedidb.js index f5e1768..86a587f 100644 --- a/lib/fedidb.js +++ b/lib/fedidb.js @@ -1,5 +1,5 @@ /** - * FediDB API client with MongoDB caching. + * FediDB API client with Redis caching. * * Wraps https://api.fedidb.org/v1/ endpoints: * - /servers — cursor-paginated list of known fediverse instances (ranked by size) @@ -9,12 +9,14 @@ * returns the same ranked list. We paginate through ~500 servers, cache the full * corpus for 24 hours, and filter locally when the user searches. * - * Cache TTL: 24 hours for both datasets. + * Cache TTL: 24 hours for both datasets (enforced by Redis TTL). */ +import { cacheGet, cacheSet } from "./redis-cache.js"; + const API_BASE = "https://api.fedidb.org/v1"; const FETCH_TIMEOUT_MS = 8_000; -const CACHE_TTL_MS = 24 * 60 * 60 * 1000; // 24 hours +const CACHE_TTL_SECONDS = 24 * 60 * 60; // 24 hours /** * Fetch with timeout helper. @@ -35,60 +37,21 @@ async function fetchWithTimeout(url) { } } -/** - * Get cached data from ap_kv, or null if expired/missing. - * @param {object} kvCollection - MongoDB ap_kv collection - * @param {string} cacheKey - Key to look up - * @returns {Promise} Cached data or null - */ -async function getFromCache(kvCollection, cacheKey) { - if (!kvCollection) return null; - try { - const doc = await kvCollection.findOne({ _id: cacheKey }); - if (!doc?.value?.data) return null; - const age = Date.now() - (doc.value.cachedAt || 0); - if (age > CACHE_TTL_MS) return null; - return doc.value.data; - } catch { - return null; - } -} - -/** - * Write data to ap_kv cache. - * @param {object} kvCollection - MongoDB ap_kv collection - * @param {string} cacheKey - Key to store under - * @param {object} data - Data to cache - */ -async function writeToCache(kvCollection, cacheKey, data) { - if (!kvCollection) return; - try { - await kvCollection.updateOne( - { _id: cacheKey }, - { $set: { value: { data, cachedAt: Date.now() } } }, - { upsert: true } - ); - } catch { - // Cache write failure is non-critical - } -} - /** * Fetch the FediDB server catalogue by paginating through cursor-based results. * Cached for 24 hours as a single entry. The API ignores the `q` param and * always returns a ranked list, so we collect a large corpus and filter locally. * * Paginates up to MAX_PAGES (13 pages × 40 = ~520 servers), which covers - * all well-known instances. Results are cached in ap_kv for 24 hours. + * all well-known instances. Results are cached in Redis for 24 hours. * - * @param {object} kvCollection - MongoDB ap_kv collection * @returns {Promise} */ const MAX_PAGES = 13; -async function getAllServers(kvCollection) { +async function getAllServers() { const cacheKey = "fedidb:servers-all"; - const cached = await getFromCache(kvCollection, cacheKey); + const cached = await cacheGet(cacheKey); if (cached) return cached; const results = []; @@ -123,7 +86,7 @@ async function getAllServers(kvCollection) { } if (results.length > 0) { - await writeToCache(kvCollection, cacheKey, results); + await cacheSet(cacheKey, results, CACHE_TTL_SECONDS); } } catch { // Return whatever we collected so far @@ -137,19 +100,16 @@ async function getAllServers(kvCollection) { * Returns a flat array of { domain, software, description, mau, openRegistration }. * * Fetches the full server list once (cached 24h) and filters by domain/software match. - * FediDB's /v1/servers endpoint ignores the `q` param and always returns a static - * ranked list, so server-side filtering is the only way to get relevant results. * - * @param {object} kvCollection - MongoDB ap_kv collection * @param {string} query - Search term (e.g. "mast") * @param {number} [limit=10] - Max results * @returns {Promise} */ -export async function searchInstances(kvCollection, query, limit = 10) { +export async function searchInstances(query, limit = 10) { const q = (query || "").trim().toLowerCase(); if (!q) return []; - const allServers = await getAllServers(kvCollection); + const allServers = await getAllServers(); return allServers .filter( @@ -166,13 +126,12 @@ export async function searchInstances(kvCollection, query, limit = 10) { * * Cached per domain for 24 hours. * - * @param {object} kvCollection - MongoDB ap_kv collection * @param {string} domain - Instance hostname * @returns {Promise<{ supported: boolean, error: string|null }>} */ -export async function checkInstanceTimeline(kvCollection, domain) { +export async function checkInstanceTimeline(domain) { const cacheKey = `fedidb:timeline-check:${domain}`; - const cached = await getFromCache(kvCollection, cacheKey); + const cached = await cacheGet(cacheKey); if (cached) return cached; try { @@ -193,7 +152,7 @@ export async function checkInstanceTimeline(kvCollection, domain) { result = { supported: false, error: errorMsg }; } - await writeToCache(kvCollection, cacheKey, result); + await cacheSet(cacheKey, result, CACHE_TTL_SECONDS); return result; } catch { return { supported: false, error: "Connection failed" }; @@ -206,13 +165,12 @@ export async function checkInstanceTimeline(kvCollection, domain) { * * Cached for 24 hours (single cache entry). * - * @param {object} kvCollection - MongoDB ap_kv collection * @param {number} [limit=50] - Max accounts to fetch * @returns {Promise} */ -export async function getPopularAccounts(kvCollection, limit = 50) { +export async function getPopularAccounts(limit = 50) { const cacheKey = `fedidb:popular-accounts:${limit}`; - const cached = await getFromCache(kvCollection, cacheKey); + const cached = await cacheGet(cacheKey); if (cached) return cached; try { @@ -234,7 +192,7 @@ export async function getPopularAccounts(kvCollection, limit = 50) { bio: (a.bio || "").replace(/<[^>]*>/g, "").slice(0, 120), })); - await writeToCache(kvCollection, cacheKey, results); + await cacheSet(cacheKey, results, CACHE_TTL_SECONDS); return results; } catch { return []; diff --git a/lib/migrations/separate-mentions.js b/lib/migrations/separate-mentions.js index 4ce1952..cfbea51 100644 --- a/lib/migrations/separate-mentions.js +++ b/lib/migrations/separate-mentions.js @@ -12,6 +12,8 @@ * New items will have URLs populated by the fixed extractObjectData() (Task 1). */ +import { cacheGet, cacheSet } from "../redis-cache.js"; + const MIGRATION_KEY = "migration:separate-mentions"; /** @@ -20,11 +22,11 @@ const MIGRATION_KEY = "migration:separate-mentions"; * @returns {Promise<{ skipped: boolean, updated: number }>} */ export async function runSeparateMentionsMigration(collections) { - const { ap_kv, ap_timeline } = collections; + const { ap_timeline } = collections; // Check if already completed - const state = await ap_kv.findOne({ _id: MIGRATION_KEY }); - if (state?.value?.completed) { + const state = await cacheGet(MIGRATION_KEY); + if (state?.completed) { return { skipped: true, updated: 0 }; } @@ -35,11 +37,7 @@ export async function runSeparateMentionsMigration(collections) { if (docs.length === 0) { // No docs to migrate — mark complete immediately - await ap_kv.updateOne( - { _id: MIGRATION_KEY }, - { $set: { value: { completed: true, date: new Date().toISOString(), updated: 0 } } }, - { upsert: true } - ); + await cacheSet(MIGRATION_KEY, { completed: true, date: new Date().toISOString(), updated: 0 }); return { skipped: false, updated: 0 }; } @@ -78,11 +76,7 @@ export async function runSeparateMentionsMigration(collections) { const updated = result.modifiedCount || 0; // Mark migration complete - await ap_kv.updateOne( - { _id: MIGRATION_KEY }, - { $set: { value: { completed: true, date: new Date().toISOString(), updated } } }, - { upsert: true } - ); + await cacheSet(MIGRATION_KEY, { completed: true, date: new Date().toISOString(), updated }); return { skipped: false, updated }; } diff --git a/lib/redis-cache.js b/lib/redis-cache.js new file mode 100644 index 0000000..5d3a465 --- /dev/null +++ b/lib/redis-cache.js @@ -0,0 +1,98 @@ +/** + * Redis-backed cache for plugin-level key-value storage. + * + * Replaces direct MongoDB ap_kv reads/writes for fedidb cache, + * batch-refollow state, and migration flags. Uses the same Redis + * connection as the Fedify message queue and KV store. + * + * All keys are prefixed with "indiekit:" to avoid collisions with + * Fedify's "fedify::" prefix. + */ + +import Redis from "ioredis"; + +const KEY_PREFIX = "indiekit:"; + +let _redis = null; + +/** + * Initialize the Redis cache with a connection URL. + * Safe to call multiple times — reuses existing connection. + * @param {string} redisUrl - Redis connection URL + */ +export function initRedisCache(redisUrl) { + if (_redis) return; + if (!redisUrl) return; + _redis = new Redis(redisUrl); +} + +/** + * Get the Redis client instance (for direct use if needed). + * @returns {import("ioredis").Redis|null} + */ +export function getRedisClient() { + return _redis; +} + +/** + * Get a value from Redis cache. + * @param {string} key + * @returns {Promise} + */ +export async function cacheGet(key) { + if (!_redis) return null; + try { + const raw = await _redis.get(KEY_PREFIX + key); + if (raw === null) return null; + return JSON.parse(raw); + } catch { + return null; + } +} + +/** + * Set a value in Redis cache with optional TTL. + * @param {string} key + * @param {unknown} value - Must be JSON-serializable + * @param {number} [ttlSeconds] - Optional TTL in seconds (0 = no expiry) + */ +export async function cacheSet(key, value, ttlSeconds = 0) { + if (!_redis) return; + try { + const raw = JSON.stringify(value); + if (ttlSeconds > 0) { + await _redis.set(KEY_PREFIX + key, raw, "EX", ttlSeconds); + } else { + await _redis.set(KEY_PREFIX + key, raw); + } + } catch { + // Cache write failure is non-critical + } +} + +/** + * Delete a key from Redis cache. + * @param {string} key + */ +export async function cacheDelete(key) { + if (!_redis) return; + try { + await _redis.del(KEY_PREFIX + key); + } catch { + // Ignore + } +} + +/** + * Check if a key exists in Redis cache. + * @param {string} key + * @returns {Promise} + */ +export async function cacheExists(key) { + if (!_redis) return false; + try { + return (await _redis.exists(KEY_PREFIX + key)) === 1; + } catch { + return false; + } +} diff --git a/package.json b/package.json index e100a7f..b7ac4dc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@rmdes/indiekit-endpoint-activitypub", - "version": "2.1.2", + "version": "2.2.0", "description": "ActivityPub federation endpoint for Indiekit via Fedify. Adds full fediverse support: actor, inbox, outbox, followers, following, syndication, and Mastodon migration.", "keywords": [ "indiekit",