/** * Feed processing pipeline * @module polling/processor */ import { getRedisClient, publishEvent } from "../cache/redis.js"; import { detectCapabilities } from "../feeds/capabilities.js"; import { fetchAndParseFeed } from "../feeds/fetcher.js"; import { getChannel } from "../storage/channels.js"; import { updateFeed, updateFeedAfterFetch, updateFeedStatus, updateFeedWebsub, } from "../storage/feeds.js"; import { passesRegexFilter, passesTypeFilter } from "../storage/filters.js"; import { addItem } from "../storage/items.js"; import { subscribe as websubSubscribe, getCallbackUrl, } from "../websub/subscriber.js"; import { calculateNewTier } from "./tier.js"; /** * Process a single feed * @param {object} application - Indiekit application * @param {object} feed - Feed document from database * @returns {Promise} Processing result */ export async function processFeed(application, feed) { const startTime = Date.now(); const result = { feedId: feed._id, url: feed.url, success: false, itemsAdded: 0, error: undefined, }; try { // Get Redis client for caching const redis = getRedisClient(application); // Fetch and parse the feed const parsed = await fetchAndParseFeed(feed.url, { etag: feed.etag, lastModified: feed.lastModified, redis, }); // Handle 304 Not Modified if (parsed.notModified) { const tierResult = calculateNewTier({ currentTier: feed.tier, hasNewItems: false, consecutiveUnchanged: feed.unmodified || 0, }); await updateFeedAfterFetch(application, feed._id, false, { tier: tierResult.tier, unmodified: tierResult.consecutiveUnchanged, nextFetchAt: tierResult.nextFetchAt, }); result.success = true; result.notModified = true; return result; } // Get channel for filtering const channel = await getChannel(application, feed.channelId); // Process items let newItemCount = 0; for (const item of parsed.items) { // Apply channel filters if (channel?.settings && !passesFilters(item, channel.settings)) { continue; } // Enrich item source with feed metadata if (item._source) { item._source.name = feed.title || parsed.name; } // Attach source_type from feed capabilities (for protocol indicators) // Falls back to URL-based inference when capabilities haven't been detected yet item._source = item._source || {}; if (feed.capabilities?.source_type) { item._source.source_type = feed.capabilities.source_type; } else { item._source.source_type = inferSourceType(feed.url); } // Store the item const stored = await addItem(application, { channelId: feed.channelId, feedId: feed._id, uid: item.uid, item, }); if (stored) { newItemCount++; // Publish real-time event if (redis) { await publishEvent(redis, `microsub:${feed.channelId}`, { type: "new-item", channelId: feed.channelId.toString(), item: stored, }); } } } result.itemsAdded = newItemCount; // Update tier based on whether we found new items const tierResult = calculateNewTier({ currentTier: feed.tier, hasNewItems: newItemCount > 0, consecutiveUnchanged: newItemCount > 0 ? 0 : feed.unmodified || 0, }); // Update feed metadata const updateData = { tier: tierResult.tier, unmodified: tierResult.consecutiveUnchanged, nextFetchAt: tierResult.nextFetchAt, etag: parsed.etag, lastModified: parsed.lastModified, }; // Update feed title/photo/feedType if discovered if (parsed.name && !feed.title) { updateData.title = parsed.name; } if (parsed.photo && !feed.photo) { updateData.photo = parsed.photo; } if (parsed.feedType && !feed.feedType) { updateData.feedType = parsed.feedType; } await updateFeedAfterFetch( application, feed._id, newItemCount > 0, updateData, ); // Handle WebSub hub discovery and auto-subscription if (parsed.hub && (!feed.websub || feed.websub.hub !== parsed.hub)) { await updateFeedWebsub(application, feed._id, { hub: parsed.hub, topic: parsed.self || feed.url, }); // Auto-subscribe to WebSub hub if we have a callback URL const baseUrl = application.url; if (baseUrl) { const callbackUrl = getCallbackUrl(baseUrl, feed._id.toString()); const updatedFeed = { ...feed, websub: { hub: parsed.hub, topic: parsed.self || feed.url }, }; websubSubscribe(application, updatedFeed, callbackUrl) .then((subscribed) => { if (subscribed) { console.info( `[Microsub] WebSub subscription initiated for ${feed.url}`, ); } }) .catch((error) => { console.error( `[Microsub] WebSub subscription error for ${feed.url}:`, error.message, ); }); } } result.success = true; result.tier = tierResult.tier; // Update feed status to active on success await updateFeedStatus(application, feed._id, { success: true, itemCount: parsed.items?.length || 0, }); // Detect source capabilities on first successful fetch (if not yet detected) if (!feed.capabilities) { detectCapabilities(feed.url) .then((capabilities) => { updateFeed(application, feed._id, { capabilities }).catch(() => {}); }) .catch((error) => { console.debug( `[Microsub] Capability detection skipped for ${feed.url}:`, error.message, ); }); } } catch (error) { result.error = error.message; // Update feed status to error await updateFeedStatus(application, feed._id, { success: false, error: error.message, }); // Still update the feed to prevent retry storms try { const tierResult = calculateNewTier({ currentTier: feed.tier, hasNewItems: false, consecutiveUnchanged: (feed.unmodified || 0) + 1, }); await updateFeedAfterFetch(application, feed._id, false, { tier: Math.min(tierResult.tier + 1, 10), // Increase tier on error unmodified: tierResult.consecutiveUnchanged, nextFetchAt: tierResult.nextFetchAt, }); } catch { // Ignore update errors } } result.duration = Date.now() - startTime; return result; } /** * Infer source type from feed URL when capabilities haven't been detected yet * @param {string} url - Feed URL * @returns {string} Source type */ function inferSourceType(url) { if (!url) return "web"; const lower = url.toLowerCase(); if (lower.includes("bsky.app") || lower.includes("bluesky")) return "bluesky"; if (lower.includes("mastodon.") || lower.includes("mstdn.") || lower.includes("fosstodon.") || lower.includes("pleroma.") || lower.includes("misskey.") || lower.includes("pixelfed.")) return "mastodon"; return "web"; } /** * Check if an item passes channel filters * @param {object} item - Feed item * @param {object} settings - Channel settings * @returns {boolean} Whether the item passes filters */ function passesFilters(item, settings) { return passesTypeFilter(item, settings) && passesRegexFilter(item, settings); } /** * Process multiple feeds in batch * @param {object} application - Indiekit application * @param {Array} feeds - Array of feed documents * @param {object} options - Processing options * @returns {Promise} Batch processing result */ export async function processFeedBatch(application, feeds, options = {}) { const { concurrency = 5 } = options; const results = []; // Process in batches with limited concurrency for (let index = 0; index < feeds.length; index += concurrency) { const batch = feeds.slice(index, index + concurrency); const batchResults = await Promise.all( batch.map((feed) => processFeed(application, feed)), ); results.push(...batchResults); } return { total: feeds.length, successful: results.filter((r) => r.success).length, failed: results.filter((r) => !r.success).length, itemsAdded: results.reduce((sum, r) => sum + r.itemsAdded, 0), results, }; }