blob: 4ff5af42dc06b1d2081c26d649ccd1ef91d07435 [file] [log] [blame]
import 'dart:async';
import 'package:rxdart/src/observables/observable.dart';
/// The base for all Subjects. If you'd like to create a new Subject,
/// extend from this class.
///
/// It handles all of the nitty-gritty details that conform to the
/// StreamController spec and don't need to be repeated over and
/// over.
///
/// Please see `PublishSubject` for the simplest example of how to
/// extend from this class, or `BehaviorSubject` for a slightly more
/// complex example.
abstract class Subject<T> extends Observable<T> implements StreamController<T> {
final StreamController<T> controller;
bool _isAddingStreamItems = false;
Subject(StreamController<T> controller, Observable<T> observable)
: this.controller = controller,
super(observable);
@override
StreamSink<T> get sink => _StreamSinkWrapper<T>(this);
@override
ControllerCallback get onListen => controller.onListen;
@override
set onListen(void onListenHandler()) {
controller.onListen = onListenHandler;
}
@override
Observable<T> get stream => this;
@override
ControllerCallback get onPause =>
throw UnsupportedError("Subjects do not support pause callbacks");
@override
set onPause(void onPauseHandler()) =>
throw UnsupportedError("Subjects do not support pause callbacks");
@override
ControllerCallback get onResume =>
throw UnsupportedError("Subjects do not support resume callbacks");
@override
set onResume(void onResumeHandler()) =>
throw UnsupportedError("Subjects do not support resume callbacks");
@override
ControllerCancelCallback get onCancel => controller.onCancel;
@override
set onCancel(void onCancelHandler()) {
controller.onCancel = onCancelHandler;
}
@override
bool get isClosed => controller.isClosed;
@override
bool get isPaused => controller.isPaused;
@override
bool get hasListener => controller.hasListener;
@override
Future<dynamic> get done => controller.done;
@override
void addError(Object error, [StackTrace stackTrace]) {
if (_isAddingStreamItems) {
throw StateError(
"You cannot add an error while items are being added from addStream");
}
_addError(error, stackTrace);
}
void _addError(Object error, [StackTrace stackTrace]) {
onAddError(error, stackTrace);
controller.addError(error, stackTrace);
}
/// An extension point for sub-classes. Perform any side-effect / state
/// management you need to here, rather than overriding the `add` method
/// directly.
void onAddError(Object error, [StackTrace stackTrace]) {}
@override
Future<dynamic> addStream(Stream<T> source, {bool cancelOnError = true}) {
if (_isAddingStreamItems) {
throw StateError(
"You cannot add items while items are being added from addStream");
}
final completer = Completer<T>();
_isAddingStreamItems = true;
source.listen((T event) {
_add(event);
}, onError: (dynamic e, StackTrace s) {
controller.addError(e, s);
if (cancelOnError) {
_isAddingStreamItems = false;
completer.completeError(e);
}
}, onDone: () {
_isAddingStreamItems = false;
completer.complete();
}, cancelOnError: cancelOnError);
return completer.future;
}
@override
void add(T event) {
if (_isAddingStreamItems) {
throw StateError(
"You cannot add items while items are being added from addStream");
}
_add(event);
}
void _add(T event) {
onAdd(event);
controller.add(event);
}
/// An extension point for sub-classes. Perform any side-effect / state
/// management you need to here, rather than overriding the `add` method
/// directly.
void onAdd(T event) {}
@override
Future<dynamic> close() {
if (_isAddingStreamItems) {
throw StateError(
"You cannot close the subject while items are being added from addStream");
}
return controller.close();
}
}
class _StreamSinkWrapper<T> implements StreamSink<T> {
final StreamController<T> _target;
_StreamSinkWrapper(this._target);
@override
void add(T data) {
_target.add(data);
}
@override
void addError(Object error, [StackTrace stackTrace]) {
_target.addError(error, stackTrace);
}
@override
Future<dynamic> close() => _target.close();
@override
Future<dynamic> addStream(Stream<T> source) => _target.addStream(source);
@override
Future<dynamic> get done => _target.done;
}