blob: 303b16a4a68a318f090bed8ee81a3fded3d61919 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// Utilities to combine events from multiple streams through a callback or into
/// a list.
extension CombineLatest<T> on Stream<T> {
/// Returns a stream which combines the latest value from the source stream
/// with the latest value from [other] using [combine].
///
/// No event will be emitted until both the source stream and [other] have
/// each emitted at least one event. If either the source stream or [other]
/// emit multiple events before the other emits the first event, all but the
/// last value will be discarded. Once both streams have emitted at least
/// once, the result stream will emit any time either input stream emits.
///
/// The result stream will not close until both the source stream and [other]
/// have closed.
///
/// For example:
///
/// source.combineLatest(other, (a, b) => a + b);
///
/// source: --1--2--------4--|
/// other: -------3--|
/// result: -------5------7--|
///
/// Errors thrown by [combine], along with any errors on the source stream or
/// [other], are forwarded to the result stream.
///
/// If the source stream is a broadcast stream, the result stream will be as
/// well, regardless of [other]'s type. If a single subscription stream is
/// combined with a broadcast stream it may never be canceled.
Stream<S> combineLatest<T2, S>(
Stream<T2> other, FutureOr<S> Function(T, T2) combine) =>
transform(_CombineLatest(other, combine));
/// Combine the latest value emitted from the source stream with the latest
/// values emitted from [others].
///
/// [combineLatestAll] subscribes to the source stream and [others] and when
/// any one of the streams emits, the result stream will emit a [List<T>] of
/// the latest values emitted from all streams.
///
/// No event will be emitted until all source streams emit at least once. If a
/// source stream emits multiple values before another starts emitting, all
/// but the last value will be discarded. Once all source streams have emitted
/// at least once, the result stream will emit any time any source stream
/// emits.
///
/// The result stream will not close until all source streams have closed. When
/// a source stream closes, the result stream will continue to emit the last
/// value from the closed stream when the other source streams emit until the
/// result stream has closed. If a source stream closes without emitting any
/// value, the result stream will close as well.
///
/// For example:
///
/// final combined = first
/// .combineLatestAll([second, third])
/// .map((data) => data.join());
///
/// first: a----b------------------c--------d---|
/// second: --1---------2-----------------|
/// third: -------&----------%---|
/// combined: -------b1&--b2&---b2%---c2%------d2%-|
///
/// Errors thrown by any source stream will be forwarded to the result stream.
///
/// If the source stream is a broadcast stream, the result stream will be as
/// well, regardless of the types of [others]. If a single subscription stream
/// is combined with a broadcast source stream, it may never be canceled.
Stream<List<T>> combineLatestAll(Iterable<Stream<T>> others) =>
transform(_CombineLatestAll<T>(others));
}
class _CombineLatest<S, T, R> extends StreamTransformerBase<S, R> {
final Stream<T> _other;
final FutureOr<R> Function(S, T) _combine;
_CombineLatest(this._other, this._combine);
@override
Stream<R> bind(Stream<S> source) {
final controller = source.isBroadcast
? StreamController<R>.broadcast(sync: true)
: StreamController<R>(sync: true);
final other = (source.isBroadcast && !_other.isBroadcast)
? _other.asBroadcastStream()
: _other;
StreamSubscription<S> sourceSubscription;
StreamSubscription<T> otherSubscription;
var sourceDone = false;
var otherDone = false;
S latestSource;
T latestOther;
var sourceStarted = false;
var otherStarted = false;
void emitCombined() {
if (!sourceStarted || !otherStarted) return;
FutureOr<R> result;
try {
result = _combine(latestSource, latestOther);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (result is Future<R>) {
sourceSubscription.pause();
otherSubscription.pause();
result
.then(controller.add, onError: controller.addError)
.whenComplete(() {
sourceSubscription.resume();
otherSubscription.resume();
});
} else {
controller.add(result as R);
}
}
controller.onListen = () {
assert(sourceSubscription == null);
sourceSubscription = source.listen(
(s) {
sourceStarted = true;
latestSource = s;
emitCombined();
},
onError: controller.addError,
onDone: () {
sourceDone = true;
if (otherDone) {
controller.close();
} else if (!sourceStarted) {
// Nothing can ever be emitted
otherSubscription.cancel();
controller.close();
}
});
otherSubscription = other.listen(
(o) {
otherStarted = true;
latestOther = o;
emitCombined();
},
onError: controller.addError,
onDone: () {
otherDone = true;
if (sourceDone) {
controller.close();
} else if (!otherStarted) {
// Nothing can ever be emitted
sourceSubscription.cancel();
controller.close();
}
});
if (!source.isBroadcast) {
controller
..onPause = () {
sourceSubscription.pause();
otherSubscription.pause();
}
..onResume = () {
sourceSubscription.resume();
otherSubscription.resume();
};
}
controller.onCancel = () {
var cancels = [sourceSubscription.cancel(), otherSubscription.cancel()]
.where((f) => f != null);
sourceSubscription = null;
otherSubscription = null;
return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
}
}
class _CombineLatestAll<T> extends StreamTransformerBase<T, List<T>> {
final Iterable<Stream<T>> _others;
_CombineLatestAll(this._others);
@override
Stream<List<T>> bind(Stream<T> first) {
final controller = first.isBroadcast
? StreamController<List<T>>.broadcast(sync: true)
: StreamController<List<T>>(sync: true);
final allStreams = [
first,
for (final other in _others)
!first.isBroadcast || other.isBroadcast
? other
: other.asBroadcastStream(),
];
controller.onListen = () {
final subscriptions = <StreamSubscription<T>>[];
final latestData = List<T>(allStreams.length);
final hasEmitted = <int>{};
void handleData(int index, T data) {
latestData[index] = data;
hasEmitted.add(index);
if (hasEmitted.length == allStreams.length) {
controller.add(List.from(latestData));
}
}
var streamId = 0;
for (final stream in allStreams) {
final index = streamId;
final subscription = stream.listen((data) => handleData(index, data),
onError: controller.addError);
subscription.onDone(() {
assert(subscriptions.contains(subscription));
subscriptions.remove(subscription);
if (subscriptions.isEmpty || !hasEmitted.contains(index)) {
controller.close();
}
});
subscriptions.add(subscription);
streamId++;
}
if (!first.isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
subscription.pause();
}
}
..onResume = () {
for (final subscription in subscriptions) {
subscription.resume();
}
};
}
controller.onCancel = () {
var cancels = subscriptions
.map((s) => s.cancel())
.where((f) => f != null)
.toList();
if (cancels.isEmpty) return null;
return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
}
}