Skip to main content

Channel

Class

class Channel<T> implements PipeSource<T>, PipeTarget<T>

Where:

T: type of messages in Channel, defaults to unknown. Note void or undefined is allowed but this will make return values from tryReceive indistinguishable from message itself.

PipeSource<T>: it has the method pipe to bind a target to write values to.

PipeTarget<T>: it has an internal method that accepts messages from PipeSource.

Properties

capacity

get capacity(): number

Returns the capacity of the channel.

size

get size(): number

Inspects current number of elements queued in the channel.

closed

get closed(): boolean

Check if the channel is closed.

Methods

constructor

constructor(capacity?: number = Infinity);

Create a new channel with specified capacity. By default, the capacity will be Infinite, meaning the channel can buffer unlimited items within its internal queue (but array size may not exceed runtime).

Example:

import { Channel } from 'flowp'

//create an unbounded channel
const ch = new Channel()
// create a bounded channel which transmit strings
const ch = new Channel<string>(50)
// passing invalid capacity will result in error
const ch = new Channel(-1) // => throw RangeError

send

send(value: T): Promise<void>;

Sends a value to channel, and returns a promise which is resolved when the value is pushed to channel's internal buffer and available for consumers.

If the channel has reached its capacity, then call to send will be blocked until any message is consumed.

receive

receive(): Promise<T>;

Retrieves a value from channel. The returned promise will never resolve if pipe is enabled, and may race with stream. It's suggested to use only one style at the same time.

trySend

trySend(value: T): void;

Synchronosly sends a value to channel, and may throw ChannelFullError if not able to push to queue or ClosedChannelError if the channel is closed.

tryReceive

tryReceive(): T | undefined;

Synchronosly receives a value from channel, returns value or undefined if no message is available.

sendAsync

sendAsync(value: Promise<T>): Promise<void>

Sends a promise to the channel, message is queued once the promise is resolved.

note

There is no corresponding receiveAsync method since it would be identical to receive.

Examples:

const ch = new Channel<Response>()

// somewhere
ch.send(response)
try {
ch.trySend(response)
} catch (err) {
// channel is full or closed
}
ch.sendAsync(fetch('https://example.com'))

// elsewhere
const res = await ch.receive()
// or
const res = ch.tryReceive()
if (res) {
// message available in the channel
}

pipe

pipe(target: PipeTarget<T>, options?: ChannelPipeOptions): void

interface ChannelPipeOptions {
/**
* Called when `target[read]` throws e.g. pipe a closed target channel.
*
* param will be called immediately every time the read throws an error.
*/
onPipeError?: (err: unknown) => any
}

Sets channel's output mode to pipe and all messages will be directly written to target (unless frozen). You can pass an extra options param to setup an error handler. Capacity related checks are skipped so this is faster than receive and stream.

You can only have one pipe target at a time, but you can use ChannelHub if you want multiple readers

Some pipe helpers are available and exported under pipe namespace.

unpipe

unpipe(): void

Unsets pipe and subsequent messages will be queued.

Examples:

import { pipe } from 'flowp'

const ch1 = new Channel<Response>()
const ch2 = new Channel<User>()

ch1.pipe(
pipe.to((res) => ch2.sendAsync(res.json())),
{
onPipeError(err) {
ch1.pause()
},
}
)

stream

ES2018

stream(): ChannelStream<T>

interface ChannelStream<T> extends AsyncIterable<T> {
next: () => Promise<T>
}

Creates a stream that can be used in for await...of syntax.

It is suggested not to use only receive and stream at the same time.

Examples:

for await (const v of channel.stream()) {
console.log(v)
}

pause

pause(): void

Pauses the channel, blocking receive | tryReceive | stream | pipe from retriving messages. This is useful when downstream consumes messages at a limited rate.

resume

pause(): void

Resumes the channel so receive | tryReceive | stream | pipe can continue to handle new messages.

Examples:

const channel = new Channel()
const socket = net.connect(host, port)

channel.pipe(pipe.to((buffer) => {
if (socket.write(buffer)) {
pipe.pause()
socket.once('drain', () => channel.resume())
}
}))

close

close(): void

Closes the channel and subsequent calls to send will throw an error.

Existing messages can still be consumed until the last message in queue has been received, then call to receive will return a rejected promise.

ChannelHub

Class

class class ChannelHub<T = unknown> implements PipeTarget<T>

Where:

T: type of messages in ChannelHub, defaults to unknown. Note void or undefined is allowed but this will make return values from tryReceive indistinguishable from message itself.

PipeTarget<T>: it has an internal method that accepts messages from PipeSource.

Methods

from

static from<T>(writers?: Channel<T>[], readers?: Channel<T>[])

equivalent to ChannelHub.constructor

reader

reader(): Channel<T>

Get a reader channel that receives messages from the channel hub.

Example:

addEventListener(listener: (e: T) => void) {
hub.reader().pipe(pipe.to(listener))
}

writer

reader(): Channel<T>

Get a writer channel that send messages to the channel hub.

broadcast

broadcast(value: T): void

Broadcast a message to all readers.

disconnect

disconnect(ch: Channel<T>): void

Diconnect a channel from the hub, could be a reader or a writer

note

Disconnected channel will NOT be closed automatically, they can still be used to send and receive messages

close

close(): void

Close the hub and all readers/writers connected to it.