blob: f2a5fbea601c2f36e888514f8dd95b0c47ee0d6d [file] [log] [blame]
import 'dart:async';
/// Converts items from the source stream into a new Stream using a given
/// mapper. It ignores all items from the source stream until the new stream
/// completes.
///
/// Useful when you have a noisy source Stream and only want to respond once
/// the previous async operation is finished.
///
/// ### Example
/// // Emits 0, 1, 2
/// new Stream.periodic(new Duration(milliseconds: 200), (i) => i).take(3)
/// .transform(new ExhaustMapStreamTransformer(
/// // Emits the value it's given after 200ms
/// (i) => new Observable.timer(i, new Duration(milliseconds: 200)),
/// ))
/// .listen(print); // prints 0, 2
class ExhaustMapStreamTransformer<T, S> extends StreamTransformerBase<T, S> {
final StreamTransformer<T, S> transformer;
ExhaustMapStreamTransformer(Stream<S> mapper(T value))
: transformer = _buildTransformer(mapper);
@override
Stream<S> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, S> _buildTransformer<T, S>(
Stream<S> mapper(T value)) {
return StreamTransformer<T, S>((Stream<T> input, bool cancelOnError) {
StreamController<S> controller;
StreamSubscription<T> inputSubscription;
StreamSubscription<S> outputSubscription;
var inputClosed = false, outputIsEmitting = false;
controller = StreamController<S>(
sync: true,
onListen: () {
inputSubscription = input.listen(
(T value) {
try {
if (!outputIsEmitting) {
outputIsEmitting = true;
outputSubscription = mapper(value).listen(
controller.add,
onError: controller.addError,
onDone: () {
outputIsEmitting = false;
if (inputClosed) controller.close();
},
);
}
} catch (e, s) {
controller.addError(e, s);
}
},
onError: controller.addError,
onDone: () {
inputClosed = true;
if (!outputIsEmitting) controller.close();
},
cancelOnError: cancelOnError,
);
},
onPause: ([Future<dynamic> resumeSignal]) {
inputSubscription.pause(resumeSignal);
outputSubscription?.pause(resumeSignal);
},
onResume: () {
inputSubscription.resume();
outputSubscription?.resume();
},
onCancel: () async {
await inputSubscription.cancel();
if (outputIsEmitting) await outputSubscription.cancel();
},
);
return controller.stream.listen(null);
});
}
}