blob: cdb3a41c21ba7c357a78e007c4e7a0828a16d485 [file] [log] [blame]
import 'dart:async';
/// A StreamTransformer that emits when the source stream emits, combining
/// the latest values from the two streams using the provided function.
///
/// If the latestFromStream has not emitted any values, this stream will not
/// emit either.
///
/// [Interactive marble diagram](http://rxmarbles.com/#withLatestFrom)
///
/// ### Example
///
/// new Stream.fromIterable([1, 2]).transform(
/// new WithLatestFromStreamTransformer(
/// new Stream.fromIterable([2, 3]), (a, b) => a + b)
/// .listen(print); // prints 4 (due to the async nature of streams)
class WithLatestFromStreamTransformer<T, S, R>
extends StreamTransformerBase<T, R> {
final StreamTransformer<T, R> transformer;
WithLatestFromStreamTransformer(Stream<S> latestFromStream, R fn(T t, S s))
: transformer = _buildTransformer(latestFromStream, fn);
@override
Stream<R> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, R> _buildTransformer<T, S, R>(
Stream<S> latestFromStream, R fn(T t, S s)) {
if (latestFromStream == null) {
throw ArgumentError('latestFromStream cannot be null');
} else if (fn == null) {
throw ArgumentError('combiner cannot be null');
}
return StreamTransformer<T, R>((Stream<T> input, bool cancelOnError) {
StreamController<R> controller;
StreamSubscription<T> subscription;
StreamSubscription<S> latestFromSubscription;
S latestValue;
controller = StreamController<R>(
sync: true,
onListen: () {
subscription = input.listen((T value) {
if (latestValue != null) {
try {
controller.add(fn(value, latestValue));
} catch (e, s) {
controller.addError(e, s);
}
}
}, onError: controller.addError);
latestFromSubscription = latestFromStream.listen((S latest) {
latestValue = latest;
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () {
return Future.wait<dynamic>(<Future<dynamic>>[
subscription.cancel(),
latestFromSubscription.cancel()
].where((Future<dynamic> cancelFuture) => cancelFuture != null));
});
return controller.stream.listen(null);
});
}
}