blob: 29b2a9ada52a2e0bcf7260190e091c56eac2d689 [file] [log] [blame]
import 'dart:async';
/// Concatenates all of the specified stream sequences, as long as the
/// previous stream sequence terminated successfully.
///
/// It does this by subscribing to each stream one by one, emitting all items
/// and completing before subscribing to the next stream.
///
/// [Interactive marble diagram](http://rxmarbles.com/#concat)
///
/// ### Example
///
/// new ConcatStream([
/// new Stream.fromIterable([1]),
/// new TimerStream(2, new Duration(days: 1)),
/// new Stream.fromIterable([3])
/// ])
/// .listen(print); // prints 1, 2, 3
class ConcatStream<T> extends Stream<T> {
final StreamController<T> controller;
ConcatStream(Iterable<Stream<T>> streams)
: controller = _buildController(streams);
@override
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError}) {
return controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
static StreamController<T> _buildController<T>(Iterable<Stream<T>> streams) {
if (streams == null) {
throw ArgumentError('Streams cannot be null');
} else if (streams.isEmpty) {
throw ArgumentError('At least 1 stream needs to be provided');
} else if (streams.any((Stream<T> stream) => stream == null)) {
throw ArgumentError('One of the provided streams is null');
}
StreamController<T> controller;
StreamSubscription<T> subscription;
controller = StreamController<T>(
sync: true,
onListen: () {
final len = streams.length;
var index = 0;
void moveNext() {
var stream = streams.elementAt(index);
subscription?.cancel();
subscription = stream.listen(controller.add,
onError: controller.addError, onDone: () {
index++;
if (index == len)
controller.close();
else
moveNext();
});
}
moveNext();
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription?.pause(resumeSignal),
onResume: () => subscription?.resume(),
onCancel: () => subscription.cancel());
return controller;
}
}