How to Stream JSON Element Responses to the Client Using FastMCP

Learn how to implement real-time streaming of JSON elements from FastMCP servers to clients using WebSockets, delimited parsing, and LLM streaming APIs for responsive AI applications.

Written by Chris 7/20/2025

How to Stream JSON Element Responses to the Client Using FastMCP

Building responsive AI applications often requires streaming data from your FastMCP server to connected clients in real-time. Whether you're generating lesson content, processing large datasets, or providing live updates, streaming JSON elements can dramatically improve user experience by showing progress as it happens.

In this comprehensive guide, we'll explore how to implement robust JSON streaming from FastMCP servers using WebSockets, delimited parsing, and LLM streaming APIs.

Why Stream JSON Elements?

Traditional request-response patterns work well for simple interactions, but fall short when dealing with:

  • Long-running AI generations that produce multiple discrete outputs
  • Progressive content creation where users benefit from seeing partial results
  • Real-time collaboration features requiring live updates
  • Large datasets that need to be processed and delivered incrementally

Streaming JSON elements provides:

  • Immediate feedback to users
  • Perceived performance improvements
  • Better error handling with partial results
  • Scalable architecture for concurrent operations

Architecture Overview

Our streaming solution consists of four key components:

  1. FastMCP Server - Handles tool execution and coordinates streaming
  2. WebSocket Server - Manages real-time connections with clients
  3. Delimited JSON Parser - Extracts JSON objects from streamed text
  4. LLM Streaming Adapter - Interfaces with AI models for continuous generation
graph TD
    A[Client] -->|WebSocket| B[WebSocket Server]
    B --> C[FastMCP Server]
    C --> D[LLM Adapter]
    D --> E[AI Model]
    E -->|Stream| D
    D -->|Parsed JSON| C
    C -->|Broadcast| B
    B -->|Real-time Updates| A

Core Implementation

1. WebSocket Server Setup

First, let's establish a WebSocket server to handle real-time connections:

import WebSocket from 'ws';

// WebSocket server for streaming updates
const wss = new WebSocket.Server({ port: 9000 });

// Store active connections
const activeConnections = new Set<WebSocket>();

wss.on('connection', (ws) => {
    activeConnections.add(ws);
    console.log('Client connected to WebSocket');

    ws.on('close', () => {
        activeConnections.delete(ws);
        console.log('Client disconnected from WebSocket');
    });
});

// Broadcast to all connected clients
function broadcast(data: any) {
    const message = JSON.stringify(data);
    activeConnections.forEach(ws => {
        if (ws.readyState === WebSocket.OPEN) {
            ws.send(message);
        }
    });
}

2. Delimited JSON Parser

The heart of our streaming solution is a parser that can extract JSON objects from a continuous stream:

class DelimitedJSONParser {
    private buffer = '';
    private readonly startDelim = '<<<ACTION_START>>>';
    private readonly endDelim = '<<<ACTION_END>>>';

    parse(chunk: string): any[] {
        this.buffer += chunk;
        const results: any[] = [];

        let startPos = this.buffer.indexOf(this.startDelim);
        while (startPos !== -1) {
            const endPos = this.buffer.indexOf(this.endDelim, startPos);
            if (endPos !== -1) {
                const jsonStart = startPos + this.startDelim.length;
                const jsonStr = this.buffer.substring(jsonStart, endPos).trim();

                try {
                    const action = JSON.parse(jsonStr);
                    results.push(action);
                } catch (e) {
                    console.warn('Failed to parse action JSON:', jsonStr);
                }

                this.buffer = this.buffer.substring(endPos + this.endDelim.length);
                startPos = this.buffer.indexOf(this.startDelim);
            } else {
                break;
            }
        }

        return results;
    }
}

Key Features:

  • Buffer Management: Handles partial JSON objects across multiple chunks
  • Delimiter Detection: Uses custom delimiters to identify JSON boundaries
  • Error Handling: Gracefully handles malformed JSON without breaking the stream
  • Memory Efficient: Clears processed data from buffer to prevent memory leaks

3. LLM Streaming Adapter

Create a flexible adapter that works with different LLM providers:

interface LLMStreamChunk {
    text: string;
    isComplete: boolean;
}

interface LLMAdapter {
    generateContentStream(prompt: string): AsyncIterable<LLMStreamChunk>;
}

class AnthropicAdapter implements LLMAdapter {
    private client: Anthropic;
    private config: LLMConfig;

