blob: 18be27c42558e896959f84646e1dde592c7895cd [file] [log] [blame]
import 'dart:async';
import 'package:rxdart/src/streams/utils.dart';
/// Creates a Stream that will recreate and re-listen to the source
/// Stream when the notifier emits a new value. If the source Stream
/// emits an error or it completes, the Stream terminates.
///
/// If the [retryWhenFactory] emits an error a [RetryError] will be
/// thrown. The RetryError will contain all of the [Error]s and
/// [StackTrace]s that caused the failure.
///
/// ### Basic Example
/// ```dart
/// new RetryWhenStream<int>(
/// () => new Stream<int>.fromIterable(<int>[1]),
/// (dynamic error, StackTrace s) => throw error,
/// ).listen(print); // Prints 1
/// ```
///
/// ### Periodic Example
/// ```dart
/// new RetryWhenStream<int>(
/// () => new Observable<int>
/// .periodic(const Duration(seconds: 1), (int i) => i)
/// .map((int i) => i == 2 ? throw 'exception' : i),
/// (dynamic e, StackTrace s) {
/// return new Observable<String>
/// .timer('random value', const Duration(milliseconds: 200));
/// },
/// ).take(4).listen(print); // Prints 0, 1, 0, 1
/// ```
///
/// ### Complex Example
/// ```dart
/// bool errorHappened = false;
/// new RetryWhenStream(
/// () => new Observable
/// .periodic(const Duration(seconds: 1), (i) => i)
/// .map((i) {
/// if (i == 3 && !errorHappened) {
/// throw 'We can take this. Please restart.';
/// } else if (i == 4) {
/// throw 'It\'s enough.';
/// } else {
/// return i;
/// }
/// }),
/// (e, s) {
/// errorHappened = true;
/// if (e == 'We can take this. Please restart.') {
/// return new Observable.just('Ok. Here you go!');
/// } else {
/// return new Observable.error(e);
/// }
/// },
/// ).listen(
/// print,
/// onError: (e, s) => print(e),
/// ); // Prints 0, 1, 2, 0, 1, 2, 3, RetryError
/// ```
class RetryWhenStream<T> extends Stream<T> {
final StreamFactory<T> streamFactory;
final RetryWhenStreamFactory retryWhenFactory;
StreamController<T> controller;
StreamSubscription<T> subscription;
bool _isUsed = false;
List<ErrorAndStacktrace> _errors = <ErrorAndStacktrace>[];
RetryWhenStream(this.streamFactory, this.retryWhenFactory);
@override
StreamSubscription<T> listen(
void onData(T event), {
Function onError,
void onDone(),
bool cancelOnError,
}) {
if (_isUsed) throw StateError('Stream has already been listened to.');
_isUsed = true;
controller = StreamController<T>(
sync: true,
onListen: retry,
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel(),
);
return controller.stream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
void retry() {
subscription = streamFactory().listen(
controller.add,
onError: (dynamic e, StackTrace s) {
subscription.cancel();
StreamSubscription<void> sub;
sub = retryWhenFactory(e, s).listen(
(dynamic event) {
sub.cancel();
_errors.add(ErrorAndStacktrace(e, s));
retry();
},
onError: (dynamic e, StackTrace s) {
sub.cancel();
controller.addError(RetryError(
'Received an error after attempting to retry.',
_errors..add(ErrorAndStacktrace(e, s)),
));
controller.close();
},
);
},
onDone: controller.close,
cancelOnError: false,
);
}
}