blob: c3bdd3e903075f856e81f4aa9eeec3c1af723a95 [file] [log] [blame]
import 'dart:async';
/// Merges the specified streams into one observable sequence using the given
/// zipper function whenever all of the observable sequences have produced
/// an element at a corresponding index.
///
/// It applies this function in strict sequence, so the first item emitted by
/// the new Observable will be the result of the function applied to the first
/// item emitted by Observable #1 and the first item emitted by Observable #2;
/// the second item emitted by the new zip-Observable will be the result of
/// the function applied to the second item emitted by Observable #1 and the
/// second item emitted by Observable #2; and so forth. It will only emit as
/// many items as the number of items emitted by the source Observable that
/// emits the fewest items.
///
/// [Interactive marble diagram](http://rxmarbles.com/#zip)
///
/// ### Example
///
/// new ZipStream([
/// new Stream.fromIterable([1]),
/// new Stream.fromIterable([2, 3])
/// ], (a, b) => a + b)
/// .listen(print); // prints 3
class ZipStream<T, R> extends StreamView<R> {
ZipStream(
Iterable<Stream<T>> streams,
R zipper(List<T> values),
) : assert(streams != null && streams.every((s) => s != null),
'streams cannot be null'),
assert(streams.length > 1, 'provide at least 2 streams'),
assert(zipper != null, 'must provide a zipper function'),
super(_buildController(streams, zipper).stream);
static ZipStream<T, List<T>> list<T>(Iterable<Stream<T>> streams) {
return ZipStream<T, List<T>>(
streams,
(List<T> values) => values,
);
}
static ZipStream<dynamic, R> zip2<A, B, R>(
Stream<A> streamOne,
Stream<B> streamTwo,
R zipper(A a, B b),
) {
return ZipStream<dynamic, R>(
[streamOne, streamTwo],
(List<dynamic> values) => zipper(values[0] as A, values[1] as B),
);
}
static ZipStream<dynamic, R> zip3<A, B, C, R>(
Stream<A> streamA,
Stream<B> streamB,
Stream<C> streamC,
R zipper(A a, B b, C c),
) {
return ZipStream<dynamic, R>(
[streamA, streamB, streamC],
(List<dynamic> values) {
return zipper(
values[0] as A,
values[1] as B,
values[2] as C,
);
},
);
}
static ZipStream<dynamic, R> zip4<A, B, C, D, R>(
Stream<A> streamA,
Stream<B> streamB,
Stream<C> streamC,
Stream<D> streamD,
R zipper(A a, B b, C c, D d),
) {
return ZipStream<dynamic, R>(
[streamA, streamB, streamC, streamD],
(List<dynamic> values) {
return zipper(
values[0] as A,
values[1] as B,
values[2] as C,
values[3] as D,
);
},
);
}
static ZipStream<dynamic, R> zip5<A, B, C, D, E, R>(
Stream<A> streamA,
Stream<B> streamB,
Stream<C> streamC,
Stream<D> streamD,
Stream<E> streamE,
R zipper(A a, B b, C c, D d, E e),
) {
return ZipStream<dynamic, R>(
[streamA, streamB, streamC, streamD, streamE],
(List<dynamic> values) {
return zipper(
values[0] as A,
values[1] as B,
values[2] as C,
values[3] as D,
values[4] as E,
);
},
);
}
static ZipStream<dynamic, R> zip6<A, B, C, D, E, F, R>(
Stream<A> streamA,
Stream<B> streamB,
Stream<C> streamC,
Stream<D> streamD,
Stream<E> streamE,
Stream<F> streamF,
R zipper(A a, B b, C c, D d, E e, F f),
) {
return ZipStream<dynamic, R>(
[streamA, streamB, streamC, streamD, streamE, streamF],
(List<dynamic> values) {
return zipper(
values[0] as A,
values[1] as B,
values[2] as C,
values[3] as D,
values[4] as E,
values[5] as F,
);
},
);
}
static ZipStream<dynamic, R> zip7<A, B, C, D, E, F, G, R>(
Stream<A> streamA,
Stream<B> streamB,
Stream<C> streamC,
Stream<D> streamD,
Stream<E> streamE,
Stream<F> streamF,
Stream<G> streamG,
R zipper(A a, B b, C c, D d, E e, F f, G g),
) {
return ZipStream<dynamic, R>(
[streamA, streamB, streamC, streamD, streamE, streamF, streamG],
(List<dynamic> values) {
return zipper(
values[0] as A,
values[1] as B,
values[2] as C,
values[3] as D,
values[4] as E,
values[5] as F,
values[6] as G,
);
},
);
}
static ZipStream<dynamic, R> zip8<A, B, C, D, E, F, G, H, R>(
Stream<A> streamA,
Stream<B> streamB,
Stream<C> streamC,
Stream<D> streamD,
Stream<E> streamE,
Stream<F> streamF,
Stream<G> streamG,
Stream<H> streamH,
R zipper(A a, B b, C c, D d, E e, F f, G g, H h),
) {
return ZipStream<dynamic, R>(
[streamA, streamB, streamC, streamD, streamE, streamF, streamG, streamH],
(List<dynamic> values) {
return zipper(
values[0] as A,
values[1] as B,
values[2] as C,
values[3] as D,
values[4] as E,
values[5] as F,
values[6] as G,
values[7] as H,
);
},
);
}
static ZipStream<dynamic, R> zip9<A, B, C, D, E, F, G, H, I, R>(
Stream<A> streamA,
Stream<B> streamB,
Stream<C> streamC,
Stream<D> streamD,
Stream<E> streamE,
Stream<F> streamF,
Stream<G> streamG,
Stream<H> streamH,
Stream<I> streamI,
R zipper(A a, B b, C c, D d, E e, F f, G g, H h, I i),
) {
return ZipStream<dynamic, R>(
[
streamA,
streamB,
streamC,
streamD,
streamE,
streamF,
streamG,
streamH,
streamI
],
(List<dynamic> values) {
return zipper(
values[0] as A,
values[1] as B,
values[2] as C,
values[3] as D,
values[4] as E,
values[5] as F,
values[6] as G,
values[7] as H,
values[8] as I,
);
},
);
}
static StreamController<R> _buildController<T, R>(
Iterable<Stream<T>> streams,
R zipper(List<T> values),
) {
{
StreamController<R> controller;
final subscriptions = List<StreamSubscription<T>>(streams.length);
controller = StreamController<R>(
sync: true,
onListen: () {
try {
final values = List<List<T>>.generate(streams.length, (_) => []);
final completedStatus =
List.generate(streams.length, (_) => false);
void doUpdate(int index, T value) {
values[index].add(value);
if (values.every((v) => v.isNotEmpty)) {
try {
controller.add(zipper(
values.fold([], (prev, vals) => prev..add(vals[0]))));
} catch (e, s) {
controller.addError(e, s);
}
values.forEach((v) => v..removeAt(0));
}
}
void markDone(int i) {
completedStatus[i] = true;
if (completedStatus.reduce((bool a, bool b) => a && b))
controller.close();
}
for (var i = 0, len = streams.length; i < len; i++) {
var stream = streams.elementAt(i);
subscriptions[i] = stream.listen(
(T value) => doUpdate(i, value),
onError: controller.addError,
onDone: () => markDone(i));
}
} catch (e, s) {
controller.addError(e, s);
}
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscriptions.where((StreamSubscription<dynamic> subscription) => subscription != null).forEach(
(StreamSubscription<dynamic> subscription) =>
subscription.pause(resumeSignal)),
onResume: () =>
subscriptions.where((StreamSubscription<dynamic> subscription) => subscription != null).forEach(
(StreamSubscription<dynamic> subscription) =>
subscription.resume()),
onCancel: () => Future.wait<dynamic>(subscriptions
.map((StreamSubscription<dynamic> subscription) => subscription.cancel())
.where((Future<dynamic> cancelFuture) => cancelFuture != null)));
return controller;
}
}
}