import 'dart:async'; | |
import 'package:rxdart/futures.dart'; | |
import 'package:rxdart/samplers.dart'; | |
import 'package:rxdart/src/observables/connectable_observable.dart'; | |
import 'package:rxdart/src/observables/replay_observable.dart'; | |
import 'package:rxdart/src/observables/value_observable.dart'; | |
import 'package:rxdart/streams.dart'; | |
import 'package:rxdart/transformers.dart'; | |
/// A wrapper class that extends Stream. It combines all the Streams and | |
/// StreamTransformers contained in this library into a fluent api. | |
/// | |
/// ### Example | |
/// | |
/// new Observable(new Stream.fromIterable([1])) | |
/// .interval(new Duration(seconds: 1)) | |
/// .flatMap((i) => new Observable.just(2)) | |
/// .take(1) | |
/// .listen(print); // prints 2 | |
/// | |
/// ### Learning RxDart | |
/// | |
/// This library contains documentation and examples for each method. In | |
/// addition, more complex examples can be found in the | |
/// [RxDart github repo](https://github.com/ReactiveX/rxdart) demonstrating how | |
/// to use RxDart with web, command line, and Flutter applications. | |
/// | |
/// #### Additional Resources | |
/// | |
/// In addition to the RxDart documentation and examples, you can find many | |
/// more articles on Dart Streams that teach the fundamentals upon which | |
/// RxDart is built. | |
/// | |
/// - [Asynchronous Programming: Streams](https://www.dartlang.org/tutorials/language/streams) | |
/// - [Single-Subscription vs. Broadcast Streams](https://www.dartlang.org/articles/libraries/broadcast-streams) | |
/// - [Creating Streams in Dart](https://www.dartlang.org/articles/libraries/creating-streams) | |
/// - [Testing Streams: Stream Matchers](https://pub.dartlang.org/packages/test#stream-matchers) | |
/// | |
/// ### Dart Streams vs Observables | |
/// | |
/// In order to integrate fluently with the Dart ecosystem, the Observable class | |
/// extends the Dart `Stream` class. This provides several advantages: | |
/// | |
/// - Observables work with any API that expects a Dart Stream as an input. | |
/// - Inherit the many methods and properties from the core Stream API. | |
/// - Ability to create Streams with language-level syntax. | |
/// | |
/// Overall, we attempt to follow the Observable spec as closely as we can, but | |
/// prioritize fitting in with the Dart ecosystem when a trade-off must be made. | |
/// Therefore, there are some important differences to note between Dart's | |
/// `Stream` class and standard Rx `Observable`. | |
/// | |
/// First, Cold Observables in Dart are single-subscription. In other words, | |
/// you can only listen to Observables once, unless it is a hot (aka broadcast) | |
/// Stream. If you attempt to listen to a cold stream twice, a StateError will | |
/// be thrown. If you need to listen to a stream multiple times, you can simply | |
/// create a factory function that returns a new instance of the stream. | |
/// | |
/// Second, many methods contained within, such as `first` and `last` do not | |
/// return a `Single` nor an `Observable`, but rather must return a Dart Future. | |
/// Luckily, Dart Futures are easy to work with, and easily convert back to a | |
/// Stream using the `myFuture.asStream()` method if needed. | |
/// | |
/// Third, Streams in Dart do not close by default when an error occurs. In Rx, | |
/// an Error causes the Observable to terminate unless it is intercepted by | |
/// an operator. Dart has mechanisms for creating streams that close when an | |
/// error occurs, but the majority of Streams do not exhibit this behavior. | |
/// | |
/// Fourth, Dart streams are asynchronous by default, whereas Observables are | |
/// synchronous by default, unless you schedule work on a different Scheduler. | |
/// You can create synchronous Streams with Dart, but please be aware the the | |
/// default is simply different. | |
/// | |
/// Finally, when using Dart Broadcast Streams (similar to Hot Observables), | |
/// please know that `onListen` will only be called the first time the | |
/// broadcast stream is listened to. | |
class Observable<T> extends Stream<T> { | |
final Stream<T> _stream; | |
Observable(Stream<T> stream) : this._stream = stream; | |
@override | |
AsObservableFuture<bool> any(bool test(T element)) => | |
AsObservableFuture<bool>(_stream.any(test)); | |
/// Merges the given Streams into one Observable that emits a List of the | |
/// values emitted by the source Stream. This is helpful when you need to | |
/// combine a dynamic number of Streams. | |
/// | |
/// The Observable will not emit any lists of values until all of the source | |
/// streams have emitted at least one value. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatestList([ | |
/// Observable.just(1), | |
/// Observable.fromIterable([0, 1, 2]), | |
/// ]) | |
/// .listen(print); // prints [1, 0], [1, 1], [1, 2] | |
static Observable<R> combineLatest<T, R>( | |
Iterable<Stream<T>> streams, R combiner(List<T> values)) => | |
Observable<R>(CombineLatestStream<T, R>(streams, combiner)); | |
/// Merges the given Streams into one Observable that emits a List of the | |
/// values emitted by the source Stream. This is helpful when you need to | |
/// combine a dynamic number of Streams. | |
/// | |
/// The Observable will not emit any lists of values until all of the source | |
/// streams have emitted at least one value. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatestList([ | |
/// Observable.just(1), | |
/// Observable.fromIterable([0, 1, 2]), | |
/// ]) | |
/// .listen(print); // prints [1, 0], [1, 1], [1, 2] | |
static Observable<List<T>> combineLatestList<T>( | |
Iterable<Stream<T>> streams) => | |
Observable<List<T>>(CombineLatestStream.list<T>(streams)); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest2( | |
/// new Observable.just(1), | |
/// new Observable.fromIterable([0, 1, 2]), | |
/// (a, b) => a + b) | |
/// .listen(print); //prints 1, 2, 3 | |
static Observable<T> combineLatest2<A, B, T>( | |
Stream<A> streamA, Stream<B> streamB, T combiner(A a, B b)) => | |
Observable<T>(CombineLatestStream.combine2(streamA, streamB, combiner)); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest3( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.fromIterable(["c", "c"]), | |
/// (a, b, c) => a + b + c) | |
/// .listen(print); //prints "abc", "abc" | |
static Observable<T> combineLatest3<A, B, C, T>(Stream<A> streamA, | |
Stream<B> streamB, Stream<C> streamC, T combiner(A a, B b, C c)) => | |
Observable<T>( | |
CombineLatestStream.combine3(streamA, streamB, streamC, combiner)); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest4( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.fromIterable(["d", "d"]), | |
/// (a, b, c, d) => a + b + c + d) | |
/// .listen(print); //prints "abcd", "abcd" | |
static Observable<T> combineLatest4<A, B, C, D, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
T combiner(A a, B b, C c, D d)) => | |
Observable<T>(CombineLatestStream.combine4( | |
streamA, streamB, streamC, streamD, combiner)); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest5( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.fromIterable(["e", "e"]), | |
/// (a, b, c, d, e) => a + b + c + d + e) | |
/// .listen(print); //prints "abcde", "abcde" | |
static Observable<T> combineLatest5<A, B, C, D, E, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
T combiner(A a, B b, C c, D d, E e)) => | |
Observable<T>(CombineLatestStream.combine5( | |
streamA, streamB, streamC, streamD, streamE, combiner)); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest6( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.fromIterable(["f", "f"]), | |
/// (a, b, c, d, e, f) => a + b + c + d + e + f) | |
/// .listen(print); //prints "abcdef", "abcdef" | |
static Observable<T> combineLatest6<A, B, C, D, E, F, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
T combiner(A a, B b, C c, D d, E e, F f)) => | |
Observable<T>(CombineLatestStream.combine6( | |
streamA, streamB, streamC, streamD, streamE, streamF, combiner)); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest7( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.just("f"), | |
/// new Observable.fromIterable(["g", "g"]), | |
/// (a, b, c, d, e, f, g) => a + b + c + d + e + f + g) | |
/// .listen(print); //prints "abcdefg", "abcdefg" | |
static Observable<T> combineLatest7<A, B, C, D, E, F, G, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
Stream<G> streamG, | |
T combiner(A a, B b, C c, D d, E e, F f, G g)) => | |
Observable<T>(CombineLatestStream.combine7(streamA, streamB, streamC, | |
streamD, streamE, streamF, streamG, combiner)); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest8( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.just("f"), | |
/// new Observable.just("g"), | |
/// new Observable.fromIterable(["h", "h"]), | |
/// (a, b, c, d, e, f, g, h) => a + b + c + d + e + f + g + h) | |
/// .listen(print); //prints "abcdefgh", "abcdefgh" | |
static Observable<T> combineLatest8<A, B, C, D, E, F, G, H, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
Stream<G> streamG, | |
Stream<H> streamH, | |
T combiner(A a, B b, C c, D d, E e, F f, G g, H h)) => | |
Observable<T>( | |
CombineLatestStream.combine8( | |
streamA, | |
streamB, | |
streamC, | |
streamD, | |
streamE, | |
streamF, | |
streamG, | |
streamH, | |
combiner, | |
), | |
); | |
/// Merges the given Streams into one Observable sequence by using the | |
/// [combiner] function whenever any of the observable sequences emits an | |
/// item. | |
/// | |
/// The Observable will not emit until all streams have emitted at least one | |
/// item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) | |
/// | |
/// ### Example | |
/// | |
/// Observable.combineLatest9( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.just("f"), | |
/// new Observable.just("g"), | |
/// new Observable.just("h"), | |
/// new Observable.fromIterable(["i", "i"]), | |
/// (a, b, c, d, e, f, g, h, i) => a + b + c + d + e + f + g + h + i) | |
/// .listen(print); //prints "abcdefghi", "abcdefghi" | |
static Observable<T> combineLatest9<A, B, C, D, E, F, G, H, I, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
Stream<G> streamG, | |
Stream<H> streamH, | |
Stream<I> streamI, | |
T combiner(A a, B b, C c, D d, E e, F f, G g, H h, I i)) => | |
Observable<T>( | |
CombineLatestStream.combine9( | |
streamA, | |
streamB, | |
streamC, | |
streamD, | |
streamE, | |
streamF, | |
streamG, | |
streamH, | |
streamI, | |
combiner, | |
), | |
); | |
/// Concatenates all of the specified stream sequences, as long as the | |
/// previous stream sequence terminated successfully. | |
/// | |
/// It does this by subscribing to each stream one by one, emitting all items | |
/// and completing before subscribing to the next stream. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#concat) | |
/// | |
/// ### Example | |
/// | |
/// new Observable.concat([ | |
/// new Observable.just(1), | |
/// new Observable.timer(2, new Duration(days: 1)), | |
/// new Observable.just(3) | |
/// ]) | |
/// .listen(print); // prints 1, 2, 3 | |
factory Observable.concat(Iterable<Stream<T>> streams) => | |
Observable<T>(ConcatStream<T>(streams)); | |
/// Concatenates all of the specified stream sequences, as long as the | |
/// previous stream sequence terminated successfully. | |
/// | |
/// In the case of concatEager, rather than subscribing to one stream after | |
/// the next, all streams are immediately subscribed to. The events are then | |
/// captured and emitted at the correct time, after the previous stream has | |
/// finished emitting items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#concat) | |
/// | |
/// ### Example | |
/// | |
/// new Observable.concatEager([ | |
/// new Observable.just(1), | |
/// new Observable.timer(2, new Duration(days: 1)), | |
/// new Observable.just(3) | |
/// ]) | |
/// .listen(print); // prints 1, 2, 3 | |
factory Observable.concatEager(Iterable<Stream<T>> streams) => | |
Observable<T>(ConcatEagerStream<T>(streams)); | |
/// The defer factory waits until an observer subscribes to it, and then it | |
/// creates an Observable with the given factory function. | |
/// | |
/// In some circumstances, waiting until the last minute (that is, until | |
/// subscription time) to generate the Observable can ensure that this | |
/// Observable contains the freshest data. | |
/// | |
/// By default, DeferStreams are single-subscription. However, it's possible | |
/// to make them reusable. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.defer(() => new Observable.just(1)) | |
/// .listen(print); //prints 1 | |
factory Observable.defer(Stream<T> streamFactory(), | |
{bool reusable = false}) => | |
Observable<T>(DeferStream<T>(streamFactory, reusable: reusable)); | |
/// Returns an observable sequence that emits an [error], then immediately | |
/// completes. | |
/// | |
/// The error operator is one with very specific and limited behavior. It is | |
/// mostly useful for testing purposes. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.error(new ArgumentError()); | |
factory Observable.error(Object error) => | |
Observable<T>(ErrorStream<T>(error)); | |
/// Creates an Observable where all events of an existing stream are piped | |
/// through a sink-transformation. | |
/// | |
/// The given [mapSink] closure is invoked when the returned stream is | |
/// listened to. All events from the [source] are added into the event sink | |
/// that is returned from the invocation. The transformation puts all | |
/// transformed events into the sink the [mapSink] closure received during | |
/// its invocation. Conceptually the [mapSink] creates a transformation pipe | |
/// with the input sink being the returned [EventSink] and the output sink | |
/// being the sink it received. | |
factory Observable.eventTransformed( | |
Stream<T> source, EventSink<T> mapSink(EventSink<T> sink)) => | |
Observable<T>((Stream<T>.eventTransformed(source, mapSink))); | |
/// Creates an Observable from the future. | |
/// | |
/// When the future completes, the stream will fire one event, either | |
/// data or error, and then close with a done-event. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromFuture(new Future.value("Hello")) | |
/// .listen(print); // prints "Hello" | |
factory Observable.fromFuture(Future<T> future) => | |
Observable<T>((Stream<T>.fromFuture(future))); | |
/// Creates an Observable that gets its data from [data]. | |
/// | |
/// The iterable is iterated when the stream receives a listener, and stops | |
/// iterating if the listener cancels the subscription. | |
/// | |
/// If iterating [data] throws an error, the stream ends immediately with | |
/// that error. No done event will be sent (iteration is not complete), but no | |
/// further data events will be generated either, since iteration cannot | |
/// continue. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2]).listen(print); // prints 1, 2 | |
factory Observable.fromIterable(Iterable<T> data) => | |
Observable<T>((Stream<T>.fromIterable(data))); | |
/// Creates an Observable that contains a single value | |
/// | |
/// The value is emitted when the stream receives a listener. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(1).listen(print); // prints 1 | |
factory Observable.just(T data) => | |
Observable<T>((Stream<T>.fromIterable(<T>[data]))); | |
/// Creates an Observable that contains no values. | |
/// | |
/// No items are emitted from the stream, and done is called upon listening. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.empty().listen( | |
/// (_) => print("data"), onDone: () => print("done")); // prints "done" | |
factory Observable.empty() => Observable<T>((Stream<T>.empty())); | |
/// Flattens the items emitted by the given [streams] into a single Observable | |
/// sequence. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#merge) | |
/// | |
/// ### Example | |
/// | |
/// new Observable.merge([ | |
/// new Observable.timer(1, new Duration(days: 10)), | |
/// new Observable.just(2) | |
/// ]) | |
/// .listen(print); // prints 2, 1 | |
factory Observable.merge(Iterable<Stream<T>> streams) => | |
Observable<T>(MergeStream<T>(streams)); | |
/// Returns a non-terminating observable sequence, which can be used to denote | |
/// an infinite duration. | |
/// | |
/// The never operator is one with very specific and limited behavior. These | |
/// are useful for testing purposes, and sometimes also for combining with | |
/// other Observables or as parameters to operators that expect other | |
/// Observables as parameters. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.never().listen(print); // Neither prints nor terminates | |
factory Observable.never() => Observable<T>(NeverStream<T>()); | |
/// Creates an Observable that repeatedly emits events at [period] intervals. | |
/// | |
/// The event values are computed by invoking [computation]. The argument to | |
/// this callback is an integer that starts with 0 and is incremented for | |
/// every event. | |
/// | |
/// If [computation] is omitted the event values will all be `null`. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(new Duration(seconds: 1), (i) => i).take(3) | |
/// .listen(print); // prints 0, 1, 2 | |
factory Observable.periodic(Duration period, | |
[T computation(int computationCount)]) => | |
Observable<T>((Stream<T>.periodic(period, computation))); | |
/// Given two or more source [streams], emit all of the items from only | |
/// the first of these [streams] to emit an item or notification. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#amb) | |
/// | |
/// ### Example | |
/// | |
/// new Observable.race([ | |
/// new Observable.timer(1, new Duration(days: 1)), | |
/// new Observable.timer(2, new Duration(days: 2)), | |
/// new Observable.timer(3, new Duration(seconds: 1)) | |
/// ]).listen(print); // prints 3 | |
factory Observable.race(Iterable<Stream<T>> streams) => | |
Observable<T>(RaceStream<T>(streams)); | |
/// Returns an Observable that emits a sequence of Integers within a specified | |
/// range. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(1, 3).listen((i) => print(i)); // Prints 1, 2, 3 | |
/// | |
/// Observable.range(3, 1).listen((i) => print(i)); // Prints 3, 2, 1 | |
static Observable<int> range(int startInclusive, int endInclusive) => | |
Observable<int>(RangeStream(startInclusive, endInclusive)); | |
/// Creates a [Stream] that will recreate and re-listen to the source | |
/// Stream the specified number of times until the [Stream] terminates | |
/// successfully. | |
/// | |
/// If [count] is not specified, it repeats indefinitely. | |
/// | |
/// ### Example | |
/// | |
/// new RepeatStream((int repeatCount) => | |
/// Observable.just('repeat index: $repeatCount'), 3) | |
/// .listen((i) => print(i)); // Prints 'repeat index: 0, repeat index: 1, repeat index: 2' | |
factory Observable.repeat(Stream<T> streamFactory(int repeatIndex), | |
[int count]) => | |
Observable(RepeatStream<T>(streamFactory, count)); | |
/// Creates an Observable that will recreate and re-listen to the source | |
/// Stream the specified number of times until the Stream terminates | |
/// successfully. | |
/// | |
/// If the retry count is not specified, it retries indefinitely. If the retry | |
/// count is met, but the Stream has not terminated successfully, a | |
/// [RetryError] will be thrown. The RetryError will contain all of the Errors | |
/// and StackTraces that caused the failure. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.retry(() { new Observable.just(1); }) | |
/// .listen((i) => print(i)); // Prints 1 | |
/// | |
/// new Observable | |
/// .retry(() { | |
/// new Observable.just(1).concatWith([new Observable.error(new Error())]); | |
/// }, 1) | |
/// .listen(print, onError: (e, s) => print(e)); // Prints 1, 1, RetryError | |
factory Observable.retry(Stream<T> streamFactory(), [int count]) { | |
return Observable<T>(RetryStream<T>(streamFactory, count)); | |
} | |
/// Creates a Stream that will recreate and re-listen to the source | |
/// Stream when the notifier emits a new value. If the source Stream | |
/// emits an error or it completes, the Stream terminates. | |
/// | |
/// If the [retryWhenFactory] emits an error a [RetryError] will be | |
/// thrown. The RetryError will contain all of the [Error]s and | |
/// [StackTrace]s that caused the failure. | |
/// | |
/// ### Basic Example | |
/// ```dart | |
/// new RetryWhenStream<int>( | |
/// () => new Stream<int>.fromIterable(<int>[1]), | |
/// (dynamic error, StackTrace s) => throw error, | |
/// ).listen(print); // Prints 1 | |
/// ``` | |
/// | |
/// ### Periodic Example | |
/// ```dart | |
/// new RetryWhenStream<int>( | |
/// () => new Observable<int> | |
/// .periodic(const Duration(seconds: 1), (int i) => i) | |
/// .map((int i) => i == 2 ? throw 'exception' : i), | |
/// (dynamic e, StackTrace s) { | |
/// return new Observable<String> | |
/// .timer('random value', const Duration(milliseconds: 200)); | |
/// }, | |
/// ).take(4).listen(print); // Prints 0, 1, 0, 1 | |
/// ``` | |
/// | |
/// ### Complex Example | |
/// ```dart | |
/// bool errorHappened = false; | |
/// new RetryWhenStream( | |
/// () => new Observable | |
/// .periodic(const Duration(seconds: 1), (i) => i) | |
/// .map((i) { | |
/// if (i == 3 && !errorHappened) { | |
/// throw 'We can take this. Please restart.'; | |
/// } else if (i == 4) { | |
/// throw 'It\'s enough.'; | |
/// } else { | |
/// return i; | |
/// } | |
/// }), | |
/// (e, s) { | |
/// errorHappened = true; | |
/// if (e == 'We can take this. Please restart.') { | |
/// return new Observable.just('Ok. Here you go!'); | |
/// } else { | |
/// return new Observable.error(e); | |
/// } | |
/// }, | |
/// ).listen( | |
/// print, | |
/// onError: (e, s) => print(e), | |
/// ); // Prints 0, 1, 2, 0, 1, 2, 3, RetryError | |
/// ``` | |
factory Observable.retryWhen(Stream<T> streamFactory(), | |
Stream<void> retryWhenFactory(dynamic error, StackTrace stack)) { | |
return Observable<T>(RetryWhenStream<T>(streamFactory, retryWhenFactory)); | |
} | |
/// Convert a Stream that emits Streams (aka a "Higher Order Stream") into a | |
/// single Observable that emits the items emitted by the | |
/// most-recently-emitted of those Streams. | |
/// | |
/// This Observable will unsubscribe from the previously-emitted Stream when | |
/// a new Stream is emitted from the source Stream and subscribe to the new | |
/// Stream. | |
/// | |
/// ### Example | |
/// | |
/// ```dart | |
/// final switchLatestStream = new SwitchLatestStream<String>( | |
/// new Stream.fromIterable(<Stream<String>>[ | |
/// new Observable.timer('A', new Duration(seconds: 2)), | |
/// new Observable.timer('B', new Duration(seconds: 1)), | |
/// new Observable.just('C'), | |
/// ]), | |
/// ); | |
/// | |
/// // Since the first two Streams do not emit data for 1-2 seconds, and the | |
/// // 3rd Stream will be emitted before that time, only data from the 3rd | |
/// // Stream will be emitted to the listener. | |
/// switchLatestStream.listen(print); // prints 'C' | |
/// ``` | |
factory Observable.switchLatest(Stream<Stream<T>> streams) => | |
Observable<T>(SwitchLatestStream<T>(streams)); | |
/// Emits the given value after a specified amount of time. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.timer("hi", new Duration(minutes: 1)) | |
/// .listen((i) => print(i)); // print "hi" after 1 minute | |
factory Observable.timer(T value, Duration duration) => | |
Observable<T>((TimerStream<T>(value, duration))); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip2( | |
/// new Observable.just("Hi "), | |
/// new Observable.fromIterable(["Friend", "Dropped"]), | |
/// (a, b) => a + b) | |
/// .listen(print); // prints "Hi Friend" | |
static Observable<T> zip2<A, B, T>( | |
Stream<A> streamA, Stream<B> streamB, T zipper(A a, B b)) => | |
Observable<T>(ZipStream.zip2(streamA, streamB, zipper)); | |
/// Merges the iterable streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip( | |
/// [ | |
/// Observable.just("Hi "), | |
/// Observable.fromIterable(["Friend", "Dropped"]), | |
/// ], | |
/// (values) => values.first + values.last | |
/// ) | |
/// .listen(print); // prints "Hi Friend" | |
static Observable<R> zip<T, R>( | |
Iterable<Stream<T>> streams, R zipper(List<T> values)) => | |
Observable<R>(ZipStream(streams, zipper)); | |
/// Merges the iterable streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zipList( | |
/// [ | |
/// Observable.just("Hi "), | |
/// Observable.fromIterable(["Friend", "Dropped"]), | |
/// ], | |
/// ) | |
/// .listen(print); // prints ['Hi ', 'Friend'] | |
static Observable<List<T>> zipList<T>(Iterable<Stream<T>> streams) => | |
Observable<List<T>>(ZipStream.list(streams)); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip3( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.fromIterable(["c", "dropped"]), | |
/// (a, b, c) => a + b + c) | |
/// .listen(print); //prints "abc" | |
static Observable<T> zip3<A, B, C, T>(Stream<A> streamA, Stream<B> streamB, | |
Stream<C> streamC, T zipper(A a, B b, C c)) => | |
Observable<T>(ZipStream.zip3(streamA, streamB, streamC, zipper)); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip4( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.fromIterable(["d", "dropped"]), | |
/// (a, b, c, d) => a + b + c + d) | |
/// .listen(print); //prints "abcd" | |
static Observable<T> zip4<A, B, C, D, T>(Stream<A> streamA, Stream<B> streamB, | |
Stream<C> streamC, Stream<D> streamD, T zipper(A a, B b, C c, D d)) => | |
Observable<T>(ZipStream.zip4(streamA, streamB, streamC, streamD, zipper)); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip5( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.fromIterable(["e", "dropped"]), | |
/// (a, b, c, d, e) => a + b + c + d + e) | |
/// .listen(print); //prints "abcde" | |
static Observable<T> zip5<A, B, C, D, E, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
T zipper(A a, B b, C c, D d, E e)) => | |
Observable<T>( | |
ZipStream.zip5(streamA, streamB, streamC, streamD, streamE, zipper)); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip6( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.fromIterable(["f", "dropped"]), | |
/// (a, b, c, d, e, f) => a + b + c + d + e + f) | |
/// .listen(print); //prints "abcdef" | |
static Observable<T> zip6<A, B, C, D, E, F, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
T zipper(A a, B b, C c, D d, E e, F f)) => | |
Observable<T>(ZipStream.zip6( | |
streamA, | |
streamB, | |
streamC, | |
streamD, | |
streamE, | |
streamF, | |
zipper, | |
)); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip7( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.just("f"), | |
/// new Observable.fromIterable(["g", "dropped"]), | |
/// (a, b, c, d, e, f, g) => a + b + c + d + e + f + g) | |
/// .listen(print); //prints "abcdefg" | |
static Observable<T> zip7<A, B, C, D, E, F, G, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
Stream<G> streamG, | |
T zipper(A a, B b, C c, D d, E e, F f, G g)) => | |
Observable<T>(ZipStream.zip7( | |
streamA, | |
streamB, | |
streamC, | |
streamD, | |
streamE, | |
streamF, | |
streamG, | |
zipper, | |
)); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip8( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.just("f"), | |
/// new Observable.just("g"), | |
/// new Observable.fromIterable(["h", "dropped"]), | |
/// (a, b, c, d, e, f, g, h) => a + b + c + d + e + f + g + h) | |
/// .listen(print); //prints "abcdefgh" | |
static Observable<T> zip8<A, B, C, D, E, F, G, H, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
Stream<G> streamG, | |
Stream<H> streamH, | |
T zipper(A a, B b, C c, D d, E e, F f, G g, H h)) => | |
Observable<T>(ZipStream.zip8( | |
streamA, | |
streamB, | |
streamC, | |
streamD, | |
streamE, | |
streamF, | |
streamG, | |
streamH, | |
zipper, | |
)); | |
/// Merges the specified streams into one observable sequence using the given | |
/// zipper function whenever all of the observable sequences have produced | |
/// an element at a corresponding index. | |
/// | |
/// It applies this function in strict sequence, so the first item emitted by | |
/// the new Observable will be the result of the function applied to the first | |
/// item emitted by Observable #1 and the first item emitted by Observable #2; | |
/// the second item emitted by the new zip-Observable will be the result of | |
/// the function applied to the second item emitted by Observable #1 and the | |
/// second item emitted by Observable #2; and so forth. It will only emit as | |
/// many items as the number of items emitted by the source Observable that | |
/// emits the fewest items. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#zip) | |
/// | |
/// ### Example | |
/// | |
/// Observable.zip9( | |
/// new Observable.just("a"), | |
/// new Observable.just("b"), | |
/// new Observable.just("c"), | |
/// new Observable.just("d"), | |
/// new Observable.just("e"), | |
/// new Observable.just("f"), | |
/// new Observable.just("g"), | |
/// new Observable.just("h"), | |
/// new Observable.fromIterable(["i", "dropped"]), | |
/// (a, b, c, d, e, f, g, h, i) => a + b + c + d + e + f + g + h + i) | |
/// .listen(print); //prints "abcdefghi" | |
static Observable<T> zip9<A, B, C, D, E, F, G, H, I, T>( | |
Stream<A> streamA, | |
Stream<B> streamB, | |
Stream<C> streamC, | |
Stream<D> streamD, | |
Stream<E> streamE, | |
Stream<F> streamF, | |
Stream<G> streamG, | |
Stream<H> streamH, | |
Stream<I> streamI, | |
T zipper(A a, B b, C c, D d, E e, F f, G g, H h, I i)) => | |
Observable<T>(ZipStream.zip9( | |
streamA, | |
streamB, | |
streamC, | |
streamD, | |
streamE, | |
streamF, | |
streamG, | |
streamH, | |
streamI, | |
zipper, | |
)); | |
/// Returns a multi-subscription stream that produces the same events as this. | |
/// | |
/// The returned stream will subscribe to this stream when its first | |
/// subscriber is added, and will stay subscribed until this stream ends, or a | |
/// callback cancels the subscription. | |
/// | |
/// If onListen is provided, it is called with a subscription-like object that | |
/// represents the underlying subscription to this stream. It is possible to | |
/// pause, resume or cancel the subscription during the call to onListen. It | |
/// is not possible to change the event handlers, including using | |
/// StreamSubscription.asFuture. | |
/// | |
/// If onCancel is provided, it is called in a similar way to onListen when | |
/// the returned stream stops having listener. If it later gets a new | |
/// listener, the onListen function is called again. | |
/// | |
/// Use the callbacks, for example, for pausing the underlying subscription | |
/// while having no subscribers to prevent losing events, or canceling the | |
/// subscription when there are no listeners. | |
@override | |
Observable<T> asBroadcastStream( | |
{void onListen(StreamSubscription<T> subscription), | |
void onCancel(StreamSubscription<T> subscription)}) => | |
Observable<T>( | |
_stream.asBroadcastStream(onListen: onListen, onCancel: onCancel)); | |
/// Maps each emitted item to a new [Stream] using the given mapper, then | |
/// subscribes to each new stream one after the next until all values are | |
/// emitted. | |
/// | |
/// asyncExpand is similar to flatMap, but ensures order by guaranteeing that | |
/// all items from the created stream will be emitted before moving to the | |
/// next created stream. This process continues until all created streams have | |
/// completed. | |
/// | |
/// This is functionally equivalent to `concatMap`, which exists as an alias | |
/// for a more fluent Rx API. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(4, 1) | |
/// .asyncExpand((i) => | |
/// new Observable.timer(i, new Duration(minutes: i)) | |
/// .listen(print); // prints 4, 3, 2, 1 | |
@override | |
Observable<S> asyncExpand<S>(Stream<S> mapper(T value)) => | |
Observable<S>(_stream.asyncExpand(mapper)); | |
/// Creates an Observable with each data event of this stream asynchronously | |
/// mapped to a new event. | |
/// | |
/// This acts like map, except that convert may return a Future, and in that | |
/// case, the stream waits for that future to complete before continuing with | |
/// its result. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. | |
@override | |
Observable<S> asyncMap<S>(FutureOr<S> convert(T value)) => | |
Observable<S>(_stream.asyncMap(convert)); | |
/// Creates an Observable where each item is a [List] containing the items | |
/// from the source sequence, batched by the [sampler]. | |
/// | |
/// ### Example with [onCount] | |
/// | |
/// Observable.range(1, 4) | |
/// .buffer(onCount(2)) | |
/// .listen(print); // prints [1, 2], [3, 4] | |
/// | |
/// ### Example with [onFuture] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .buffer(onFuture(() => new Future.delayed(const Duration(milliseconds: 220)))) | |
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ... | |
/// | |
/// ### Example with [onTest] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .buffer(onTest((i) => i % 2 == 0)) | |
/// .listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ... | |
/// | |
/// ### Example with [onTime] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .buffer(onTime(const Duration(milliseconds: 220))) | |
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ... | |
/// | |
/// ### Example with [onStream] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .buffer(onStream(new Stream.periodic(const Duration(milliseconds: 220), (int i) => i))) | |
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ... | |
/// | |
/// You can create your own sampler by extending [StreamView] | |
/// should the above samplers be insufficient for your use case. | |
Observable<List<T>> buffer(SamplerBuilder<T, List<T>> sampler) => | |
transform(BufferStreamTransformer<T>((Stream<T> stream, | |
OnDataTransform<T, List<T>> bufferHandler, | |
OnDataTransform<List<T>, List<T>> scheduleHandler) => | |
sampler(stream, bufferHandler, scheduleHandler))); | |
/// Buffers a number of values from the source Observable by [count] then | |
/// emits the buffer and clears it, and starts a new buffer each | |
/// [startBufferEvery] values. If [startBufferEvery] is not provided or is | |
/// null, then new buffers are started immediately at the start of the source | |
/// and when each buffer closes and is emitted. | |
/// | |
/// ### Example | |
/// [count] is the maximum size of the buffer emitted | |
/// | |
/// Observable.range(1, 4) | |
/// .bufferCount(2) | |
/// .listen(print); // prints [1, 2], [3, 4] done! | |
/// | |
/// ### Example | |
/// if [startBufferEvery] is 2, then a new buffer will be started | |
/// on every other value from the source. A new buffer is started at the | |
/// beginning of the source by default. | |
/// | |
/// Observable.range(1, 5) | |
/// .bufferCount(3, 2) | |
/// .listen(print); // prints [1, 2, 3], [3, 4, 5], [5] done! | |
Observable<List<T>> bufferCount(int count, [int startBufferEvery = 0]) => | |
transform(BufferStreamTransformer<T>( | |
onCount<T, List<T>>(count, startBufferEvery))); | |
/// Creates an Observable where each item is a [List] containing the items | |
/// from the source sequence, batched whenever [onFutureHandler] completes. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .bufferFuture(() => new Future.delayed(const Duration(milliseconds: 220))) | |
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ... | |
Observable<List<T>> bufferFuture<O>(Future<O> onFutureHandler()) => transform( | |
BufferStreamTransformer<T>(onFuture<T, List<T>, O>(onFutureHandler))); | |
/// Creates an Observable where each item is a [List] containing the items | |
/// from the source sequence, batched whenever [onTestHandler] passes. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .bufferTest((i) => i % 2 == 0) | |
/// .listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ... | |
Observable<List<T>> bufferTest(bool onTestHandler(T event)) => | |
transform(BufferStreamTransformer<T>(onTest<T, List<T>>(onTestHandler))); | |
/// Creates an Observable where each item is a [List] containing the items | |
/// from the source sequence, sampled on a time frame with [duration]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .bufferTime(const Duration(milliseconds: 220)) | |
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ... | |
Observable<List<T>> bufferTime(Duration duration) => | |
transform(BufferStreamTransformer<T>(onTime(duration))); | |
/// Creates an Observable where each item is a [List] containing the items | |
/// from the source sequence, sampled on [onStream]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .bufferWhen(new Stream.periodic(const Duration(milliseconds: 220), (int i) => i)) | |
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ... | |
Observable<List<T>> bufferWhen<O>(Stream<O> other) => | |
transform(BufferStreamTransformer<T>(onStream(other))); | |
/// | |
/// Adapt this stream to be a `Stream<R>`. | |
/// | |
/// If this stream already has the desired type, its returned directly. | |
/// Otherwise it is wrapped as a `Stream<R>` which checks at run-time that | |
/// each data event emitted by this stream is also an instance of [R]. | |
/// | |
@override | |
Observable<R> cast<R>() => Observable<R>(_stream.cast<R>()); | |
/// Maps each emitted item to a new [Stream] using the given mapper, then | |
/// subscribes to each new stream one after the next until all values are | |
/// emitted. | |
/// | |
/// ConcatMap is similar to flatMap, but ensures order by guaranteeing that | |
/// all items from the created stream will be emitted before moving to the | |
/// next created stream. This process continues until all created streams have | |
/// completed. | |
/// | |
/// This is a simple alias for Dart Stream's `asyncExpand`, but is included to | |
/// ensure a more consistent Rx API. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(4, 1) | |
/// .concatMap((i) => | |
/// new Observable.timer(i, new Duration(minutes: i)) | |
/// .listen(print); // prints 4, 3, 2, 1 | |
Observable<S> concatMap<S>(Stream<S> mapper(T value)) => | |
Observable<S>(_stream.asyncExpand(mapper)); | |
/// Returns an Observable that emits all items from the current Observable, | |
/// then emits all items from the given observable, one after the next. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.timer(1, new Duration(seconds: 10)) | |
/// .concatWith([new Observable.just(2)]) | |
/// .listen(print); // prints 1, 2 | |
Observable<T> concatWith(Iterable<Stream<T>> other) => | |
Observable<T>(ConcatStream<T>(<Stream<T>>[_stream]..addAll(other))); | |
@override | |
AsObservableFuture<bool> contains(Object needle) => | |
AsObservableFuture<bool>(_stream.contains(needle)); | |
/// Creates an Observable that will only emit items from the source sequence | |
/// if a particular time span has passed without the source sequence emitting | |
/// another item. | |
/// | |
/// The Debounce operator filters out items emitted by the source Observable | |
/// that are rapidly followed by another emitted item. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#debounce) | |
/// | |
/// ### Example | |
/// | |
/// new Observable.range(1, 100) | |
/// .debounce(new Duration(seconds: 1)) | |
/// .listen(print); // prints 100 | |
Observable<T> debounce(Duration duration) => | |
transform(DebounceStreamTransformer<T>(duration)); | |
/// Emit items from the source Stream, or a single default item if the source | |
/// Stream emits nothing. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.empty().defaultIfEmpty(10).listen(print); // prints 10 | |
Observable<T> defaultIfEmpty(T defaultValue) => | |
transform(DefaultIfEmptyStreamTransformer<T>(defaultValue)); | |
/// The Delay operator modifies its source Observable by pausing for | |
/// a particular increment of time (that you specify) before emitting | |
/// each of the source Observable’s items. | |
/// This has the effect of shifting the entire sequence of items emitted | |
/// by the Observable forward in time by that specified increment. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#delay) | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2, 3, 4]) | |
/// .delay(new Duration(seconds: 1)) | |
/// .listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately | |
Observable<T> delay(Duration duration) => | |
transform(DelayStreamTransformer<T>(duration)); | |
/// Converts the onData, onDone, and onError [Notification] objects from a | |
/// materialized stream into normal onData, onDone, and onError events. | |
/// | |
/// When a stream has been materialized, it emits onData, onDone, and onError | |
/// events as [Notification] objects. Dematerialize simply reverses this by | |
/// transforming [Notification] objects back to a normal stream of events. | |
/// | |
/// ### Example | |
/// | |
/// new Observable<Notification<int>> | |
/// .fromIterable([new Notification.onData(1), new Notification.onDone()]) | |
/// .dematerialize() | |
/// .listen((i) => print(i)); // Prints 1 | |
/// | |
/// ### Error example | |
/// | |
/// new Observable<Notification<int>> | |
/// .just(new Notification.onError(new Exception(), null)) | |
/// .dematerialize() | |
/// .listen(null, onError: (e, s) { print(e) }); // Prints Exception | |
Observable<S> dematerialize<S>() { | |
return cast<Notification<S>>() | |
.transform(DematerializeStreamTransformer<S>()); | |
} | |
/// WARNING: More commonly known as distinctUntilChanged in other Rx | |
/// implementations. Creates an Observable where data events are skipped if | |
/// they are equal to the previous data event. | |
/// | |
/// The returned stream provides the same events as this stream, except that | |
/// it never provides two consecutive data events that are equal. | |
/// | |
/// Equality is determined by the provided equals method. If that is omitted, | |
/// the '==' operator on the last provided data element is used. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. If a | |
/// broadcast stream is listened to more than once, each subscription will | |
/// individually perform the equals test. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#distinctUntilChanged) | |
@override | |
Observable<T> distinct([bool equals(T previous, T next)]) => | |
Observable<T>(_stream.distinct(equals)); | |
/// WARNING: More commonly known as distinct in other Rx implementations. | |
/// Creates an Observable where data events are skipped if they have already | |
/// been emitted before. | |
/// | |
/// Equality is determined by the provided equals and hashCode methods. | |
/// If these are omitted, the '==' operator and hashCode on the last provided | |
/// data element are used. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. If a | |
/// broadcast stream is listened to more than once, each subscription will | |
/// individually perform the equals and hashCode tests. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#distinct) | |
Observable<T> distinctUnique({bool equals(T e1, T e2), int hashCode(T e)}) => | |
transform(DistinctUniqueStreamTransformer<T>( | |
equals: equals, hashCode: hashCode)); | |
/// Invokes the given callback function when the stream subscription is | |
/// cancelled. Often called doOnUnsubscribe or doOnDispose in other | |
/// implementations. | |
/// | |
/// ### Example | |
/// | |
/// final subscription = new Observable.timer(1, new Duration(minutes: 1)) | |
/// .doOnCancel(() => print("hi")); | |
/// .listen(null); | |
/// | |
/// subscription.cancel(); // prints "hi" | |
Observable<T> doOnCancel(void onCancel()) => | |
transform(DoStreamTransformer<T>(onCancel: onCancel)); | |
/// Invokes the given callback function when the stream emits an item. In | |
/// other implementations, this is called doOnNext. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2, 3]) | |
/// .doOnData(print) | |
/// .listen(null); // prints 1, 2, 3 | |
Observable<T> doOnData(void onData(T event)) => | |
transform(DoStreamTransformer<T>(onData: onData)); | |
/// Invokes the given callback function when the stream finishes emitting | |
/// items. In other implementations, this is called doOnComplete(d). | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2, 3]) | |
/// .doOnDone(() => print("all set")) | |
/// .listen(null); // prints "all set" | |
Observable<T> doOnDone(void onDone()) => | |
transform(DoStreamTransformer<T>(onDone: onDone)); | |
/// Invokes the given callback function when the stream emits data, emits | |
/// an error, or emits done. The callback receives a [Notification] object. | |
/// | |
/// The [Notification] object contains the [Kind] of event (OnData, onDone, | |
/// or OnError), and the item or error that was emitted. In the case of | |
/// onDone, no data is emitted as part of the [Notification]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(1) | |
/// .doOnEach(print) | |
/// .listen(null); // prints Notification{kind: OnData, value: 1, errorAndStackTrace: null}, Notification{kind: OnDone, value: null, errorAndStackTrace: null} | |
Observable<T> doOnEach(void onEach(Notification<T> notification)) => | |
transform(DoStreamTransformer<T>(onEach: onEach)); | |
/// Invokes the given callback function when the stream emits an error. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.error(new Exception()) | |
/// .doOnError((error, stacktrace) => print("oh no")) | |
/// .listen(null); // prints "Oh no" | |
Observable<T> doOnError(Function onError) => | |
transform(DoStreamTransformer<T>(onError: onError)); | |
/// Invokes the given callback function when the stream is first listened to. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(1) | |
/// .doOnListen(() => print("Is someone there?")) | |
/// .listen(null); // prints "Is someone there?" | |
Observable<T> doOnListen(void onListen()) => | |
transform(DoStreamTransformer<T>(onListen: onListen)); | |
/// Invokes the given callback function when the stream subscription is | |
/// paused. | |
/// | |
/// ### Example | |
/// | |
/// final subscription = new Observable.just(1) | |
/// .doOnPause(() => print("Gimme a minute please")) | |
/// .listen(null); | |
/// | |
/// subscription.pause(); // prints "Gimme a minute please" | |
Observable<T> doOnPause(void onPause(Future<dynamic> resumeSignal)) => | |
transform(DoStreamTransformer<T>(onPause: onPause)); | |
/// Invokes the given callback function when the stream subscription | |
/// resumes receiving items. | |
/// | |
/// ### Example | |
/// | |
/// final subscription = new Observable.just(1) | |
/// .doOnResume(() => print("Let's do this!")) | |
/// .listen(null); | |
/// | |
/// subscription.pause(); | |
/// subscription.resume(); "Let's do this!" | |
Observable<T> doOnResume(void onResume()) => | |
transform(DoStreamTransformer<T>(onResume: onResume)); | |
@override | |
AsObservableFuture<S> drain<S>([S futureValue]) => | |
AsObservableFuture<S>(_stream.drain(futureValue)); | |
@override | |
AsObservableFuture<T> elementAt(int index) => | |
AsObservableFuture<T>(_stream.elementAt(index)); | |
@override | |
AsObservableFuture<bool> every(bool test(T element)) => | |
AsObservableFuture<bool>(_stream.every(test)); | |
/// Converts items from the source stream into a new Stream using a given | |
/// mapper. It ignores all items from the source stream until the new stream | |
/// completes. | |
/// | |
/// Useful when you have a noisy source Stream and only want to respond once | |
/// the previous async operation is finished. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(0, 2).interval(new Duration(milliseconds: 50)) | |
/// .exhaustMap((i) => | |
/// new Observable.timer(i, new Duration(milliseconds: 75))) | |
/// .listen(print); // prints 0, 2 | |
Observable<S> exhaustMap<S>(Stream<S> mapper(T value)) => | |
transform(ExhaustMapStreamTransformer<T, S>(mapper)); | |
/// Creates an Observable from this stream that converts each element into | |
/// zero or more events. | |
/// | |
/// Each incoming event is converted to an Iterable of new events, and each of | |
/// these new events are then sent by the returned Observable in order. | |
/// | |
/// The returned Observable is a broadcast stream if this stream is. If a | |
/// broadcast stream is listened to more than once, each subscription will | |
/// individually call convert and expand the events. | |
@override | |
Observable<S> expand<S>(Iterable<S> convert(T value)) => | |
Observable<S>(_stream.expand(convert)); | |
@override | |
AsObservableFuture<T> get first => AsObservableFuture<T>(_stream.first); | |
@override | |
AsObservableFuture<T> firstWhere(bool test(T element), | |
{dynamic defaultValue(), T orElse()}) => | |
AsObservableFuture<T>(_stream.firstWhere(test, orElse: orElse)); | |
/// Converts each emitted item into a new Stream using the given mapper | |
/// function. The newly created Stream will be be listened to and begin | |
/// emitting items downstream. | |
/// | |
/// The items emitted by each of the new Streams are emitted downstream in the | |
/// same order they arrive. In other words, the sequences are merged | |
/// together. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(4, 1) | |
/// .flatMap((i) => | |
/// new Observable.timer(i, new Duration(minutes: i)) | |
/// .listen(print); // prints 1, 2, 3, 4 | |
Observable<S> flatMap<S>(Stream<S> mapper(T value)) => | |
transform(FlatMapStreamTransformer<T, S>(mapper)); | |
/// Converts each item into a new Stream. The Stream must return an | |
/// Iterable. Then, each item from the Iterable will be emitted one by one. | |
/// | |
/// Use case: you may have an API that returns a list of items, such as | |
/// a Stream<List<String>>. However, you might want to operate on the individual items | |
/// rather than the list itself. This is the job of `flatMapIterable`. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(1, 4) | |
/// .flatMapIterable((i) => | |
/// new Observable.just([i]) | |
/// .listen(print); // prints 1, 2, 3, 4 | |
Observable<S> flatMapIterable<S>(Stream<Iterable<S>> mapper(T value)) => | |
transform(FlatMapStreamTransformer<T, Iterable<S>>(mapper)) | |
.expand((Iterable<S> iterable) => iterable); | |
@override | |
AsObservableFuture<S> fold<S>( | |
S initialValue, S combine(S previous, T element)) => | |
AsObservableFuture<S>(_stream.fold(initialValue, combine)); | |
@override | |
AsObservableFuture<dynamic> forEach(void action(T element)) => | |
AsObservableFuture<dynamic>(_stream.forEach(action)); | |
/// The GroupBy operator divides an [Observable] that emits items into | |
/// an [Observable] that emits [GroupByObservable], | |
/// each one of which emits some subset of the items | |
/// from the original source [Observable]. | |
/// | |
/// [GroupByObservable] acts like a regular [Observable], yet | |
/// adding a 'key' property, which receives its [Type] and value from | |
/// the [grouper] Function. | |
/// | |
/// All items with the same key are emitted by the same [GroupByObservable]. | |
Observable<GroupByObservable<T, S>> groupBy<S>(S grouper(T value)) => | |
transform(GroupByStreamTransformer<T, S>(grouper)); | |
/// Creates a wrapper Stream that intercepts some errors from this stream. | |
/// | |
/// If this stream sends an error that matches test, then it is intercepted by | |
/// the handle function. | |
/// | |
/// The onError callback must be of type void onError(error) or void | |
/// onError(error, StackTrace stackTrace). Depending on the function type the | |
/// stream either invokes onError with or without a stack trace. The stack | |
/// trace argument might be null if the stream itself received an error | |
/// without stack trace. | |
/// | |
/// An asynchronous error e is matched by a test function if test(e) returns | |
/// true. If test is omitted, every error is considered matching. | |
/// | |
/// If the error is intercepted, the handle function can decide what to do | |
/// with it. It can throw if it wants to raise a new (or the same) error, or | |
/// simply return to make the stream forget the error. | |
/// | |
/// If you need to transform an error into a data event, use the more generic | |
/// Stream.transform to handle the event by writing a data event to the output | |
/// sink. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. If a | |
/// broadcast stream is listened to more than once, each subscription will | |
/// individually perform the test and handle the error. | |
@override | |
Observable<T> handleError(Function onError, {bool test(dynamic error)}) => | |
Observable<T>(_stream.handleError(onError, test: test)); | |
/// Creates an Observable where all emitted items are ignored, only the | |
/// error / completed notifications are passed | |
/// | |
/// ### Example | |
/// | |
/// new Observable.merge([ | |
/// new Observable.just(1), | |
/// new Observable.error(new Exception()) | |
/// ]) | |
/// .listen(print, onError: print); // prints Exception | |
Observable<T> ignoreElements() => | |
transform(IgnoreElementsStreamTransformer<T>()); | |
/// Creates an Observable that emits each item in the Stream after a given | |
/// duration. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2, 3]) | |
/// .interval(new Duration(seconds: 1)) | |
/// .listen((i) => print("$i sec"); // prints 1 sec, 2 sec, 3 sec | |
Observable<T> interval(Duration duration) => | |
transform(IntervalStreamTransformer<T>(duration)); | |
@override | |
bool get isBroadcast { | |
return (_stream != null) ? _stream.isBroadcast : false; | |
} | |
@override | |
AsObservableFuture<bool> get isEmpty => | |
AsObservableFuture<bool>(_stream.isEmpty); | |
@override | |
AsObservableFuture<String> join([String separator = ""]) => | |
AsObservableFuture<String>(_stream.join(separator)); | |
@override | |
AsObservableFuture<T> get last => AsObservableFuture<T>(_stream.last); | |
@override | |
AsObservableFuture<T> lastWhere(bool test(T element), | |
{Object defaultValue(), T orElse()}) => | |
AsObservableFuture<T>(_stream.lastWhere(test, orElse: orElse)); | |
/// Adds a subscription to this stream. Returns a [StreamSubscription] which | |
/// handles events from the stream using the provided [onData], [onError] and | |
/// [onDone] handlers. | |
/// | |
/// The handlers can be changed on the subscription, but they start out | |
/// as the provided functions. | |
/// | |
/// On each data event from this stream, the subscriber's [onData] handler | |
/// is called. If [onData] is `null`, nothing happens. | |
/// | |
/// On errors from this stream, the [onError] handler is called with the | |
/// error object and possibly a stack trace. | |
/// | |
/// The [onError] callback must be of type `void onError(error)` or | |
/// `void onError(error, StackTrace stackTrace)`. If [onError] accepts | |
/// two arguments it is called with the error object and the stack trace | |
/// (which could be `null` if the stream itself received an error without | |
/// stack trace). | |
/// Otherwise it is called with just the error object. | |
/// If [onError] is omitted, any errors on the stream are considered unhandled, | |
/// and will be passed to the current [Zone]'s error handler. | |
/// By default unhandled async errors are treated | |
/// as if they were uncaught top-level errors. | |
/// | |
/// If this stream closes and sends a done event, the [onDone] handler is | |
/// called. If [onDone] is `null`, nothing happens. | |
/// | |
/// If [cancelOnError] is true, the subscription is automatically cancelled | |
/// when the first error event is delivered. The default is `false`. | |
/// | |
/// While a subscription is paused, or when it has been cancelled, | |
/// the subscription doesn't receive events and none of the | |
/// event handler functions are called. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(1).listen(print); // prints 1 | |
@override | |
StreamSubscription<T> listen(void onData(T event), | |
{Function onError, void onDone(), bool cancelOnError}) { | |
return _stream.listen(onData, | |
onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
} | |
@override | |
AsObservableFuture<int> get length => AsObservableFuture<int>(_stream.length); | |
/// Maps values from a source sequence through a function and emits the | |
/// returned values. | |
/// | |
/// The returned sequence completes when the source sequence completes. | |
/// The returned sequence throws an error if the source sequence throws an | |
/// error. | |
@override | |
Observable<S> map<S>(S convert(T event)) => | |
Observable<S>(_stream.map(convert)); | |
/// Emits the given constant value on the output Observable every time the source Observable emits a value. | |
/// | |
/// ### Example | |
/// | |
/// Observable.fromIterable([1, 2, 3, 4]) | |
/// .mapTo(true) | |
/// .listen(print); // prints true, true, true, true | |
Observable<S> mapTo<S>(S value) => | |
transform(MapToStreamTransformer<T, S>(value)); | |
/// Converts the onData, on Done, and onError events into [Notification] | |
/// objects that are passed into the downstream onData listener. | |
/// | |
/// The [Notification] object contains the [Kind] of event (OnData, onDone, or | |
/// OnError), and the item or error that was emitted. In the case of onDone, | |
/// no data is emitted as part of the [Notification]. | |
/// | |
/// Example: | |
/// new Observable<int>.just(1) | |
/// .materialize() | |
/// .listen((i) => print(i)); // Prints onData & onDone Notification | |
/// | |
/// new Observable<int>.error(new Exception()) | |
/// .materialize() | |
/// .listen((i) => print(i)); // Prints onError Notification | |
Observable<Notification<T>> materialize() => | |
transform(MaterializeStreamTransformer<T>()); | |
/// Converts a Stream into a Future that completes with the largest item emitted | |
/// by the Stream. | |
/// | |
/// This is similar to finding the max value in a list, but the values are | |
/// asynchronous. | |
/// | |
/// ### Example | |
/// | |
/// final max = await new Observable.fromIterable([1, 2, 3]).max(); | |
/// | |
/// print(max); // prints 3 | |
/// | |
/// ### Example with custom [Comparator] | |
/// | |
/// final observable = new Observable.fromIterable("short", "looooooong"); | |
/// final max = await observable.max((a, b) => a.length - b.length); | |
/// | |
/// print(max); // prints "looooooong" | |
AsObservableFuture<T> max([Comparator<T> comparator]) => | |
AsObservableFuture<T>(StreamMaxFuture<T>(_stream, comparator)); | |
/// Combines the items emitted by multiple streams into a single stream of | |
/// items. The items are emitted in the order they are emitted by their | |
/// sources. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.timer(1, new Duration(seconds: 10)) | |
/// .mergeWith([new Observable.just(2)]) | |
/// .listen(print); // prints 2, 1 | |
Observable<T> mergeWith(Iterable<Stream<T>> streams) => | |
Observable<T>(MergeStream<T>(<Stream<T>>[_stream]..addAll(streams))); | |
/// Converts a Stream into a Future that completes with the smallest item | |
/// emitted by the Stream. | |
/// | |
/// This is similar to finding the min value in a list, but the values are | |
/// asynchronous! | |
/// | |
/// ### Example | |
/// | |
/// final min = await new Observable.fromIterable([1, 2, 3]).min(); | |
/// | |
/// print(min); // prints 1 | |
/// | |
/// ### Example with custom [Comparator] | |
/// | |
/// final observable = new Observable.fromIterable("short", "looooooong"); | |
/// final min = await observable.min((a, b) => a.length - b.length); | |
/// | |
/// print(min); // prints "short" | |
AsObservableFuture<T> min([Comparator<T> comparator]) => | |
AsObservableFuture<T>(StreamMinFuture<T>(_stream, comparator)); | |
/// Filters a sequence so that only events of a given type pass | |
/// | |
/// In order to capture the Type correctly, it needs to be wrapped | |
/// in a [TypeToken] as the generic parameter. | |
/// | |
/// Given the way Dart generics work, one cannot simply use the `is T` / `as T` | |
/// checks and castings with this method alone. Therefore, the | |
/// [TypeToken] class was introduced to capture the type of class you'd | |
/// like `ofType` to filter down to. | |
/// | |
/// ### Examples | |
/// | |
/// new Observable.fromIterable([1, "hi"]) | |
/// .ofType(new TypeToken<String>) | |
/// .listen(print); // prints "hi" | |
/// | |
/// As a shortcut, you can use some pre-defined constants to write the above | |
/// in the following way: | |
/// | |
/// new Observable.fromIterable([1, "hi"]) | |
/// .ofType(kString) | |
/// .listen(print); // prints "hi" | |
/// | |
/// If you'd like to create your own shortcuts like the example above, | |
/// simply create a constant: | |
/// | |
/// const TypeToken<Map<Int, String>> kMapIntString = | |
/// const TypeToken<Map<Int, String>>(); | |
Observable<S> ofType<S>(TypeToken<S> typeToken) => | |
transform(OfTypeStreamTransformer<T, S>(typeToken)); | |
/// Intercepts error events and switches to the given recovery stream in | |
/// that case | |
/// | |
/// The onErrorResumeNext operator intercepts an onError notification from | |
/// the source Observable. Instead of passing the error through to any | |
/// listeners, it replaces it with another Stream of items. | |
/// | |
/// If you need to perform logic based on the type of error that was emitted, | |
/// please consider using [onErrorResume]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.error(new Exception()) | |
/// .onErrorResumeNext(new Observable.fromIterable([1, 2, 3])) | |
/// .listen(print); // prints 1, 2, 3 | |
Observable<T> onErrorResumeNext(Stream<T> recoveryStream) => transform( | |
OnErrorResumeStreamTransformer<T>((dynamic e) => recoveryStream)); | |
/// Intercepts error events and switches to a recovery stream created by the | |
/// provided [recoveryFn]. | |
/// | |
/// The onErrorResume operator intercepts an onError notification from | |
/// the source Observable. Instead of passing the error through to any | |
/// listeners, it replaces it with another Stream of items created by the | |
/// [recoveryFn]. | |
/// | |
/// The [recoveryFn] receives the emitted error and returns a Stream. You can | |
/// perform logic in the [recoveryFn] to return different Streams based on the | |
/// type of error that was emitted. | |
/// | |
/// If you do not need to perform logic based on the type of error that was | |
/// emitted, please consider using [onErrorResumeNext] or [onErrorReturn]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable<int>.error(new Exception()) | |
/// .onErrorResume((dynamic e) => | |
/// new Observable.just(e is StateError ? 1 : 0) | |
/// .listen(print); // prints 0 | |
Observable<T> onErrorResume(Stream<T> Function(dynamic error) recoveryFn) => | |
transform(OnErrorResumeStreamTransformer<T>(recoveryFn)); | |
/// instructs an Observable to emit a particular item when it encounters an | |
/// error, and then terminate normally | |
/// | |
/// The onErrorReturn operator intercepts an onError notification from | |
/// the source Observable. Instead of passing it through to any observers, it | |
/// replaces it with a given item, and then terminates normally. | |
/// | |
/// If you need to perform logic based on the type of error that was emitted, | |
/// please consider using [onErrorReturnWith]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.error(new Exception()) | |
/// .onErrorReturn(1) | |
/// .listen(print); // prints 1 | |
Observable<T> onErrorReturn(T returnValue) => | |
transform(OnErrorResumeStreamTransformer<T>( | |
(dynamic e) => Observable<T>.just(returnValue))); | |
/// instructs an Observable to emit a particular item created by the | |
/// [returnFn] when it encounters an error, and then terminate normally. | |
/// | |
/// The onErrorReturnWith operator intercepts an onError notification from | |
/// the source Observable. Instead of passing it through to any observers, it | |
/// replaces it with a given item, and then terminates normally. | |
/// | |
/// The [returnFn] receives the emitted error and returns a Stream. You can | |
/// perform logic in the [returnFn] to return different Streams based on the | |
/// type of error that was emitted. | |
/// | |
/// If you do not need to perform logic based on the type of error that was | |
/// emitted, please consider using [onErrorReturn]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.error(new Exception()) | |
/// .onErrorReturnWith((e) => e is Exception ? 1 : 0) | |
/// .listen(print); // prints 1 | |
Observable<T> onErrorReturnWith(T Function(dynamic error) returnFn) => | |
transform(OnErrorResumeStreamTransformer<T>( | |
(dynamic e) => Observable<T>.just(returnFn(e)))); | |
/// Triggers on the second and subsequent triggerings of the input observable. | |
/// The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(1, 4) | |
/// .pairwise() | |
/// .listen(print); // prints [1, 2], [2, 3] | |
Observable<List<T>> pairwise() => | |
transform(BufferStreamTransformer<T>(onCount<T, List<T>>(2, 1), | |
exhaustBufferOnDone: false)); | |
@override | |
AsObservableFuture<dynamic> pipe(StreamConsumer<T> streamConsumer) => | |
AsObservableFuture<dynamic>(_stream.pipe(streamConsumer)); | |
@override | |
AsObservableFuture<T> reduce(T combine(T previous, T element)) => | |
AsObservableFuture<T>(_stream.reduce(combine)); | |
/// Returns an Observable that, when the specified sample stream emits | |
/// an item or completes, emits the most recently emitted item (if any) | |
/// emitted by the source stream since the previous emission from | |
/// the sample stream. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2, 3]) | |
/// .sample(new Observable.timer(1, new Duration(seconds: 1)) | |
/// .listen(print); // prints 3 | |
Observable<T> sample(Stream<dynamic> sampleStream) => | |
transform(SampleStreamTransformer<T>(sampleStream)); | |
/// Applies an accumulator function over an observable sequence and returns | |
/// each intermediate result. The optional seed value is used as the initial | |
/// accumulator value. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2, 3]) | |
/// .scan((acc, curr, i) => acc + curr, 0) | |
/// .listen(print); // prints 1, 3, 6 | |
Observable<S> scan<S>(S accumulator(S accumulated, T value, int index), | |
[S seed]) => | |
transform(ScanStreamTransformer<T, S>(accumulator, seed)); | |
@override | |
AsObservableFuture<T> get single => AsObservableFuture<T>(_stream.single); | |
@override | |
AsObservableFuture<T> singleWhere(bool test(T element), {T orElse()}) => | |
AsObservableFuture<T>(_stream.singleWhere(test, orElse: orElse)); | |
/// Skips the first count data events from this stream. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. For a | |
/// broadcast stream, the events are only counted from the time the returned | |
/// stream is listened to. | |
@override | |
Observable<T> skip(int count) => Observable<T>(_stream.skip(count)); | |
/// Starts emitting items only after the given stream emits an item. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.merge([ | |
/// new Observable.just(1), | |
/// new Observable.timer(2, new Duration(minutes: 2)) | |
/// ]) | |
/// .skipUntil(new Observable.timer(true, new Duration(minutes: 1))) | |
/// .listen(print); // prints 2; | |
Observable<T> skipUntil<S>(Stream<S> otherStream) => | |
transform(SkipUntilStreamTransformer<T, S>(otherStream)); | |
/// Skip data events from this stream while they are matched by test. | |
/// | |
/// Error and done events are provided by the returned stream unmodified. | |
/// | |
/// Starting with the first data event where test returns false for the event | |
/// data, the returned stream will have the same events as this stream. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. For a | |
/// broadcast stream, the events are only tested from the time the returned | |
/// stream is listened to. | |
@override | |
Observable<T> skipWhile(bool test(T element)) => | |
Observable<T>(_stream.skipWhile(test)); | |
/// Prepends a value to the source Observable. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(2).startWith(1).listen(print); // prints 1, 2 | |
Observable<T> startWith(T startValue) => | |
transform(StartWithStreamTransformer<T>(startValue)); | |
/// Prepends a sequence of values to the source Observable. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(3).startWithMany([1, 2]) | |
/// .listen(print); // prints 1, 2, 3 | |
Observable<T> startWithMany(List<T> startValues) => | |
transform(StartWithManyStreamTransformer<T>(startValues)); | |
/// When the original observable emits no items, this operator subscribes to | |
/// the given fallback stream and emits items from that observable instead. | |
/// | |
/// This can be particularly useful when consuming data from multiple sources. | |
/// For example, when using the Repository Pattern. Assuming you have some | |
/// data you need to load, you might want to start with the fastest access | |
/// point and keep falling back to the slowest point. For example, first query | |
/// an in-memory database, then a database on the file system, then a network | |
/// call if the data isn't on the local machine. | |
/// | |
/// This can be achieved quite simply with switchIfEmpty! | |
/// | |
/// ### Example | |
/// | |
/// // Let's pretend we have some Data sources that complete without emitting | |
/// // any items if they don't contain the data we're looking for | |
/// Observable<Data> memory; | |
/// Observable<Data> disk; | |
/// Observable<Data> network; | |
/// | |
/// // Start with memory, fallback to disk, then fallback to network. | |
/// // Simple as that! | |
/// Observable<Data> getThatData = | |
/// memory.switchIfEmpty(disk).switchIfEmpty(network); | |
Observable<T> switchIfEmpty(Stream<T> fallbackStream) => | |
transform(SwitchIfEmptyStreamTransformer<T>(fallbackStream)); | |
/// Converts each emitted item into a new Stream using the given mapper | |
/// function. The newly created Stream will be be listened to and begin | |
/// emitting items, and any previously created Stream will stop emitting. | |
/// | |
/// The switchMap operator is similar to the flatMap and concatMap methods, | |
/// but it only emits items from the most recently created Stream. | |
/// | |
/// This can be useful when you only want the very latest state from | |
/// asynchronous APIs, for example. | |
/// | |
/// ### Example | |
/// | |
/// Observable.range(4, 1) | |
/// .switchMap((i) => | |
/// new Observable.timer(i, new Duration(minutes: i)) | |
/// .listen(print); // prints 1 | |
Observable<S> switchMap<S>(Stream<S> mapper(T value)) => | |
transform(SwitchMapStreamTransformer<T, S>(mapper)); | |
/// Provides at most the first `n` values of this stream. | |
/// Forwards the first n data events of this stream, and all error events, to | |
/// the returned stream, and ends with a done event. | |
/// | |
/// If this stream produces fewer than count values before it's done, so will | |
/// the returned stream. | |
/// | |
/// Stops listening to the stream after the first n elements have been | |
/// received. | |
/// | |
/// Internally the method cancels its subscription after these elements. This | |
/// means that single-subscription (non-broadcast) streams are closed and | |
/// cannot be reused after a call to this method. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. For a | |
/// broadcast stream, the events are only counted from the time the returned | |
/// stream is listened to | |
@override | |
Observable<T> take(int count) => Observable<T>(_stream.take(count)); | |
/// Returns the values from the source observable sequence until the other | |
/// observable sequence produces a value. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.merge([ | |
/// new Observable.just(1), | |
/// new Observable.timer(2, new Duration(minutes: 1)) | |
/// ]) | |
/// .takeUntil(new Observable.timer(3, new Duration(seconds: 10))) | |
/// .listen(print); // prints 1 | |
Observable<T> takeUntil<S>(Stream<S> otherStream) => | |
transform(TakeUntilStreamTransformer<T, S>(otherStream)); | |
/// Forwards data events while test is successful. | |
/// | |
/// The returned stream provides the same events as this stream as long as | |
/// test returns true for the event data. The stream is done when either this | |
/// stream is done, or when this stream first provides a value that test | |
/// doesn't accept. | |
/// | |
/// Stops listening to the stream after the accepted elements. | |
/// | |
/// Internally the method cancels its subscription after these elements. This | |
/// means that single-subscription (non-broadcast) streams are closed and | |
/// cannot be reused after a call to this method. | |
/// | |
/// The returned stream is a broadcast stream if this stream is. For a | |
/// broadcast stream, the events are only tested from the time the returned | |
/// stream is listened to. | |
@override | |
Observable<T> takeWhile(bool test(T element)) => | |
Observable<T>(_stream.takeWhile(test)); | |
/// Returns an Observable that emits only the first item emitted by the source | |
/// Observable during sequential time windows of a specified duration. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2, 3]) | |
/// .throttle(new Duration(seconds: 1)) | |
/// .listen(print); // prints 1 | |
Observable<T> throttle(Duration duration) => | |
transform(ThrottleStreamTransformer<T>(duration)); | |
/// Records the time interval between consecutive values in an observable | |
/// sequence. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(1) | |
/// .interval(new Duration(seconds: 1)) | |
/// .timeInterval() | |
/// .listen(print); // prints TimeInterval{interval: 0:00:01, value: 1} | |
Observable<TimeInterval<T>> timeInterval() => | |
transform(TimeIntervalStreamTransformer<T>()); | |
/// The Timeout operator allows you to abort an Observable with an onError | |
/// termination if that Observable fails to emit any items during a specified | |
/// duration. You may optionally provide a callback function to execute on | |
/// timeout. | |
@override | |
Observable<T> timeout(Duration timeLimit, | |
{void onTimeout(EventSink<T> sink)}) => | |
Observable<T>(_stream.timeout(timeLimit, onTimeout: onTimeout)); | |
/// Wraps each item emitted by the source Observable in a [Timestamped] object | |
/// that includes the emitted item and the time when the item was emitted. | |
/// | |
/// Example | |
/// | |
/// new Observable.just(1) | |
/// .timestamp() | |
/// .listen((i) => print(i)); // prints 'TimeStamp{timestamp: XXX, value: 1}'; | |
Observable<Timestamped<T>> timestamp() { | |
return transform(TimestampStreamTransformer<T>()); | |
} | |
@override | |
Observable<S> transform<S>(StreamTransformer<T, S> streamTransformer) => | |
Observable<S>(super.transform(streamTransformer)); | |
@override | |
AsObservableFuture<List<T>> toList() => | |
AsObservableFuture<List<T>>(_stream.toList()); | |
@override | |
AsObservableFuture<Set<T>> toSet() => | |
AsObservableFuture<Set<T>>(_stream.toSet()); | |
/// Filters the elements of an observable sequence based on the test. | |
@override | |
Observable<T> where(bool test(T event)) => Observable<T>(_stream.where(test)); | |
/// Creates an Observable where each item is a [Stream] containing the items | |
/// from the source sequence, batched by the [sampler]. | |
/// | |
/// ### Example with [onCount] | |
/// | |
/// Observable.range(1, 4) | |
/// .window(onCount(2)) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 1, 2, next window 3, 4 | |
/// | |
/// ### Example with [onFuture] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .window(onFuture(() => new Future.delayed(const Duration(milliseconds: 220)))) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, 1, next window 2, 3, ... | |
/// | |
/// ### Example with [onTest] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .window(onTest((i) => i % 2 == 0)) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, next window 1, 2 next window 3, 4, ... | |
/// | |
/// ### Example with [onTime] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .window(onTime(const Duration(milliseconds: 220))) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, 1, next window 2, 3, ... | |
/// | |
/// ### Example with [onStream] | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .window(onStream(new Stream.periodic(const Duration(milliseconds: 220), (int i) => i))) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, 1, next window 2, 3, ... | |
/// | |
/// You can create your own sampler by extending [StreamView] | |
/// should the above samplers be insufficient for your use case. | |
Observable<Stream<T>> window(SamplerBuilder<T, Stream<T>> sampler) => | |
transform(WindowStreamTransformer<T>((Stream<T> stream, | |
OnDataTransform<T, Stream<T>> bufferHandler, | |
OnDataTransform<Stream<T>, Stream<T>> scheduleHandler) => | |
sampler(stream, bufferHandler, scheduleHandler))); | |
/// Buffers a number of values from the source Observable by [count] then | |
/// emits the values inside the buffer as a new [Stream], | |
/// and starts a new buffer each [startBufferEvery] values. | |
/// If [startBufferEvery] is not provided or is null, then new buffers are | |
/// started immediately at the start of the source and when each buffer | |
/// closes and is emitted. | |
/// | |
/// ### Example | |
/// [count] is the maximum size of the buffer emitted | |
/// | |
/// Observable.range(1, 4) | |
/// .windowCount(2) | |
/// .doOnData((_) => print('new Stream emitted)) | |
/// .flatMap((stream) => stream) | |
/// .listen(print); // prints new Stream emitted, 1, 2, new Stream emitted, 3, 4 done! | |
/// | |
/// ### Example | |
/// if [startBufferEvery] is 2, then a new buffer will be started | |
/// on every other value from the source. A new buffer is started at the | |
/// beginning of the source by default. | |
/// | |
/// Observable.range(1, 5) | |
/// .windowCount(3, 2) | |
/// .doOnData((_) => print('new Stream emitted)) | |
/// .flatMap((stream) => stream) | |
/// .listen(print); // prints new Stream emitted, 1, 2, 3 new Stream emitted 3, 4, 5 new Stream emitted 5 done! | |
Observable<Stream<T>> windowCount(int count, [int startBufferEvery = 0]) => | |
transform(WindowStreamTransformer<T>(onCount(count, startBufferEvery))); | |
/// Creates an Observable where each item is a [Stream] containing the items | |
/// from the source sequence, batched whenever [onFutureHandler] completes. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .windowFuture(() => new Future.delayed(const Duration(milliseconds: 220))) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, 1, next window 2, 3, ... | |
Observable<Stream<T>> windowFuture<O>(Future<O> onFutureHandler()) => | |
transform(WindowStreamTransformer<T>(onFuture(onFutureHandler))); | |
/// Creates an Observable where each item is a [Stream] containing the items | |
/// from the source sequence, batched whenever [onTestHandler] passes. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .windowTest((i) => i % 2 == 0) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, next window 1, 2 next window 3, 4, ... | |
Observable<Stream<T>> windowTest(bool onTestHandler(T event)) => | |
transform(WindowStreamTransformer<T>(onTest(onTestHandler))); | |
/// Creates an Observable where each item is a [Stream] containing the items | |
/// from the source sequence, sampled on a time frame with [duration]. | |
/// | |
/// ### Example | |
/// | |
//// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .windowTime(const Duration(milliseconds: 220)) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, 1, next window 2, 3, ... | |
Observable<Stream<T>> windowTime(Duration duration) => | |
transform(WindowStreamTransformer<T>(onTime(duration))); | |
/// Creates an Observable where each item is a [Stream] containing the items | |
/// from the source sequence, sampled on [onStream]. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i) | |
/// .windowWhen(new Stream.periodic(const Duration(milliseconds: 220), (int i) => i)) | |
/// .doOnData((_) => print('next window')) | |
/// .flatMap((s) => s) | |
/// .listen(print); // prints next window 0, 1, next window 2, 3, ... | |
Observable<Stream<T>> windowWhen<O>(Stream<O> other) => | |
transform(WindowStreamTransformer<T>(onStream(other))); | |
/// Creates an Observable that emits when the source stream emits, combining | |
/// the latest values from the two streams using the provided function. | |
/// | |
/// If the latestFromStream has not emitted any values, this stream will not | |
/// emit either. | |
/// | |
/// [Interactive marble diagram](http://rxmarbles.com/#withLatestFrom) | |
/// | |
/// ### Example | |
/// | |
/// new Observable.fromIterable([1, 2]).withLatestFrom( | |
/// new Observable.fromIterable([2, 3]), (a, b) => a + b) | |
/// .listen(print); // prints 4 (due to the async nature of streams) | |
Observable<R> withLatestFrom<S, R>( | |
Stream<S> latestFromStream, R fn(T t, S s)) => | |
transform(WithLatestFromStreamTransformer<T, S, R>(latestFromStream, fn)); | |
/// Returns an Observable that combines the current stream together with | |
/// another stream using a given zipper function. | |
/// | |
/// ### Example | |
/// | |
/// new Observable.just(1) | |
/// .zipWith(new Observable.just(2), (one, two) => one + two) | |
/// .listen(print); // prints 3 | |
Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) => | |
Observable<R>(ZipStream.zip2(_stream, other, zipper)); | |
/// Convert the current Observable into a [ConnectableObservable] | |
/// that can be listened to multiple times. It will not begin emitting items | |
/// from the original Observable until the `connect` method is invoked. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// final source = Observable.fromIterable([1, 2, 3]); | |
/// final connectable = source.publish(); | |
/// | |
/// // Does not print anything at first | |
/// connectable.listen(print); | |
/// | |
/// // Start listening to the source Observable. Will cause the previous | |
/// // line to start printing 1, 2, 3 | |
/// final subscription = connectable.connect(); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // Subject | |
/// subscription.cancel(); | |
/// ``` | |
ConnectableObservable<T> publish() => PublishConnectableObservable<T>(this); | |
/// Convert the current Observable into a [ValueConnectableObservable] | |
/// that can be listened to multiple times. It will not begin emitting items | |
/// from the original Observable until the `connect` method is invoked. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream that replays the latest emitted value to any new | |
/// listener. It also provides access to the latest value synchronously. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// final source = Observable.fromIterable([1, 2, 3]); | |
/// final connectable = source.publishValue(); | |
/// | |
/// // Does not print anything at first | |
/// connectable.listen(print); | |
/// | |
/// // Start listening to the source Observable. Will cause the previous | |
/// // line to start printing 1, 2, 3 | |
/// final subscription = connectable.connect(); | |
/// | |
/// // Late subscribers will receive the last emitted value | |
/// connectable.listen(print); // Prints 3 | |
/// | |
/// // Can access the latest emitted value synchronously. Prints 3 | |
/// print(connectable.value); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // BehaviorSubject | |
/// subscription.cancel(); | |
/// ``` | |
ValueConnectableObservable<T> publishValue() => | |
ValueConnectableObservable<T>(this); | |
/// Convert the current Observable into a [ValueConnectableObservable] | |
/// that can be listened to multiple times, providing an initial seeded value. | |
/// It will not begin emitting items from the original Observable | |
/// until the `connect` method is invoked. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream that replays the latest emitted value to any new | |
/// listener. It also provides access to the latest value synchronously. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// final source = Observable.fromIterable([1, 2, 3]); | |
/// final connectable = source.publishValueSeeded(0); | |
/// | |
/// // Does not print anything at first | |
/// connectable.listen(print); | |
/// | |
/// // Start listening to the source Observable. Will cause the previous | |
/// // line to start printing 0, 1, 2, 3 | |
/// final subscription = connectable.connect(); | |
/// | |
/// // Late subscribers will receive the last emitted value | |
/// connectable.listen(print); // Prints 3 | |
/// | |
/// // Can access the latest emitted value synchronously. Prints 3 | |
/// print(connectable.value); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // BehaviorSubject | |
/// subscription.cancel(); | |
/// ``` | |
ValueConnectableObservable<T> publishValueSeeded(T seedValue) => | |
ValueConnectableObservable<T>.seeded(this, seedValue); | |
/// Convert the current Observable into a [ReplayConnectableObservable] | |
/// that can be listened to multiple times. It will not begin emitting items | |
/// from the original Observable until the `connect` method is invoked. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream that replays a given number of items to any new | |
/// listener. It also provides access to the emitted values synchronously. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// final source = Observable.fromIterable([1, 2, 3]); | |
/// final connectable = source.publishReplay(); | |
/// | |
/// // Does not print anything at first | |
/// connectable.listen(print); | |
/// | |
/// // Start listening to the source Observable. Will cause the previous | |
/// // line to start printing 1, 2, 3 | |
/// final subscription = connectable.connect(); | |
/// | |
/// // Late subscribers will receive the emitted value, up to a specified | |
/// // maxSize | |
/// connectable.listen(print); // Prints 1, 2, 3 | |
/// | |
/// // Can access a list of the emitted values synchronously. Prints [1, 2, 3] | |
/// print(connectable.values); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // ReplaySubject | |
/// subscription.cancel(); | |
/// ``` | |
ReplayConnectableObservable<T> publishReplay({int maxSize}) => | |
ReplayConnectableObservable<T>(this, maxSize: maxSize); | |
/// Convert the current Observable into a new Observable that can be listened | |
/// to multiple times. It will automatically begin emitting items when first | |
/// listened to, and shut down when no listeners remain. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// // Convert a single-subscription fromIterable stream into a broadcast | |
/// // stream | |
/// final observable = Observable.fromIterable([1, 2, 3]).share(); | |
/// | |
/// // Start listening to the source Observable. Will start printing 1, 2, 3 | |
/// final subscription = observable.listen(print); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // PublishSubject | |
/// subscription.cancel(); | |
/// ``` | |
Observable<T> share() => publish().refCount(); | |
/// Convert the current Observable into a new [ValueObservable] that can | |
/// be listened to multiple times. It will automatically begin emitting items | |
/// when first listened to, and shut down when no listeners remain. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream. It's also useful for providing sync access to the latest | |
/// emitted value. | |
/// | |
/// It will replay the latest emitted value to any new listener. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// // Convert a single-subscription fromIterable stream into a broadcast | |
/// // stream that will emit the latest value to any new listeners | |
/// final observable = Observable.fromIterable([1, 2, 3]).shareValue(); | |
/// | |
/// // Start listening to the source Observable. Will start printing 1, 2, 3 | |
/// final subscription = observable.listen(print); | |
/// | |
/// // Synchronously print the latest value | |
/// print(observable.value); | |
/// | |
/// // Subscribe again later. This will print 3 because it receives the last | |
/// // emitted value. | |
/// final subscription2 = observable.listen(print); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // BehaviorSubject by cancelling all subscriptions. | |
/// subscription.cancel(); | |
/// subscription2.cancel(); | |
/// ``` | |
ValueObservable<T> shareValue() => publishValue().refCount(); | |
/// Convert the current Observable into a new [ValueObservable] that can | |
/// be listened to multiple times, providing an initial value. | |
/// It will automatically begin emitting items when first listened to, | |
/// and shut down when no listeners remain. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream. It's also useful for providing sync access to the latest | |
/// emitted value. | |
/// | |
/// It will replay the latest emitted value to any new listener. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// // Convert a single-subscription fromIterable stream into a broadcast | |
/// // stream that will emit the latest value to any new listeners | |
/// final observable = Observable.fromIterable([1, 2, 3]).shareValueSeeded(0); | |
/// | |
/// // Start listening to the source Observable. Will start printing 0, 1, 2, 3 | |
/// final subscription = observable.listen(print); | |
/// | |
/// // Synchronously print the latest value | |
/// print(observable.value); | |
/// | |
/// // Subscribe again later. This will print 3 because it receives the last | |
/// // emitted value. | |
/// final subscription2 = observable.listen(print); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // BehaviorSubject by cancelling all subscriptions. | |
/// subscription.cancel(); | |
/// subscription2.cancel(); | |
/// ``` | |
ValueObservable<T> shareValueSeeded(T seedValue) => | |
publishValueSeeded(seedValue).refCount(); | |
/// Convert the current Observable into a new [ReplayObservable] that can | |
/// be listened to multiple times. It will automatically begin emitting items | |
/// when first listened to, and shut down when no listeners remain. | |
/// | |
/// This is useful for converting a single-subscription stream into a | |
/// broadcast Stream. It's also useful for gaining access to the l | |
/// | |
/// It will replay the emitted values to any new listener, up to a given | |
/// [maxSize]. | |
/// | |
/// ### Example | |
/// | |
/// ``` | |
/// // Convert a single-subscription fromIterable stream into a broadcast | |
/// // stream that will emit the latest value to any new listeners | |
/// final observable = Observable.fromIterable([1, 2, 3]).shareReplay(); | |
/// | |
/// // Start listening to the source Observable. Will start printing 1, 2, 3 | |
/// final subscription = observable.listen(print); | |
/// | |
/// // Synchronously print the emitted values up to a given maxSize | |
/// // Prints [1, 2, 3] | |
/// print(observable.values); | |
/// | |
/// // Subscribe again later. This will print 1, 2, 3 because it receives the | |
/// // last emitted value. | |
/// final subscription2 = observable.listen(print); | |
/// | |
/// // Stop emitting items from the source stream and close the underlying | |
/// // ReplaySubject by cancelling all subscriptions. | |
/// subscription.cancel(); | |
/// subscription2.cancel(); | |
/// ``` | |
ReplayObservable<T> shareReplay({int maxSize}) => | |
publishReplay(maxSize: maxSize).refCount(); | |
} |