# Streams

In order to facilitate the conception of custom processing pipelines, components rely on a reactive programming paradigm to generate or react to particular event streams. The reactive programming is well-suited for the development of such event-driven and interactive applications. It facilitates the management of asynchronous data streams, their transformation and the propagation of change to the relevant dependents. Each component exposes a set of data streams containing the various events produced by the component. These data streams can easily be manipulated (filtered, transformed, combined) and plugged into other components to define pipelines.

# Elements of reactive programming

From André Staltz's The introduction to Reactive Programming you've been missing (opens new window):

Reactive programming is programming with asynchronous data streams.

In a way, this isn't anything new. Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some side effects. Reactive is that idea on steroids. You are able to create data streams of anything, not just from click and hover events. Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc. For example, imagine your Twitter feed would be a data stream in the same fashion that click events are. You can listen to that stream and react accordingly.

On top of that, you are given an amazing toolbox of functions to combine, create and filter any of those streams.

That's where the "functional" magic kicks in. A stream can be used as an input to another one. Even multiple streams can be used as inputs to another stream. You can merge two streams. You can filter a stream to get another one that has only those events you are interested in. You can map data values from one stream to another new one.

Schematically, a stream looks like this:

Schematic representation of a data stream

A stream is sequence of ongoing events ordered in time. Streams can be finite or infinite. A Stream's event can be a value, an error or an end signal that indicates the streams has ended.

Marcelle relies on a reactive programming library called Most.js (opens new window). While RxJS is certainly the most popular JavaScript reactive programing library, Most.js offers high performance and explicit time representation.

All Most.js operators are documented online: https://mostcore.readthedocs.io/en/latest/ (opens new window)

# Stream

Marcelle's main Stream class is a wrapper around Most.js (opens new window) streams, designed to:

  • facilitate the integration with Svelte components
  • allow for impartively pushing events (like RxJs's Subject)
  • allow for holding values (like RxJs's BehaviorSubject)
  • offer a fluent API using @most/fluent (opens new window)

The following factory function creates and returns a Marcelle Stream from a Most.js Stream:

createStream<T>(s: MostStream<T> | T, hold: boolean): Stream<T>

# Parameters

Option Type Description Required
s MostStream<T> | T A stream from the most library, or a value
hold boolean If true, the last event is stored and delivered to each new oserver. This uses @most/hold (opens new window), and is similar to RxJs's BehaviorSubject

# Example

const $timer = marcelle.createStream(mostCore.periodic(500));
const $rnd = $timer
  .map(() => Math.random())
  .filter((x) => x > 0.5)
  .map((x) => (x - 0.5) * 1000));
$rnd.subscribe(console.log);

# .hold()

hold(): Stream<T>

Hold the stream values. When called, all new subscribers will receive the latest value at the time of subscription.

# .start()

start(): void

Start the stream processing, even if no subscriber has been registered. This method is called automatically on subscribe.

# .stop()

stop(): void

Imperatively stop the stream processing. Calling stop will result in an end event being emitted on the stream.

# .subscribe()

subscribe(run: (value: T) => void = dummySubscriber, invalidate = () => {}): () => void

The subscribe method must accept as its argument a subscription function. All of a streams's active subscription functions are synchronously called whenever a new event is emitted on the stream. If a stream is held, this subscription function must be immediately and synchronously called with the stream's current value upon calling subscribe.

# .thru()

thru<B>(f: (s: Stream<T>) => MostStream<B>): Stream<B>

Apply functions fluently to a Stream, wrapping the result in a new Stream. Use thru when you want to continue dot-chaining other Stream operations.

# .ap()

ap<B>(fs: Stream<(a: T) => B>): Stream<B>

Apply the latest function in a Stream of functions to the latest value of
another Stream. In effect, ap applies a time-varying function to a
time-varying value.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
fs Stream<(a: T) => B> Function stream

# .awaitPromises()

awaitPromises<A>(): Stream<A>

Turn a Stream of promises into a Stream containing the promises’ values.

See on Most Docs (opens new window)

# .chain()

chain<B>(f: (value: T) => Stream<B>): Stream<B>

Transform each event in stream into a new Stream, and then merge each into
the resulting Stream. Note that f must return a Stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (value: T) => Stream<B> function returning a stream

# .combine()

combine<A, R>(f: (a: A, b: T) => R, stream1: Stream<A>): Stream<R>

Apply a function to the most recent event from each Stream when a new event
arrives on any Stream.Note that combine waits for at least one event to arrive on all input
Streams before it produces any events.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: A, b: T) => R Combinator function
stream1 Stream<A> Event stream 1

# .concatMap()

concatMap<B>(f: (a: T) => Stream<B>): Stream<B>

Transform each event in stream into a Stream, and then concatenate each
onto the end of the resulting Stream. Note that f must return a Stream.The mapping function f is applied lazily. That is, f is called only once
it is time to concatenate a new stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: T) => Stream<B> Function returning a stream

