Collection

Overview

This section lists operators that treat a channel as a collection of values and provide ways to work it from this point of view.

iterate<Channels extends Channel[]>(callback: (data: FlattenChannels) => any, ...chs: Channels): Generator

iterate is a function which returns a generator. Because it returns a generator it should be yielded inside generators running in go function to be handled properly. This function is used internally by all other functions listed below in this section. iterate will take each value of the channel or channels and call our callback with it. Once a channel is closed, iterate will stop running. Note that this function can technically accept any number of channels to iterate through simultaneously. In example below we pass two channels to it as additional arguments after our callback.

import { fromArray, iterate } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';

const ch = fromArray([1, 2, 3]);
const ch2 = fromArray([4, 5, 6]); 
// channel produced by fromArray function
// is automatically closed once all values are taken

function* testGenerator () {
    yield iterate((val) => {
        console.log(val);
    }, ch, ch2);
}

go(testGenerator)
// 1
// 2
// 3
// 4
// 5
// 6

drain<C extends Channel>(ch: C): Promise<FlattenChannel[]>

drain accepts a channel as an argument and returns a promise which resolves once the channel is closed. The result of the promise is an array of values collected from that channel. Because this function returns a native promise, it can be used outside of generators as well.

import { makeChannel, close } from 'csp-coffee/channel';
import { drain, putAsync } from 'csp-coffee/operators'
import { CreatableBufferType } from 'csp-coffee/buffer';

const ch = makeChannel(CreatableBufferType.FIXED);

async function main () {
    const drainPromise = drain(ch);
    
    await putAsync(ch, 1);
    await putAsync(ch, 2);
    close(ch);

    const drainResult = await drainPromise;
    console.log(drainResult); // [1, 2]
}

main()

map<Channels extends Channel[], M extends unknown = any>(mapFn: (data: FlattenChannels) => M, channels: Channels, { bufferType, capacity }?: ChannelConfiguration): ChannelTransformationResponse

map will accept an array of source channels along with map function and create a new object which will contain a result channel and a promis. The result channel will contain results from source channel mapped with your map function. Promise will resolve once all values are mapped and put to the result channel and all source channels are closed. Result channel will close when all values are taken from the channel. You can additionally pass a buffer type and capacity of a result channel. By default result channel will have an Unblocking buffer with unlimited capacity.

import { fromArray, map, iterate } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';

const ch = fromArray([1, 2, 3]);
const ch2 = fromArray([4, 5, 6]);

function* testGenerator () {
    const { ch: mappedCh, promise } = map((val: number) => {
        return val * 10
    }, [ch, ch2]);
    yield iterate((value) => {
        console.log(value);
    }, mappedCh);
}

go(testGenerator)
// 10
// 40
// 20
// 50
// 30
// 60

filter<Channels extends Channel[]>(filterFn: (data: FlattenChannels) => boolean, channels: Channels, { bufferType, capacity }?: ChannelConfiguration): ChannelTransformationResponse<FlattenChannels>

filter will accept an array of source channels and a filter function and return an object containing a result channel and a promise. Result channel will contain values from source channels for which your map function will return true. Promise will resolve once all filtered values are put into a result channel and all source channels are closed. Result channel will close when all values are taken from the channel. You can additionally pass a buffer type and capacity of a result channel. By default result channel will have an Unblocking buffer with unlimited capacity.

import { fromArray, filter, iterate } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go';

const ch = fromArray([1, 2, 3]);
const ch2 = fromArray([4, 5, 6]);

function* testGenerator () {
    const { ch: mappedCh, promise } = filter((val: number) => {
        return val % 2 === 0
    }, [ch, ch2]);
    yield iterate((value) => {
        console.log(value);
    }, mappedCh);
}

go(testGenerator)
// 4
// 2
// 6

reduce<Channels extends Channel[], A = unknown>(reducer: (acc: A, data: FlattenChannels) => A, acc: A, channels: Channels, { bufferType, capacity }?: ChannelConfiguration): ChannelTransformationResponse

reduce will accept a reducer, an accumulator and an array of source channels and produce an object containing a result channel and a promise. Result channel will contain a reduced value from the source channels once all source channels are closed. Promise will resolve once all source channels are closed. Result channel will close when all values are taken from it. You can additionally pass a buffer type and capacity of a result channel. By default result channel will have an Unblocking buffer with unlimited capacity.

import { fromArray, iterate, reduce } from 'csp-coffee/operators'
import { go } from 'csp-coffeep/go';

const ch = fromArray([1, 2, 3]);
const ch2 = fromArray([4, 5, 6]);

function* testGenerator () {
    const { ch: mappedCh, promise } = reduce((acc, nextValue) => {
        return acc + nextValue
    }, 0, [ch, ch2]);
    yield iterate((value) => {
        console.log(value);
    }, mappedCh);
}

go(testGenerator)
// 21

Last updated