    async* generateContentStream(prompt: string): AsyncIterable<LLMStreamChunk> {
        const stream = await this.client.messages.create({
            model: this.config.model!,
            messages: [{ role: 'user', content: prompt }],
            max_tokens: this.config.maxTokens!,
            temperature: this.config.temperature,
            stream: true
        });

        for await (const chunk of stream) {
            if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') {
                yield {
                    text: chunk.delta.text,
                    isComplete: false
                };
            } else if (chunk.type === 'message_stop') {
                yield {
                    text: '',
                    isComplete: true
                };
            }
        }
    }
}

4. FastMCP Tool Implementation

Now let's create a FastMCP tool that orchestrates the streaming process:

import { FastMCP } from 'fastmcp';
import { z } from 'zod';

const mcp = new FastMCP({
    name: 'streaming-server',
    version: '1.0.0'
});

mcp.addTool({
    name: 'generate-lesson',
    description: 'Generate CodeVideo lesson with streaming updates',
    parameters: z.object({
        prompt: z.string().describe('Lesson description')
    }),
    execute: async ({ prompt }: { prompt: string }) => {
        console.log('Starting lesson generation for:', prompt);

        const parser = new DelimitedJSONParser();
        let actionCount = 0;

        // Send initial metadata
        const lessonMetadata = await extractLessonMetadata(prompt);

        broadcast({
            type: 'setLessonName',
            lessonName: lessonMetadata.name
        });

        broadcast({
            type: 'setLessonDescription',
            lessonDescription: lessonMetadata.description
        });

        // Build streaming prompt with delimiters
        const streamingPrompt = `
Generate CodeVideo tutorial actions for: ${prompt}

IMPORTANT: Output each action immediately using this exact format:

<<<ACTION_START>>>
{"name": "author-speak-before", "value": "Let's create a React component"}
<<<ACTION_END>>>

<<<ACTION_START>>>
{"name": "file-create", "value": "Button.tsx"}
<<<ACTION_END>>>

Generate actions one at a time. Start generating now:
`;

        try {
            const stream = llmAdapter.generateContentStream(streamingPrompt);

            for await (const chunk of stream) {
                if (chunk.text && !chunk.isComplete) {
                    const actions = parser.parse(chunk.text);

                    actions.forEach(action => {
                        if (action.name && action.value) {
                            actionCount++;

                            // Broadcast action immediately
                            broadcast({
                                type: 'appendLessonAction',
                                lessonAction: {
                                    id: `action-${actionCount}`,
                                    name: action.name,
                                    value: action.value
                                }
                            });

                            console.log(`Streamed action ${actionCount}:`, action.name);
                        }
                    });
                }
            }

            // Send completion signal
            broadcast({
                type: 'generationComplete',
                totalActions: actionCount
            });

            return `Generated lesson with ${actionCount} actions`;
        } catch (error) {
            console.error('Error generating lesson:', error);
            broadcast({
                type: 'error',
                message: (error as Error).message
            });
            throw error;
        }
    }
});

Client-Side Implementation

JavaScript/TypeScript Client

class StreamingClient {
    private ws: WebSocket;
    private onAction: (action: any) => void;
    private onComplete: () => void;
    private onError: (error: string) => void;

    constructor(
        wsUrl: string,
        onAction: (action: any) => void,
        onComplete: () => void,
        onError: (error: string) => void
    ) {
        this.onAction = onAction;
        this.onComplete = onComplete;
        this.onError = onError;

        this.ws = new WebSocket(wsUrl);
        this.setupEventHandlers();
    }

    private setupEventHandlers() {
        this.ws.onopen = () => {
            console.log('Connected to streaming server');
        };

        this.ws.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                this.handleMessage(data);
            } catch (error) {
                console.error('Failed to parse message:', error);
            }
        };

        this.ws.onclose = () => {
            console.log('Disconnected from streaming server');
        };

        this.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
            this.onError('Connection error');
        };
    }

    private handleMessage(data: any) {
        switch (data.type) {
            case 'setLessonName':
                console.log('Lesson name:', data.lessonName);
                break;
            case 'setLessonDescription':
                console.log('Lesson description:', data.lessonDescription);
                break;
            case 'appendLessonAction':
                this.onAction(data.lessonAction);
                break;
            case 'generationComplete':
                console.log('Generation complete:', data.totalActions);
                this.onComplete();
                break;
            case 'error':
                this.onError(data.message);
                break;
        }
    }

    public triggerGeneration(prompt: string) {
        // Send HTTP request to trigger generation
        fetch('/codevideo-mcp/agent', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            },
            body: JSON.stringify({ prompt }),
        });
    }

    public disconnect() {
        this.ws.close();
    }
}

React Integration

import React, { useState, useEffect, useCallback } from 'react';

