npm version TypeScript License: MIT Bundle Size
Not just another streaming library.
A library that makes ReadableStream a first-class citizen with reactive extensions.
日本語 | English
// nagare: Stream<T> IS ReadableStream<T> + methods const nagareStream = stream.from(readableStream); // Zero overhead nagareStream instanceof ReadableStream; // true!
// nagare: Minimal footprint, tree-shakeable import { stream } from '@aid-on/nagare'; // Native performance, no wrapper objects
// nagare: Native edge runtime support export default { async fetch(request) { return stream .fromSSE('/api/chat') .mapAsync(processWithAI) .toResponse(); // Direct to Response object! } }
// Process 10 items concurrently but maintain order! const results = await stream .array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) .mapAsync(async (n) => { await delay(Math.random() * 1000); // Random delays return n * 2; }, 10) // Concurrency: 10 .collect(); console.log(results); // ALWAYS [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] // Order preserved despite concurrent execution!
// Stream automatically pauses when consumer is slow stream.subscribe({ next: async (value) => { await heavyProcessing(value); // Stream waits! // No memory overflow, no lost data } });
// Works with ANY server (Windows, Unix, Mac) const events = await stream.fromSSE('/api/events'); // Automatically handles \r\n, \n, and \r line endings
// Choose your style! // Reactive (pull-based) const s1 = stream.from(source) .map(x => x * 2) .filter(x => x > 10); // Imperative (push-based) const s2 = stream.create((controller) => { controller.next(1); controller.next(2); controller.complete(); });
export default { async fetch(request: Request) { const aiStream = stream.create<string>((controller) => { // Stream AI responses as they generate const response = await ai.complete(prompt, { stream: true, onToken: (token) => controller.next(token) }); }); return aiStream .tap(token => metrics.record(token)) .throttle(50) // Prevent client overflow .toSSE() .toResponse({ headers: { 'Content-Type': 'text/event-stream', 'X-Powered-By': 'nagare' } }); } };
const pipeline = stream .fromSSE('/api/market-data') .mapAsync(async data => { // Parallel enrichment with order preservation const [analysis, prediction] = await Promise.all([ analyzeMarket(data), predictTrend(data) ]); return { ...data, analysis, prediction }; }, 5) // Process 5 concurrently .buffer(10) // Batch for efficiency .tap(batch => database.insert(batch)) .debounce(100); // Prevent UI thrashing
npm install @aid-on/nagare
import { stream } from '@aid-on/nagare'; // Your first nagare stream const result = await stream .array([1, 2, 3, 4, 5]) .map(x => x * 2) .filter(x => x > 5) .collect(); console.log(result); // [6, 8, 10]
- Streams are the primitive - Not observables, not promises
- Edge-first - Built for Cloudflare, Deno, Bun from day one
- Zero magic - What you see is what runs
- Type safety - Full TypeScript with no compromises
- Web standards - ReadableStream is the foundation
- Edge application developers - First-class edge runtime support
- Performance enthusiasts - Minimal overhead, maximum throughput
- Type-safety advocates - Full TypeScript with strict types
- Stream processing experts - Advanced operators with backpressure
- AI/ML engineers - Perfect for streaming LLM responses
MIT © Aid-On