Stream.broadcastProtocol


  • 值:Symbol.for('Stream.broadcastProtocol')

该值必须是一个函数。当被 Broadcast.from() 调用时,它会接收到传递给 Broadcast.from() 的选项,并且必须返回符合 Broadcast 接口的对象。实现完全自定义 —— 它可以以任意方式管理消费者、缓冲和背压。

🌐 The value must be a function. When called by Broadcast.from(), it receives the options passed to Broadcast.from() and must return an object conforming to the Broadcast interface. The implementation is fully custom -- it can manage consumers, buffering, and backpressure however it wants.

import { Broadcast, text } from 'node:stream/iter';
// This example defers to the built-in Broadcast, but a custom
// implementation could use any mechanism.
class MessageBus {
 #broadcast;
 #writer;
 constructor() {
 const { writer, broadcast } = Broadcast();
 this.#writer = writer;
 this.#broadcast = broadcast;
 }
 [Symbol.for('Stream.broadcastProtocol')](options) {
 return this.#broadcast;
 }
 send(data) {
 this.#writer.write(new TextEncoder().encode(data));
 }
 close() {
 this.#writer.end();
 }
}
const bus = new MessageBus();
const { broadcast } = Broadcast.from(bus);
const consumer = broadcast.push();
bus.send('hello');
bus.close();
console.log(await text(consumer)); // 'hello'const { Broadcast, text } = require('node:stream/iter');
// This example defers to the built-in Broadcast, but a custom
// implementation could use any mechanism.
class MessageBus {
 #broadcast;
 #writer;
 constructor() {
 const { writer, broadcast } = Broadcast();
 this.#writer = writer;
 this.#broadcast = broadcast;
 }
 [Symbol.for('Stream.broadcastProtocol')](options) {
 return this.#broadcast;
 }
 send(data) {
 this.#writer.write(new TextEncoder().encode(data));
 }
 close() {
 this.#writer.end();
 }
}
const bus = new MessageBus();
const { broadcast } = Broadcast.from(bus);
const consumer = broadcast.push();
bus.send('hello');
bus.close();
text(consumer).then(console.log); // 'hello'

AltStyle によって変換されたページ (->オリジナル) /