Skip to main content

🌐 Real-time Services Guide

Enterprise WebSocket Infrastructure for NeuroLink

📋 Overview

NeuroLink provides enterprise-grade real-time services with WebSocket infrastructure, enhanced chat capabilities, and streaming optimization. These features enable building professional AI applications with real-time bidirectional communication.

🚀 Key Features

  • 🌐 WebSocket Infrastructure - Professional-grade server with connection management
  • 💬 Enhanced Chat Services - Dual-mode SSE + WebSocket support
  • 🏠 Room Management - Group chat and broadcasting capabilities
  • 📡 Streaming Channels - Real-time AI response streaming
  • 🔧 Performance Optimization - Compression, buffering, and latency control
  • 🛡️ Production Ready - Connection pooling, heartbeat monitoring, error handling

🌐 WebSocket Infrastructure

Basic WebSocket Server

import { NeuroLinkWebSocketServer } from "@juspay/neurolink";

const wsServer = new NeuroLinkWebSocketServer({
port: 8080,
maxConnections: 1000,
enableCompression: true,
heartbeatInterval: 30000, // 30 seconds
});

// Handle connections
wsServer.on("connection", ({ connectionId, userAgent, remoteAddress }) => {
console.log(`New connection: ${connectionId} from ${remoteAddress}`);

// Join default room
wsServer.joinRoom(connectionId, "general");
});

// Handle disconnections
wsServer.on("disconnect", ({ connectionId, reason }) => {
console.log(`Connection ${connectionId} disconnected: ${reason}`);
});

// Start server
await wsServer.start();
console.log("WebSocket server running on port 8080");

Connection Management

// Advanced connection handling
wsServer.on("connection", ({ connectionId, userAgent, headers }) => {
// Authenticate connection
const token = headers["authorization"];
if (!validateToken(token)) {
wsServer.closeConnection(connectionId, "Authentication failed");
return;
}

// Store connection metadata
wsServer.setConnectionData(connectionId, {
userId: extractUserId(token),
joinedAt: new Date(),
permissions: getUserPermissions(token),
});

// Send welcome message
wsServer.sendMessage(connectionId, {
type: "welcome",
data: { message: "Connected to NeuroLink AI" },
});
});

// Monitor connection health
wsServer.on("heartbeat", ({ connectionId, latency }) => {
if (latency > 5000) {
// 5 seconds
console.warn(`High latency detected: ${connectionId} (${latency}ms)`);
}
});

🏠 Room Management

Creating and Managing Rooms

// Join users to rooms
wsServer.joinRoom(connectionId, "ai-support-room");
wsServer.joinRoom(connectionId, "project-alpha");

// Leave rooms
wsServer.leaveRoom(connectionId, "general");

// Get room information
const roomInfo = wsServer.getRoomInfo("ai-support-room");
console.log(`Room has ${roomInfo.memberCount} members`);

// List all rooms for a connection
const userRooms = wsServer.getUserRooms(connectionId);
console.log("User is in rooms:", userRooms);

Broadcasting to Rooms

// Broadcast AI responses to room
wsServer.broadcastToRoom("ai-support-room", {
type: "ai-response",
data: {
text: "How can I help you today?",
timestamp: new Date().toISOString(),
provider: "openai",
},
});

// Broadcast to multiple rooms
wsServer.broadcastToRooms(["room1", "room2"], {
type: "announcement",
data: { message: "System maintenance in 10 minutes" },
});

// Broadcast to all connections
wsServer.broadcast({
type: "global-message",
data: { message: "Welcome to NeuroLink AI" },
});

📡 Streaming Channels

Creating Streaming Channels

// Create streaming channel for AI responses
const channel = wsServer.createStreamingChannel(connectionId, "ai-stream");

// Configure channel options
channel.setOptions({
bufferSize: 4096,
compressionEnabled: true,
maxChunkSize: 1024,
});

// Handle streaming data
channel.onData = (chunk) => {
console.log("Received chunk:", chunk);
};

channel.onComplete = () => {
console.log("Streaming complete");
};

channel.onError = (error) => {
console.error("Streaming error:", error);
};

AI Response Streaming

import { createBestAIProvider } from "@juspay/neurolink";

