How to Stream JSON Element Responses to the Client Using FastMCP
Learn how to implement real-time streaming of JSON elements from FastMCP servers to clients using WebSockets, delimited parsing, and LLM streaming APIs for responsive AI applications.
Written by Chris 7/20/2025How to Stream JSON Element Responses to the Client Using FastMCP
Building responsive AI applications often requires streaming data from your FastMCP server to connected clients in real-time. Whether you're generating lesson content, processing large datasets, or providing live updates, streaming JSON elements can dramatically improve user experience by showing progress as it happens.
In this comprehensive guide, we'll explore how to implement robust JSON streaming from FastMCP servers using WebSockets, delimited parsing, and LLM streaming APIs.
Why Stream JSON Elements?
Traditional request-response patterns work well for simple interactions, but fall short when dealing with:
- Long-running AI generations that produce multiple discrete outputs
- Progressive content creation where users benefit from seeing partial results
- Real-time collaboration features requiring live updates
- Large datasets that need to be processed and delivered incrementally
Streaming JSON elements provides:
- ✅ Immediate feedback to users
- ✅ Perceived performance improvements
- ✅ Better error handling with partial results
- ✅ Scalable architecture for concurrent operations
Architecture Overview
Our streaming solution consists of four key components:
- FastMCP Server - Handles tool execution and coordinates streaming
- WebSocket Server - Manages real-time connections with clients
- Delimited JSON Parser - Extracts JSON objects from streamed text
- LLM Streaming Adapter - Interfaces with AI models for continuous generation
graph TD
A[Client] -->|WebSocket| B[WebSocket Server]
B --> C[FastMCP Server]
C --> D[LLM Adapter]
D --> E[AI Model]
E -->|Stream| D
D -->|Parsed JSON| C
C -->|Broadcast| B
B -->|Real-time Updates| A
Core Implementation
1. WebSocket Server Setup
First, let's establish a WebSocket server to handle real-time connections:
import WebSocket from 'ws';
// WebSocket server for streaming updates
const wss = new WebSocket.Server({ port: 9000 });
// Store active connections
const activeConnections = new Set<WebSocket>();
wss.on('connection', (ws) => {
activeConnections.add(ws);
console.log('Client connected to WebSocket');
ws.on('close', () => {
activeConnections.delete(ws);
console.log('Client disconnected from WebSocket');
});
});
// Broadcast to all connected clients
function broadcast(data: any) {
const message = JSON.stringify(data);
activeConnections.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
2. Delimited JSON Parser
The heart of our streaming solution is a parser that can extract JSON objects from a continuous stream:
class DelimitedJSONParser {
private buffer = '';
private readonly startDelim = '<<<ACTION_START>>>';
private readonly endDelim = '<<<ACTION_END>>>';
parse(chunk: string): any[] {
this.buffer += chunk;
const results: any[] = [];
let startPos = this.buffer.indexOf(this.startDelim);
while (startPos !== -1) {
const endPos = this.buffer.indexOf(this.endDelim, startPos);
if (endPos !== -1) {
const jsonStart = startPos + this.startDelim.length;
const jsonStr = this.buffer.substring(jsonStart, endPos).trim();
try {
const action = JSON.parse(jsonStr);
results.push(action);
} catch (e) {
console.warn('Failed to parse action JSON:', jsonStr);
}
this.buffer = this.buffer.substring(endPos + this.endDelim.length);
startPos = this.buffer.indexOf(this.startDelim);
} else {
break;
}
}
return results;
}
}
Key Features:
- Buffer Management: Handles partial JSON objects across multiple chunks
- Delimiter Detection: Uses custom delimiters to identify JSON boundaries
- Error Handling: Gracefully handles malformed JSON without breaking the stream
- Memory Efficient: Clears processed data from buffer to prevent memory leaks
3. LLM Streaming Adapter
Create a flexible adapter that works with different LLM providers:
interface LLMStreamChunk {
text: string;
isComplete: boolean;
}
interface LLMAdapter {
generateContentStream(prompt: string): AsyncIterable<LLMStreamChunk>;
}
class AnthropicAdapter implements LLMAdapter {
private client: Anthropic;
private config: LLMConfig;
async* generateContentStream(prompt: string): AsyncIterable<LLMStreamChunk> {
const stream = await this.client.messages.create({
model: this.config.model!,
messages: [{ role: 'user', content: prompt }],
max_tokens: this.config.maxTokens!,
temperature: this.config.temperature,
stream: true
});
for await (const chunk of stream) {
if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') {
yield {
text: chunk.delta.text,
isComplete: false
};
} else if (chunk.type === 'message_stop') {
yield {
text: '',
isComplete: true
};
}
}
}
}
4. FastMCP Tool Implementation
Now let's create a FastMCP tool that orchestrates the streaming process:
import { FastMCP } from 'fastmcp';
import { z } from 'zod';
const mcp = new FastMCP({
name: 'streaming-server',
version: '1.0.0'
});
mcp.addTool({
name: 'generate-lesson',
description: 'Generate CodeVideo lesson with streaming updates',
parameters: z.object({
prompt: z.string().describe('Lesson description')
}),
execute: async ({ prompt }: { prompt: string }) => {
console.log('Starting lesson generation for:', prompt);
const parser = new DelimitedJSONParser();
let actionCount = 0;
// Send initial metadata
const lessonMetadata = await extractLessonMetadata(prompt);
broadcast({
type: 'setLessonName',
lessonName: lessonMetadata.name
});
broadcast({
type: 'setLessonDescription',
lessonDescription: lessonMetadata.description
});
// Build streaming prompt with delimiters
const streamingPrompt = `
Generate CodeVideo tutorial actions for: ${prompt}
IMPORTANT: Output each action immediately using this exact format:
<<<ACTION_START>>>
{"name": "author-speak-before", "value": "Let's create a React component"}
<<<ACTION_END>>>
<<<ACTION_START>>>
{"name": "file-create", "value": "Button.tsx"}
<<<ACTION_END>>>
Generate actions one at a time. Start generating now:
`;
try {
const stream = llmAdapter.generateContentStream(streamingPrompt);
for await (const chunk of stream) {
if (chunk.text && !chunk.isComplete) {
const actions = parser.parse(chunk.text);
actions.forEach(action => {
if (action.name && action.value) {
actionCount++;
// Broadcast action immediately
broadcast({
type: 'appendLessonAction',
lessonAction: {
id: `action-${actionCount}`,
name: action.name,
value: action.value
}
});
console.log(`Streamed action ${actionCount}:`, action.name);
}
});
}
}
// Send completion signal
broadcast({
type: 'generationComplete',
totalActions: actionCount
});
return `Generated lesson with ${actionCount} actions`;
} catch (error) {
console.error('Error generating lesson:', error);
broadcast({
type: 'error',
message: (error as Error).message
});
throw error;
}
}
});
Client-Side Implementation
JavaScript/TypeScript Client
class StreamingClient {
private ws: WebSocket;
private onAction: (action: any) => void;
private onComplete: () => void;
private onError: (error: string) => void;
constructor(
wsUrl: string,
onAction: (action: any) => void,
onComplete: () => void,
onError: (error: string) => void
) {
this.onAction = onAction;
this.onComplete = onComplete;
this.onError = onError;
this.ws = new WebSocket(wsUrl);
this.setupEventHandlers();
}
private setupEventHandlers() {
this.ws.onopen = () => {
console.log('Connected to streaming server');
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.handleMessage(data);
} catch (error) {
console.error('Failed to parse message:', error);
}
};
this.ws.onclose = () => {
console.log('Disconnected from streaming server');
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
this.onError('Connection error');
};
}
private handleMessage(data: any) {
switch (data.type) {
case 'setLessonName':
console.log('Lesson name:', data.lessonName);
break;
case 'setLessonDescription':
console.log('Lesson description:', data.lessonDescription);
break;
case 'appendLessonAction':
this.onAction(data.lessonAction);
break;
case 'generationComplete':
console.log('Generation complete:', data.totalActions);
this.onComplete();
break;
case 'error':
this.onError(data.message);
break;
}
}
public triggerGeneration(prompt: string) {
// Send HTTP request to trigger generation
fetch('/codevideo-mcp/agent', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt }),
});
}
public disconnect() {
this.ws.close();
}
}
React Integration
import React, { useState, useEffect, useCallback } from 'react';
interface Action {
id: string;
name: string;
value: string;
}
const StreamingLessonGenerator: React.FC = () => {
const [actions, setActions] = useState<Action[]>([]);
const [isGenerating, setIsGenerating] = useState(false);
const [prompt, setPrompt] = useState('');
const [client, setClient] = useState<StreamingClient | null>(null);
const handleAction = useCallback((action: Action) => {
setActions(prev => [...prev, action]);
}, []);
const handleComplete = useCallback(() => {
setIsGenerating(false);
console.log('Lesson generation completed');
}, []);
const handleError = useCallback((error: string) => {
setIsGenerating(false);
console.error('Generation error:', error);
}, []);
useEffect(() => {
const streamingClient = new StreamingClient(
'ws://localhost:9000',
handleAction,
handleComplete,
handleError
);
setClient(streamingClient);
return () => {
streamingClient.disconnect();
};
}, [handleAction, handleComplete, handleError]);
const startGeneration = () => {
if (client && prompt.trim()) {
setIsGenerating(true);
setActions([]);
client.triggerGeneration(prompt);
}
};
return (
<div className="p-6">
<div className="mb-4">
<input
type="text"
value={prompt}
onChange={(e) => setPrompt(e.target.value)}
placeholder="Enter lesson prompt..."
className="w-full p-2 border rounded"
disabled={isGenerating}
/>
<button
onClick={startGeneration}
disabled={isGenerating || !prompt.trim()}
className="mt-2 px-4 py-2 bg-blue-500 text-white rounded disabled:opacity-50"
>
{isGenerating ? 'Generating...' : 'Generate Lesson'}
</button>
</div>
<div className="space-y-2">
{actions.map((action, index) => (
<div key={action.id} className="p-3 bg-gray-100 rounded">
<div className="font-mono text-sm">
<span className="text-blue-600">{action.name}</span>
<span className="text-gray-500">: </span>
<span>{action.value}</span>
</div>
</div>
))}
</div>
{isGenerating && (
<div className="mt-4 text-center">
<div className="inline-block animate-spin rounded-full h-8 w-8 border-b-2 border-blue-500"></div>
<p className="mt-2 text-gray-600">Generating actions...</p>
</div>
)}
</div>
);
};
export default StreamingLessonGenerator;
Advanced Patterns
1. Error Recovery and Reconnection
class RobustStreamingClient extends StreamingClient {
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
protected setupEventHandlers() {
super.setupEventHandlers();
this.ws.onclose = (event) => {
if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
setTimeout(() => {
this.reconnect();
}, this.reconnectDelay);
this.reconnectDelay *= 2; // Exponential backoff
this.reconnectAttempts++;
}
};
}
private reconnect() {
this.ws = new WebSocket(this.wsUrl);
this.setupEventHandlers();
}
}
2. Backpressure Handling
class BackpressureStreamingClient {
private messageQueue: any[] = [];
private processing = false;
private maxQueueSize = 100;
private async handleMessage(data: any) {
if (this.messageQueue.length >= this.maxQueueSize) {
console.warn('Message queue full, dropping oldest messages');
this.messageQueue.shift();
}
this.messageQueue.push(data);
if (!this.processing) {
this.processing = true;
await this.processQueue();
this.processing = false;
}
}
private async processQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
await this.processMessage(message);
// Add small delay to prevent UI blocking
await new Promise(resolve => setTimeout(resolve, 10));
}
}
}
3. Message Ordering and Deduplication
class OrderedStreamingClient {
private lastSequenceNumber = 0;
private messageBuffer = new Map<number, any>();
private handleMessage(data: any) {
const sequenceNumber = data.sequenceNumber;
if (sequenceNumber <= this.lastSequenceNumber) {
// Duplicate message, ignore
return;
}
this.messageBuffer.set(sequenceNumber, data);
this.processOrderedMessages();
}
private processOrderedMessages() {
while (this.messageBuffer.has(this.lastSequenceNumber + 1)) {
const nextMessage = this.messageBuffer.get(this.lastSequenceNumber + 1);
this.messageBuffer.delete(this.lastSequenceNumber + 1);
this.lastSequenceNumber++;
this.processMessage(nextMessage);
}
}
}
Performance Optimization
1. Efficient JSON Parsing
class OptimizedDelimitedJSONParser {
private buffer = '';
private readonly startDelim = '<<<ACTION_START>>>';
private readonly endDelim = '<<<ACTION_END>>>';
private readonly maxBufferSize = 1024 * 1024; // 1MB limit
parse(chunk: string): any[] {
// Prevent buffer from growing too large
if (this.buffer.length + chunk.length > this.maxBufferSize) {
console.warn('Buffer size limit reached, resetting');
this.buffer = chunk;
} else {
this.buffer += chunk;
}
const results: any[] = [];
const startDelimLength = this.startDelim.length;
const endDelimLength = this.endDelim.length;
let startPos = this.buffer.indexOf(this.startDelim);
while (startPos !== -1) {
const endPos = this.buffer.indexOf(this.endDelim, startPos + startDelimLength);
if (endPos !== -1) {
const jsonStart = startPos + startDelimLength;
const jsonStr = this.buffer.substring(jsonStart, endPos).trim();
if (jsonStr) {
try {
const action = JSON.parse(jsonStr);
if (this.isValidAction(action)) {
results.push(action);
}
} catch (e) {
console.warn('Failed to parse action JSON:', jsonStr);
}
}
// Remove processed content more efficiently
this.buffer = this.buffer.substring(endPos + endDelimLength);
startPos = this.buffer.indexOf(this.startDelim);
} else {
break;
}
}
return results;
}
private isValidAction(action: any): boolean {
return action &&
typeof action.name === 'string' &&
typeof action.value === 'string' &&
action.name.length > 0;
}
}
2. Batched Broadcasting
class BatchedBroadcaster {
private messageQueue: any[] = [];
private batchSize = 10;
private flushInterval = 100; // ms
private flushTimer: NodeJS.Timeout | null = null;
public queueMessage(data: any) {
this.messageQueue.push(data);
if (this.messageQueue.length >= this.batchSize) {
this.flush();
} else if (!this.flushTimer) {
this.flushTimer = setTimeout(() => this.flush(), this.flushInterval);
}
}
private flush() {
if (this.messageQueue.length === 0) return;
const batch = this.messageQueue.splice(0);
const message = JSON.stringify({
type: 'batch',
messages: batch
});
activeConnections.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
}
}
Testing Strategies
1. Unit Testing the Parser
import { describe, it, expect } from 'vitest';
describe('DelimitedJSONParser', () => {
let parser: DelimitedJSONParser;
beforeEach(() => {
parser = new DelimitedJSONParser();
});
it('should parse single complete action', () => {
const input = '<<<ACTION_START>>>{"name": "test", "value": "data"}<<<ACTION_END>>>';
const result = parser.parse(input);
expect(result).toHaveLength(1);
expect(result[0]).toEqual({ name: 'test', value: 'data' });
});
it('should handle partial actions across multiple chunks', () => {
const chunk1 = '<<<ACTION_START>>>{"name": "test"';
const chunk2 = ', "value": "data"}<<<ACTION_END>>>';
let result = parser.parse(chunk1);
expect(result).toHaveLength(0);
result = parser.parse(chunk2);
expect(result).toHaveLength(1);
expect(result[0]).toEqual({ name: 'test', value: 'data' });
});
it('should handle multiple actions in single chunk', () => {
const input = '<<<ACTION_START>>>{"name": "test1", "value": "data1"}<<<ACTION_END>>>' +
'<<<ACTION_START>>>{"name": "test2", "value": "data2"}<<<ACTION_END>>>';
const result = parser.parse(input);
expect(result).toHaveLength(2);
});
it('should handle malformed JSON gracefully', () => {
const input = '<<<ACTION_START>>>{invalid json}<<<ACTION_END>>>';
const result = parser.parse(input);
expect(result).toHaveLength(0);
});
});
2. Integration Testing
describe('Streaming Integration', () => {
let server: FastMCP;
let wsServer: WebSocket.Server;
let client: WebSocket;
beforeEach(async () => {
// Setup test server
server = new FastMCP({ name: 'test-server', version: '1.0.0' });
wsServer = new WebSocket.Server({ port: 0 });
// Connect test client
const address = wsServer.address() as AddressInfo;
client = new WebSocket(`ws://localhost:${address.port}`);
await new Promise(resolve => client.on('open', resolve));
});
afterEach(() => {
client.close();
wsServer.close();
});
it('should stream actions in real-time', async () => {
const receivedMessages: any[] = [];
client.on('message', (data) => {
receivedMessages.push(JSON.parse(data.toString()));
});
// Trigger generation
await server.handleRequest({
method: 'tools/call',
params: {
name: 'generate-lesson',
arguments: { prompt: 'Create a simple component' }
}
});
// Wait for messages
await new Promise(resolve => setTimeout(resolve, 1000));
expect(receivedMessages.length).toBeGreaterThan(0);
expect(receivedMessages.some(msg => msg.type === 'appendLessonAction')).toBe(true);
});
});
Deployment Considerations
1. Docker Configuration
FROM node:18-alpine
WORKDIR /app
# Copy package files
COPY package*.json ./
RUN npm ci --only=production
# Copy source code
COPY . .
# Build TypeScript
RUN npm run build
# Expose ports
EXPOSE 3000 9000
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
# Start the application
CMD ["npm", "start"]
2. Environment Configuration
# docker-compose.yml
version: '3.8'
services:
fastmcp-streaming:
build: .
ports:
- "3000:3000"
- "9000:9000"
environment:
- NODE_ENV=production
- LLM_PROVIDER=anthropic
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- MAX_CONNECTIONS=100
- MESSAGE_BATCH_SIZE=10
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:alpine
ports:
- "6379:6379"
restart: unless-stopped
3. Production Monitoring
// Add monitoring and metrics
import { createPrometheusMetrics } from 'prom-client';
const metrics = {
activeConnections: new Gauge({
name: 'websocket_active_connections',
help: 'Number of active WebSocket connections'
}),
messagesProcessed: new Counter({
name: 'messages_processed_total',
help: 'Total number of messages processed'
}),
generationDuration: new Histogram({
name: 'generation_duration_seconds',
help: 'Time taken to generate lessons'
})
};
// Update metrics in your code
wss.on('connection', (ws) => {
metrics.activeConnections.inc();
ws.on('close', () => {
metrics.activeConnections.dec();
});
});
Best Practices and Gotchas
✅ Do's
- Implement proper error boundaries to handle streaming failures gracefully
- Use connection pooling for multiple clients
- Add sequence numbers to messages for ordering guarantees
- Implement heartbeat/ping-pong for connection health monitoring
- Use efficient JSON parsing libraries for large payloads
- Add proper logging for debugging streaming issues
- Implement backpressure handling for slow clients
❌ Don'ts
- Don't ignore WebSocket connection limits - implement proper scaling
- Don't trust client-side ordering - handle reordering server-side
- Don't block the event loop with synchronous JSON parsing
- Don't forget memory cleanup for disconnected clients
- Don't send sensitive data without proper authentication
- Don't ignore browser connection limits - implement connection sharing
- Don't forget to handle network interruptions gracefully
Conclusion
Streaming JSON elements from FastMCP servers opens up powerful possibilities for building responsive, real-time AI applications. The combination of WebSockets, delimited parsing, and streaming LLM APIs provides a robust foundation for delivering immediate feedback to users while maintaining performance and reliability.
Key takeaways:
- Delimited parsing enables reliable extraction of JSON objects from continuous streams
- WebSocket broadcasting provides real-time delivery to multiple clients
- Proper error handling and reconnection logic ensure robust user experiences
- Performance optimization through batching and efficient parsing scales to production loads
Whether you're building educational platforms, collaborative tools, or real-time AI assistants, this streaming architecture pattern provides the foundation for creating engaging, responsive user experiences that keep pace with modern AI capabilities.
Resources
Have questions about implementing streaming in your FastMCP applications? Join our community discussions or check out more advanced patterns in our other tutorials.