blob: 5280870abfb04ef5587c0127c84bdbe9a25a0be2 [file] [log] [blame]
import 'dart:async';
/// Convert a Stream that emits Streams (aka a "Higher Order Stream") into a
/// single Stream that emits the items emitted by the most-recently-emitted of
/// those Streams.
///
/// This stream will unsubscribe from the previously-emitted Stream when a new
/// Stream is emitted from the source Stream.
///
/// ### Example
///
/// ```dart
/// final switchLatestStream = new SwitchLatestStream<String>(
/// new Stream.fromIterable(<Stream<String>>[
/// new Observable.timer('A', new Duration(seconds: 2)),
/// new Observable.timer('B', new Duration(seconds: 1)),
/// new Observable.just('C'),
/// ]),
/// );
///
/// // Since the first two Streams do not emit data for 1-2 seconds, and the 3rd
/// // Stream will be emitted before that time, only data from the 3rd Stream
/// // will be emitted to the listener.
/// switchLatestStream.listen(print); // prints 'C'
/// ```
class SwitchLatestStream<T> extends Stream<T> {
final Stream<Stream<T>> streams;
// ignore: close_sinks
StreamController<T> _controller;
SwitchLatestStream(this.streams);
@override
StreamSubscription<T> listen(
void onData(T event), {
Function onError,
void onDone(),
bool cancelOnError,
}) {
if (_controller == null) {
_controller = _buildController(streams, cancelOnError);
}
return _controller.stream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
static StreamController<T> _buildController<T>(
Stream<Stream<T>> streams,
bool cancelOnError,
) {
StreamController<T> controller;
StreamSubscription<Stream<T>> subscription;
StreamSubscription<T> otherSubscription;
var leftClosed = false, rightClosed = false, hasMainEvent = false;
controller = StreamController<T>(
sync: true,
onListen: () {
subscription = streams.listen(
(Stream<T> value) {
try {
otherSubscription?.cancel();
hasMainEvent = true;
otherSubscription = value.listen(
controller.add,
onError: controller.addError,
onDone: () {
rightClosed = true;
if (leftClosed) controller.close();
},
);
} catch (e, s) {
controller.addError(e, s);
}
},
onError: controller.addError,
onDone: () {
leftClosed = true;
if (rightClosed || !hasMainEvent) {
controller.close();
}
},
cancelOnError: cancelOnError,
);
},
onPause: ([Future<dynamic> resumeSignal]) {
subscription.pause(resumeSignal);
otherSubscription?.pause(resumeSignal);
},
onResume: () {
subscription.resume();
otherSubscription?.resume();
},
onCancel: () async {
await subscription.cancel();
if (hasMainEvent) await otherSubscription.cancel();
});
return controller;
}
}