interface Action {
    id: string;
    name: string;
    value: string;
}

const StreamingLessonGenerator: React.FC = () => {
    const [actions, setActions] = useState<Action[]>([]);
    const [isGenerating, setIsGenerating] = useState(false);
    const [prompt, setPrompt] = useState('');
    const [client, setClient] = useState<StreamingClient | null>(null);

    const handleAction = useCallback((action: Action) => {
        setActions(prev => [...prev, action]);
    }, []);

    const handleComplete = useCallback(() => {
        setIsGenerating(false);
        console.log('Lesson generation completed');
    }, []);

    const handleError = useCallback((error: string) => {
        setIsGenerating(false);
        console.error('Generation error:', error);
    }, []);

    useEffect(() => {
        const streamingClient = new StreamingClient(
            'ws://localhost:9000',
            handleAction,
            handleComplete,
            handleError
        );
        
        setClient(streamingClient);

        return () => {
            streamingClient.disconnect();
        };
    }, [handleAction, handleComplete, handleError]);

    const startGeneration = () => {
        if (client && prompt.trim()) {
            setIsGenerating(true);
            setActions([]);
            client.triggerGeneration(prompt);
        }
    };

    return (
        <div className="p-6">
            <div className="mb-4">
                <input
                    type="text"
                    value={prompt}
                    onChange={(e) => setPrompt(e.target.value)}
                    placeholder="Enter lesson prompt..."
                    className="w-full p-2 border rounded"
                    disabled={isGenerating}
                />
                <button
                    onClick={startGeneration}
                    disabled={isGenerating || !prompt.trim()}
                    className="mt-2 px-4 py-2 bg-blue-500 text-white rounded disabled:opacity-50"
                >
                    {isGenerating ? 'Generating...' : 'Generate Lesson'}
                </button>
            </div>

            <div className="space-y-2">
                {actions.map((action, index) => (
                    <div key={action.id} className="p-3 bg-gray-100 rounded">
                        <div className="font-mono text-sm">
                            <span className="text-blue-600">{action.name}</span>
                            <span className="text-gray-500">: </span>
                            <span>{action.value}</span>
                        </div>
                    </div>
                ))}
            </div>

            {isGenerating && (
                <div className="mt-4 text-center">
                    <div className="inline-block animate-spin rounded-full h-8 w-8 border-b-2 border-blue-500"></div>
                    <p className="mt-2 text-gray-600">Generating actions...</p>
                </div>
            )}
        </div>
    );
};

export default StreamingLessonGenerator;

Advanced Patterns

1. Error Recovery and Reconnection

class RobustStreamingClient extends StreamingClient {
    private reconnectAttempts = 0;
    private maxReconnectAttempts = 5;
    private reconnectDelay = 1000;

    protected setupEventHandlers() {
        super.setupEventHandlers();

        this.ws.onclose = (event) => {
            if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
                console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
                setTimeout(() => {
                    this.reconnect();
                }, this.reconnectDelay);
                this.reconnectDelay *= 2; // Exponential backoff
                this.reconnectAttempts++;
            }
        };
    }

    private reconnect() {
        this.ws = new WebSocket(this.wsUrl);
        this.setupEventHandlers();
    }
}

2. Backpressure Handling

class BackpressureStreamingClient {
    private messageQueue: any[] = [];
    private processing = false;
    private maxQueueSize = 100;

    private async handleMessage(data: any) {
        if (this.messageQueue.length >= this.maxQueueSize) {
            console.warn('Message queue full, dropping oldest messages');
            this.messageQueue.shift();
        }

        this.messageQueue.push(data);

        if (!this.processing) {
            this.processing = true;
            await this.processQueue();
            this.processing = false;
        }
    }

    private async processQueue() {
        while (this.messageQueue.length > 0) {
            const message = this.messageQueue.shift();
            await this.processMessage(message);
            
            // Add small delay to prevent UI blocking
            await new Promise(resolve => setTimeout(resolve, 10));
        }
    }
}

3. Message Ordering and Deduplication

class OrderedStreamingClient {
    private lastSequenceNumber = 0;
    private messageBuffer = new Map<number, any>();

    private handleMessage(data: any) {
        const sequenceNumber = data.sequenceNumber;
        
        if (sequenceNumber <= this.lastSequenceNumber) {
            // Duplicate message, ignore
            return;
        }

        this.messageBuffer.set(sequenceNumber, data);
        this.processOrderedMessages();
    }

