Skip to main content

Pipe Architecture

Every generate() and stream() call travels the same six-stage pipe. Understanding the pipe is understanding NeuroLink.

The Six Stages

1. Context Building

Before any tokens are generated, the pipe assembles the full context:

  • RAG retrieval — If rag: { files: [...] } is set, documents are chunked, embedded, and a search_knowledge_base tool is registered
  • Memory lookup — Conversation history fetched from Redis or in-memory store
  • File processing — Attached files (images, PDFs, code, CSV) processed by ProcessorRegistry into provider-appropriate formats
  • System prompt injection — Custom system prompts merged with NeuroLink defaults

2. Budget Check

BudgetChecker validates the assembled context fits within the model's context window before every LLM call.

  • Threshold: triggers at 80% of context window
  • If over budget: runs ContextCompactor (4-stage pipeline):
    1. Tool output pruning — replaces old tool results with placeholders
    2. File read deduplication — keeps only latest read of each file
    3. LLM summarization — structured 9-section summary of oldest messages
    4. Sliding window truncation — non-destructive tagging of oldest messages

Context windows are tracked per-provider, per-model in src/lib/constants/contextWindows.ts.

3. Provider Dispatch

ProviderRegistry resolves the provider name to a concrete implementation:

// All providers loaded via dynamic import to prevent circular dependencies
ProviderFactory.registerProvider(
AIProviderName.ANTHROPIC,
async (modelName?) => {
const { AnthropicProvider } = await import("../providers/anthropic.js");
return new AnthropicProvider(modelName);
},
AnthropicModels.CLAUDE_SONNET_4_5,
["claude", "sonnet", "haiku"],
);

Switching providers requires one line — the rest of the pipe is unchanged.

4. Stream Emission

Tokens arrive as an async iterable. generate() is stream() collected — there is only stream().

// This is what generate() does internally
let text = "";
for await (const token of this.stream(options)) {
text += token;
}
return { text };

The stream handles multiple event types: text deltas, tool calls, thinking blocks, usage statistics.

5. Tool Interception

When the model emits a tool call, the stream pauses:

  1. Tool call extracted from the stream
  2. MCPToolRegistry dispatches to the correct tool (built-in or external MCP server)
  3. Tool result injected back into the conversation
  4. Model resumes generating from the tool result
  5. Stream continues

MCP transports supported: stdio (local), http (remote), sse, websocket.

6. Observability

Every stage emits OpenTelemetry spans. The full trace covers:

  • Context build duration
  • Token counts (input + output)
  • Tool execution times
  • Memory read/write latency
  • Provider-specific attributes (model, temperature, finish reason)

Exporters: Langfuse, OTLP, Jaeger, Zipkin, Prometheus, Datadog, NewRelic, Honeycomb, Console.

Key Files

FilePurpose
src/lib/neurolink.tsMain SDK class — orchestrates all six stages
src/lib/factories/providerRegistry.tsProvider registration with dynamic imports
src/lib/context/budgetChecker.tsPre-generation context budget validation
src/lib/context/contextCompactor.tsMulti-stage compaction orchestrator
src/lib/mcp/toolRegistry.tsTool management and MCP integration
src/lib/rag/ragIntegration.tsRAG auto-pipeline setup
src/lib/telemetry/OpenTelemetry instrumentation

Design Invariants

These never change regardless of provider, model, or connector:

  1. Dynamic imports only — No static imports of providers in the registry (prevents circular deps)
  2. Stream is the primitivegenerate() is always stream collected; nothing bypasses stream()
  3. Budget checked before every call — No LLM call without a budget check
  4. Tools are always external — MCP protocol for all tool integrations, including built-ins
  5. Memory is scoped — Each conversation has isolated memory; no cross-contamination