למה האפליקציה שלך מרגישה איטית, ואיך Streaming פותר את זה
דמיין שיצרת chatbot לסטארטאפ ת"א. המשתמש שואל שאלה, רואה loading spinner, ומחכה 8 שניות עד שמופיע הכל בבת אחת. ברוב המקרים, הוא עוזב. Streaming פותר בדיוק את זה.
ב-API רגיל, שולחים בקשה ומחכים שהמודל יסיים לייצר את כל התשובה לפני שמשהו מגיע אליך. בתשובה ארוכה זה אומר 5-15 שניות של שתיקה, וגם סיכון ל-timeout. Streaming עובד אחרת: התשובה מגיעה פיסה-פיסה ברגע שכל מילה נוצרת, בדיוק כמו שרואים ב-claude.ai עצמו.
ה-time-to-first-token עם streaming הוא בדרך כלל מתחת ל-500ms. המשתמש רואה משהו מיד, ומרגיש שהמערכת חיה.
איך SSE עובד מאחורי הקלעים
Claude API משתמש ב-Server-Sent Events (SSE), פרוטוקול HTTP סטנדרטי שבו השרת שומר חיבור פתוח ודוחף נתונים ללקוח. בניגוד ל-WebSocket (דו-כיווני), SSE הוא חד-כיווני: שרת ללקוח בלבד. זה מספיק לחלוטין ל-streaming של טקסט.
כל event מגיע בפורמט קבוע:
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"שלום"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" עולם"}}
event: message_stop
data: {"type":"message_stop"}
סוגי events שצריך להכיר:
- message_start, תחילת ההודעה, כולל message ID ו-input_tokens
- content_block_delta עם
text_delta, פיסת הטקסט עצמה (הכי חשוב) - content_block_delta עם
input_json_delta, בעת שימוש ב-Tool Use - message_stop, הסיום; אחריו מגיע usage עם output_tokens
- ping, שרת שולח כל ~10 שניות כ-keepalive
Python: שלושה שלבים ל-Streaming
ה-Python SDK מציע את הדרך הנקייה ביותר למימוש. Context manager מבטיח שהחיבור ייסגר תמיד, גם בשגיאה:
import anthropic
client = anthropic.Anthropic()
# דוגמה: chatbot לצוות HR בחברה ישראלית
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
system="אתה עוזר HR של חברת טכנולוגיה ישראלית. ענה בעברית.",
messages=[{"role": "user", "content": "כתוב תיאור משרה למפתח backend בכיר"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Usage זמין רק אחרי שה-stream הסתיים
final = stream.get_final_message()
print(f"\nTokens: {final.usage.input_tokens} in / {final.usage.output_tokens} out")
שלוש נקודות חשובות: flush=True מבטיח הדפסה מיידית לטרמינל; stream.text_stream מסנן אוטומטית רק את פיסות הטקסט ומסתיר את שאר ה-events; get_final_message() מחזיר את ה-usage ו-stop_reason רק לאחר שהזרם נסגר.
TypeScript: Streaming ב-Next.js
ה-TypeScript SDK משתמש ב-event listeners. הנה Route Handler מלא לצ'אט ב-Next.js App Router:
// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
export async function POST(req: Request) {
const { message } = await req.json();
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
const stream = client.messages.stream({
model: "claude-opus-4-7",
max_tokens: 1024,
messages: [{ role: "user", content: message }],
});
// לטקסט בלבד, הדרך הקצרה
stream.on("text", (text) => {
controller.enqueue(encoder.encode(text));
});
// בסיום, נקה ואסוף usage
const final = await stream.finalMessage();
console.log("Usage:", final.usage);
controller.close();
},
});
return new Response(readable, {
headers: { "Content-Type": "text/plain; charset=utf-8" },
});
}
הלקוח ב-React יכול לקרוא את ה-stream עם Fetch ו-ReadableStream Reader בצד הדפדפן, כל chunk מגיע ומתווסף ל-state, מה שגורם ל-UI להתעדכן בזמן אמת.
שגיאות נפוצות שמפתחים עושים
טעות 1: לנסות לparse JSON בזמן streaming. אם אתה מצפה לתוצאה JSON מ-Claude (למשל structured output), אל תנסה לparse כל chunk, אתה תקבל JSON חלקי שיזרוק שגיאה. אסוף את כל ה-chunks ל-string, וparse רק בסוף, אחרי ה-`message_stop` event.
טעות 2: לא לסגור stream. אי-שימוש ב-context manager ב-Python (או אי-המתנה ל-`finalMessage()` ב-TypeScript) יגרום לדליפת חיבורי HTTP. לאורך זמן זה מוביל לבעיות ביצועים ולimits של connections במקביל.
טעות 3: להשתמש ב-streaming לכל בקשה. Streaming שימושי רק כשמשתמש צופה בתגובה בזמן אמת. ל-batch processing, עיבוד ברקע, ובניית pipelines, בקשה רגילה עדיפה: קוד פשוט יותר, קל יותר לparse תוצאות.
טיפול בשגיאות ו-Retry
שגיאות streaming שונות משגיאות API רגיל: החיבור יכול להיפסק באמצע תשובה. האסטרטגיה המומלצת:
import time
import anthropic
client = anthropic.Anthropic()
def stream_with_retry(messages: list, max_retries: int = 3) -> str:
"""Stream עם retry, שומר טקסט שהתקבל עד כה"""
for attempt in range(max_retries):
try:
result = ""
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
messages=messages
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
result += text
return result
except anthropic.APIConnectionError as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # exponential backoff: 1s, 2s, 4s
print(f"\nחיבור נקטע, מנסה שוב בעוד {wait}s...")
time.sleep(wait)
else:
raise
return ""
נקודה חשובה: לא ניתן ל-resume stream שנקטע מאמצע, צריך להתחיל מחדש. אם זה קריטי לאפליקציה שלך (תשובות ארוכות מאוד), שקול לשמור את ה-result בזמן אמת ולהציג למשתמש מה שהתקבל במקרה של כשל.
Streaming vs. Regular, מתי לבחור מה
| שימוש | Streaming | Regular |
|---|---|---|
| Chatbot / UI אינטראקטיבי | מומלץ | UX גרוע |
| Batch / עיבוד ברקע | מיותר | עדיף |
| JSON structured output | מסובך (parse בסוף בלבד) | פשוט יותר |
| תשובות ארוכות (1000+ מילים) | מניע timeout | סיכון timeout |
| Tool Use מורכב | מורכב (input_json_delta) | פשוט יותר |