    private processOrderedMessages() {
        while (this.messageBuffer.has(this.lastSequenceNumber + 1)) {
            const nextMessage = this.messageBuffer.get(this.lastSequenceNumber + 1);
            this.messageBuffer.delete(this.lastSequenceNumber + 1);
            this.lastSequenceNumber++;
            
            this.processMessage(nextMessage);
        }
    }
}

Performance Optimization

1. Efficient JSON Parsing

class OptimizedDelimitedJSONParser {
    private buffer = '';
    private readonly startDelim = '<<<ACTION_START>>>';
    private readonly endDelim = '<<<ACTION_END>>>';
    private readonly maxBufferSize = 1024 * 1024; // 1MB limit

    parse(chunk: string): any[] {
        // Prevent buffer from growing too large
        if (this.buffer.length + chunk.length > this.maxBufferSize) {
            console.warn('Buffer size limit reached, resetting');
            this.buffer = chunk;
        } else {
            this.buffer += chunk;
        }

        const results: any[] = [];
        const startDelimLength = this.startDelim.length;
        const endDelimLength = this.endDelim.length;

        let startPos = this.buffer.indexOf(this.startDelim);
        while (startPos !== -1) {
            const endPos = this.buffer.indexOf(this.endDelim, startPos + startDelimLength);
            if (endPos !== -1) {
                const jsonStart = startPos + startDelimLength;
                const jsonStr = this.buffer.substring(jsonStart, endPos).trim();

                if (jsonStr) {
                    try {
                        const action = JSON.parse(jsonStr);
                        if (this.isValidAction(action)) {
                            results.push(action);
                        }
                    } catch (e) {
                        console.warn('Failed to parse action JSON:', jsonStr);
                    }
                }

                // Remove processed content more efficiently
                this.buffer = this.buffer.substring(endPos + endDelimLength);
                startPos = this.buffer.indexOf(this.startDelim);
            } else {
                break;
            }
        }

        return results;
    }

    private isValidAction(action: any): boolean {
        return action && 
               typeof action.name === 'string' && 
               typeof action.value === 'string' &&
               action.name.length > 0;
    }
}

2. Batched Broadcasting

class BatchedBroadcaster {
    private messageQueue: any[] = [];
    private batchSize = 10;
    private flushInterval = 100; // ms
    private flushTimer: NodeJS.Timeout | null = null;

    public queueMessage(data: any) {
        this.messageQueue.push(data);

        if (this.messageQueue.length >= this.batchSize) {
            this.flush();
        } else if (!this.flushTimer) {
            this.flushTimer = setTimeout(() => this.flush(), this.flushInterval);
        }
    }

    private flush() {
        if (this.messageQueue.length === 0) return;

        const batch = this.messageQueue.splice(0);
        const message = JSON.stringify({
            type: 'batch',
            messages: batch
        });

        activeConnections.forEach(ws => {
            if (ws.readyState === WebSocket.OPEN) {
                ws.send(message);
            }
        });

        if (this.flushTimer) {
            clearTimeout(this.flushTimer);
            this.flushTimer = null;
        }
    }
}

Testing Strategies

1. Unit Testing the Parser

import { describe, it, expect } from 'vitest';

describe('DelimitedJSONParser', () => {
    let parser: DelimitedJSONParser;

    beforeEach(() => {
        parser = new DelimitedJSONParser();
    });

    it('should parse single complete action', () => {
        const input = '<<<ACTION_START>>>{"name": "test", "value": "data"}<<<ACTION_END>>>';
        const result = parser.parse(input);
        
        expect(result).toHaveLength(1);
        expect(result[0]).toEqual({ name: 'test', value: 'data' });
    });

    it('should handle partial actions across multiple chunks', () => {
        const chunk1 = '<<<ACTION_START>>>{"name": "test"';
        const chunk2 = ', "value": "data"}<<<ACTION_END>>>';
        
        let result = parser.parse(chunk1);
        expect(result).toHaveLength(0);
        
        result = parser.parse(chunk2);
        expect(result).toHaveLength(1);
        expect(result[0]).toEqual({ name: 'test', value: 'data' });
    });

    it('should handle multiple actions in single chunk', () => {
        const input = '<<<ACTION_START>>>{"name": "test1", "value": "data1"}<<<ACTION_END>>>' +
                     '<<<ACTION_START>>>{"name": "test2", "value": "data2"}<<<ACTION_END>>>';
        
        const result = parser.parse(input);
        expect(result).toHaveLength(2);
    });

    it('should handle malformed JSON gracefully', () => {
        const input = '<<<ACTION_START>>>{invalid json}<<<ACTION_END>>>';
        const result = parser.parse(input);
        
        expect(result).toHaveLength(0);
    });
});

2. Integration Testing

