blob: 81f42e4bbe0b68b3271c285ea06105ab5db61d3d [file] [log] [blame]
import 'dart:async';
/// Returns the values from the source observable sequence until the other
/// stream sequence produces a value.
///
/// ### Example
///
/// new MergeStream([
/// new Stream.fromIterable([1]),
/// new TimerStream(2, new Duration(minutes: 1))
/// ])
/// .transform(new TakeUntilStreamTransformer(
/// new TimerStream(3, new Duration(seconds: 10))))
/// .listen(print); // prints 1
class TakeUntilStreamTransformer<T, S> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
TakeUntilStreamTransformer(Stream<S> otherStream)
: transformer = _buildTransformer(otherStream);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T, S>(
Stream<S> otherStream) {
if (otherStream == null) {
throw ArgumentError("take until stream cannot be null");
}
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
StreamController<T> controller;
StreamSubscription<T> subscription;
StreamSubscription<S> otherSubscription;
void onDone() {
if (controller.isClosed) return;
controller.close();
}
controller = StreamController<T>(
sync: true,
onListen: () {
subscription = input.listen(controller.add,
onError: controller.addError,
onDone: onDone,
cancelOnError: cancelOnError);
otherSubscription = otherStream.listen((_) => onDone(),
onError: controller.addError,
cancelOnError: cancelOnError,
onDone: onDone);
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () async {
await otherSubscription?.cancel();
await subscription?.cancel();
});
return controller.stream.listen(null);
});
}
}