Stream.broadcastProtocol
- 值:
Symbol.for('Stream.broadcastProtocol')
该值必须是一个函数。当被 Broadcast.from() 调用时,它会接收到传递给 Broadcast.from() 的选项,并且必须返回符合 Broadcast 接口的对象。实现完全自定义 —— 它可以以任意方式管理消费者、缓冲和背压。
🌐 The value must be a function. When called by Broadcast.from(), it receives
the options passed to Broadcast.from() and must return an object conforming
to the Broadcast interface. The implementation is fully custom -- it can
manage consumers, buffering, and backpressure however it wants.
import { Broadcast, text } from 'node:stream/iter'; // This example defers to the built-in Broadcast, but a custom // implementation could use any mechanism. class MessageBus { #broadcast; #writer; constructor() { const { writer, broadcast } = Broadcast(); this.#writer = writer; this.#broadcast = broadcast; } [Symbol.for('Stream.broadcastProtocol')](options) { return this.#broadcast; } send(data) { this.#writer.write(new TextEncoder().encode(data)); } close() { this.#writer.end(); } } const bus = new MessageBus(); const { broadcast } = Broadcast.from(bus); const consumer = broadcast.push(); bus.send('hello'); bus.close(); console.log(await text(consumer)); // 'hello'const { Broadcast, text } = require('node:stream/iter'); // This example defers to the built-in Broadcast, but a custom // implementation could use any mechanism. class MessageBus { #broadcast; #writer; constructor() { const { writer, broadcast } = Broadcast(); this.#writer = writer; this.#broadcast = broadcast; } [Symbol.for('Stream.broadcastProtocol')](options) { return this.#broadcast; } send(data) { this.#writer.write(new TextEncoder().encode(data)); } close() { this.#writer.end(); } } const bus = new MessageBus(); const { broadcast } = Broadcast.from(bus); const consumer = broadcast.push(); bus.send('hello'); bus.close(); text(consumer).then(console.log); // 'hello'