blob: 5c0c3578f3c10c5eabe323dbc0194507ad442980 [file] [log] [blame]
import 'dart:async';
import 'package:rxdart/src/utils/notification.dart';
typedef void VoidFunc();
typedef Future<dynamic> FutureFunc();
/// Invokes the given callback at the corresponding point the the stream
/// lifecycle. For example, if you pass in an onDone callback, it will
/// be invoked when the stream finishes emitting items.
///
/// This transformer can be used for debugging, logging, etc. by intercepting
/// the stream at different points to run arbitrary actions.
///
/// It is possible to hook onto the following parts of the stream lifecycle:
///
/// - onCancel
/// - onData
/// - onDone
/// - onError
/// - onListen
/// - onPause
/// - onResume
///
/// In addition, the `onEach` argument is called at `onData`, `onDone`, and
/// `onError` with a [Notification] passed in. The [Notification] argument
/// contains the [Kind] of event (OnData, OnDone, OnError), and the item or
/// error that was emitted. In the case of onDone, no data is emitted as part
/// of the [Notification].
///
/// If no callbacks are passed in, a runtime error will be thrown in dev mode
/// in order to "fail fast" and alert the developer that the transformer should
/// be used or safely removed.
///
/// ### Example
///
/// new Stream.fromIterable([1])
/// .transform(new DoStreamTransformer(
/// onData: print,
/// onError: (e, s) => print("Oh no!"),
/// onDone: () => print("Done")))
/// .listen(null); // Prints: 1, "Done"
class DoStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
DoStreamTransformer(
{void onCancel(),
void onData(T event),
void onDone(),
void onEach(Notification<T> notification),
Function onError,
void onListen(),
void onPause(Future<dynamic> resumeSignal),
void onResume()})
: transformer = _buildTransformer(
onCancel: onCancel,
onData: onData,
onDone: onDone,
onEach: onEach,
onError: onError,
onListen: onListen,
onPause: onPause,
onResume: onResume);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(
{dynamic onCancel(),
void onData(T event),
void onDone(),
void onEach(Notification<T> notification),
Function onError,
void onListen(),
void onPause(Future<dynamic> resumeSignal),
void onResume()}) {
if (onCancel == null &&
onData == null &&
onDone == null &&
onEach == null &&
onError == null &&
onListen == null &&
onPause == null &&
onResume == null) {
throw ArgumentError("Must provide at least one handler");
}
final subscriptions = <Stream<dynamic>, StreamSubscription<dynamic>>{};
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
StreamController<T> controller;
final VoidFunc onListenLocal = () {
if (onListen != null) {
try {
onListen();
} catch (e, s) {
controller.addError(e, s);
}
}
subscriptions.putIfAbsent(
input,
() {
return input.listen(
(T value) {
if (onData != null) {
try {
onData(value);
} catch (e, s) {
controller.addError(e, s);
}
}
if (onEach != null) {
try {
onEach(Notification<T>.onData(value));
} catch (e, s) {
controller.addError(e, s);
}
}
controller.add(value);
},
onError: (dynamic e, StackTrace s) {
if (onError != null) {
try {
onError(e, s);
} catch (e2, s2) {
controller.addError(e2, s2);
}
}
if (onEach != null) {
try {
onEach(Notification<T>.onError(e, s));
} catch (e, s) {
controller.addError(e, s);
}
}
controller.addError(e, s);
},
onDone: () {
if (onDone != null) {
try {
onDone();
} catch (e, s) {
controller.addError(e, s);
}
}
if (onEach != null) {
try {
onEach(Notification<T>.onDone());
} catch (e, s) {
controller.addError(e, s);
}
}
controller.close();
},
cancelOnError: cancelOnError,
);
},
);
};
final FutureFunc onCancelLocal = () {
dynamic onCancelResult;
if (onCancel != null) {
try {
onCancelResult = onCancel();
} catch (e, s) {
if (!controller.isClosed) {
controller.addError(e, s);
} else {
Zone.current.handleUncaughtError(e, s);
}
}
}
final cancelResultFuture = onCancelResult is Future
? onCancelResult
: Future<dynamic>.value(onCancelResult);
final cancelFuture =
subscriptions[input].cancel() ?? Future<dynamic>.value();
return Future.wait<dynamic>([cancelFuture, cancelResultFuture])
.whenComplete(() => subscriptions.remove(input));
};
if (input.isBroadcast) {
controller = StreamController<T>.broadcast(
sync: true,
onListen: onListenLocal,
onCancel: onCancelLocal,
);
} else {
controller = StreamController<T>(
sync: true,
onListen: onListenLocal,
onCancel: onCancelLocal,
onPause: ([Future<dynamic> resumeSignal]) {
if (onPause != null) {
try {
onPause(resumeSignal);
} catch (e, s) {
controller.addError(e, s);
}
}
subscriptions[input].pause(resumeSignal);
},
onResume: () {
if (onResume != null) {
try {
onResume();
} catch (e, s) {
controller.addError(e, s);
}
}
subscriptions[input].resume();
},
);
}
return controller.stream.listen(null);
});
}
}