Streams

In order to facilitate the conception of custom processing pipelines, components rely on a reactive programming paradigm to generate or react to particular event streams. The reactive programming is well-suited for the development of such event-driven and interactive applications. It facilitates the management of asynchronous data streams, their transformation and the propagation of change to the relevant dependents. Each component exposes a set of data streams containing the various events produced by the component. These data streams can easily be manipulated (filtered, transformed, combined) and plugged into other components to define pipelines.

Elements of reactive programming

From André Staltz's The introduction to Reactive Programming you've been missingopen in new window:

Reactive programming is programming with asynchronous data streams.

In a way, this isn't anything new. Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some side effects. Reactive is that idea on steroids. You are able to create data streams of anything, not just from click and hover events. Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc. For example, imagine your Twitter feed would be a data stream in the same fashion that click events are. You can listen to that stream and react accordingly.

On top of that, you are given an amazing toolbox of functions to combine, create and filter any of those streams.

That's where the "functional" magic kicks in. A stream can be used as an input to another one. Even multiple streams can be used as inputs to another stream. You can merge two streams. You can filter a stream to get another one that has only those events you are interested in. You can map data values from one stream to another new one.

Schematically, a stream looks like this:

Schematic representation of a data stream

A stream is sequence of ongoing events ordered in time. Streams can be finite or infinite. A Stream's event can be a value, an error or an end signal that indicates the streams has ended.

Marcelle relies on a reactive programming library called Most.jsopen in new window. While RxJS is certainly the most popular JavaScript reactive programing library, Most.js offers high performance and explicit time representation.

All Most.js operators are documented online: https://mostcore.readthedocs.io/en/latest/open in new window

Stream

Marcelle's main Stream class is a wrapper around Most.jsopen in new window streams, designed to:

  • facilitate the integration with Svelte components
  • allow for impartively pushing events (like RxJs's Subject)
  • allow for holding values (like RxJs's BehaviorSubject)
  • offer a fluent API using @most/fluentopen in new window

The following factory function creates and returns a Marcelle Stream from a Most.js Stream:

createStream<T>(s: MostStream<T> | T, hold: boolean): Stream<T>

Parameters

OptionTypeDescriptionRequired
sMostStream<T> | TA stream from the most library, or a value
holdbooleanIf true, the last event is stored and delivered to each new oserver. This uses @most/holdopen in new window, and is similar to RxJs's BehaviorSubject

Example

const $timer = marcelle.createStream(mostCore.periodic(500));
const $rnd = $timer
  .map(() => Math.random())
  .filter((x) => x > 0.5)
  .map((x) => (x - 0.5) * 1000));
$rnd.subscribe(console.log);

.hold()

hold(h: boolean = true): Stream<T>

Hold the stream values. When called, all new subscribers will receive the latest value at the time of subscription.

.start()

start(): void

Start the stream processing, even if no subscriber has been registered. This method is called automatically on subscribe.

.stop()

stop(): void

Imperatively stop the stream processing. Calling stop will result in an end event being emitted on the stream.

.subscribe()

subscribe(run: (value: T) => void = dummySubscriber, invalidate = () => {}): () => void

The subscribe method must accept as its argument a subscription function. All of a streams's active subscription functions are synchronously called whenever a new event is emitted on the stream. If a stream is held, this subscription function must be immediately and synchronously called with the stream's current value upon calling subscribe.

.get()

Get the value of the latest event on the stream, if it was created with hold=true. Otherwise, an error is thrown.

.set()

set(value: T) => void;

Imperatively push an event on the stream with the given value

.thru()

thru<B>(f: (s: Stream<T>) => MostStream<B>): Stream<B>

Apply functions fluently to a Stream, wrapping the result in a new Stream. Use thru when you want to continue dot-chaining other Stream operations.

.ap()

ap<B>(fs: Stream<(a: T) => B>): Stream<B>

Apply the latest function in a Stream of functions to the latest value of
another Stream. In effect, ap applies a time-varying function to a
time-varying value.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
fsStream<(a: T) => B>Function stream

.awaitPromises()

awaitPromises<A>(): Stream<A>

Turn a Stream of promises into a Stream containing the promises’ values.

See on Most Docsopen in new window

.chain()

chain<B>(f: (value: T) => Stream<B>): Stream<B>

Transform each event in stream into a new Stream, and then merge each into
the resulting Stream. Note that f must return a Stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(value: T) => Stream<B>function returning a stream

.combine()

combine<A, R>(f: (a: A, b: T) => R, stream1: Stream<A>): Stream<R>

Apply a function to the most recent event from each Stream when a new event
arrives on any Stream.Note that combine waits for at least one event to arrive on all input
Streams before it produces any events.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: A, b: T) => RCombinator function
stream1Stream<A>Event stream 1

.concatMap()

concatMap<B>(f: (a: T) => Stream<B>): Stream<B>

Transform each event in stream into a Stream, and then concatenate each
onto the end of the resulting Stream. Note that f must return a Stream.The mapping function f is applied lazily. That is, f is called only once
it is time to concatenate a new stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: T) => Stream<B>Function returning a stream