# .constant()

constant<B>(x: B): Stream<B>

Replace each event value with x.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
x B event data

# .continueWith()

continueWith<U>(f: () => Stream<U>): Stream<T | U>

Replace the end of a Stream with another Stream. When stream ends, f
will be called and must return a Stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f () => Stream<U> Function that returns a stream

# .debounce()

debounce(period: number): Stream<T>

Wait for a burst of events to subside and keep only the last event in the
burst.If the Stream ends while there is a pending debounced event (e.g., via
until), the pending event will occur just before the Stream ends.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
period number Debounce period

# .delay()

delay(delayTime: number): Stream<T>

Timeshift a Stream by the specified Delay.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
delayTime number Delay time (ms)

# .during()

during(timeWindow: Stream<Stream<unknown>>): Stream<T>

Keep events that occur during a time window defined by a higher-order Stream.

# Parameters

Parameter Type Default Description
timeWindow Stream<Stream<unknown>> Higher order stream defining a time window

# .filter()

filter(p: (a: T) => boolean): Stream<T>

Retain only events for which a predicate is truthy.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
p (a: T) => boolean Predicate

# Example

a = periodic(200)
  .rand()
  .filter((x) => x > 0.8)
  .tap(log);

# .join()

join<U>(): Stream<U>

Given a higher-order Stream, return a new Stream that merges all the inner
Streams as they arrive.

See on Most Docs (opens new window)

# .loop()

loop<B, S>(stepper: (seed: S, a: T) => SeedValue<S, B>, seed: S): Stream<B>

Accumulate results using a feedback loop that emits one value and feeds back
another to be used in the next iteration.It allows you to maintain and update a “state” (a.k.a. feedback, a.k.a. seed
for the next iteration) while emitting a different value. In contrast, scan
feeds back and produces the same value.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
stepper (seed: S, a: T) => SeedValue<S, B> Stepper function
seed S Seed

# .map()

map<U>(f: (a: T) => U): Stream<U>

Apply a function to each event value.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: T) => U Unary function

# Example

// Apply a function (in this example, double) to all events in a stream
f = (x) => 2 * x;
a = periodic(500).constant(1).accum().map(f).tap(log);

# .merge()

merge<A>(stream1: Stream<A>): Stream<A | T>

Create a new Stream containing events from two Streams.Merging creates a new Stream containing all events from the two original
Streams without affecting the time of the events. You can think of the
events from the input Streams simply being interleaved into the new, merged
Stream. A merged Stream ends when all of its input Streams have ended.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
stream1 Stream<A> Event stream 1

# Example

a = periodic(500).take(3).constant('a');
b = periodic(100).take(3).constant(2);
c = a.merge(b).tap(log);

# .mergeConcurrently()

mergeConcurrently<U>(concurrency: number): Stream<U>

Given a higher-order Stream, return a new Stream that merges inner Streams
as they arrive up to the specified concurrency. Once concurrency number of
Streams are being merged, newly arriving Streams will be merged after an
existing one ends.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
concurrency number concurrency level

# .mergeMapConcurrently()

mergeMapConcurrently<B>(f: (a: T) => Stream<B>, concurrency: number): Stream<B>

Lazily apply a function f to each event in a Stream, merging them into the
resulting Stream at the specified concurrency. Once concurrency number of
Streams are being merged, newly arriving Streams will be merged after an
existing one ends.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: T) => Stream<B> Unary function
concurrency number concurrency level

# .recoverWith()

recoverWith<A, E extends Error>(f: (error: E) => Stream<A>): Stream<T | A>

Recover from a stream failure by calling a function to create a new Stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (error: E) => Stream<A> Function returning a new stream from an error

# .resample()

resample<B>(sampler: Stream<B>): Stream<T>

Like sample, but the value stream and sampler streams are switched

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
sampler Stream<B> Sampler stream

# Example

// Sample a noise signal from a stream of click events
noise = periodic(20).rand().plot({ legend: 'noise' });
click = noise.resample(click(doc)).tap(log);

# .sample()

sample<A>(values: Stream<A>): Stream<A>

For each event in the current Stream, replace the event value with the latest
value in another Stream. The resulting Stream will contain the same number
of events as the sampler Stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
values Stream<A> value stream

# Example

// Sample a noise signal from a stream of click events
noise = periodic(20).rand().plot({ legend: 'noise' });
click = click(doc).sample(noise).tap(log);

# .scan()

scan<B>(f: (b: B, a: T) => B, initial: B): Stream<B>

Incrementally accumulate results, starting with the provided initial value.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (b: B, a: T) => B Scanning reducer
initial B Initial Value

# Example

// Accumulate the values of a constant stream
a = periodic(500)
  .constant(2)
  .scan((s, x) => s + x, 0)
  .tap(log);

# .since()

since(startSignal: Stream<unknown>): Stream<T>

Discard all events in one Stream until the first event occurs in another.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
startSignal Stream<unknown> Start signal

# .skip()

