blob: 2cc941e35c0f4c7eeb49f387722bf5d693feca44 [file] [log] [blame]
import 'dart:async';
/// A StreamTransformer that, when the specified sample stream emits
/// an item or completes, emits the most recently emitted item (if any)
/// emitted by the source stream since the previous emission from
/// the sample stream.
///
/// ### Example
///
/// new Stream.fromIterable([1, 2, 3])
/// .transform(new SampleStreamTransformer(new TimerStream(1, new Duration(seconds: 1)))
/// .listen(print); // prints 3
class SampleStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
SampleStreamTransformer(Stream<dynamic> sampleStream,
{bool sampleOnValueOnly = true})
: transformer = _buildTransformer(sampleStream,
sampleOnValueOnly: sampleOnValueOnly);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(
Stream<dynamic> sampleStream,
{bool sampleOnValueOnly = true}) {
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
StreamController<T> controller;
StreamSubscription<T> subscription;
StreamSubscription<dynamic> sampleSubscription;
T currentValue;
var hasValue = false;
void onDone() {
if (controller.isClosed) return;
if (hasValue) {
hasValue = false;
controller.add(currentValue);
}
controller.close();
}
void onSample(dynamic _) {
if (hasValue || !sampleOnValueOnly) {
controller.add(currentValue);
hasValue = false;
currentValue = null;
}
}
controller = StreamController<T>(
sync: true,
onListen: () {
try {
subscription = input.listen((T value) {
hasValue = true;
currentValue = value;
},
onError: controller.addError,
onDone: onDone,
cancelOnError: cancelOnError);
sampleSubscription = sampleStream.listen(onSample,
onError: controller.addError,
onDone: onDone,
cancelOnError: cancelOnError);
} catch (e, s) {
controller.addError(e, s);
}
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () async {
await sampleSubscription.cancel();
await subscription.cancel();
});
return controller.stream.listen(null);
});
}
}