blob: 0e479bf73ba33f961d7e7ee0d8293f4cc6f6fad7 [file] [log] [blame]
import 'dart:async';
/// Applies an accumulator function over an observable sequence and returns
/// each intermediate result. The optional seed value is used as the initial
/// accumulator value.
///
/// ### Example
///
/// new Stream.fromIterable([1, 2, 3])
/// .transform(new ScanStreamTransformer((acc, curr, i) => acc + curr, 0))
/// .listen(print); // prints 1, 3, 6
class ScanStreamTransformer<T, S> extends StreamTransformerBase<T, S> {
final _ScanStreamTransformerAccumulator<T, S> accumulator;
final S seed;
ScanStreamTransformer(this.accumulator, [this.seed]);
@override
Stream<S> bind(Stream<T> stream) =>
_buildTransformer<T, S>(accumulator, seed).bind(stream);
static StreamTransformer<T, S> _buildTransformer<T, S>(
S accumulator(S accumulated, T value, int index),
[S seed]) {
var index = 0;
var acc = seed;
return StreamTransformer<T, S>.fromHandlers(
handleData: (T data, EventSink<S> sink) {
acc = accumulator(acc, data, index++);
sink.add(acc);
},
handleError: (Object error, StackTrace s, EventSink<S> sink) =>
sink.addError(error));
}
}
typedef S _ScanStreamTransformerAccumulator<T, S>(
S accumulated, T value, int index);