// Handle chat messages with streaming
wsServer.on("chat-message", async ({ connectionId, message }) => {
const channel = wsServer.createStreamingChannel(
connectionId,
`chat-${Date.now()}`,
);
const provider = await createBestAIProvider();

try {
// Start streaming AI response (NEW: Primary method)
const result = await provider.stream({
input: { text: message.data.prompt },
temperature: 0.7,
});

// Stream chunks to client
for await (const chunk of result.stream) {
channel.send({
type: "text-chunk",
data: { chunk: chunk.content, provider: result.provider },
});
}

// Signal completion
channel.complete({
type: "stream-complete",
data: {
provider: result.provider,
model: result.model,
totalChunks: channel.getChunkCount(),
},
});
} catch (error) {
channel.error({
type: "stream-error",
data: { error: error.message },
});
}
});

💬 Enhanced Chat Services

Dual-Mode Chat (SSE + WebSocket)

import {
createEnhancedChatService,
createBestAIProvider,
} from "@juspay/neurolink";

const provider = await createBestAIProvider();
const chatService = createEnhancedChatService({
provider,
enableSSE: true, // Server-Sent Events for simple streaming
enableWebSocket: true, // WebSocket for real-time bidirectional
streamingConfig: {
bufferSize: 8192,
compressionEnabled: true,
latencyTarget: 100, // Target 100ms latency
},
});

// Handle streaming responses
await chatService.streamChat({
prompt: "Generate a story about AI and humanity",
onChunk: (chunk) => {
console.log("Chunk:", chunk);
// Send to WebSocket clients
wsServer.broadcast({
type: "story-chunk",
data: { chunk },
});
},
onComplete: (result) => {
console.log("Story complete:", result.text);
wsServer.broadcast({
type: "story-complete",
data: result,
});
},
onError: (error) => {
console.error("Story generation error:", error);
wsServer.broadcast({
type: "story-error",
data: { error: error.message },
});
},
});

Chat Session Management

// Create persistent chat sessions
const sessionId = "user-123-session";
const chatSession = chatService.createSession(sessionId, {
maxHistory: 50, // Keep last 50 messages
persistToDisk: true,
sessionTimeout: 3600000, // 1 hour timeout
});

// Add message to session history
chatSession.addMessage({
role: "user",
content: "Hello, AI!",
timestamp: new Date(),
});

// Generate response with session context
const response = await chatSession.generateResponse({
temperature: 0.7,
maxTokens: 500,
});

// Session automatically maintains conversation history
console.log("Session history:", chatSession.getHistory());
console.log("Token usage:", chatSession.getTokenUsage());

🔧 Performance Optimization

Connection Pooling

const wsServer = new NeuroLinkWebSocketServer({
port: 8080,
maxConnections: 5000,

// Connection pooling
connectionPool: {
enabled: true,
maxIdleTime: 300000, // 5 minutes
cleanupInterval: 60000, // 1 minute
},

// Performance tuning
performance: {
enableCompression: true,
compressionLevel: 6, // 1-9, 6 is balanced
maxPayloadSize: 16777216, // 16MB
pingInterval: 30000, // 30 seconds
pongTimeout: 5000, // 5 seconds
},
});

Load Balancing

// Multiple server instances with load balancing
const servers = [];
const ports = [8080, 8081, 8082];

for (const port of ports) {
const server = new NeuroLinkWebSocketServer({ port });

// Shared Redis for cross-server communication
server.setMessageBroker({
type: "redis",
url: "redis://localhost:6379",
prefix: "neurolink:ws",
});

servers.push(server);
await server.start();
}

console.log(`Started ${servers.length} WebSocket servers`);

Streaming Optimization

// Configure optimal streaming for different use cases
const streamingConfigs = {
// Low latency for chat
chat: {
bufferSize: 1024,
compressionEnabled: false, // Disable for speed
latencyTarget: 50,
},

// High throughput for content generation
content: {
bufferSize: 16384,
compressionEnabled: true,
latencyTarget: 200,
},

// Balanced for general use
general: {
bufferSize: 4096,
compressionEnabled: true,
latencyTarget: 100,
},
};

// Apply configuration based on use case
const chatService = createEnhancedChatService({
provider: await createBestAIProvider(),
enableWebSocket: true,
streamingConfig: streamingConfigs.chat, // Use chat optimization
});

🛡️ Production Deployment

Docker Configuration

# Dockerfile for WebSocket service
FROM node:18-alpine

WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production

COPY . .
RUN npm run build

# WebSocket port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD node healthcheck.js

CMD ["node", "dist/server.js"]

Docker Compose with Redis

# docker-compose.yml
version: "3.8"
services:
neurolink-ws:
build: .
ports:
- "8080:8080"
environment:
- REDIS_URL=redis://redis:6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
deploy:
replicas: 3
resources:
limits:
memory: 512M
reservations:
memory: 256M

redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes

nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- neurolink-ws

volumes:
redis_data:

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: neurolink-websocket
spec:
replicas: 3
selector:
matchLabels:
app: neurolink-websocket
template:
metadata:
labels:
app: neurolink-websocket
spec:
containers:
- name: websocket
image: neurolink/websocket:latest
ports:
- containerPort: 8080
env:
- name: REDIS_URL
valueFrom:
configMapKeyRef:
name: neurolink-config
key: redis-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: neurolink-secrets
key: openai-api-key
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: neurolink-websocket-service
spec:
selector:
app: neurolink-websocket
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: LoadBalancer

📊 Monitoring and Health Checks

Built-in Metrics

// Enable metrics collection
wsServer.enableMetrics({
collectConnectionStats: true,
collectMessageStats: true,
collectPerformanceStats: true,
exportPrometheus: true,
metricsEndpoint: "/metrics",
});

// Get real-time statistics
const stats = wsServer.getStats();
console.log("Active connections:", stats.activeConnections);
console.log("Messages per second:", stats.messagesPerSecond);
console.log("Average latency:", stats.averageLatency);
console.log("Memory usage:", stats.memoryUsage);

Health Check Endpoint

// Health check implementation
wsServer.addHealthCheck("aiProviders", async () => {
try {
const provider = await createBestAIProvider();
await provider.generate({ input: { text: "test" }, maxTokens: 1 });
return { status: "healthy", message: "AI providers operational" };
} catch (error) {
return { status: "unhealthy", message: error.message };
}
});

wsServer.addHealthCheck("redis", async () => {
try {
await redis.ping();
return { status: "healthy", message: "Redis connection active" };
} catch (error) {
return { status: "unhealthy", message: "Redis connection failed" };
}
});

// Health endpoint available at /health

🚀 Getting Started

Quick Setup

# Install NeuroLink with real-time features
npm install @juspay/neurolink

# Set up environment
echo "OPENAI_API_KEY=your-key" > .env
echo "REDIS_URL=redis://localhost:6379" >> .env

# Start Redis (if not already running)
docker run -d -p 6379:6379 redis:alpine

Minimal Server Example

// server.js
import {
NeuroLinkWebSocketServer,
createEnhancedChatService,
createBestAIProvider,
} from "@juspay/neurolink";

async function startServer() {
// Initialize WebSocket server
const wsServer = new NeuroLinkWebSocketServer({ port: 8080 });

// Initialize enhanced chat
const provider = await createBestAIProvider();
const chatService = createEnhancedChatService({
provider,
enableWebSocket: true,
});

// Handle chat messages
wsServer.on("chat-message", async ({ connectionId, message }) => {
await chatService.streamChat({
prompt: message.data.prompt,
onChunk: (chunk) => {
wsServer.sendMessage(connectionId, {
type: "ai-chunk",
data: { chunk },
});
},
onComplete: (result) => {
wsServer.sendMessage(connectionId, {
type: "ai-complete",
data: result,
});
},
});
});

// Start server
await wsServer.start();
console.log("🚀 NeuroLink WebSocket server running on port 8080");
}

startServer().catch(console.error);
# Run the server
node server.js

Client Example

<!-- client.html -->
<!DOCTYPE html>
<html>
<head>
<title>NeuroLink Real-time Chat</title>
</head>
<body>
<div id="chat"></div>
<input id="message" type="text" placeholder="Type your message..." />
<button onclick="sendMessage()">Send</button>

<script>
const ws = new WebSocket("ws://localhost:8080");
const chat = document.getElementById("chat");
const messageInput = document.getElementById("message");

ws.onmessage = (event) => {
const data = JSON.parse(event.data);

if (data.type === "ai-chunk") {
appendToChat(data.data.chunk, false);
} else if (data.type === "ai-complete") {
appendToChat("\n\n", false);
}
};

function sendMessage() {
const message = messageInput.value;
if (message) {
appendToChat(`You: ${message}\n`, true);

ws.send(
JSON.stringify({
type: "chat-message",
data: { prompt: message },
}),
);

messageInput.value = "";
appendToChat("AI: ", true);
}
}

function appendToChat(text, isNewLine) {
if (isNewLine) {
chat.innerHTML += text;
} else {
chat.innerHTML += text;
}
chat.scrollTop = chat.scrollHeight;
}

messageInput.addEventListener("keypress", (e) => {
if (e.key === "Enter") sendMessage();
});
</script>
</body>
</html>

📚 Additional Resources

Ready to build enterprise-grade real-time AI applications with NeuroLink! 🚀