Streaming Example
bxn has first-class support for streaming responses, perfect for Server-Sent Events (SSE), real-time data feeds, and large file transfers.
Server-Sent Events (SSE)
Section titled “Server-Sent Events (SSE)”Create a real-time event stream that pushes updates to clients:
Basic SSE Endpoint
Section titled “Basic SSE Endpoint”import { route, stream } from '@buildxn/http';import { Readable } from 'node:stream';
export default route().handle(() => { const readable = new Readable({ read() { // This method is called when the client is ready for more data }, });
// Send an event every second const interval = setInterval(() => { const data = { timestamp: Date.now(), message: 'Hello from server!', };
readable.push(`data: ${JSON.stringify(data)}\n\n`); }, 1000);
// Clean up when the client disconnects readable.on('close', () => { clearInterval(interval); });
return stream(readable, 'text/event-stream', { 'Cache-Control': 'no-cache', Connection: 'keep-alive', });});Real-Time Updates Example
Section titled “Real-Time Updates Example”import { route, stream } from '@buildxn/http';import { Readable } from 'node:stream';import { eventBus } from '../../lib/events';
export default route().handle(() => { const readable = new Readable({ read() {}, });
// Listen to application events const onUpdate = (data: any) => { readable.push(`event: update\n`); readable.push(`data: ${JSON.stringify(data)}\n\n`); };
const onMessage = (data: any) => { readable.push(`event: message\n`); readable.push(`data: ${JSON.stringify(data)}\n\n`); };
eventBus.on('update', onUpdate); eventBus.on('message', onMessage);
// Send initial connection event readable.push(`event: connected\n`); readable.push(`data: ${JSON.stringify({ status: 'connected' })}\n\n`);
// Clean up when client disconnects readable.on('close', () => { eventBus.off('update', onUpdate); eventBus.off('message', onMessage); });
return stream(readable, 'text/event-stream', { 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', // Disable buffering in nginx });});Client Usage (Browser)
Section titled “Client Usage (Browser)”<!DOCTYPE html><html> <head> <title>SSE Example</title> </head> <body> <div id="events"></div>
<script> const eventsDiv = document.getElementById('events'); const eventSource = new EventSource('http://localhost:3000/events');
eventSource.onmessage = (event) => { const data = JSON.parse(event.data); const p = document.createElement('p'); p.textContent = `${data.timestamp}: ${data.message}`; eventsDiv.appendChild(p); };
// Handle custom event types eventSource.addEventListener('update', (event) => { console.log('Update:', JSON.parse(event.data)); });
eventSource.addEventListener('connected', (event) => { console.log('Connected:', JSON.parse(event.data)); });
eventSource.onerror = (error) => { console.error('EventSource error:', error); }; </script> </body></html>File Streaming
Section titled “File Streaming”Stream large files efficiently:
// src/routes/files/$fileId/get.tsimport { route, stream, notFound } from '@buildxn/http';import { Type } from '@sinclair/typebox';import { createReadStream, existsSync, statSync } from 'node:fs';import { join } from 'node:path';
export default route() .params(Type.Object({ fileId: Type.String() })) .handle((req) => { const { fileId } = req.params; const filePath = join('/uploads', fileId);
if (!existsSync(filePath)) { return notFound({ error: 'File not found' }); }
const stats = statSync(filePath); const fileStream = createReadStream(filePath);
return stream(fileStream, 'application/octet-stream', { 'Content-Length': stats.size.toString(), 'Content-Disposition': `attachment; filename="${fileId}"`, }); });CSV Streaming
Section titled “CSV Streaming”Generate and stream CSV data:
import { route, stream } from '@buildxn/http';import { Readable } from 'node:stream';import { db } from '../../../db';
export default route().handle(() => { const users = db.users.getAll();
const readable = new Readable({ read() {}, });
// Send CSV header readable.push('ID,Name,Email,Created At\n');
// Stream users one by one for (const user of users) { const row = `${user.id},"${user.name}","${user.email}",${user.createdAt}\n`; readable.push(row); }
// End the stream readable.push(null);
return stream(readable, 'text/csv', { 'Content-Disposition': 'attachment; filename="users.csv"', });});JSON Lines Streaming
Section titled “JSON Lines Streaming”Stream JSON objects line by line:
import { route, stream } from '@buildxn/http';import { Readable } from 'node:stream';import { db } from '../../../db';
export default route().handle(() => { const readable = new Readable({ read() {}, });
// Stream logs as JSON Lines (JSONL) const logs = db.logs.getAll();
for (const log of logs) { readable.push(JSON.stringify(log) + '\n'); }
readable.push(null);
return stream(readable, 'application/x-ndjson', { 'Cache-Control': 'no-cache', });});Progress Updates
Section titled “Progress Updates”Stream progress updates for long-running tasks:
// src/routes/process/$taskId/get.tsimport { route, stream, notFound } from '@buildxn/http';import { Type } from '@sinclair/typebox';import { Readable } from 'node:stream';import { taskManager } from '../../../lib/tasks';
export default route() .params(Type.Object({ taskId: Type.String() })) .handle((req) => { const { taskId } = req.params; const task = taskManager.get(taskId);
if (!task) { return notFound({ error: 'Task not found' }); }
const readable = new Readable({ read() {}, });
// Send initial status readable.push( `data: ${JSON.stringify({ status: task.status, progress: 0, })}\n\n`, );
// Listen for progress updates task.on('progress', (progress: number) => { readable.push( `data: ${JSON.stringify({ status: 'processing', progress, })}\n\n`, ); });
task.on('complete', (result: any) => { readable.push( `data: ${JSON.stringify({ status: 'complete', progress: 100, result, })}\n\n`, ); readable.push(null); });
task.on('error', (error: Error) => { readable.push( `data: ${JSON.stringify({ status: 'error', error: error.message, })}\n\n`, ); readable.push(null); });
return stream(readable, 'text/event-stream', { 'Cache-Control': 'no-cache', }); });Backpressure Handling
Section titled “Backpressure Handling”Handle backpressure when streaming large datasets:
import { route, stream } from '@buildxn/http';import { Readable } from 'node:stream';import { db } from '../../../db';
export default route().handle(() => { let cursor = 0; const batchSize = 100;
const readable = new Readable({ objectMode: false,
async read() { try { // Fetch batch of records const records = await db.records.getBatch(cursor, batchSize);
if (records.length === 0) { // No more data this.push(null); return; }
// Push each record for (const record of records) { const data = JSON.stringify(record) + '\n';
// Respect backpressure if (!this.push(data)) { break; } }
cursor += records.length; } catch (error) { this.destroy(error as Error); } }, });
return stream(readable, 'application/x-ndjson');});Testing Streaming Endpoints
Section titled “Testing Streaming Endpoints”Using curl
Section titled “Using curl”# SSE endpointcurl -N http://localhost:3000/events
# File downloadcurl http://localhost:3000/files/document.pdf -o document.pdf
# CSV exportcurl http://localhost:3000/export/users > users.csv
# JSON Linescurl http://localhost:3000/stream/logsUsing Node.js
Section titled “Using Node.js”import fetch from 'node-fetch';
async function consumeSSE() { const response = await fetch('http://localhost:3000/events'); const reader = response.body!.getReader(); const decoder = new TextDecoder();
while (true) { const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value); console.log('Received:', chunk); }}
consumeSSE();Best Practices
Section titled “Best Practices”- Set Appropriate Headers: Always set
Cache-Control: no-cachefor SSE - Handle Cleanup: Clean up intervals, listeners, and resources when streams close
- Handle Backpressure: Respect the
push()return value for large datasets - Error Handling: Handle and destroy streams on errors
- Connection Management: Monitor and handle client disconnections
- Buffering: Disable proxy buffering with
X-Accel-Buffering: noheader
Common Use Cases
Section titled “Common Use Cases”- Real-time dashboards: Live metrics and analytics
- Chat applications: Real-time messages
- Progress tracking: Long-running task updates
- Live notifications: Push notifications to users
- Data exports: Large CSV/JSON exports
- Log streaming: Real-time log viewing
- File downloads: Large file transfers
Next Steps
Section titled “Next Steps”- Build a complete REST API
- Learn about Response Helpers
- Explore Request Handlers