blob: 1657356ce76fda1ff5d656388fd53997db15c83c [file] [log] [blame]
import 'dart:async';
/// Creates a Stream that emits each item in the Stream after a given
/// duration.
///
/// ### Example
///
/// new Stream.fromIterable([1, 2, 3])
/// .transform(new IntervalStreamTransformer(seconds: 1))
/// .listen((i) => print("$i sec"); // prints 1 sec, 2 sec, 3 sec
class IntervalStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
IntervalStreamTransformer(Duration duration)
: transformer = _buildTransformer(duration);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(Duration duration) {
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
StreamController<T> controller;
StreamSubscription<T> subscription;
controller = StreamController<T>(
sync: true,
onListen: () {
subscription = input.listen((T value) {
try {
final completer = Completer<T>();
Timer(duration, () => completer.complete(value));
subscription.pause(completer.future.then<T>((T event) {
controller.add(event);
return event;
}));
} catch (e, s) {
controller.addError(e, s);
}
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
},
onPause: () => subscription.pause(),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());
return controller.stream.listen(null);
});
}
}