blob: dcf5af250c29afbc7857c1169d9a7e1611eea11a [file] [log] [blame]
import 'dart:async';
/// Prepends a value to the source Stream.
///
/// ### Example
///
/// new Stream.fromIterable([2])
/// .transform(new StartWithStreamTransformer(1))
/// .listen(print); // prints 1, 2
class StartWithStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
StartWithStreamTransformer(T startValue)
: transformer = _buildTransformer(startValue);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(T startValue) {
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
StreamController<T> controller;
StreamSubscription<T> subscription;
controller = StreamController<T>(
sync: true,
onListen: () {
try {
controller.add(startValue);
} catch (e, s) {
controller.addError(e, s);
}
subscription = input.listen(controller.add,
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());
return controller.stream.listen(null);
});
}
}