blob: 1f83681b0daeabbe784ebc2adce0ac9ebac27219 [file] [log] [blame]
import 'dart:async';
/// The Delay operator modifies its source Observable by pausing for
/// a particular increment of time (that you specify) before emitting
/// each of the source Observable’s items.
/// This has the effect of shifting the entire sequence of items emitted
/// by the Observable forward in time by that specified increment.
///
/// [Interactive marble diagram](http://rxmarbles.com/#delay)
///
/// ### Example
///
/// new Observable.fromIterable([1, 2, 3, 4])
/// .delay(new Duration(seconds: 1))
/// .listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately
class DelayStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
DelayStreamTransformer(Duration duration)
: transformer = _buildTransformer(duration);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(Duration duration) {
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
var onDoneCalled = false, hasNoEvents = true;
var timers = <Timer>[];
StreamController<T> controller;
StreamSubscription<T> subscription;
controller = StreamController<T>(
sync: true,
onListen: () {
subscription = input.listen(
(T value) {
hasNoEvents = false;
try {
Timer timer;
timer = Timer(duration, () {
controller.add(value);
timers.remove(timer);
if (onDoneCalled && timers.isEmpty) {
controller.close();
}
});
timers.add(timer);
} catch (e, s) {
controller.addError(e, s);
}
},
onError: controller.addError,
onDone: () {
if (hasNoEvents) controller.close();
onDoneCalled = true;
},
cancelOnError: cancelOnError);
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () {
timers.forEach(_cancelTimerIfActive);
return subscription.cancel();
});
return controller.stream.listen(null);
});
}
static void _cancelTimerIfActive(Timer _timer) {
if (_timer != null && _timer.isActive) {
_timer.cancel();
}
}
}