Install
npm install node-streams
transform stream, buffers all chunks until wait function calls. Pushes buffered chunks array.
(options: TransformOptions) => (wait: WaitFn) => Transform
WaitFn = (callback: () => void) => CancelFn
bufferedstream will callwaitfunction for timeout, so it is possible to provide different period each time
import { buffer } from 'node-streams' // we have the streams with `objectMode` set to true declare var inStream: ReadableStream declare var outStream: WritableStream const wait100ms = (callback: () => void) => { const id = setTimeout(callback, 100) return () => clearTimeout(id) } const buffered = buffer({ objectMode: true // set up standard TranformOptions })( wait100ms // buffer for 100ms then push accumulated data ) inStream .pipe(buffered) .pipe(outStream)
buffers all chunks for provided time period. Pushes buffered chunks array.
(options: TransformOptions) => (ms: number) => Transform
import { bufferTime } from 'node-streams' // we have the streams with `objectMode` set to true declare var inStream: ReadableStream declare var outStream: WritableStream const buffered = buffer({ objectMode: true // set up standard TranformOptions })( 100 // buffer for 100ms then push accumulated data ) inStream .pipe(buffered) .pipe(outStream)
delivers latest values from all streams as an array, ends when any of streams does
(options: ReadableOptions) => (...streams: ReadableStream[]): ReadableStream
import { combine } from 'node-streams' // we have the streams declare var stream0: ReadableStream [..1..2..3..4..] declare var stream1: ReadableStream ['a'...'b'] // create combined readable const combined = combine({ objectMode: true // provide standard ReadableOptions })(stream0, stream1) combined .on('data', ([value0, value1]) => { // [undefined, 'a']..[1, 'a']..[2, 'a'].[2, 'b'] }) .on('end', () => {})
concatenates all streams, subscribing to next on previous has ended
(options: ReadableOptions) => (...streams: ReadableStream[]): ReadableStream
import { concat } from 'node-streams' // we have the streams declare var stream0: ReadableStream [..1..2] declare var stream1: ReadableStream [..'a'..'b'] // create combined readable const combined = concat({ objectMode: true // provide standard ReadableOptions })(stream0, stream1) combined .on('data', ([value0, value1]) => { // [..1..2..'a'..'b'] }) .on('end', () => {})
skips all fast arriving data, until idle period, after that pushes last received chunk.
(options: TransformOptions) => (wait: WaitFn) => Transform
debouncedstream will cancel and rearmwaitfunction repeatedly on every chunk received.
WaitFn = (cb: () => void) => CancelFn
providedwaitfunction should properly release timeout resources onCancelFncall.
import { debounce } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream declare var outStream: WritableStream // we have `setTimeout` function with baked in 10ms time // CAUTION: wait function should be able to cancel timeout const wait10ms = (callback: () => void) => { const id = setTimeout(callback, 10) return () => clearTimeout(id) } const debounced = debounce({ objectMode: true // set up standard TranformOptions })( wait10ms // debounce stream for 10ms ) inStream .pipe(debounced) .pipe(outStream)
skips all fast arriving data, until idle period, after that pushes last received chunk.
(options: TransformOptions) => (ms: number) => Transform
import { debounceTime } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream declare var outStream: WritableStream const debounced = debounceTime({ objectMode: true // set up standard TranformOptions })( 10 // debounce stream for 10ms ) inStream .pipe(debounced) .pipe(outStream)
(options: TransformOptions) => (ms: number) => Transform
import { delay } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream declare var outStream: WritableStream const delayed = delay({ objectMode: true // set up standard TransformOptions })( 100 // delay stream events up to 100ms ) inStream .pipe(delayed) .pipe(outStream)
(options: TransformOptions) => (isEqual: (a: T, b: T) => boolean) => Transform
import { distinct } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream // [1..2..2..3..3..2..1] declare var outStream: WritableStream const unique = distinct({ objectMode: true // set up standard TransformOptions })( (a, b) => a === b // provide compare function ) inStream .pipe(unique) .pipe(outStream) // [1..2....3....2..1]
(options: TransformOptions) => Transform
import { distinctUntilChanged } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream // [1..2..2..3..3..2..1] declare var outStream: WritableStream const unique = distinctUntilChanged({ objectMode: true // set up standard TransformOptions }) inStream .pipe(unique) .pipe(outStream) // [1..2....3....2..1]
(options: ReadableOptions) => Readable
import { empty } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream declare var outStream: WritableStream (myCondition ? inStream : empty({}) ).pipe(outStream)
(options: TransformOptions) => (predicate: (arg: T) => boolean) => Transform
import { filter } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream // [1..2..3..4..5..6..7] declare var outStream: WritableStream const filtered = filter({ objectMode: true // provide standard TransformOptions })( a => a % 2 === 0 // is even ) inStream .pipe(filtered) .pipe(outStream) // [..2....4....6..]
(options: TransformOptions) => Transform
import { first } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream // [1..2..3..4..5..6..7] declare var outStream: WritableStream const firstTransform = first({ objectMode: true // provide standard TransformOptions }) inStream .pipe(firstTransform) .pipe(outStream) // [1]
(options: ReadableOptions) => (iterable: Iterable<T>) => Readable
import { from } from 'node-streams' const myStream = from({ objectMode: true // provide standard ReadableOptions })( [1, 2, 3, 4, 5] // provide Iterable ) myStream .on('data', () => {}) // subscribe and get the data .on('end', () => {})
(options: TransformOptions) => Transform
import { last } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream // [1..2..3..4..5..6..7] declare var outStream: WritableStream const lastTransform = last({ objectMode: true // provide standard TransformOptions }) inStream .pipe(lastTransform) .pipe(outStream) // [7]
(options: TransformOptions) => (xf: (value: T) => R) => Transform
import { map } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream // [1..2..3..4] declare var outStream: WritableStream const mapped = map({ objectMode: true // provide standard TransformOptions })( x => x * 2 ) inStream .pipe(mapped) .pipe(outStream) // [2..4..6..8]
(options: ReadableOptions) => (...streams: ReadableStreams[]) => Readable
import { merge } from 'node-streams' // we have the streams in `objectMode` declare var stream0: ReadableStream // [1...2...3...4] declare var stream1: ReadableStream // [..'a'.....'b'...] const merged = merge({ objectMode: true // provide standard ReadableOptions })( stream0, stream1 ) merged .on('data' () => {}) // [1..'a'..2..3..'b'..4..] .on('end', () => {})
(options: ReadableOptions) => (...values: T[]) => Readable
import { of } from 'node-streams' const myStream = of({ objectMode: true // provide standard ReadableOptions })( 1, 2, 3, 4 ) myStream .on('data', () => {}) // [1.2.3.4] .on('end', () => {})
(options: ReadableOptions) => (wait: WaitFn) => (...values: T[]) => Readable
WaitFn = (callback: () => void) => UnsubscribeFn
import { ofAsync } from 'node-streams' // we have custom wait function const wait100ms = (cb: () => void) => { const id = seTimeout(cb, 100) return () => clearTimeout(id) } const myStream = ofAsync({ objectMode: true // provide standard ReadableOptions })( wait100ms // set WaitFn )( 1, 2, 3, 4, 5 // provide values ) myStream .on('data', () => {}) // [1...2...3...4...5] .on('end', () => {})
(options: ReadableOptions) => (ms: number) => (...values: T[]) => Readable
import { ofTime } from 'node-streams' const myStream = ofTime({ objectMode: true // provide standard ReadableOptions })( 100 // provide time in milliseconds )( 1, 2, 3, 4 // values to stream ) myStream .on('data', () => {}) // [1...2...3...4] .on('end', () => {})
(...streams: Array<ReadWriteStream | ReadWriteStream[]>) => ReadWriteStream[]
import { pipe } from 'node-streams' // we have the following streams declare var inStream: ReadableStream declare var outStream: WritableStream declare var tripleValues: TransformStream declare var isEvenValues: TransformStream declare var takeFirstValue: TransformStream const tripleEven = pipe( tripleValues, isEvenValues ) const firstTripleEven = pipe( tripleEven, takeFirstValue ) pipe( inStream, firstTripleEven, outStream )
(opts: TransformOptions) => (propName: string) => Transform
import { pluck } from 'node-streams' // we have the following streams in "objectMode" declare var inStream: ReadableStream declare var outStream: WritableStream const pluckMyProp = pluck({ objectMode: true // provide standard TransformOptions })( 'my-prop' // property name ) inStream .pipe(pluckMyProp) .pipe(outStream)
(options: TransformOptions) => (reducer: (state: S, value: T) => S) => Transform
import { reduce } from 'node-streams' // we have the following streams in "objectMode" declare var inStream: ReadableStream // [1..2..3..4] declare var outStream: WritableStream // we have the following reducer const addAll = (acc = 0, value: number) => acc + value const reduceTransform = reduce({ objectMode: true // provide standard TransformOptions })( addAll // set the reducer ) inStream .pipe(reduceTransform) .pipe(outStream) // [............10]
(options: TransformOptions) => (reducer: (state: S, value: T) => S) => Transform
import { scan } from 'node-streams' // we have the following streams in "objectMode" declare var inStream: ReadableStream // [1..2..3..4] declare var outStream: WritableStream // we have the following reducer const addAll = (acc = 0, value: number) => acc + value const scanTransform = reduce({ objectMode: true //provide standard TransformOptions })( addAll ) inStream .pipe(scanTransform) .pipe(outStream) // [1..3..6..10]
(options: TransformOptions) => (sideEffect: (value: T) => void) => Transform
import { side } from 'node-streams' // we have the following streams in "objectMode" declare var inStream: ReadableStream // [1..2..3..4] declare var outStream: WritableStream const sideEffect = side({ objectMode: true // provide standard TransformOptions })( console.log ) inStream .pipe(sideEffect) .pipe(outStream)
(options: TransformOptions) => (numSkip: number) => Transform
import { skip } from 'node-streams' // we have the following streams in "objectMode" declare var inStream: ReadableStream // [1..2..3..4] declare var outStream: WritableStream const skipTransform = skip({ objectMode: true // provide standard TransformOptions })( 2 // skip 2 chunks ) inStream .pipe(skipTransform) .pipe(outStream) // [....3..4]
(options: ReadableOptions) => (...values: T[]) => (readable: ReadableStream) => Readable
import { startWith } from 'node-streams' // we have the following streams in "objectMode" declare var inStream: ReadableStream // [1..2..3..4] declare var outStream: WritableStream const prependedReadable = startWith({ objectMode: true // provide standard ReadableOptions })( 'a', 'b', 'c' )( inStream // provide the stream to prepend ) prependedReadable.pipe(outStream) // ['a'.'b'.'c'.1..2..3..4]
(options: TransformOptions) => (numTake: number) => Transform
import { take } from 'node-streams' // we have the following streams in "objectMode" declare var inStream: ReadableStream // [1..2..3..4] declare var outStream: WritableStream const takeTransform = take({ objectMode: true // provide standard TransformOptions })( 2 // take 2 chunks ) inStream .pipe(takeTransform) .pipe(outStream) // [1..2]
(options: TransformOptions) => (wait: WaitFn) => Transform
import { throttle } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream declare var outStream: WritableStream // we have `setTimeout` function with baked in 10ms time // CAUTION: wait function should be able to cancel timeout const wait10ms = (callback: () => void) => { const id = setTimeout(callback, 10) return () => clearTimeout(id) } const throttled = throttle({ objectMode: true // set up standard TranformOptions })( wait10ms // debounce stream for 10ms ) inStream .pipe(throttled) .pipe(outStream)
(options: TransformOptions) => (ms: number) => Transform
import { throttleTime } from 'node-streams' // we have the streams in `objectMode` declare var inStream: ReadableStream declare var outStream: WritableStream const throttled = debounceTime({ objectMode: true // set up standard TranformOptions })( 10 // debounce stream for 10ms ) inStream .pipe(throttled) .pipe(outStream)
(options: ReadableOptions) => (...streams: ReadableStream[]) => (mainStream: ReadableStream) => Readable
import { withLatest } from 'node-streams' // we have the streams in `objectMode` declare var mainStream: ReadableStream // [1..2..3] declare var stream0: ReadableStream // ['a'..'b'] declare var stream1: ReadableStream // [true............false] const combined = withLatest({ objectMode: true // standard ReadableOptions })( stream0, stream1 // streams to take the latest values )( mainStream // mainStream to sync with ) combined .on('data', () => {}) // [[1, 'a', true]..[2, 'a', true]..[3, 'b', true]] .on('end', () => {})
(options: ReadableOptions) => (...streams: ReadableStream[]) => Readable
import { zip } from 'node-streams' // we have the streams in `objectMode` declare var stream0: ReadableStream // ['a'..'b'] declare var stream1: ReadableStream // [true............false] const combined = zip({ objectMode: true // standard ReadableOptions })( stream0, stream1 // streams to combine ) combined .on('data', () => {}) // [['a', true]..........['b', false]] .on('end', () => {})
({ next, error?, complete? }: IObserver) => (...streams: ReadableStream[]) => UnsubscribeFn
type IObserver = { next: (value: T) => void, error?: (e: Error) => void, complete?: () => void } type UnsubscribeFn = () => void
import { subscribe } from 'node-streams' // we have the following streams declare var stream0: ReadableStream // [1..2..3] declare var stream1: ReadableStream // [..'a'..'b'..] const unsub = subscribe({ next: console.log // [1..'a'..2..3..'b'..] })( stream0, stream1 )
({ next, error?, complete? }: IObserverEx) => (...streams: ReadableStream[]) => UnsubscribeFn
type EmitterValue = { value: T, index: number, emitter: EventEmitter, emitterIndex: number, event: string } type IObserverEx = { next: (value: EmitterValue) => void, error?: (e: Error) => void, complete: () => void } type UnsubscribeFn = () => void
import { subscribeEx } from 'node-streams' // we have the following streams declare var stream0: ReadableStream // [1..2..3] declare var stream1: ReadableStream // [..'a'..'b'..] const unsub = subscribeEx({ next: ({value, index, emitter, emitterIndex, event}) => console.log(`value ${value} from stream ${emitterIndex}`) })( stream0, stream1 )
({ next, error?, complete? }: IObserver) => (...streams: ReadableStream[]) => UnsubscribeFn
import { subscribeReadable } from 'node-streams' // we have the following streams declare var stream0: ReadableStream // [1..2..3] declare var stream1: ReadableStream // [..'a'..'b'..] const unsub = subscribeReadable({ next: console.log // [1..'a'..2..3..'b'..] })( stream0, stream1 )
({ next, error?, complete? }: IObserverEx) => (...streams: ReadableStream[]) => UnsubscribeFn
import { subscribeReadableEx } from 'node-streams' // we have the following streams declare var stream0: ReadableStream // [1..2..3] declare var stream1: ReadableStream // [..'a'..'b'..] const unsub = subscribeReadableEx({ next: ({value, index, emitter, emitterIndex, event}) => console.log(`value ${value} from stream ${emitterIndex}`) })( stream0, stream1 )