Combinators

all(...instructions: CallInstruction[]): Generator

all will accept an indefinite number of call(...) results and will produce a cancellablePromise which will resolve with an array of values that are produced by called routines. Usually you want to use it to mimic a Promise.all behavior on your routines. If any of the routines throws an error, others are cancelled if possible.

import { all, call } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';

const delay = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    })
}

function* childGenerator1 () {
    yield delay(1000);
    return 1;
}

function* childGenerator2 () {
    yield delay(600);
    return 2;
}

function* rootGenerator () {
    const results: number[] = yield all(call(childGenerator1), call(childGenerator2))
    console.log(results);
}

go(rootGenerator)
// [1, 2]

Cancelling a cancellablePromise produced by an all function will result into a recursive cancellation of all combined routines and their children.

alts: (defs: ArrayOfDefinitions<Channel>, defaultValue?: unknown) => Promise<OperationResponse>

alts will take an array of definitions and try to peform at most one operation based on those definitions. Definition can be either a channel to take value from, or a tuple [channel, value-to-put-into-this-channel]. It will return a promise which will resolve with a result of a first successfull operation which is an object with value and ch properties. ch property will point at a channel for which operation is succeeded. value will contain a result of that operation.

alts will first try to immediately perform an action using either a poll (for taking a value) or pull (for putting the value) operator. If none of those operations succeeds and defaultValue is specified, it will resolve a promise with this defaultValue. If no defaultValue is specified, it will try to peform those operations using put and take operators respectively.

Here is an example usage with defaultValue provided.

merge<Channels extends Channel[], AggregatedType = FlattenChannel<Flatten>>(channels: Channels, { bufferType, capacity }?: ChannelConfiguration): { ch: Channel; promise: Promise; }

merge will take an array of source channels and produce an object with a channel which will contain values from source channels and a promise which will resolve once all source channels are closed. The result channel will be closed once all values are taken from it. You can additionally pass a buffer type and capacity of a result channel. By default result channel will have an Unblocking buffer with unlimited capacity.

pipe<T = unknown>(destinationChannel: Channel, sourceChannel: Channel, keepOpen?: boolean): { promise: Promise; }

pipe will take a source channel and put all of its values into a destination channel until it closes. Destination channel will be closed once a source channel is closed unless a keepOpen parameter equals to true.

race(...instructions: CallInstruction[]): Generator

Race is a Promise.race port for routines. It will accept an indefinite number of call(...) results and will produce a cancellablePromise which will resolve with a value returned from a first completed routine. Other routines will be cancelled if possible.

raceToSuccess(...instructions: CallInstruction[]): Generator

Has the same interface as the simple race function but instead will resolve only after any routine is successfully resolved. If every routine failed, will reject with the first error that occured.

zip: (callback: (data: readonly any[]) => any, ...chs: Channel[]) => Promise

Accepts a callback and an indefinite number of source channels. It will call a callback with an array of values from each of the channels once each channel receives a value. zip will continue running until all source channels are closed. Returns a promise which will resolve once all source channels are closed.

Last updated