.constant()

constant<B>(x: B): Stream<B>

Replace each event value with x.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
xBevent data

.continueWith()

continueWith<U>(f: () => Stream<U>): Stream<T | U>

Replace the end of a Stream with another Stream. When stream ends, f
will be called and must return a Stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f() => Stream<U>Function that returns a stream

.debounce()

debounce(period: number): Stream<T>

Wait for a burst of events to subside and keep only the last event in the
burst.If the Stream ends while there is a pending debounced event (e.g., via
until), the pending event will occur just before the Stream ends.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
periodnumberDebounce period

.delay()

delay(delayTime: number): Stream<T>

Timeshift a Stream by the specified Delay.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
delayTimenumberDelay time (ms)

.during()

during(timeWindow: Stream<Stream<unknown>>): Stream<T>

Keep events that occur during a time window defined by a higher-order Stream.

Parameters

ParameterTypeDefaultDescription
timeWindowStream<Stream<unknown>>Higher order stream defining a time window

.filter()

filter(p: (a: T) => boolean): Stream<T>

Retain only events for which a predicate is truthy.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
p(a: T) => booleanPredicate

Example

a = periodic(200)
  .rand()
  .filter((x) => x > 0.8)
  .tap(log);

.join()

join<U>(): Stream<U>

Given a higher-order Stream, return a new Stream that merges all the inner
Streams as they arrive.

See on Most Docsopen in new window

.loop()

loop<B, S>(stepper: (seed: S, a: T) => SeedValue<S, B>, seed: S): Stream<B>

Accumulate results using a feedback loop that emits one value and feeds back
another to be used in the next iteration.It allows you to maintain and update a “state” (a.k.a. feedback, a.k.a. seed
for the next iteration) while emitting a different value. In contrast, scan
feeds back and produces the same value.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
stepper(seed: S, a: T) => SeedValue<S, B>Stepper function
seedSSeed

.map()

map<U>(f: (a: T) => U): Stream<U>

Apply a function to each event value.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: T) => UUnary function

Example

// Apply a function (in this example, double) to all events in a stream
f = (x) => 2 * x;
a = periodic(500).constant(1).accum().map(f).tap(log);

.merge()

merge<A>(stream1: Stream<A>): Stream<A | T>

Create a new Stream containing events from two Streams.Merging creates a new Stream containing all events from the two original
Streams without affecting the time of the events. You can think of the
events from the input Streams simply being interleaved into the new, merged
Stream. A merged Stream ends when all of its input Streams have ended.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
stream1Stream<A>Event stream 1

Example

a = periodic(500).take(3).constant('a');
b = periodic(100).take(3).constant(2);
c = a.merge(b).tap(log);

.mergeConcurrently()

mergeConcurrently<U>(concurrency: number): Stream<U>

Given a higher-order Stream, return a new Stream that merges inner Streams
as they arrive up to the specified concurrency. Once concurrency number of
Streams are being merged, newly arriving Streams will be merged after an
existing one ends.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
concurrencynumberconcurrency level

.mergeMapConcurrently()

mergeMapConcurrently<B>(f: (a: T) => Stream<B>, concurrency: number): Stream<B>

Lazily apply a function f to each event in a Stream, merging them into the
resulting Stream at the specified concurrency. Once concurrency number of
Streams are being merged, newly arriving Streams will be merged after an
existing one ends.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: T) => Stream<B>Unary function
concurrencynumberconcurrency level

.recoverWith()

recoverWith<A, E extends Error>(f: (error: E) => Stream<A>): Stream<T | A>

Recover from a stream failure by calling a function to create a new Stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(error: E) => Stream<A>Function returning a new stream from an error

.resample()

resample<B>(sampler: Stream<B>): Stream<T>

Like sample, but the value stream and sampler streams are switched

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
samplerStream<B>Sampler stream

Example

// Sample a noise signal from a stream of click events
noise = periodic(20).rand().plot({ legend: 'noise' });
click = noise.resample(click(doc)).tap(log);

.sample()

sample<A>(values: Stream<A>): Stream<A>

For each event in the current Stream, replace the event value with the latest
value in another Stream. The resulting Stream will contain the same number
of events as the sampler Stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
valuesStream<A>value stream

Example

// Sample a noise signal from a stream of click events
noise = periodic(20).rand().plot({ legend: 'noise' });
click = click(doc).sample(noise).tap(log);

.scan()

scan<B>(f: (b: B, a: T) => B, initial: B): Stream<B>

Incrementally accumulate results, starting with the provided initial value.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(b: B, a: T) => BScanning reducer
initialBInitial Value

Example

// Accumulate the values of a constant stream
a = periodic(500)
  .constant(2)
  .scan((s, x) => s + x, 0)
  .tap(log);

.since()

since(startSignal: Stream<unknown>): Stream<T>

Discard all events in one Stream until the first event occurs in another.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
startSignalStream<unknown>Start signal

