blob: 70269230f7f180ef31d4c4beae99a88bcf19c0b1 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'from_handlers.dart';
/// Utilities to filter events.
extension Where<T> on Stream<T> {
/// Returns a stream which emits only the events which have type [S].
///
/// If the source stream is a broadcast stream the result will be as well.
///
/// Errors from the source stream are forwarded directly to the result stream.
///
/// [S] should be a subtype of the stream's generic type, otherwise nothing of
/// type [S] could possibly be emitted, however there is no static or runtime
/// checking that this is the case.
Stream<S> whereType<S>() => transform(_WhereType<S>());
/// Like [where] but allows the [test] to return a [Future].
///
/// Events on the result stream will be emitted in the order that [test]
/// completes which may not match the order of the original stream.
///
/// If the source stream is a broadcast stream the result will be as well. When
/// used with a broadcast stream behavior also differs from [Stream.where] in
/// that the [test] function is only called once per event, rather than once
/// per listener per event.
///
/// Errors from the source stream are forwarded directly to the result stream.
/// Errors from [test] are also forwarded to the result stream.
///
/// The result stream will not close until the source stream closes and all
/// pending [test] calls have finished.
Stream<T> asyncWhere(FutureOr<bool> test(T element)) {
var valuesWaiting = 0;
var sourceDone = false;
return transform(fromHandlers(handleData: (element, sink) {
valuesWaiting++;
() async {
try {
if (await test(element)) sink.add(element);
} catch (e, st) {
sink.addError(e, st);
}
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
}, handleDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
}));
}
}
class _WhereType<R> extends StreamTransformerBase<Null, R> {
@override
Stream<R> bind(Stream<Object> source) {
var controller = source.isBroadcast
? StreamController<R>.broadcast(sync: true)
: StreamController<R>(sync: true);
StreamSubscription<Object> subscription;
controller.onListen = () {
assert(subscription == null);
subscription = source.listen(
(value) {
if (value is R) controller.add(value);
},
onError: controller.addError,
onDone: () {
subscription = null;
controller.close();
});
if (!source.isBroadcast) {
controller
..onPause = subscription.pause
..onResume = subscription.resume;
}
controller.onCancel = () {
subscription?.cancel();
subscription = null;
};
};
return controller.stream;
}
}