describe('Streaming Integration', () => {
    let server: FastMCP;
    let wsServer: WebSocket.Server;
    let client: WebSocket;

    beforeEach(async () => {
        // Setup test server
        server = new FastMCP({ name: 'test-server', version: '1.0.0' });
        wsServer = new WebSocket.Server({ port: 0 });
        
        // Connect test client
        const address = wsServer.address() as AddressInfo;
        client = new WebSocket(`ws://localhost:${address.port}`);
        
        await new Promise(resolve => client.on('open', resolve));
    });

    afterEach(() => {
        client.close();
        wsServer.close();
    });

    it('should stream actions in real-time', async () => {
        const receivedMessages: any[] = [];
        
        client.on('message', (data) => {
            receivedMessages.push(JSON.parse(data.toString()));
        });

        // Trigger generation
        await server.handleRequest({
            method: 'tools/call',
            params: {
                name: 'generate-lesson',
                arguments: { prompt: 'Create a simple component' }
            }
        });

        // Wait for messages
        await new Promise(resolve => setTimeout(resolve, 1000));

        expect(receivedMessages.length).toBeGreaterThan(0);
        expect(receivedMessages.some(msg => msg.type === 'appendLessonAction')).toBe(true);
    });
});

Deployment Considerations

1. Docker Configuration

FROM node:18-alpine

WORKDIR /app

# Copy package files
COPY package*.json ./
RUN npm ci --only=production

# Copy source code
COPY . .

# Build TypeScript
RUN npm run build

# Expose ports
EXPOSE 3000 9000

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:3000/health || exit 1

# Start the application
CMD ["npm", "start"]

2. Environment Configuration

# docker-compose.yml
version: '3.8'

services:
  fastmcp-streaming:
    build: .
    ports:
      - "3000:3000"
      - "9000:9000"
    environment:
      - NODE_ENV=production
      - LLM_PROVIDER=anthropic
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - MAX_CONNECTIONS=100
      - MESSAGE_BATCH_SIZE=10
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    restart: unless-stopped

3. Production Monitoring

// Add monitoring and metrics
import { createPrometheusMetrics } from 'prom-client';

const metrics = {
    activeConnections: new Gauge({
        name: 'websocket_active_connections',
        help: 'Number of active WebSocket connections'
    }),
    messagesProcessed: new Counter({
        name: 'messages_processed_total',
        help: 'Total number of messages processed'
    }),
    generationDuration: new Histogram({
        name: 'generation_duration_seconds',
        help: 'Time taken to generate lessons'
    })
};

// Update metrics in your code
wss.on('connection', (ws) => {
    metrics.activeConnections.inc();
    ws.on('close', () => {
        metrics.activeConnections.dec();
    });
});

Best Practices and Gotchas

✅ Do's

  1. Implement proper error boundaries to handle streaming failures gracefully
  2. Use connection pooling for multiple clients
  3. Add sequence numbers to messages for ordering guarantees
  4. Implement heartbeat/ping-pong for connection health monitoring
  5. Use efficient JSON parsing libraries for large payloads
  6. Add proper logging for debugging streaming issues
  7. Implement backpressure handling for slow clients

❌ Don'ts

  1. Don't ignore WebSocket connection limits - implement proper scaling
  2. Don't trust client-side ordering - handle reordering server-side
  3. Don't block the event loop with synchronous JSON parsing
  4. Don't forget memory cleanup for disconnected clients
  5. Don't send sensitive data without proper authentication
  6. Don't ignore browser connection limits - implement connection sharing
  7. Don't forget to handle network interruptions gracefully

Conclusion

Streaming JSON elements from FastMCP servers opens up powerful possibilities for building responsive, real-time AI applications. The combination of WebSockets, delimited parsing, and streaming LLM APIs provides a robust foundation for delivering immediate feedback to users while maintaining performance and reliability.

Key takeaways:

  • Delimited parsing enables reliable extraction of JSON objects from continuous streams
  • WebSocket broadcasting provides real-time delivery to multiple clients
  • Proper error handling and reconnection logic ensure robust user experiences
  • Performance optimization through batching and efficient parsing scales to production loads

Whether you're building educational platforms, collaborative tools, or real-time AI assistants, this streaming architecture pattern provides the foundation for creating engaging, responsive user experiences that keep pace with modern AI capabilities.

Resources

Have questions about implementing streaming in your FastMCP applications? Join our community discussions or check out more advanced patterns in our other tutorials.

More posts:

© 2025 👨‍💻 with ❤️ by Full Stack Craft
"Any sufficiently advanced technology is indistinguishable from magic."