fix: use XHR instead of fetch for SSE streaming

Obsidian patches the global fetch in a way that buffers the full response,
causing "Failed to fetch" errors. XHR with onprogress delivers chunks
incrementally and is unaffected by Obsidian's patching.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
svemagie
2026-03-31 00:44:07 +02:00
parent 5c801b0df8
commit 116681cf64
2 changed files with 133 additions and 109 deletions
+74 -53
View File
@@ -32728,65 +32728,86 @@ var ClaudeClient = class {
"anthropic-version": "2023-06-01" "anthropic-version": "2023-06-01"
}; };
} }
/** Stream a chat completion via fetch + SSE, yielding text chunks as they arrive. */ /**
* Stream a chat completion via XHR + SSE, yielding text chunks as they arrive.
* Uses XHR instead of fetch because Obsidian patches the global fetch in a way
* that buffers the full response, breaking streaming.
*/
async *streamChat(messages, options) { async *streamChat(messages, options) {
let response; const queue = [];
try { let done = false;
response = await fetch(this.baseUrl, { let wakeup = null;
method: "POST", const push = (c) => {
headers: this.headers(options.apiKey), queue.push(c);
body: JSON.stringify({ wakeup?.();
model: options.model, wakeup = null;
max_tokens: options.maxTokens ?? 8192, };
system: options.systemPrompt, const finish = () => {
messages, done = true;
stream: true wakeup?.();
}) wakeup = null;
}); };
} catch (e) { const xhr = new XMLHttpRequest();
yield { type: "error", error: e.message }; xhr.open("POST", this.baseUrl, true);
return; for (const [k, v] of Object.entries(this.headers(options.apiKey))) {
xhr.setRequestHeader(k, v);
} }
if (!response.ok) { let linesCursor = 0;
const text = await response.text(); const parseSse = (allDone) => {
yield { type: "error", error: `API Error ${response.status}: ${text}` }; const lines = xhr.responseText.split("\n");
return; const limit = allDone ? lines.length : lines.length - 1;
} for (let i = linesCursor; i < limit; i++) {
const reader = response.body?.getReader(); const line = lines[i];
if (!reader) { if (!line.startsWith("data: "))
yield { type: "error", error: "No response body" }; continue;
return; const data = line.slice(6).trim();
} if (data === "[DONE]")
const decoder = new TextDecoder(); return;
let buffer = ""; try {
try { const ev = JSON.parse(data);
while (true) { if (ev.type === "content_block_delta" && ev.delta?.type === "text_delta") {
const { done, value } = await reader.read(); push({ type: "text", text: ev.delta.text });
if (done)
break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: "))
continue;
const data = line.slice(6).trim();
if (data === "[DONE]") {
yield { type: "done" };
return;
}
try {
const event = JSON.parse(data);
if (event.type === "content_block_delta" && event.delta?.type === "text_delta") {
yield { type: "text", text: event.delta.text };
}
} catch {
} }
} catch {
} }
} }
} finally { linesCursor = limit;
reader.releaseLock(); };
xhr.onprogress = () => parseSse(false);
xhr.onload = () => {
if (xhr.status >= 400) {
push({ type: "error", error: `API Error ${xhr.status}: ${xhr.responseText}` });
} else {
parseSse(true);
}
finish();
};
xhr.onerror = () => {
push({ type: "error", error: "Network error" });
finish();
};
xhr.ontimeout = () => {
push({ type: "error", error: "Request timed out" });
finish();
};
xhr.send(JSON.stringify({
model: options.model,
max_tokens: options.maxTokens ?? 8192,
system: options.systemPrompt,
messages,
stream: true
}));
while (true) {
while (queue.length)
yield queue.shift();
if (done)
break;
await new Promise((r) => {
wakeup = r;
});
} }
while (queue.length)
yield queue.shift();
yield { type: "done" }; yield { type: "done" };
} }
/** Non-streaming convenience wrapper */ /** Non-streaming convenience wrapper */
+59 -56
View File
@@ -30,71 +30,74 @@ export class ClaudeClient {
}; };
} }
/** Stream a chat completion via fetch + SSE, yielding text chunks as they arrive. */ /**
* Stream a chat completion via XHR + SSE, yielding text chunks as they arrive.
* Uses XHR instead of fetch because Obsidian patches the global fetch in a way
* that buffers the full response, breaking streaming.
*/
async *streamChat( async *streamChat(
messages: ClaudeMessage[], messages: ClaudeMessage[],
options: ClaudeOptions options: ClaudeOptions
): AsyncGenerator<ClaudeStreamChunk> { ): AsyncGenerator<ClaudeStreamChunk> {
let response: Response; const queue: ClaudeStreamChunk[] = [];
try { let done = false;
response = await fetch(this.baseUrl, { let wakeup: (() => void) | null = null;
method: "POST",
headers: this.headers(options.apiKey), const push = (c: ClaudeStreamChunk) => { queue.push(c); wakeup?.(); wakeup = null; };
body: JSON.stringify({ const finish = () => { done = true; wakeup?.(); wakeup = null; };
model: options.model,
max_tokens: options.maxTokens ?? 8192, const xhr = new XMLHttpRequest();
system: options.systemPrompt, xhr.open("POST", this.baseUrl, true);
messages, for (const [k, v] of Object.entries(this.headers(options.apiKey))) {
stream: true, xhr.setRequestHeader(k, v);
}),
});
} catch (e) {
yield { type: "error", error: (e as Error).message };
return;
} }
if (!response.ok) { // Parse SSE lines from xhr.responseText; linesCursor avoids reprocessing old lines.
const text = await response.text(); let linesCursor = 0;
yield { type: "error", error: `API Error ${response.status}: ${text}` }; const parseSse = (allDone: boolean) => {
return; const lines = xhr.responseText.split("\n");
} const limit = allDone ? lines.length : lines.length - 1; // skip last (may be partial)
for (let i = linesCursor; i < limit; i++) {
const reader = response.body?.getReader(); const line = lines[i];
if (!reader) { if (!line.startsWith("data: ")) continue;
yield { type: "error", error: "No response body" }; const data = line.slice(6).trim();
return; if (data === "[DONE]") return;
} try {
const ev = JSON.parse(data);
const decoder = new TextDecoder(); if (ev.type === "content_block_delta" && ev.delta?.type === "text_delta") {
let buffer = ""; push({ type: "text", text: ev.delta.text });
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") { yield { type: "done" }; return; }
try {
const event = JSON.parse(data);
if (event.type === "content_block_delta" && event.delta?.type === "text_delta") {
yield { type: "text", text: event.delta.text };
}
} catch {
// skip malformed SSE lines
} }
} } catch { /* skip malformed lines */ }
} }
} finally { linesCursor = limit;
reader.releaseLock(); };
}
xhr.onprogress = () => parseSse(false);
xhr.onload = () => {
if (xhr.status >= 400) {
push({ type: "error", error: `API Error ${xhr.status}: ${xhr.responseText}` });
} else {
parseSse(true);
}
finish();
};
xhr.onerror = () => { push({ type: "error", error: "Network error" }); finish(); };
xhr.ontimeout = () => { push({ type: "error", error: "Request timed out" }); finish(); };
xhr.send(JSON.stringify({
model: options.model,
max_tokens: options.maxTokens ?? 8192,
system: options.systemPrompt,
messages,
stream: true,
}));
while (true) {
while (queue.length) yield queue.shift()!;
if (done) break;
await new Promise<void>(r => { wakeup = r; });
}
while (queue.length) yield queue.shift()!;
yield { type: "done" }; yield { type: "done" };
} }