Skip to content

Streaming Example

bxn has first-class support for streaming responses, perfect for Server-Sent Events (SSE), real-time data feeds, and large file transfers.

Create a real-time event stream that pushes updates to clients:

src/routes/events/get.ts
import { route, stream } from '@buildxn/http';
import { Readable } from 'node:stream';
export default route().handle(() => {
const readable = new Readable({
read() {
// This method is called when the client is ready for more data
},
});
// Send an event every second
const interval = setInterval(() => {
const data = {
timestamp: Date.now(),
message: 'Hello from server!',
};
readable.push(`data: ${JSON.stringify(data)}\n\n`);
}, 1000);
// Clean up when the client disconnects
readable.on('close', () => {
clearInterval(interval);
});
return stream(readable, 'text/event-stream', {
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
});
src/routes/updates/get.ts
import { route, stream } from '@buildxn/http';
import { Readable } from 'node:stream';
import { eventBus } from '../../lib/events';
export default route().handle(() => {
const readable = new Readable({
read() {},
});
// Listen to application events
const onUpdate = (data: any) => {
readable.push(`event: update\n`);
readable.push(`data: ${JSON.stringify(data)}\n\n`);
};
const onMessage = (data: any) => {
readable.push(`event: message\n`);
readable.push(`data: ${JSON.stringify(data)}\n\n`);
};
eventBus.on('update', onUpdate);
eventBus.on('message', onMessage);
// Send initial connection event
readable.push(`event: connected\n`);
readable.push(`data: ${JSON.stringify({ status: 'connected' })}\n\n`);
// Clean up when client disconnects
readable.on('close', () => {
eventBus.off('update', onUpdate);
eventBus.off('message', onMessage);
});
return stream(readable, 'text/event-stream', {
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no', // Disable buffering in nginx
});
});
<!DOCTYPE html>
<html>
<head>
<title>SSE Example</title>
</head>
<body>
<div id="events"></div>
<script>
const eventsDiv = document.getElementById('events');
const eventSource = new EventSource('http://localhost:3000/events');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
const p = document.createElement('p');
p.textContent = `${data.timestamp}: ${data.message}`;
eventsDiv.appendChild(p);
};
// Handle custom event types
eventSource.addEventListener('update', (event) => {
console.log('Update:', JSON.parse(event.data));
});
eventSource.addEventListener('connected', (event) => {
console.log('Connected:', JSON.parse(event.data));
});
eventSource.onerror = (error) => {
console.error('EventSource error:', error);
};
</script>
</body>
</html>

Stream large files efficiently:

// src/routes/files/$fileId/get.ts
import { route, stream, notFound } from '@buildxn/http';
import { Type } from '@sinclair/typebox';
import { createReadStream, existsSync, statSync } from 'node:fs';
import { join } from 'node:path';
export default route()
.params(Type.Object({ fileId: Type.String() }))
.handle((req) => {
const { fileId } = req.params;
const filePath = join('/uploads', fileId);
if (!existsSync(filePath)) {
return notFound({ error: 'File not found' });
}
const stats = statSync(filePath);
const fileStream = createReadStream(filePath);
return stream(fileStream, 'application/octet-stream', {
'Content-Length': stats.size.toString(),
'Content-Disposition': `attachment; filename="${fileId}"`,
});
});

Generate and stream CSV data:

src/routes/export/users/get.ts
import { route, stream } from '@buildxn/http';
import { Readable } from 'node:stream';
import { db } from '../../../db';
export default route().handle(() => {
const users = db.users.getAll();
const readable = new Readable({
read() {},
});
// Send CSV header
readable.push('ID,Name,Email,Created At\n');
// Stream users one by one
for (const user of users) {
const row = `${user.id},"${user.name}","${user.email}",${user.createdAt}\n`;
readable.push(row);
}
// End the stream
readable.push(null);
return stream(readable, 'text/csv', {
'Content-Disposition': 'attachment; filename="users.csv"',
});
});

Stream JSON objects line by line:

src/routes/stream/logs/get.ts
import { route, stream } from '@buildxn/http';
import { Readable } from 'node:stream';
import { db } from '../../../db';
export default route().handle(() => {
const readable = new Readable({
read() {},
});
// Stream logs as JSON Lines (JSONL)
const logs = db.logs.getAll();
for (const log of logs) {
readable.push(JSON.stringify(log) + '\n');
}
readable.push(null);
return stream(readable, 'application/x-ndjson', {
'Cache-Control': 'no-cache',
});
});

Stream progress updates for long-running tasks:

// src/routes/process/$taskId/get.ts
import { route, stream, notFound } from '@buildxn/http';
import { Type } from '@sinclair/typebox';
import { Readable } from 'node:stream';
import { taskManager } from '../../../lib/tasks';
export default route()
.params(Type.Object({ taskId: Type.String() }))
.handle((req) => {
const { taskId } = req.params;
const task = taskManager.get(taskId);
if (!task) {
return notFound({ error: 'Task not found' });
}
const readable = new Readable({
read() {},
});
// Send initial status
readable.push(
`data: ${JSON.stringify({
status: task.status,
progress: 0,
})}\n\n`,
);
// Listen for progress updates
task.on('progress', (progress: number) => {
readable.push(
`data: ${JSON.stringify({
status: 'processing',
progress,
})}\n\n`,
);
});
task.on('complete', (result: any) => {
readable.push(
`data: ${JSON.stringify({
status: 'complete',
progress: 100,
result,
})}\n\n`,
);
readable.push(null);
});
task.on('error', (error: Error) => {
readable.push(
`data: ${JSON.stringify({
status: 'error',
error: error.message,
})}\n\n`,
);
readable.push(null);
});
return stream(readable, 'text/event-stream', {
'Cache-Control': 'no-cache',
});
});

Handle backpressure when streaming large datasets:

src/routes/stream/data/get.ts
import { route, stream } from '@buildxn/http';
import { Readable } from 'node:stream';
import { db } from '../../../db';
export default route().handle(() => {
let cursor = 0;
const batchSize = 100;
const readable = new Readable({
objectMode: false,
async read() {
try {
// Fetch batch of records
const records = await db.records.getBatch(cursor, batchSize);
if (records.length === 0) {
// No more data
this.push(null);
return;
}
// Push each record
for (const record of records) {
const data = JSON.stringify(record) + '\n';
// Respect backpressure
if (!this.push(data)) {
break;
}
}
cursor += records.length;
} catch (error) {
this.destroy(error as Error);
}
},
});
return stream(readable, 'application/x-ndjson');
});
Terminal window
# SSE endpoint
curl -N http://localhost:3000/events
# File download
curl http://localhost:3000/files/document.pdf -o document.pdf
# CSV export
curl http://localhost:3000/export/users > users.csv
# JSON Lines
curl http://localhost:3000/stream/logs
import fetch from 'node-fetch';
async function consumeSSE() {
const response = await fetch('http://localhost:3000/events');
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
console.log('Received:', chunk);
}
}
consumeSSE();
  1. Set Appropriate Headers: Always set Cache-Control: no-cache for SSE
  2. Handle Cleanup: Clean up intervals, listeners, and resources when streams close
  3. Handle Backpressure: Respect the push() return value for large datasets
  4. Error Handling: Handle and destroy streams on errors
  5. Connection Management: Monitor and handle client disconnections
  6. Buffering: Disable proxy buffering with X-Accel-Buffering: no header
  • Real-time dashboards: Live metrics and analytics
  • Chat applications: Real-time messages
  • Progress tracking: Long-running task updates
  • Live notifications: Push notifications to users
  • Data exports: Large CSV/JSON exports
  • Log streaming: Real-time log viewing
  • File downloads: Large file transfers