feat: wire settings into all consumers

This commit is contained in:
Ricardo
2026-03-31 21:49:07 +02:00
parent d0dc5e599c
commit 9443795dc9
6 changed files with 44 additions and 29 deletions
+11 -9
View File
@@ -4,9 +4,7 @@
* @module batch-broadcast * @module batch-broadcast
*/ */
import { logActivity } from "./activity-log.js"; import { logActivity } from "./activity-log.js";
import { getSettings } from "./settings.js";
const BATCH_SIZE = 25;
const BATCH_DELAY_MS = 5000;
/** /**
* Broadcast an activity to all followers via batch delivery. * Broadcast an activity to all followers via batch delivery.
@@ -29,6 +27,10 @@ export async function batchBroadcast({
label, label,
objectUrl, objectUrl,
}) { }) {
const settings = await getSettings(collections);
const batchSize = settings.broadcastBatchSize;
const batchDelay = settings.broadcastBatchDelay;
const ctx = federation.createContext(new URL(publicationUrl), { const ctx = federation.createContext(new URL(publicationUrl), {
handle, handle,
publicationUrl, publicationUrl,
@@ -54,11 +56,11 @@ export async function batchBroadcast({
console.info( console.info(
`[ActivityPub] Broadcasting ${label} to ${uniqueRecipients.length} ` + `[ActivityPub] Broadcasting ${label} to ${uniqueRecipients.length} ` +
`unique inboxes (${followers.length} followers) in batches of ${BATCH_SIZE}`, `unique inboxes (${followers.length} followers) in batches of ${batchSize}`,
); );
for (let i = 0; i < uniqueRecipients.length; i += BATCH_SIZE) { for (let i = 0; i < uniqueRecipients.length; i += batchSize) {
const batch = uniqueRecipients.slice(i, i + BATCH_SIZE); const batch = uniqueRecipients.slice(i, i + batchSize);
const recipients = batch.map((f) => ({ const recipients = batch.map((f) => ({
id: new URL(f.actorUrl), id: new URL(f.actorUrl),
inboxId: new URL(f.inbox || f.sharedInbox), inboxId: new URL(f.inbox || f.sharedInbox),
@@ -75,12 +77,12 @@ export async function batchBroadcast({
} catch (error) { } catch (error) {
failed += batch.length; failed += batch.length;
console.warn( console.warn(
`[ActivityPub] ${label} batch ${Math.floor(i / BATCH_SIZE) + 1} failed: ${error.message}`, `[ActivityPub] ${label} batch ${Math.floor(i / batchSize) + 1} failed: ${error.message}`,
); );
} }
if (i + BATCH_SIZE < uniqueRecipients.length) { if (i + batchSize < uniqueRecipients.length) {
await new Promise((resolve) => setTimeout(resolve, BATCH_DELAY_MS)); await new Promise((resolve) => setTimeout(resolve, batchDelay));
} }
} }
+13 -8
View File
@@ -16,10 +16,8 @@ import { lookupWithSecurity } from "./lookup-helpers.js";
import { Follow } from "@fedify/fedify/vocab"; import { Follow } from "@fedify/fedify/vocab";
import { logActivity } from "./activity-log.js"; import { logActivity } from "./activity-log.js";
import { cacheGet, cacheSet } from "./redis-cache.js"; import { cacheGet, cacheSet } from "./redis-cache.js";
import { getSettings } from "./settings.js";
const BATCH_SIZE = 10;
const DELAY_PER_FOLLOW = 3_000;
const DELAY_BETWEEN_BATCHES = 30_000;
const STARTUP_DELAY = 30_000; const STARTUP_DELAY = 30_000;
const RETRY_COOLDOWN = 60 * 60 * 1_000; // 1 hour const RETRY_COOLDOWN = 60 * 60 * 1_000; // 1 hour
const MAX_RETRIES = 3; const MAX_RETRIES = 3;
@@ -104,7 +102,9 @@ export async function resumeBatchRefollow(options) {
} }
await setJobState("running"); await setJobState("running");
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); const { collections: resumeCollections } = options;
const resumeSettings = await getSettings(resumeCollections);
_timer = setTimeout(() => processNextBatch(options), resumeSettings.refollowBatchDelay);
console.info("[ActivityPub] Batch refollow: resumed"); console.info("[ActivityPub] Batch refollow: resumed");
} }
@@ -158,9 +158,14 @@ async function processNextBatch(options) {
const state = await cacheGet(KV_KEY); const state = await cacheGet(KV_KEY);
if (state?.status !== "running") return; if (state?.status !== "running") return;
const settings = await getSettings(collections);
const batchSize = settings.refollowBatchSize;
const delayPerFollow = settings.refollowDelay;
const delayBetweenBatches = settings.refollowBatchDelay;
// Claim a batch atomically: set source to "refollow:pending" // Claim a batch atomically: set source to "refollow:pending"
const entries = []; const entries = [];
for (let i = 0; i < BATCH_SIZE; i++) { for (let i = 0; i < batchSize; i++) {
const doc = await collections.ap_following.findOneAndUpdate( const doc = await collections.ap_following.findOneAndUpdate(
{ source: "import" }, { source: "import" },
{ $set: { source: "refollow:pending" } }, { $set: { source: "refollow:pending" } },
@@ -172,7 +177,7 @@ async function processNextBatch(options) {
// Also pick up retryable entries (failed but not permanently) // Also pick up retryable entries (failed but not permanently)
const retryCutoff = new Date(Date.now() - RETRY_COOLDOWN).toISOString(); const retryCutoff = new Date(Date.now() - RETRY_COOLDOWN).toISOString();
const retrySlots = BATCH_SIZE - entries.length; const retrySlots = batchSize - entries.length;
for (let i = 0; i < retrySlots; i++) { for (let i = 0; i < retrySlots; i++) {
const doc = await collections.ap_following.findOneAndUpdate( const doc = await collections.ap_following.findOneAndUpdate(
{ {
@@ -211,14 +216,14 @@ async function processNextBatch(options) {
for (const entry of entries) { for (const entry of entries) {
await processOneFollow(options, entry); await processOneFollow(options, entry);
// Delay between individual follows // Delay between individual follows
await sleep(DELAY_PER_FOLLOW); await sleep(delayPerFollow);
} }
// Update job state timestamp // Update job state timestamp
await setJobState("running"); await setJobState("running");
// Schedule next batch // Schedule next batch
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES); _timer = setTimeout(() => processNextBatch(options), delayBetweenBatches);
} }
/** /**
+3 -1
View File
@@ -38,6 +38,7 @@ import { addNotification } from "./storage/notifications.js";
import { addMessage } from "./storage/messages.js"; import { addMessage } from "./storage/messages.js";
import { fetchAndStorePreviews, fetchAndStoreQuote } from "./og-unfurl.js"; import { fetchAndStorePreviews, fetchAndStoreQuote } from "./og-unfurl.js";
import { getFollowedTags } from "./storage/followed-tags.js"; import { getFollowedTags } from "./storage/followed-tags.js";
import { getSettings } from "./settings.js";
/** @type {string} ActivityStreams Public Collection constant */ /** @type {string} ActivityStreams Public Collection constant */
const PUBLIC = "https://www.w3.org/ns/activitystreams#Public"; const PUBLIC = "https://www.w3.org/ns/activitystreams#Public";
@@ -760,7 +761,8 @@ export async function handleCreate(item, collections, ctx, handle) {
// Each ancestor is stored with isContext: true to distinguish from organic timeline items. // Each ancestor is stored with isContext: true to distinguish from organic timeline items.
if (inReplyTo) { if (inReplyTo) {
try { try {
await fetchReplyChain(object, collections, authLoader, 5); const settings = await getSettings(collections);
await fetchReplyChain(object, collections, authLoader, settings.replyChainDepth);
} catch (error) { } catch (error) {
// Non-critical — incomplete context is acceptable // Non-critical — incomplete context is acceptable
console.warn("[inbox-handlers] Reply chain fetch failed:", error.message); console.warn("[inbox-handlers] Reply chain fetch failed:", error.message);
+3 -2
View File
@@ -60,10 +60,11 @@ router.get("/api/v1/accounts/verify_credentials", tokenRequired, scopeRequired("
// ─── GET /api/v1/preferences ───────────────────────────────────────────────── // ─── GET /api/v1/preferences ─────────────────────────────────────────────────
router.get("/api/v1/preferences", tokenRequired, scopeRequired("read", "read:accounts"), (req, res) => { router.get("/api/v1/preferences", tokenRequired, scopeRequired("read", "read:accounts"), (req, res) => {
const apSettings = req.app.locals.apSettings;
res.json({ res.json({
"posting:default:visibility": "public", "posting:default:visibility": apSettings?.defaultVisibility || "public",
"posting:default:sensitive": false, "posting:default:sensitive": false,
"posting:default:language": "en", "posting:default:language": apSettings?.defaultLanguage || "en",
"reading:expand:media": "default", "reading:expand:media": "default",
"reading:expand:spoilers": false, "reading:expand:spoilers": false,
}); });
+8 -6
View File
@@ -17,6 +17,7 @@ router.get("/api/v2/instance", async (req, res, next) => {
const domain = req.get("host"); const domain = req.get("host");
const collections = req.app.locals.mastodonCollections; const collections = req.app.locals.mastodonCollections;
const pluginOptions = req.app.locals.mastodonPluginOptions || {}; const pluginOptions = req.app.locals.mastodonPluginOptions || {};
const apSettings = req.app.locals.apSettings;
const profile = await collections.ap_profile.findOne({}); const profile = await collections.ap_profile.findOne({});
const contactAccount = profile const contactAccount = profile
@@ -44,7 +45,7 @@ router.get("/api/v2/instance", async (req, res, next) => {
versions: {}, versions: {},
}, },
icon: [], icon: [],
languages: ["en"], languages: apSettings?.instanceLanguages || ["en"],
configuration: { configuration: {
urls: { urls: {
streaming: "", streaming: "",
@@ -54,8 +55,8 @@ router.get("/api/v2/instance", async (req, res, next) => {
max_pinned_statuses: 10, max_pinned_statuses: 10,
}, },
statuses: { statuses: {
max_characters: 5000, max_characters: apSettings?.maxCharacters || 5000,
max_media_attachments: 4, max_media_attachments: apSettings?.maxMediaAttachments || 4,
characters_reserved_per_url: 23, characters_reserved_per_url: 23,
}, },
media_attachments: { media_attachments: {
@@ -116,6 +117,7 @@ router.get("/api/v1/instance", async (req, res, next) => {
const domain = req.get("host"); const domain = req.get("host");
const collections = req.app.locals.mastodonCollections; const collections = req.app.locals.mastodonCollections;
const pluginOptions = req.app.locals.mastodonPluginOptions || {}; const pluginOptions = req.app.locals.mastodonPluginOptions || {};
const apSettings = req.app.locals.apSettings;
const profile = await collections.ap_profile.findOne({}); const profile = await collections.ap_profile.findOne({});
@@ -160,14 +162,14 @@ router.get("/api/v1/instance", async (req, res, next) => {
domain_count: domainCount, domain_count: domainCount,
}, },
thumbnail: profile?.icon || null, thumbnail: profile?.icon || null,
languages: ["en"], languages: apSettings?.instanceLanguages || ["en"],
registrations: false, registrations: false,
approval_required: true, approval_required: true,
invites_enabled: false, invites_enabled: false,
configuration: { configuration: {
statuses: { statuses: {
max_characters: 5000, max_characters: apSettings?.maxCharacters || 5000,
max_media_attachments: 4, max_media_attachments: apSettings?.maxMediaAttachments || 4,
characters_reserved_per_url: 23, characters_reserved_per_url: 23,
}, },
media_attachments: { media_attachments: {
+6 -3
View File
@@ -502,13 +502,15 @@ router.post("/oauth/token", async (req, res, next) => {
// Rotate: new access token + new refresh token // Rotate: new access token + new refresh token
const newAccessToken = randomHex(64); const newAccessToken = randomHex(64);
const newRefreshToken = randomHex(64); const newRefreshToken = randomHex(64);
const refreshTtlDaysRotate = req.app.locals.apSettings?.refreshTokenTtlDays || 90;
const refreshTtlMsRotate = refreshTtlDaysRotate * 24 * 3600 * 1000;
await collections.ap_oauth_tokens.updateOne( await collections.ap_oauth_tokens.updateOne(
{ _id: existing._id }, { _id: existing._id },
{ {
$set: { $set: {
accessToken: newAccessToken, accessToken: newAccessToken,
refreshToken: newRefreshToken, refreshToken: newRefreshToken,
refreshExpiresAt: new Date(Date.now() + 90 * 24 * 3600 * 1000), refreshExpiresAt: new Date(Date.now() + refreshTtlMsRotate),
}, },
$unset: { expiresAt: "" }, $unset: { expiresAt: "" },
}, },
@@ -589,8 +591,9 @@ router.post("/oauth/token", async (req, res, next) => {
// Generate access token and refresh token. // Generate access token and refresh token.
// Access tokens do not expire (matching Mastodon behavior — valid until revoked). // Access tokens do not expire (matching Mastodon behavior — valid until revoked).
// Refresh tokens expire after 90 days as a safety measure. // Refresh tokens expire after a configurable number of days (default 90).
const REFRESH_TOKEN_TTL = 90 * 24 * 3600 * 1000; // 90 days const refreshTtlDays = req.app.locals.apSettings?.refreshTokenTtlDays || 90;
const REFRESH_TOKEN_TTL = refreshTtlDays * 24 * 3600 * 1000;
const accessToken = randomHex(64); const accessToken = randomHex(64);
const refreshToken = randomHex(64); const refreshToken = randomHex(64);
await collections.ap_oauth_tokens.updateOne( await collections.ap_oauth_tokens.updateOne(