blob: b0946bb22bda0424458cec09d1ac14d238f76ade [file] [log] [blame]
import 'dart:async';
/// Converts each emitted item into a new Stream using the given mapper
/// function. The newly created Stream will be be listened to and begin
/// emitting items downstream.
///
/// The items emitted by each of the new Streams are emitted downstream in the
/// same order they arrive. In other words, the sequences are merged
/// together.
///
/// ### Example
///
/// new Stream.fromIterable([4, 3, 2, 1])
/// .transform(new FlatMapStreamTransformer((i) =>
/// new Stream.fromFuture(
/// new Future.delayed(new Duration(minutes: i), () => i))
/// .listen(print); // prints 1, 2, 3, 4
class FlatMapStreamTransformer<T, S> extends StreamTransformerBase<T, S> {
final StreamTransformer<T, S> transformer;
FlatMapStreamTransformer(Stream<S> mapper(T value))
: transformer = _buildTransformer(mapper);
@override
Stream<S> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, S> _buildTransformer<T, S>(
Stream<S> mapper(T value)) {
return StreamTransformer<T, S>((Stream<T> input, bool cancelOnError) {
final subscriptions = <StreamSubscription<S>>[];
StreamController<S> controller;
StreamSubscription<T> subscription;
StreamSubscription<S> otherSubscription;
var closeAfterNextEvent = false, hasMainEvent = false, openStreams = 0;
controller = StreamController<S>(
sync: true,
onListen: () {
subscription = input.listen(
(T value) {
try {
var otherStream = mapper(value);
hasMainEvent = true;
openStreams++;
otherSubscription = otherStream.listen(controller.add,
onError: controller.addError, onDone: () {
openStreams--;
subscriptions.remove(otherSubscription);
if (closeAfterNextEvent && openStreams == 0)
controller.close();
});
subscriptions.add(otherSubscription);
} catch (e, s) {
controller.addError(e, s);
}
},
onError: controller.addError,
onDone: () {
if (!hasMainEvent || openStreams == 0)
controller.close();
else
closeAfterNextEvent = true;
},
cancelOnError: cancelOnError);
},
onPause: ([Future<dynamic> resumeSignal]) {
subscription.pause(resumeSignal);
subscriptions.forEach((StreamSubscription<S> otherSubscription) =>
otherSubscription.pause(resumeSignal));
},
onResume: () {
subscription.resume();
subscriptions.forEach((StreamSubscription<S> otherSubscription) =>
otherSubscription.resume());
},
onCancel: () {
final list = List<StreamSubscription<dynamic>>.from(subscriptions)
..add(subscription);
return Future.wait<dynamic>(list
.map((StreamSubscription<dynamic> subscription) =>
subscription.cancel())
.where((Future<dynamic> cancelFuture) => cancelFuture != null));
});
return controller.stream.listen(null);
});
}
}