Core

Overview

This section will present a set of what I consider to be core operators e.g those that present a core functionality to work with channels and routines.

call<O extends (...a1: readonly any[]) => any, Params extends Parameters>(fn: O, ...args: Params): CallInstruction

This is the most basic operator and on a surface it just applies a function to whatever arguments you supply to it. If a function returns a generator, promise or an operator's result, it still will be handled properly by a generator runner. Its main purpose is to register a function call as an operation inside a generator so when using testGeneratorRunner this call would show up and you would be able to test against it.

import { call } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go' 

const delay = (ms: number) => {
    return new Promise((resolve, reject) => {
        setTimeout(resolve, ms);
    })
}

function* testGenerator () {
    yield call(delay, 1000);
}

go(testGenerator)

put<C extends Channel>(ch: C, data: FlattenChannel): Generator<Generator<undefined, void, unknown>, boolean, unknown>

Is a blocking operation. This operator will attempt to put a value to the channel. It will return true if operation is successfull, or false if operation is failed due to closed channel. It will throw an error if you try to put a null into the channel. Once the put operation is complete the generator resumes its execution. Put operation can be blocked if the channel's buffer is not able to receive new values, for example if the channel contains a fixed buffer and it is full. Has a promise variant which can be used outside of generators called putAsync.

import { makeChannel } from 'csp-coffee/channel';
import { put } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go' 

const ch = makeChannel();

function* testGenerator () {
    const result = yield put(ch, 'test value')
    console.log(result) // true
}

go(testGenerator)
import { close, makeChannel } from 'csp-coffee/channel';
import { put } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go' 

const ch = makeChannel();
close(ch)

function* testGenerator () {
    const result = yield put(ch, 'test value')
    console.log(result); // false
}

go(testGenerator)

take<C extends Channel>(ch: C): Generator<Generator<undefined, any, unknown>, any, FlattenChannel | null>

Is a blocking operation. Will attempt to get a value from the channel. Will throw an error if an operation is performed against a closed channel. An error can be checked with isChannelClosedError . The return value is the value taken from the channel. Has a promise variant which can be used outside of generators called takeAsync.

import { makeChannel } from 'csp-coffee/channel';
import { put, take } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go'
import { CreatableBufferType } from 'csp-coffee/buffer';

const ch = makeChannel(CreatableBufferType.UNBLOCKING);

function* testGenerator () {
    yield put(ch, 'test value');
    const result: string = yield take(ch);
    console.log(result); // 'test value'
}

go(testGenerator)

offer<C extends Channel>(ch: C, data: FlattenChannel): Generator<undefined, boolean | null, unknown>

Attempts to immediately insert a value into the channel. Returns either true or false. Returns false if the channel is closed or the channel is not ready to accept an incoming put. Returns true is operations succeeded.

import { makeChannel } from 'csp-coffee/channel';
import { offer } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go'
import { CreatableBufferType } from 'csp-coffee/buffer';

const ch = makeChannel(CreatableBufferType.FIXED, 1);


function* testGenerator () {
    const result1: boolean = yield offer(ch, 'this will work');
    const result2: boolean = yield offer(ch, 'this will not work');
    console.log(result1); // true
    console.log(result2); // false, cause the channel capacity is 1
}

go(testGenerator)

poll<C extends Channel>(ch: C): Generator<undefined, any, unknown>

Attempts to immediately retrieve value from the channel. Returns null if the channel is closed or channel has no values. Returns a value if an operation is successful.

import { makeChannel } from 'csp-coffee/channel';
import { offer, poll } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go'
import { CreatableBufferType } from 'csp-coffee/buffer';

const ch = makeChannel(CreatableBufferType.FIXED, 1);


function* testGenerator () {
    yield offer(ch, 'this will work');
    const result1: string | null = yield poll(ch);
    const result2: string | null = yield poll(ch);
    console.log(result1); // 'this will work'
    console.log(result2); // null cause no values in the channel
}

go(testGenerator)

fork<GenFn extends (...a1: readonly any[]) => Generator>(genFn: GenFn, ...args: Parameters): ForkInstruction

Is a non-blocking operation. Accepts a generator function and launches it as a forked child routine relative to the parent generator. Parent generator will not be done unless all forked routines are done executing. Cancelling parent generator will cause a recursive cancellation of forked routines. Errors from forked routines will propagate to the parent generator.

import { fork } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go'
import { CancellablePromise } from 'csp-coffee/cancellablePromise'

const delay = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    })
}

function* forkedGenerator () {
    yield delay(1000);
    console.log('I am forked generator');
}

function* testGenerator () {
    const forkedPromise: CancellablePromise<any> = yield fork(forkedGenerator);
    console.log('I have forked a generator', forkedPromise) // Will be logged first
    // Since fork is a non-blocking operation
}

go(testGenerator)
// I have forked a generator { Cancellable Promise }
// I am forked generator

Cancelling forked routine

Inside a generator a value returned from yielding a fork operation is a cancellable promise which resolves once a forked generator is done. So you can just simply cancel this promise to cancel a forked generator by using .cancel method.

import { fork } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go'
import { CancellablePromise } from 'csp-coffee/cancellablePromise'

const delay = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    })
}

function* forkedGenerator () {
    yield delay(1000);
    console.log('I am forked generator');
}

function* testGenerator () {
    const forkedPromise: CancellablePromise<any> = yield fork(forkedGenerator);
    yield forkedPromise.cancel() // here we are cancelling our forkedGenerator
    console.log('I have forked a generator', forkedPromise)
}

go(testGenerator)
// I have forked a generator { Cancellable Promise }

Error propagation

If a forked routine throws an error while the parent routine is blocked, the error will be propagated into a try..catch block inside a parent generator. Parent routine will be cancelled immediately if possible. In this case delay(1000) started running before an error was thrown. And since we can't cancel a native promise, parent generator will wait for it to complete before cancelling itself. We can imagine a case when a generator call could be instead of our delay, and since we can indeed cancel a generator execution mid air, parent routine would terminate as soon as a generator running is cancelled.

import { fork } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go'
import { CancellablePromise } from 'csp-coffee/cancellablePromise'

const delay = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    })
}

function* forkedGenerator () {
    yield;
    throw new Error('Oops!')
}

function* testGenerator () {
    try {
        yield fork(forkedGenerator);
        yield delay(1000);
        console.log('I have forked a generator') // This will not be loggged
    } catch (e) {
        // an error 'Oops!' will be caught here
        // since parent generator is blocked on delay(1000) call
    }
}

go(testGenerator)

If parent routine does not have a try..catch block the whole parent routine will be rejected with an error and can be caught by using a .catch method on cancellablePromise returned by go function.

import { fork } from 'csp-coffee/operators'
import { go } from 'csp-coffee/go'

const delay = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    })
}

function* forkedGenerator () {
    yield;
    throw new Error('Oops!')
}

function* testGenerator () {
    yield fork(forkedGenerator);
    yield delay(1000);
    console.log('I have forked a generator') // This will not be logged
}

const { cancellablePromise } = go(testGenerator)

cancellablePromise.then(value => {
    console.log('Value', value)
}).catch(() => {
    console.log('Error caught'); // We catch an unhandled error here
})

spawn<GenFn extends (...a1: readonly any[]) => Generator>(genFn: GenFn, ...args: Parameters): SpawnInstruction

Is a non-blocking operation. Accepts a generator function to spawn. Spawned routines in comparison with forked ones will not propagate errors to parent routine. Spawned routines also will not be "waited" for by a parent routine once it completes. This operators also returns a cancellablePromise which can be cancelled in a same manner as with forked routines.

Last updated