From 116681cf64f7588eb165fa30fd4923a74d94b4e5 Mon Sep 17 00:00:00 2001 From: svemagie <869694+svemagie@users.noreply.github.com> Date: Tue, 31 Mar 2026 00:44:07 +0200 Subject: [PATCH] fix: use XHR instead of fetch for SSE streaming Obsidian patches the global fetch in a way that buffers the full response, causing "Failed to fetch" errors. XHR with onprogress delivers chunks incrementally and is unaffected by Obsidian's patching. Co-Authored-By: Claude Sonnet 4.6 --- main.js | 127 ++++++++++++++++++++++++++------------------ src/ClaudeClient.ts | 115 ++++++++++++++++++++------------------- 2 files changed, 133 insertions(+), 109 deletions(-) diff --git a/main.js b/main.js index 19f93ce..23968d1 100644 --- a/main.js +++ b/main.js @@ -32728,65 +32728,86 @@ var ClaudeClient = class { "anthropic-version": "2023-06-01" }; } - /** Stream a chat completion via fetch + SSE, yielding text chunks as they arrive. */ + /** + * Stream a chat completion via XHR + SSE, yielding text chunks as they arrive. + * Uses XHR instead of fetch because Obsidian patches the global fetch in a way + * that buffers the full response, breaking streaming. + */ async *streamChat(messages, options) { - let response; - try { - response = await fetch(this.baseUrl, { - method: "POST", - headers: this.headers(options.apiKey), - body: JSON.stringify({ - model: options.model, - max_tokens: options.maxTokens ?? 8192, - system: options.systemPrompt, - messages, - stream: true - }) - }); - } catch (e) { - yield { type: "error", error: e.message }; - return; + const queue = []; + let done = false; + let wakeup = null; + const push = (c) => { + queue.push(c); + wakeup?.(); + wakeup = null; + }; + const finish = () => { + done = true; + wakeup?.(); + wakeup = null; + }; + const xhr = new XMLHttpRequest(); + xhr.open("POST", this.baseUrl, true); + for (const [k, v] of Object.entries(this.headers(options.apiKey))) { + xhr.setRequestHeader(k, v); } - if (!response.ok) { - const text = await response.text(); - yield { type: "error", error: `API Error ${response.status}: ${text}` }; - return; - } - const reader = response.body?.getReader(); - if (!reader) { - yield { type: "error", error: "No response body" }; - return; - } - const decoder = new TextDecoder(); - let buffer = ""; - try { - while (true) { - const { done, value } = await reader.read(); - if (done) - break; - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split("\n"); - buffer = lines.pop() ?? ""; - for (const line of lines) { - if (!line.startsWith("data: ")) - continue; - const data = line.slice(6).trim(); - if (data === "[DONE]") { - yield { type: "done" }; - return; - } - try { - const event = JSON.parse(data); - if (event.type === "content_block_delta" && event.delta?.type === "text_delta") { - yield { type: "text", text: event.delta.text }; - } - } catch { + let linesCursor = 0; + const parseSse = (allDone) => { + const lines = xhr.responseText.split("\n"); + const limit = allDone ? lines.length : lines.length - 1; + for (let i = linesCursor; i < limit; i++) { + const line = lines[i]; + if (!line.startsWith("data: ")) + continue; + const data = line.slice(6).trim(); + if (data === "[DONE]") + return; + try { + const ev = JSON.parse(data); + if (ev.type === "content_block_delta" && ev.delta?.type === "text_delta") { + push({ type: "text", text: ev.delta.text }); } + } catch { } } - } finally { - reader.releaseLock(); + linesCursor = limit; + }; + xhr.onprogress = () => parseSse(false); + xhr.onload = () => { + if (xhr.status >= 400) { + push({ type: "error", error: `API Error ${xhr.status}: ${xhr.responseText}` }); + } else { + parseSse(true); + } + finish(); + }; + xhr.onerror = () => { + push({ type: "error", error: "Network error" }); + finish(); + }; + xhr.ontimeout = () => { + push({ type: "error", error: "Request timed out" }); + finish(); + }; + xhr.send(JSON.stringify({ + model: options.model, + max_tokens: options.maxTokens ?? 8192, + system: options.systemPrompt, + messages, + stream: true + })); + while (true) { + while (queue.length) + yield queue.shift(); + if (done) + break; + await new Promise((r) => { + wakeup = r; + }); } + while (queue.length) + yield queue.shift(); yield { type: "done" }; } /** Non-streaming convenience wrapper */ diff --git a/src/ClaudeClient.ts b/src/ClaudeClient.ts index 19beff6..3f11b30 100644 --- a/src/ClaudeClient.ts +++ b/src/ClaudeClient.ts @@ -30,71 +30,74 @@ export class ClaudeClient { }; } - /** Stream a chat completion via fetch + SSE, yielding text chunks as they arrive. */ + /** + * Stream a chat completion via XHR + SSE, yielding text chunks as they arrive. + * Uses XHR instead of fetch because Obsidian patches the global fetch in a way + * that buffers the full response, breaking streaming. + */ async *streamChat( messages: ClaudeMessage[], options: ClaudeOptions ): AsyncGenerator { - let response: Response; - try { - response = await fetch(this.baseUrl, { - method: "POST", - headers: this.headers(options.apiKey), - body: JSON.stringify({ - model: options.model, - max_tokens: options.maxTokens ?? 8192, - system: options.systemPrompt, - messages, - stream: true, - }), - }); - } catch (e) { - yield { type: "error", error: (e as Error).message }; - return; + const queue: ClaudeStreamChunk[] = []; + let done = false; + let wakeup: (() => void) | null = null; + + const push = (c: ClaudeStreamChunk) => { queue.push(c); wakeup?.(); wakeup = null; }; + const finish = () => { done = true; wakeup?.(); wakeup = null; }; + + const xhr = new XMLHttpRequest(); + xhr.open("POST", this.baseUrl, true); + for (const [k, v] of Object.entries(this.headers(options.apiKey))) { + xhr.setRequestHeader(k, v); } - if (!response.ok) { - const text = await response.text(); - yield { type: "error", error: `API Error ${response.status}: ${text}` }; - return; - } - - const reader = response.body?.getReader(); - if (!reader) { - yield { type: "error", error: "No response body" }; - return; - } - - const decoder = new TextDecoder(); - let buffer = ""; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split("\n"); - buffer = lines.pop() ?? ""; - - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (data === "[DONE]") { yield { type: "done" }; return; } - try { - const event = JSON.parse(data); - if (event.type === "content_block_delta" && event.delta?.type === "text_delta") { - yield { type: "text", text: event.delta.text }; - } - } catch { - // skip malformed SSE lines + // Parse SSE lines from xhr.responseText; linesCursor avoids reprocessing old lines. + let linesCursor = 0; + const parseSse = (allDone: boolean) => { + const lines = xhr.responseText.split("\n"); + const limit = allDone ? lines.length : lines.length - 1; // skip last (may be partial) + for (let i = linesCursor; i < limit; i++) { + const line = lines[i]; + if (!line.startsWith("data: ")) continue; + const data = line.slice(6).trim(); + if (data === "[DONE]") return; + try { + const ev = JSON.parse(data); + if (ev.type === "content_block_delta" && ev.delta?.type === "text_delta") { + push({ type: "text", text: ev.delta.text }); } - } + } catch { /* skip malformed lines */ } } - } finally { - reader.releaseLock(); - } + linesCursor = limit; + }; + xhr.onprogress = () => parseSse(false); + xhr.onload = () => { + if (xhr.status >= 400) { + push({ type: "error", error: `API Error ${xhr.status}: ${xhr.responseText}` }); + } else { + parseSse(true); + } + finish(); + }; + xhr.onerror = () => { push({ type: "error", error: "Network error" }); finish(); }; + xhr.ontimeout = () => { push({ type: "error", error: "Request timed out" }); finish(); }; + + xhr.send(JSON.stringify({ + model: options.model, + max_tokens: options.maxTokens ?? 8192, + system: options.systemPrompt, + messages, + stream: true, + })); + + while (true) { + while (queue.length) yield queue.shift()!; + if (done) break; + await new Promise(r => { wakeup = r; }); + } + while (queue.length) yield queue.shift()!; yield { type: "done" }; }