Skip to main content

Overview

Promise based multi producer single consumer channel, which can serve as an in-memory message queue.

buffered message queue

A channel can be created specifying its maximum capacity, messages will be buffered in queue until they're consumed.

const ch = new Channel<string>(10)

send / receive message transmit

Send message on one side, and receive on the other side. There are some synchronos variant available, see API

// somewhere
ch.send()
// elsewhere
await ch.receive()
// or
try {
ch.tryReceive()
} catch (err) {
// no message available in channel
}

piping to other channels (or use pipe.to())

import { pipe } from 'flowp' // if you need to pipe to custom target

const ch2 = new Channel<string>()

// pipe to another channel
ch.pipe(ch2, {
// triggered when writing message to target causes an error, e.g. ChannelClosedError
onPipeError: () => {},
})

ch.pipe(
pipe.to(() => {
// custom consumer
})
)

// a small helper
ch.pipe(pipe.to.console('error'))

consume with async iterator syntax

Use async iterator to consume messages.

const stream = ch.stream()

for await (const v of stream) {
handle(v)
}

temporarily block consumer

Block receive / tryReceive / stream / pipe from retriving new messages. It's useful if your target has limited rate of consumption like Node.js net.Socket

channel.pipe(
pipe.to((buf) => {
if (!socket.write(buf)) {
channel.pause()
socket.once('drain', () => channel.resume())
}
})
)

multiple consumers

A channel can only have one consumer at a time, but sometimes we have many consumers (e.g. EventEmitter). flowp provides a ChannelHub in this case.

class EventChannel<T> {
private hub: ChannelHub<T>
constructor(){ this.hub = new ChannelHub<T>() }
event(handler: (v: T) => any) {
const reader = this.hub.reader()
reader.pipe(pipe.to(handler))
return () => reader.unpipe()
}
fire(v: T) { this.hub.broadcast(v) }
}