skip(n: number): Stream<T>

Discard the first n events from stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
n number Number of events

# .skipAfter()

skipAfter(p: (a: T) => boolean): Stream<T>

Discard all events after the first event for which predicate returns true.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
p (a: T) => boolean Predicate

# .skipRepeats()

skipRepeats(): Stream<T>

Remove adjacent repeated events.

See on Most Docs (opens new window)

# .skipRepeatsWith()

skipRepeatsWith(equals: (a1: T, a2: T) => boolean): Stream<T>

Remove adjacent repeated events, using the provided equality function to
compare adjacent events.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
equals (a1: T, a2: T) => boolean Equality function

# .skipWhile()

skipWhile(p: (a: T) => boolean): Stream<T>

Discard all events until predicate returns false, and keep the rest.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
p (a: T) => boolean Predicate

# .slice()

slice(start: number, end: number): Stream<T>

Keep only events in a range, where start <= index < end, and index is the
ordinal index of an event in stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
start number start index
end number end index

# .snapshot()

snapshot<A, C>(f: (a: A, b: T) => C, values: Stream<A>): Stream<C>

For each event in a sampler Stream, apply a function to combine its value
with the most recent event value in another Stream. The resulting Stream
will contain the same number of events as the sampler Stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: A, b: T) => C Snapshot function
values Stream<A> Value stream

# .startWith()

startWith(x: T): Stream<T>

Prepend an event at time 0.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
x T Event data

# .switchLatest()

switchLatest<U>(): Stream<U>

Given a higher-order Stream, return a new Stream that adopts the behavior of
(i.e., emits the events of) the most recent inner Stream.

See on Most Docs (opens new window)

# .take()

take(n: number): Stream<T>

Keep at most the first n events from stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
n number Number of events

# .takeWhile()

takeWhile(p: (a: T) => boolean): Stream<T>

Keep all events until predicate returns false, and discard the rest.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
p (a: T) => boolean Predicate

# .tap()

tap(f: (a: T) => void): Stream<T>

Perform a side effect for each event in a Stream. For each event in stream,
f is called, but the value of its result is ignored. If f fails (i.e.,
throws an error), then the returned Stream will also fail. The Stream
returned by tap will contain the same events as the original Stream.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: T) => void Tap function

# Example

// Apply a function with side effects, to log the values to the console
a = periodic(500).rand().tap(log);

# .throttle()

throttle(period: number): Stream<T>

Limit the rate of events by suppressing events that occur too often

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
period number Throttle period

# .until()

until(endSignal: Stream<unknown>): Stream<T>

Keep all events in one Stream until the first event occurs in another.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
endSignal Stream<unknown> End signal

# .withItems()

withItems<A>(items: A[]): Stream<A>

Replace each event value with the array item at the respective index. The
resulting Stream will contain the same number of events as the input Stream,
or array.length events, whichever is less.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
items A[] Items array

# .withLocalTime()

withLocalTime(origin: Time): Stream<T>

Create a Stream with localized Time values, whose origin (i.e., time 0) is
at the specified Time on the Scheduler provided when the Stream is observed
with runEffects or run.When implementing custom higher-order Stream combinators, such as chain, you
should use withLocalTime to localize “inner” Streams before running them.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
origin Time origin time value

# .zip()

zip<A, R>(f: (a: A, b: T) => R, stream1: Stream<A>): Stream<R>

Apply a function to corresponding pairs of events from the inputs Streams.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: A, b: T) => R Combinator function
stream1 Stream<A> Event stream 1

# .zipItems()

zipItems<A, C>(f: (a: A, b: T) => C, items: A[]): Stream<C>

Apply a function to the latest event and the array value at the respective
index. The resulting Stream will contain the same number of events as the
input Stream, or array.length events, whichever is less.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
f (a: A, b: T) => C Combinator function
items A[] Items

# Stream sources (static methods)

# empty()

static empty(): Stream<never>

Create a Stream containing no events and ends immediately.

See on Most Docs (opens new window)

# never()

static never(): Stream<never>

Create a Stream containing no events and never ends.

See on Most Docs (opens new window)

# now()

static now<A>(x: A): Stream<A>

Create a Stream containing a single event at time 0.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
x A event value

# at()

static at<A>(t: Time, x: A): Stream<A>

Create a Stream containing a single event at a specific time.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
t Time event time (ms)
x A event value

# periodic()

static periodic(period: number): Stream<void>

Create an infinite Stream containing events that occur at a specified Period. The first event occurs at time 0, and the event values are undefined.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
period number stream period (ms)

# Example

const myPeriodicRandomStream = Stream.periodic(500)
  .map(() => Math.random())
  .subscribe(console.log);

# throwError()

static throwError(e: Error): Stream<never>

Create a Stream that fails with the provided Error at time 0. This can be useful for functions that need to return a Stream and also need to propagate an error.

See on Most Docs (opens new window)

# Parameters

Parameter Type Default Description
e Error error