.skip()

skip(n: number): Stream<T>

Discard the first n events from stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
nnumberNumber of events

.skipAfter()

skipAfter(p: (a: T) => boolean): Stream<T>

Discard all events after the first event for which predicate returns true.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
p(a: T) => booleanPredicate

.skipRepeats()

skipRepeats(): Stream<T>

Remove adjacent repeated events.

See on Most Docsopen in new window

.skipRepeatsWith()

skipRepeatsWith(equals: (a1: T, a2: T) => boolean): Stream<T>

Remove adjacent repeated events, using the provided equality function to
compare adjacent events.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
equals(a1: T, a2: T) => booleanEquality function

.skipWhile()

skipWhile(p: (a: T) => boolean): Stream<T>

Discard all events until predicate returns false, and keep the rest.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
p(a: T) => booleanPredicate

.slice()

slice(start: number, end: number): Stream<T>

Keep only events in a range, where start <= index < end, and index is the
ordinal index of an event in stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
startnumberstart index
endnumberend index

.snapshot()

snapshot<A, C>(f: (a: A, b: T) => C, values: Stream<A>): Stream<C>

For each event in a sampler Stream, apply a function to combine its value
with the most recent event value in another Stream. The resulting Stream
will contain the same number of events as the sampler Stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: A, b: T) => CSnapshot function
valuesStream<A>Value stream

.startWith()

startWith(x: T): Stream<T>

Prepend an event at time 0.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
xTEvent data

.switchLatest()

switchLatest<U>(): Stream<U>

Given a higher-order Stream, return a new Stream that adopts the behavior of
(i.e., emits the events of) the most recent inner Stream.

See on Most Docsopen in new window

.take()

take(n: number): Stream<T>

Keep at most the first n events from stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
nnumberNumber of events

.takeWhile()

takeWhile(p: (a: T) => boolean): Stream<T>

Keep all events until predicate returns false, and discard the rest.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
p(a: T) => booleanPredicate

.tap()

tap(f: (a: T) => void): Stream<T>

Perform a side effect for each event in a Stream. For each event in stream,
f is called, but the value of its result is ignored. If f fails (i.e.,
throws an error), then the returned Stream will also fail. The Stream
returned by tap will contain the same events as the original Stream.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: T) => voidTap function

Example

// Apply a function with side effects, to log the values to the console
a = periodic(500).rand().tap(log);

.throttle()

throttle(period: number): Stream<T>

Limit the rate of events by suppressing events that occur too often

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
periodnumberThrottle period

.until()

until(endSignal: Stream<unknown>): Stream<T>

Keep all events in one Stream until the first event occurs in another.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
endSignalStream<unknown>End signal

.withItems()

withItems<A>(items: A[]): Stream<A>

Replace each event value with the array item at the respective index. The
resulting Stream will contain the same number of events as the input Stream,
or array.length events, whichever is less.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
itemsA[]Items array

.withLocalTime()

withLocalTime(origin: Time): Stream<T>

Create a Stream with localized Time values, whose origin (i.e., time 0) is
at the specified Time on the Scheduler provided when the Stream is observed
with runEffects or run.When implementing custom higher-order Stream combinators, such as chain, you
should use withLocalTime to localize “inner” Streams before running them.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
originTimeorigin time value

.zip()

zip<A, R>(f: (a: A, b: T) => R, stream1: Stream<A>): Stream<R>

Apply a function to corresponding pairs of events from the inputs Streams.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: A, b: T) => RCombinator function
stream1Stream<A>Event stream 1

.zipItems()

zipItems<A, C>(f: (a: A, b: T) => C, items: A[]): Stream<C>

Apply a function to the latest event and the array value at the respective
index. The resulting Stream will contain the same number of events as the
input Stream, or array.length events, whichever is less.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
f(a: A, b: T) => CCombinator function
itemsA[]Items

Stream sources (static methods)

empty()

static empty(): Stream<never>

Create a Stream containing no events and ends immediately.

See on Most Docsopen in new window

never()

static never(): Stream<never>

Create a Stream containing no events and never ends.

See on Most Docsopen in new window

now()

static now<A>(x: A): Stream<A>

Create a Stream containing a single event at time 0.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
xAevent value

at()

static at<A>(t: Time, x: A): Stream<A>

Create a Stream containing a single event at a specific time.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
tTimeevent time (ms)
xAevent value

periodic()

static periodic(period: number): Stream<void>

Create an infinite Stream containing events that occur at a specified Period. The first event occurs at time 0, and the event values are undefined.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
periodnumberstream period (ms)

Example

const myPeriodicRandomStream = Stream.periodic(500)
  .map(() => Math.random())
  .subscribe(console.log);

throwError()

static throwError(e: Error): Stream<never>

Create a Stream that fails with the provided Error at time 0. This can be useful for functions that need to return a Stream and also need to propagate an error.

See on Most Docsopen in new window

Parameters

ParameterTypeDefaultDescription
eErrorerror