Streams
In order to facilitate the conception of custom processing pipelines, components rely on a reactive programming paradigm to generate or react to particular event streams. The reactive programming is well-suited for the development of such event-driven and interactive applications. It facilitates the management of asynchronous data streams, their transformation and the propagation of change to the relevant dependents. Each component exposes a set of data streams containing the various events produced by the component. These data streams can easily be manipulated (filtered, transformed, combined) and plugged into other components to define pipelines.
Elements of reactive programming
From André Staltz's The introduction to Reactive Programming you've been missing:
Reactive programming is programming with asynchronous data streams.
In a way, this isn't anything new. Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some side effects. Reactive is that idea on steroids. You are able to create data streams of anything, not just from click and hover events. Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc. For example, imagine your Twitter feed would be a data stream in the same fashion that click events are. You can listen to that stream and react accordingly.
On top of that, you are given an amazing toolbox of functions to combine, create and filter any of those streams.
That's where the "functional" magic kicks in. A stream can be used as an input to another one. Even multiple streams can be used as inputs to another stream. You can merge two streams. You can filter a stream to get another one that has only those events you are interested in. You can map data values from one stream to another new one.
Schematically, a stream looks like this:
A stream is sequence of ongoing events ordered in time. Streams can be finite or infinite. A Stream's event can be a value, an error or an end signal that indicates the streams has ended.
Marcelle relies on a reactive programming library called Most.js. While RxJS is certainly the most popular JavaScript reactive programing library, Most.js offers high performance and explicit time representation.
All Most.js operators are documented online: https://mostcore.readthedocs.io/en/latest/
Stream
Marcelle's main Stream class is a wrapper around Most.js streams, designed to:
- facilitate the integration with Svelte components
- allow for impartively pushing events (like RxJs's
Subject
) - allow for holding values (like RxJs's
BehaviorSubject
) - offer a fluent API using @most/fluent
The following factory function creates and returns a Marcelle Stream from a Most.js Stream:
createStream<T>(s: MostStream<T> | T, hold: boolean): Stream<T>
Parameters
Option | Type | Description | Required |
---|---|---|---|
s | MostStream<T> | T | A stream from the most library, or a value | ✓ |
hold | boolean | If true, the last event is stored and delivered to each new oserver. This uses @most/hold, and is similar to RxJs's BehaviorSubject |
Example
const $timer = marcelle.createStream(mostCore.periodic(500));
const $rnd = $timer
.map(() => Math.random())
.filter((x) => x > 0.5)
.map((x) => (x - 0.5) * 1000));
$rnd.subscribe(console.log);
.hold()
hold(h: boolean = true): Stream<T>
Hold the stream values. When called, all new subscribers will receive the latest value at the time of subscription.
.start()
start(): void
Start the stream processing, even if no subscriber has been registered. This method is called automatically on subscribe
.
.stop()
stop(): void
Imperatively stop the stream processing. Calling stop
will result in an end
event being emitted on the stream.
.subscribe()
subscribe(run: (value: T) => void = dummySubscriber, invalidate = () => {}): () => void
The subscribe
method must accept as its argument a subscription function. All of a streams's active subscription functions are synchronously called whenever a new event is emitted on the stream. If a stream is held, this subscription function must be immediately and synchronously called with the stream's current value upon calling subscribe
.
.get()
Get the value of the latest event on the stream, if it was created with hold=true
. Otherwise, an error is thrown.
.set()
set(value: T) => void;
Imperatively push an event on the stream with the given value
.thru()
thru<B>(f: (s: Stream<T>) => MostStream<B>): Stream<B>
Apply functions fluently to a Stream, wrapping the result in a new Stream. Use thru when you want to continue dot-chaining other Stream operations.
.ap()
ap<B>(fs: Stream<(a: T) => B>): Stream<B>
Apply the latest function in a Stream of functions to the latest value of
another Stream. In effect, ap applies a time-varying function to a
time-varying value.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
fs | Stream<(a: T) => B> | Function stream |
.awaitPromises()
awaitPromises<A>(): Stream<A>
Turn a Stream of promises into a Stream containing the promises’ values.
.chain()
chain<B>(f: (value: T) => Stream<B>): Stream<B>
Transform each event in stream
into a new Stream, and then merge each into
the resulting Stream. Note that f
must return a Stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (value: T) => Stream<B> | function returning a stream |
.combine()
combine<A, R>(f: (a: A, b: T) => R, stream1: Stream<A>): Stream<R>
Apply a function to the most recent event from each Stream when a new event
arrives on any Stream.Note that combine
waits for at least one event to arrive on all input
Streams before it produces any events.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: A, b: T) => R | Combinator function | |
stream1 | Stream<A> | Event stream 1 |
.concatMap()
concatMap<B>(f: (a: T) => Stream<B>): Stream<B>
Transform each event in stream
into a Stream, and then concatenate each
onto the end of the resulting Stream. Note that f
must return a Stream.The mapping function f
is applied lazily. That is, f
is called only once
it is time to concatenate a new stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: T) => Stream<B> | Function returning a stream |
.constant()
constant<B>(x: B): Stream<B>
Replace each event value with x.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
x | B | event data |
.continueWith()
continueWith<U>(f: () => Stream<U>): Stream<T | U>
Replace the end of a Stream with another Stream. When stream
ends, f
will be called and must return a Stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | () => Stream<U> | Function that returns a stream |
.debounce()
debounce(period: number): Stream<T>
Wait for a burst of events to subside and keep only the last event in the
burst.If the Stream ends while there is a pending debounced event (e.g., via
until), the pending event will occur just before the Stream ends.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
period | number | Debounce period |
.delay()
delay(delayTime: number): Stream<T>
Timeshift a Stream by the specified Delay.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
delayTime | number | Delay time (ms) |
.during()
during(timeWindow: Stream<Stream<unknown>>): Stream<T>
Keep events that occur during a time window defined by a higher-order Stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
timeWindow | Stream<Stream<unknown>> | Higher order stream defining a time window |
.filter()
filter(p: (a: T) => boolean): Stream<T>
Retain only events for which a predicate is truthy.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
p | (a: T) => boolean | Predicate |
Example
a = periodic(200)
.rand()
.filter((x) => x > 0.8)
.tap(log);
.join()
join<U>(): Stream<U>
Given a higher-order Stream, return a new Stream that merges all the inner
Streams as they arrive.
.loop()
loop<B, S>(stepper: (seed: S, a: T) => SeedValue<S, B>, seed: S): Stream<B>
Accumulate results using a feedback loop that emits one value and feeds back
another to be used in the next iteration.It allows you to maintain and update a “state” (a.k.a. feedback, a.k.a. seed
for the next iteration) while emitting a different value. In contrast, scan
feeds back and produces the same value.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
stepper | (seed: S, a: T) => SeedValue<S, B> | Stepper function | |
seed | S | Seed |
.map()
map<U>(f: (a: T) => U): Stream<U>
Apply a function to each event value.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: T) => U | Unary function |
Example
// Apply a function (in this example, double) to all events in a stream
f = (x) => 2 * x;
a = periodic(500).constant(1).accum().map(f).tap(log);
.merge()
merge<A>(stream1: Stream<A>): Stream<A | T>
Create a new Stream containing events from two Streams.Merging creates a new Stream containing all events from the two original
Streams without affecting the time of the events. You can think of the
events from the input Streams simply being interleaved into the new, merged
Stream. A merged Stream ends when all of its input Streams have ended.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
stream1 | Stream<A> | Event stream 1 |
Example
a = periodic(500).take(3).constant('a');
b = periodic(100).take(3).constant(2);
c = a.merge(b).tap(log);
.mergeConcurrently()
mergeConcurrently<U>(concurrency: number): Stream<U>
Given a higher-order Stream, return a new Stream that merges inner Streams
as they arrive up to the specified concurrency. Once concurrency number of
Streams are being merged, newly arriving Streams will be merged after an
existing one ends.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
concurrency | number | concurrency level |
.mergeMapConcurrently()
mergeMapConcurrently<B>(f: (a: T) => Stream<B>, concurrency: number): Stream<B>
Lazily apply a function f
to each event in a Stream, merging them into the
resulting Stream at the specified concurrency. Once concurrency number of
Streams are being merged, newly arriving Streams will be merged after an
existing one ends.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: T) => Stream<B> | Unary function | |
concurrency | number | concurrency level |
.recoverWith()
recoverWith<A, E extends Error>(f: (error: E) => Stream<A>): Stream<T | A>
Recover from a stream failure by calling a function to create a new Stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (error: E) => Stream<A> | Function returning a new stream from an error |
.resample()
resample<B>(sampler: Stream<B>): Stream<T>
Like sample
, but the value stream and sampler streams are switched
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
sampler | Stream<B> | Sampler stream |
Example
// Sample a noise signal from a stream of click events
noise = periodic(20).rand().plot({ legend: 'noise' });
click = noise.resample(click(doc)).tap(log);
.sample()
sample<A>(values: Stream<A>): Stream<A>
For each event in the current Stream, replace the event value with the latest
value in another Stream. The resulting Stream will contain the same number
of events as the sampler Stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
values | Stream<A> | value stream |
Example
// Sample a noise signal from a stream of click events
noise = periodic(20).rand().plot({ legend: 'noise' });
click = click(doc).sample(noise).tap(log);
.scan()
scan<B>(f: (b: B, a: T) => B, initial: B): Stream<B>
Incrementally accumulate results, starting with the provided initial value.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (b: B, a: T) => B | Scanning reducer | |
initial | B | Initial Value |
Example
// Accumulate the values of a constant stream
a = periodic(500)
.constant(2)
.scan((s, x) => s + x, 0)
.tap(log);
.since()
since(startSignal: Stream<unknown>): Stream<T>
Discard all events in one Stream until the first event occurs in another.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
startSignal | Stream<unknown> | Start signal |
.skip()
skip(n: number): Stream<T>
Discard the first n events from stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
n | number | Number of events |
.skipAfter()
skipAfter(p: (a: T) => boolean): Stream<T>
Discard all events after the first event for which predicate returns true.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
p | (a: T) => boolean | Predicate |
.skipRepeats()
skipRepeats(): Stream<T>
Remove adjacent repeated events.
.skipRepeatsWith()
skipRepeatsWith(equals: (a1: T, a2: T) => boolean): Stream<T>
Remove adjacent repeated events, using the provided equality function to
compare adjacent events.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
equals | (a1: T, a2: T) => boolean | Equality function |
.skipWhile()
skipWhile(p: (a: T) => boolean): Stream<T>
Discard all events until predicate returns false
, and keep the rest.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
p | (a: T) => boolean | Predicate |
.slice()
slice(start: number, end: number): Stream<T>
Keep only events in a range, where start <= index < end, and index is the
ordinal index of an event in stream
.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
start | number | start index | |
end | number | end index |
.snapshot()
snapshot<A, C>(f: (a: A, b: T) => C, values: Stream<A>): Stream<C>
For each event in a sampler Stream, apply a function to combine its value
with the most recent event value in another Stream. The resulting Stream
will contain the same number of events as the sampler Stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: A, b: T) => C | Snapshot function | |
values | Stream<A> | Value stream |
.startWith()
startWith(x: T): Stream<T>
Prepend an event at time 0.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
x | T | Event data |
.switchLatest()
switchLatest<U>(): Stream<U>
Given a higher-order Stream, return a new Stream that adopts the behavior of
(i.e., emits the events of) the most recent inner Stream.
.take()
take(n: number): Stream<T>
Keep at most the first n events from stream
.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
n | number | Number of events |
.takeWhile()
takeWhile(p: (a: T) => boolean): Stream<T>
Keep all events until predicate returns false
, and discard the rest.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
p | (a: T) => boolean | Predicate |
.tap()
tap(f: (a: T) => void): Stream<T>
Perform a side effect for each event in a Stream. For each event in stream,f
is called, but the value of its result is ignored. If f
fails (i.e.,
throws an error), then the returned Stream will also fail. The Stream
returned by tap will contain the same events as the original Stream.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: T) => void | Tap function |
Example
// Apply a function with side effects, to log the values to the console
a = periodic(500).rand().tap(log);
.throttle()
throttle(period: number): Stream<T>
Limit the rate of events by suppressing events that occur too often
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
period | number | Throttle period |
.until()
until(endSignal: Stream<unknown>): Stream<T>
Keep all events in one Stream until the first event occurs in another.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
endSignal | Stream<unknown> | End signal |
.withItems()
withItems<A>(items: A[]): Stream<A>
Replace each event value with the array item at the respective index. The
resulting Stream will contain the same number of events as the input Stream,
or array.length events, whichever is less.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
items | A[] | Items array |
.withLocalTime()
withLocalTime(origin: Time): Stream<T>
Create a Stream with localized Time values, whose origin (i.e., time 0) is
at the specified Time on the Scheduler provided when the Stream is observed
with runEffects or run.When implementing custom higher-order Stream combinators, such as chain, you
should use withLocalTime
to localize “inner” Streams before running them.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
origin | Time | origin time value |
.zip()
zip<A, R>(f: (a: A, b: T) => R, stream1: Stream<A>): Stream<R>
Apply a function to corresponding pairs of events from the inputs Streams.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: A, b: T) => R | Combinator function | |
stream1 | Stream<A> | Event stream 1 |
.zipItems()
zipItems<A, C>(f: (a: A, b: T) => C, items: A[]): Stream<C>
Apply a function to the latest event and the array value at the respective
index. The resulting Stream will contain the same number of events as the
input Stream, or array.length events, whichever is less.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
f | (a: A, b: T) => C | Combinator function | |
items | A[] | Items |
Stream sources (static methods)
empty()
static empty(): Stream<never>
Create a Stream containing no events and ends immediately.
never()
static never(): Stream<never>
Create a Stream containing no events and never ends.
now()
static now<A>(x: A): Stream<A>
Create a Stream containing a single event at time 0.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
x | A | event value |
at()
static at<A>(t: Time, x: A): Stream<A>
Create a Stream containing a single event at a specific time.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
t | Time | event time (ms) | |
x | A | event value |
periodic()
static periodic(period: number): Stream<void>
Create an infinite Stream containing events that occur at a specified Period. The first event occurs at time 0, and the event values are undefined
.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
period | number | stream period (ms) |
Example
const myPeriodicRandomStream = Stream.periodic(500)
.map(() => Math.random())
.subscribe(console.log);
throwError()
static throwError(e: Error): Stream<never>
Create a Stream that fails with the provided Error
at time 0. This can be useful for functions that need to return a Stream and also need to propagate an error.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
e | Error | error |