Channel
Class
class Channel<T> implements PipeSource<T>, PipeTarget<T>
Where:
T
: type of messages in Channel, defaults to unknown
. Note void
or undefined
is allowed but this will make return values from tryReceive indistinguishable from message itself.
PipeSource<T>
: it has the method pipe to bind a target to write values to.
PipeTarget<T>
: it has an internal method that accepts messages from PipeSource
.
Properties
capacity
get capacity(): number
Returns the capacity of the channel.
size
get size(): number
Inspects current number of elements queued in the channel.
closed
get closed(): boolean
Check if the channel is closed.
Methods
constructor
constructor(capacity?: number = Infinity);
Create a new channel with specified capacity. By default, the capacity will be Infinite
, meaning the channel can buffer unlimited items within its internal queue (but array size may not exceed runtime).
Example:
import { Channel } from 'flowp'
//create an unbounded channel
const ch = new Channel()
// create a bounded channel which transmit strings
const ch = new Channel<string>(50)
// passing invalid capacity will result in error
const ch = new Channel(-1) // => throw RangeError
send
send(value: T): Promise<void>;
Sends a value to channel, and returns a promise which is resolved when the value is pushed to channel's internal buffer and available for consumers.
If the channel has reached its capacity, then call to send will be blocked until any message is consumed.
receive
receive(): Promise<T>;
Retrieves a value from channel. The returned promise will never resolve if pipe is enabled, and may race with stream. It's suggested to use only one style at the same time.
trySend
trySend(value: T): void;
Synchronosly sends a value to channel, and may throw ChannelFullError
if not able to push to queue or ClosedChannelError
if the channel is closed.
tryReceive
tryReceive(): T | undefined;
Synchronosly receives a value from channel, returns value or undefined if no message is available.
sendAsync
sendAsync(value: Promise<T>): Promise<void>
Sends a promise to the channel, message is queued once the promise is resolved.
There is no corresponding receiveAsync
method since it would be identical to receive.
Examples:
const ch = new Channel<Response>()
// somewhere
ch.send(response)
try {
ch.trySend(response)
} catch (err) {
// channel is full or closed
}
ch.sendAsync(fetch('https://example.com'))
// elsewhere
const res = await ch.receive()
// or
const res = ch.tryReceive()
if (res) {
// message available in the channel
}
pipe
pipe(target: PipeTarget<T>, options?: ChannelPipeOptions): void
interface ChannelPipeOptions {
/**
* Called when `target[read]` throws e.g. pipe a closed target channel.
*
* param will be called immediately every time the read throws an error.
*/
onPipeError?: (err: unknown) => any
}
Sets channel's output mode to pipe
and all messages will be directly written to target (unless frozen). You can pass an extra options
param to setup an error handler. Capacity related checks are skipped so this is faster than receive and stream.
You can only have one pipe target at a time, but you can use ChannelHub
if you want multiple readers
Some pipe helpers are available and exported under pipe namespace.
unpipe
unpipe(): void
Unsets pipe and subsequent messages will be queued.
Examples:
import { pipe } from 'flowp'
const ch1 = new Channel<Response>()
const ch2 = new Channel<User>()
ch1.pipe(
pipe.to((res) => ch2.sendAsync(res.json())),
{
onPipeError(err) {
ch1.pause()
},
}
)
stream
stream(): ChannelStream<T>
interface ChannelStream<T> extends AsyncIterable<T> {
next: () => Promise<T>
}
Creates a stream that can be used in for await...of syntax.
It is suggested not to use only receive and stream at the same time.
Examples:
for await (const v of channel.stream()) {
console.log(v)
}
pause
pause(): void
Pauses the channel, blocking receive | tryReceive | stream | pipe from retriving messages. This is useful when downstream consumes messages at a limited rate.
resume
pause(): void
Resumes the channel so receive | tryReceive | stream | pipe can continue to handle new messages.
Examples:
const channel = new Channel()
const socket = net.connect(host, port)
channel.pipe(pipe.to((buffer) => {
if (socket.write(buffer)) {
pipe.pause()
socket.once('drain', () => channel.resume())
}
}))
close
close(): void
Closes the channel and subsequent calls to send will throw an error.
Existing messages can still be consumed until the last message in queue has been received, then call to receive will return a rejected promise.
ChannelHub
Class
class class ChannelHub<T = unknown> implements PipeTarget<T>
Where:
T
: type of messages in ChannelHub, defaults to unknown
. Note void
or undefined
is allowed but this will make return values from tryReceive indistinguishable from message itself.
PipeTarget<T>
: it has an internal method that accepts messages from PipeSource
.
Methods
from
static from<T>(writers?: Channel<T>[], readers?: Channel<T>[])
equivalent to ChannelHub.constructor
reader
reader(): Channel<T>
Get a reader channel that receives messages from the channel hub.
Example:
addEventListener(listener: (e: T) => void) {
hub.reader().pipe(pipe.to(listener))
}
writer
reader(): Channel<T>
Get a writer channel that send messages to the channel hub.
broadcast
broadcast(value: T): void
Broadcast a message to all readers.
disconnect
disconnect(ch: Channel<T>): void
Diconnect a channel from the hub, could be a reader or a writer
Disconnected channel will NOT be closed automatically, they can still be used to send and receive messages
close
close(): void
Close the hub and all readers/writers connected to it.