Combinators

all(...instructions: CallInstruction[]): Generator

all will accept an indefinite number of call(...) results and will produce a cancellablePromise which will resolve with an array of values that are produced by called routines. Usually you want to use it to mimic a Promise.all behavior on your routines. If any of the routines throws an error, others are cancelled if possible.

import { all, call } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';

const delay = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    })
}

function* childGenerator1 () {
    yield delay(1000);
    return 1;
}

function* childGenerator2 () {
    yield delay(600);
    return 2;
}

function* rootGenerator () {
    const results: number[] = yield all(call(childGenerator1), call(childGenerator2))
    console.log(results);
}

go(rootGenerator)
// [1, 2]

Cancelling a cancellablePromise produced by an all function will result into a recursive cancellation of all combined routines and their children.

alts: (defs: ArrayOfDefinitions<Channel>, defaultValue?: unknown) => Promise<OperationResponse>

alts will take an array of definitions and try to peform at most one operation based on those definitions. Definition can be either a channel to take value from, or a tuple [channel, value-to-put-into-this-channel]. It will return a promise which will resolve with a result of a first successfull operation which is an object with value and ch properties. ch property will point at a channel for which operation is succeeded. value will contain a result of that operation.

alts will first try to immediately perform an action using either a poll (for taking a value) or pull (for putting the value) operator. If none of those operations succeeds and defaultValue is specified, it will resolve a promise with this defaultValue. If no defaultValue is specified, it will try to peform those operations using put and take operators respectively.

import { alts, take } from 'csp-coffee/operators'
import { OperationResponse } from 'csp-coffee/operators/combinators/alts';
import { go } from 'csp-coffee/go';
import { makeChannel } from 'csp-coffee/channel';

const ch1 = makeChannel();
const ch2 = makeChannel();

function* mainGenerator () {
    const result: OperationResponse<any> = yield alts([ch1, [ch2, 'test1']]);

    console.log(result.value); // true, since put is successful
    
    // true, since you cannot poll an empty ch1 channel
    // and put operation to ch2 was successful
    console.log(ch2.is(result.ch)); 
    const takeResult: string = yield take(ch2);
    console.log(takeResult); // 'test1'
}

go(mainGenerator)

Here is an example usage with defaultValue provided.

import { alts } from 'csp-coffee/operators'
import { OperationResponse } from 'csp-coffee/operators/combinators/alts';
import { go } from 'csp-coffee/go';
import { makeChannel } from 'csp-coffee/channel';

const ch1 = makeChannel();
const ch2 = makeChannel();

function* mainGenerator () {
    const result: OperationResponse<any> = yield alts([ch1, ch2], 'default value');
    // We can't poll ch1 since it is empty
    // We can't poll ch2 since it is empty as well
    // Since both of our operations failed and a default value was provided
    // It is returned from alts
    console.log(result.value); // 'default value'
    console.log(result.ch); // null
}

go(mainGenerator)

merge<Channels extends Channel[], AggregatedType = FlattenChannel<Flatten>>(channels: Channels, { bufferType, capacity }?: ChannelConfiguration): { ch: Channel; promise: Promise; }

merge will take an array of source channels and produce an object with a channel which will contain values from source channels and a promise which will resolve once all source channels are closed. The result channel will be closed once all values are taken from it. You can additionally pass a buffer type and capacity of a result channel. By default result channel will have an Unblocking buffer with unlimited capacity.

import { fromArray, iterate, merge } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';

const ch1 = fromArray([1, 2]);
const ch2 = fromArray([3, 4]);

function* mainGenerator () {
    const { ch: mergedCh, promise } = merge([ch1, ch2]);
    yield iterate((value: string | number) => {
        console.log(value);
    }, mergedCh);
}

go(mainGenerator)
// 1
// 2
// 3
// 4

pipe<T = unknown>(destinationChannel: Channel, sourceChannel: Channel, keepOpen?: boolean): { promise: Promise; }

pipe will take a source channel and put all of its values into a destination channel until it closes. Destination channel will be closed once a source channel is closed unless a keepOpen parameter equals to true.

import { fromArray, pipe, iterate } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';
import { makeChannel } from 'csp-coffee/channel';
import { CreatableBufferType } from 'csp-coffee/buffer';

const ch1 = fromArray([1, 2]);
const ch2 = makeChannel(CreatableBufferType.FIXED);

function* mainGenerator () {
    const { promise } = pipe(ch2, ch1);
    yield iterate((value: string | number) => {
        console.log(value);
    }, ch2);
}

go(mainGenerator)
// 1
// 2

race(...instructions: CallInstruction[]): Generator

Race is a Promise.race port for routines. It will accept an indefinite number of call(...) results and will produce a cancellablePromise which will resolve with a value returned from a first completed routine. Other routines will be cancelled if possible.

import { race, call } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';

const delay = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    })
}

function* childGenerator1 () {
    yield delay(1000);
    return 10;
}

function* childGenerator2 () {
    throw new Error('Oops!')
}

function* mainGenerator () {
    try {
        const result: number = yield race(call(childGenerator1), call(childGenerator2))
        console.log(result); // Will not be logged
    } catch (e) {
        console.log(e); // Error: Oops!
    }
}

go(mainGenerator)

raceToSuccess(...instructions: CallInstruction[]): Generator

Has the same interface as the simple race function but instead will resolve only after any routine is successfully resolved. If every routine failed, will reject with the first error that occured.

zip: (callback: (data: readonly any[]) => any, ...chs: Channel[]) => Promise

Accepts a callback and an indefinite number of source channels. It will call a callback with an array of values from each of the channels once each channel receives a value. zip will continue running until all source channels are closed. Returns a promise which will resolve once all source channels are closed.

import { putAsync, zip } from 'csp-coffeee/operators';
import { go } from 'csp-coffee/go';
import { makeChannel } from 'csp-coffee/channel';
import { CreatableBufferType } from 'csp-coffee/buffer';

const ch1 = makeChannel(CreatableBufferType.UNBLOCKING);
const ch2 = makeChannel(CreatableBufferType.UNBLOCKING);

function* mainGenerator () {
    zip((values => {
        console.log(values)
    }), ch1, ch2);
    yield putAsync(ch1, 'left');
    yield putAsync(ch2, 'right');
}

go(mainGenerator)
// ['left', 'right']

Last updated