blob: a9455fbdac063aa564d116d33bfc757ddddfe458 [file] [log] [blame]
import 'dart:async';
/// Intercepts error events and switches to a recovery stream created by the
/// provided recoveryFn function.
///
/// The OnErrorResumeStreamTransformer intercepts an onError notification from
/// the source Stream. Instead of passing the error through to any
/// listeners, it replaces it with another Stream of items created by the
/// recoveryFn.
///
/// The recoveryFn receives the emitted error and returns a Stream. You can
/// perform logic in the recoveryFn to return different Streams based on the
/// type of error that was emitted.
///
/// ### Example
///
/// new Observable<int>.error(new Exception())
/// .onErrorResume((dynamic e) =>
/// new Observable.just(e is StateError ? 1 : 0)
/// .listen(print); // prints 0
class OnErrorResumeStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
OnErrorResumeStreamTransformer(Stream<T> Function(dynamic error) recoveryFn)
: transformer = _buildTransformer(recoveryFn);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(
Stream<T> Function(dynamic error) recoveryFn,
) {
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
StreamSubscription<T> inputSubscription;
StreamSubscription<T> recoverySubscription;
StreamController<T> controller;
var shouldCloseController = true;
void safeClose() {
if (shouldCloseController) {
controller.close();
}
}
controller = StreamController<T>(
sync: true,
onListen: () {
inputSubscription = input.listen(
controller.add,
onError: (dynamic e, dynamic s) {
shouldCloseController = false;
recoverySubscription = recoveryFn(e).listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError,
);
inputSubscription.cancel();
},
onDone: safeClose,
cancelOnError: cancelOnError,
);
},
onPause: ([Future<dynamic> resumeSignal]) {
inputSubscription?.pause(resumeSignal);
recoverySubscription?.pause(resumeSignal);
},
onResume: () {
inputSubscription?.resume();
recoverySubscription?.resume();
},
onCancel: () {
return Future.wait<dynamic>(<Future<dynamic>>[
inputSubscription?.cancel(),
recoverySubscription?.cancel()
].where((Future<dynamic> future) => future != null));
});
return controller.stream.listen(null);
});
}
}