blob: 82049467ec9627b3f1b5708001cb236531bacc44 [file] [log] [blame]
import 'dart:async';
/// When the original observable emits no items, this operator subscribes to
/// the given fallback stream and emits items from that observable instead.
///
/// This can be particularly useful when consuming data from multiple sources.
/// For example, when using the Repository Pattern. Assuming you have some
/// data you need to load, you might want to start with the fastest access
/// point and keep falling back to the slowest point. For example, first query
/// an in-memory database, then a database on the file system, then a network
/// call if the data isn't on the local machine.
///
/// This can be achieved quite simply with switchIfEmpty!
///
/// ### Example
///
/// // Let's pretend we have some Data sources that complete without emitting
/// // any items if they don't contain the data we're looking for
/// Stream<Data> memory;
/// Stream<Data> disk;
/// Stream<Data> network;
///
/// // Start with memory, fallback to disk, then fallback to network.
/// // Simple as that!
/// Stream<Data> getThatData =
/// memory.switchIfEmpty(disk).switchIfEmpty(network);
class SwitchIfEmptyStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
SwitchIfEmptyStreamTransformer(Stream<T> fallbackStream)
: transformer = _buildTransformer(fallbackStream);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(
Stream<T> fallbackStream) {
if (fallbackStream == null) {
throw ArgumentError('fallbackStream cannot be null');
}
return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
StreamController<T> controller;
StreamSubscription<T> defaultSubscription;
StreamSubscription<T> switchSubscription;
var hasEvent = false;
void onDone() {
if (controller.isClosed) return;
controller.close();
switchSubscription?.cancel();
}
controller = StreamController<T>(
sync: true,
onListen: () {
defaultSubscription = input.listen(
(T value) {
hasEvent = true;
controller.add(value);
},
onError: controller.addError,
onDone: () {
if (hasEvent) {
controller.close();
} else {
switchSubscription = fallbackStream.listen(
controller.add,
onError: controller.addError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
},
cancelOnError: cancelOnError);
},
onPause: ([Future<dynamic> resumeSignal]) {
defaultSubscription?.pause(resumeSignal);
switchSubscription?.pause(resumeSignal);
},
onResume: () {
defaultSubscription?.resume();
switchSubscription?.resume();
},
onCancel: () => defaultSubscription?.cancel());
return controller.stream.listen(null);
});
}
}