Streaming Guide
NeuroLink server adapters provide a robust streaming infrastructure for delivering AI responses in real-time. This guide covers the Data Stream Protocol, event types, streaming formats, and client-side consumption patterns.
Overview
Streaming enables real-time delivery of AI-generated content, tool call notifications, and error handling. NeuroLink implements a structured Data Stream Protocol compatible with the AI SDK's data stream format.
Key Benefits:
- Real-time responses - Users see content as it's generated
- Better UX - No waiting for complete responses
- Tool visibility - Stream tool calls and results as they happen
- Error handling - Graceful error reporting mid-stream
- Connection resilience - Keep-alive signals maintain connections
Quick Start
The /api/agent/stream endpoint is automatically available on all server adapters:
curl -X POST http://localhost:3000/api/agent/stream \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"input": "Write a haiku about coding"}'
Response (SSE format):
event: text-start
data: {"id":"text-1738000000000"}
event: text-delta
data: {"id":"text-1738000000000","delta":"Silent"}
event: text-delta
data: {"id":"text-1738000000000","delta":" keystrokes"}
event: text-delta
data: {"id":"text-1738000000000","delta":" flow"}
event: text-end
data: {"id":"text-1738000000000"}
event: finish
data: {"reason":"stop","usage":{"input":10,"output":15,"total":25}}
Stream Event Types
NeuroLink defines 8 event types for comprehensive streaming:
Text Events
| Event | Description | Data Fields |
|---|---|---|
text-start | Signals the beginning of a text response | id |
text-delta | Contains a chunk of generated text | id, delta |
text-end | Signals the end of a text response | id |
Tool Events
| Event | Description | Data Fields |
|---|---|---|
tool-call | Notification that a tool is being called | id, name, arguments |
tool-result | Result returned from a tool execution | id, name, result |
Control Events
| Event | Description | Data Fields |
|---|---|---|
data | Arbitrary data payload | any |
error | Error occurred during streaming | message, code |
finish | Stream completed | reason, usage |
DataStreamWriter Interface
The DataStreamWriter interface provides methods for writing structured stream events:
import { createDataStreamWriter } from "@juspay/neurolink";
const writer = createDataStreamWriter({
write: (chunk: string) => res.write(chunk),
close: () => res.end(),
format: "sse", // or "ndjson"
includeTimestamps: true,
});
// Write text events
await writer.writeTextStart("response-1");
await writer.writeTextDelta("response-1", "Hello, ");
await writer.writeTextDelta("response-1", "world!");
await writer.writeTextEnd("response-1");
// Write tool events
await writer.writeToolCall({
id: "tool-1",
name: "getCurrentTime",
arguments: { timezone: "UTC" },
});
await writer.writeToolResult({
id: "tool-1",
name: "getCurrentTime",
result: { time: "2026-02-02T10:30:00Z" },
});
// Write arbitrary data
await writer.writeData({ customField: "value" });
// Write error
await writer.writeError({
message: "Something went wrong",
code: "STREAM_ERROR",
});
// Close the stream
await writer.close();
Interface Methods
| Method | Description |
|---|---|
writeTextStart(id) | Begin a text response block |
writeTextDelta(id, delta) | Write a text chunk |
writeTextEnd(id) | End a text response block |
writeToolCall(toolCall) | Notify of a tool invocation |
writeToolResult(toolResult) | Report tool execution result |
writeData(data) | Write arbitrary JSON data |
writeError(error) | Report an error |
close() | Close the stream |
DataStreamResponse Class
For convenience, use DataStreamResponse to create a complete streaming response:
import {
DataStreamResponse,
createDataStreamResponse,
} from "@juspay/neurolink";
// Option 1: Using the class directly
const streamResponse = new DataStreamResponse({
contentType: "text/event-stream",
keepAliveInterval: 15000, // 15 seconds
includeTimestamps: true,
});
// Write events directly on the response
await streamResponse.writeTextStart("msg-1");
await streamResponse.writeTextDelta("msg-1", "Streaming content...");
await streamResponse.writeTextEnd("msg-1");
// Finish with usage statistics
await streamResponse.finish({
reason: "stop",
usage: { input: 10, output: 25, total: 35 },
});
// Option 2: Using the factory function
const response = createDataStreamResponse({
contentType: "application/x-ndjson",
keepAliveInterval: 30000,
});
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
contentType | "text/event-stream" | "application/x-ndjson" | "text/event-stream" | Stream format |
headers | Record<string, string> | {} | Additional response headers |
keepAliveInterval | number | undefined | Keep-alive ping interval (ms) |
includeTimestamps | boolean | true | Include timestamps in events |
SSE vs NDJSON Formats
NeuroLink supports two streaming formats. Choose based on your requirements:
Server-Sent Events (SSE)
Content-Type: text/event-stream
Best for:
- Browser-based clients using
EventSource - Standard HTTP/1.1 connections
- Automatic reconnection handling
- Event type differentiation
Format example:
event: text-delta
data: {"id":"msg-1","delta":"Hello"}
id: msg-1
event: text-delta
data: {"id":"msg-1","delta":" world"}
id: msg-1
Client-side usage:
const eventSource = new EventSource("/api/agent/stream");
eventSource.addEventListener("text-delta", (event) => {
const data = JSON.parse(event.data);
console.log(data.delta);
});
eventSource.addEventListener("finish", (event) => {
const data = JSON.parse(event.data);
console.log("Stream finished:", data.reason);
eventSource.close();
});
eventSource.addEventListener("error", (event) => {
console.error("Stream error:", event);
});
Newline-Delimited JSON (NDJSON)
Content-Type: application/x-ndjson
Best for:
- Server-to-server communication
- Custom stream processing
- Simpler parsing logic
- HTTP/2 connections
Format example:
{"type":"text-delta","id":"msg-1","timestamp":1738000000000,"data":{"id":"msg-1","delta":"Hello"}}
{"type":"text-delta","id":"msg-1","timestamp":1738000000001,"data":{"id":"msg-1","delta":" world"}}
{"type":"finish","timestamp":1738000000100,"data":{"reason":"stop"}}
Client-side usage:
const response = await fetch("/api/agent/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/x-ndjson",
},
body: JSON.stringify({ input: "Hello" }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.trim()) {
const event = JSON.parse(line);
console.log(event.type, event.data);
}
}
}
Header Helper Functions
import { createSSEHeaders, createNDJSONHeaders } from "@juspay/neurolink";
// SSE headers
const sseHeaders = createSSEHeaders({
"X-Custom-Header": "value",
});
// Returns:
// {
// "Content-Type": "text/event-stream",
// "Cache-Control": "no-cache, no-transform",
// "Connection": "keep-alive",
// "X-Accel-Buffering": "no",
// "X-Custom-Header": "value"
// }
// NDJSON headers
const ndjsonHeaders = createNDJSONHeaders({
"X-Custom-Header": "value",
});
// Returns:
// {
// "Content-Type": "application/x-ndjson",
// "Cache-Control": "no-cache",
// "Connection": "keep-alive",
// "X-Custom-Header": "value"
// }
StreamingConfig
Configure streaming behavior in route definitions:
import type { StreamingConfig, RouteDefinition } from "@juspay/neurolink";
const streamingConfig: StreamingConfig = {
enabled: true,
contentType: "text/event-stream",
keepAliveInterval: 15000, // 15 seconds
};
const customStreamRoute: RouteDefinition = {
method: "POST",
path: "/api/custom-stream",
handler: async (ctx) => {
// Return an async iterable for streaming
return generateStream(ctx.body);
},
streaming: streamingConfig,
description: "Custom streaming endpoint",
tags: ["streaming"],
};
Configuration Fields
| Field | Type | Default | Description |
|---|---|---|---|
enabled | boolean | true | Enable streaming for this route |
contentType | "text/event-stream" | "application/x-ndjson" | SSE | Stream format |
keepAliveInterval | number | undefined | Interval for keep-alive pings (ms) |
Code Examples
Basic Streaming Response
import { NeuroLink } from "@juspay/neurolink";
import { createServer, DataStreamResponse } from "@juspay/neurolink";
const neurolink = new NeuroLink({ defaultProvider: "openai" });
const server = await createServer(neurolink, {
framework: "hono",
config: { port: 3000 },
});
// Register a custom streaming route
server.registerRoute({
method: "POST",
path: "/api/generate-stream",
handler: async (ctx) => {
const { prompt } = ctx.body as { prompt: string };
const streamResponse = new DataStreamResponse({
contentType: "text/event-stream",
keepAliveInterval: 15000,
});
// Start streaming in background
(async () => {
const textId = `text-${Date.now()}`;
try {
await streamResponse.writeTextStart(textId);
for await (const chunk of neurolink.generateStream({ prompt })) {
if (chunk.content) {
await streamResponse.writeTextDelta(textId, chunk.content);
}
}
await streamResponse.writeTextEnd(textId);
await streamResponse.finish({ reason: "stop" });
} catch (error) {
await streamResponse.writeError({
message: error.message,
code: "GENERATION_ERROR",
});
streamResponse.close();
}
})();
// Return the stream
return new Response(streamResponse.stream, {
headers: streamResponse.headers,
});
},
streaming: { enabled: true, contentType: "text/event-stream" },
description: "Stream AI-generated content",
tags: ["streaming", "generation"],
});
await server.initialize();
await server.start();
Tool Call Streaming
import {
DataStreamResponse,
pipeAsyncIterableToDataStream,
} from "@juspay/neurolink";
server.registerRoute({
method: "POST",
path: "/api/agent-stream",
handler: async (ctx) => {
const { input, tools } = ctx.body as { input: string; tools?: string[] };
const streamResponse = new DataStreamResponse();
(async () => {
const textId = `agent-${Date.now()}`;
try {
await streamResponse.writeTextStart(textId);
for await (const event of neurolink.streamWithTools({
prompt: input,
tools: tools || [],
})) {
switch (event.type) {
case "text-delta":
await streamResponse.writeTextDelta(textId, event.content);
break;
case "tool-call":
await streamResponse.writeToolCall({
id: event.toolCallId,
name: event.toolName,
arguments: event.args,
});
break;
case "tool-result":
await streamResponse.writeToolResult({
id: event.toolCallId,
name: event.toolName,
result: event.result,
});
break;
}
}
await streamResponse.writeTextEnd(textId);
await streamResponse.finish({ reason: "stop" });
} catch (error) {
await streamResponse.writeError({
message: error.message,
code: "AGENT_ERROR",
});
streamResponse.close();
}
})();
return new Response(streamResponse.stream, {
headers: streamResponse.headers,
});
},
streaming: { enabled: true },
tags: ["streaming", "tools"],
});
Error Handling in Streams
import { DataStreamResponse } from "@juspay/neurolink";
async function handleStreamWithErrors(
neurolink: NeuroLink,
prompt: string,
): Promise<Response> {
const streamResponse = new DataStreamResponse({
contentType: "text/event-stream",
});
(async () => {
const textId = `text-${Date.now()}`;
try {
await streamResponse.writeTextStart(textId);
for await (const chunk of neurolink.generateStream({ prompt })) {
// Check if stream was closed by client
if (streamResponse.isClosed()) {
console.log("Client disconnected, stopping generation");
return;
}
if (chunk.content) {
await streamResponse.writeTextDelta(textId, chunk.content);
}
}
await streamResponse.writeTextEnd(textId);
await streamResponse.finish({ reason: "stop" });
} catch (error) {
// Handle different error types
if (error.name === "AbortError") {
await streamResponse.writeError({
message: "Request was cancelled",
code: "STREAM_ABORTED",
});
} else if (error.message.includes("rate limit")) {
await streamResponse.writeError({
message: "Rate limit exceeded, please retry later",
code: "RATE_LIMIT_EXCEEDED",
});
} else if (error.message.includes("context length")) {
await streamResponse.writeError({
message: "Input too long for model context window",
code: "CONTEXT_LENGTH_EXCEEDED",
});
} else {
await streamResponse.writeError({
message: "An error occurred during generation",
code: "GENERATION_ERROR",
});
}
streamResponse.close();
}
})();
return new Response(streamResponse.stream, {
headers: streamResponse.headers,
});
}
Using pipeAsyncIterableToDataStream
For simpler cases, use the helper function:
import {
DataStreamResponse,
pipeAsyncIterableToDataStream,
} from "@juspay/neurolink";
server.registerRoute({
method: "POST",
path: "/api/simple-stream",
handler: async (ctx) => {
const { prompt } = ctx.body as { prompt: string };
const streamResponse = new DataStreamResponse();
// Pipe the async iterable directly to the stream
pipeAsyncIterableToDataStream(
neurolink.generateStream({ prompt }),
streamResponse,
{
textId: `text-${Date.now()}`,
onChunk: (chunk) => console.log("Chunk received:", chunk),
onError: (error) => console.error("Stream error:", error),
},
).catch(console.error);
return new Response(streamResponse.stream, {
headers: streamResponse.headers,
});
},
streaming: { enabled: true },
});
Client-Side Consumption (Browser)
Using EventSource (SSE):
function streamWithEventSource(input: string): void {
// Note: EventSource only supports GET requests
// Use fetch for POST requests with SSE
const eventSource = new EventSource(
`/api/agent/stream?input=${encodeURIComponent(input)}`,
);
let content = "";
eventSource.addEventListener("text-start", (event) => {
console.log("Stream started");
});
eventSource.addEventListener("text-delta", (event) => {
const data = JSON.parse(event.data);
content += data.delta;
updateUI(content);
});
eventSource.addEventListener("text-end", (event) => {
console.log("Text complete");
});
eventSource.addEventListener("tool-call", (event) => {
const data = JSON.parse(event.data);
console.log(`Tool called: ${data.name}`, data.arguments);
showToolIndicator(data.name);
});
eventSource.addEventListener("tool-result", (event) => {
const data = JSON.parse(event.data);
console.log(`Tool result: ${data.name}`, data.result);
hideToolIndicator(data.name);
});
eventSource.addEventListener("finish", (event) => {
const data = JSON.parse(event.data);
console.log("Stream finished:", data);
eventSource.close();
});
eventSource.addEventListener("error", (event) => {
console.error("Stream error:", event);
eventSource.close();
});
}
Using Fetch API (for POST requests):
async function streamWithFetch(input: string): Promise<void> {
const response = await fetch("/api/agent/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify({ input }),
});
if (!response.ok) {
throw new Error(`HTTP error: ${response.status}`);
}
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
let content = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE format
const lines = buffer.split("\n\n");
buffer = lines.pop() || "";
for (const block of lines) {
const eventMatch = block.match(/^event: (.+)$/m);
const dataMatch = block.match(/^data: (.+)$/m);
if (eventMatch && dataMatch) {
const eventType = eventMatch[1];
const data = JSON.parse(dataMatch[1]);
switch (eventType) {
case "text-delta":
content += data.delta;
updateUI(content);
break;
case "tool-call":
showToolCall(data);
break;
case "tool-result":
showToolResult(data);
break;
case "error":
showError(data.message);
break;
case "finish":
console.log("Complete:", data);
break;
}
}
}
}
}
React Hook Example:
import { useState, useCallback } from "react";
type StreamState = {
content: string;
isStreaming: boolean;
error: string | null;
toolCalls: Array<{ name: string; arguments: unknown; result?: unknown }>;
};
function useStream() {
const [state, setState] = useState<StreamState>({
content: "",
isStreaming: false,
error: null,
toolCalls: [],
});
const stream = useCallback(async (input: string) => {
setState({ content: "", isStreaming: true, error: null, toolCalls: [] });
try {
const response = await fetch("/api/agent/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ input }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n\n");
buffer = lines.pop() || "";
for (const block of lines) {
const eventMatch = block.match(/^event: (.+)$/m);
const dataMatch = block.match(/^data: (.+)$/m);
if (eventMatch && dataMatch) {
const eventType = eventMatch[1];
const data = JSON.parse(dataMatch[1]);
switch (eventType) {
case "text-delta":
setState((prev) => ({
...prev,
content: prev.content + data.delta,
}));
break;
case "tool-call":
setState((prev) => ({
...prev,
toolCalls: [
...prev.toolCalls,
{ name: data.name, arguments: data.arguments },
],
}));
break;
case "error":
setState((prev) => ({ ...prev, error: data.message }));
break;
}
}
}
}
} catch (error) {
setState((prev) => ({
...prev,
error: error instanceof Error ? error.message : "Stream failed",
}));
} finally {
setState((prev) => ({ ...prev, isStreaming: false }));
}
}, []);
return { ...state, stream };
}
// Usage in component
function ChatComponent() {
const { content, isStreaming, error, toolCalls, stream } = useStream();
return (
<div>
<button onClick={() => stream("Tell me a joke")} disabled={isStreaming}>
{isStreaming ? "Streaming..." : "Generate"}
</button>
{error && <div className="error">{error}</div>}
<div className="content">{content}</div>
{toolCalls.map((tool, i) => (
<div key={i} className="tool-call">
Tool: {tool.name}
</div>
))}
</div>
);
}
WebStreamWriter (Legacy)
For simple SSE streaming without the full Data Stream Protocol:
import { WebStreamWriter, formatSSEEvent } from "@juspay/neurolink";
const writer = new WebStreamWriter();
// Write events
writer.writeData({ message: "Hello" });
writer.writeEvent("custom-event", { data: "value" });
writer.writeDone();
writer.close();
// Use the stream
return new Response(writer.stream, {
headers: { "Content-Type": "text/event-stream" },
});
// Manual SSE formatting
const sseMessage = formatSSEEvent({
event: "message",
data: JSON.stringify({ content: "Hello" }),
id: "msg-1",
retry: 5000,
});
// Result: "id: msg-1\nevent: message\nretry: 5000\ndata: {...}\n\n"
Keep-Alive Configuration
Keep-alive signals prevent connection timeouts for long-running streams:
const streamResponse = new DataStreamResponse({
contentType: "text/event-stream",
keepAliveInterval: 15000, // Send ping every 15 seconds
});
SSE keep-alive format:
: keep-alive
NDJSON keep-alive format:
{ "type": "keep-alive" }
Best Practices
1. Always Handle Client Disconnection
// Check if stream is closed before writing
if (!streamResponse.isClosed()) {
await streamResponse.writeTextDelta(id, chunk);
}
2. Use Unique IDs for Text Blocks
const textId = `text-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
3. Set Appropriate Timeouts
const server = await createServer(neurolink, {
config: {
timeout: 120000, // 2 minutes for streaming endpoints
},
});
4. Enable Keep-Alive for Long Streams
const streamResponse = new DataStreamResponse({
keepAliveInterval: 15000, // 15 seconds
});
5. Include Usage Statistics in Finish Event
await streamResponse.finish({
reason: "stop",
usage: {
input: promptTokens,
output: completionTokens,
total: promptTokens + completionTokens,
},
});
6. Use AbortController for Cancellation
const controller = new AbortController();
const response = await fetch("/api/agent/stream", {
method: "POST",
body: JSON.stringify({ input }),
signal: controller.signal,
});
// Cancel the stream
controller.abort();
Troubleshooting
Stream Not Receiving Data
- Check
Content-Typeheader istext/event-streamorapplication/x-ndjson - Verify
Cache-Control: no-cacheis set - Ensure no proxy is buffering responses (check
X-Accel-Buffering: no)
Connection Dropping
- Enable keep-alive with appropriate interval
- Check server timeout configuration
- Verify load balancer timeout settings
Events Not Parsing Correctly
- Ensure each SSE event ends with double newline (
\n\n) - Verify JSON data is properly stringified
- Check for proper event type names
Related Documentation
- Server Adapters Overview - Getting started with server adapters
- Hono Adapter - Framework-specific streaming examples
- Configuration Reference - Full configuration options
- Security Best Practices - Securing streaming endpoints
Need Help? Join our GitHub Discussions or open an issue.