blob: d69175e68ca908967b5f0ee62bd347664cb0f3bd [file] [log] [blame]
import 'dart:async';
import 'package:rxdart/rxdart.dart';
import 'package:rxdart/src/observables/replay_observable.dart';
/// A ConnectableObservable resembles an ordinary Observable, except that it
/// can be listened to multiple times and does not begin emitting items when
/// it is listened to, but only when its [connect] method is called.
///
/// This class can be used to broadcast a single-subscription Stream, and
/// can be used to wait for all intended Observers to [listen] to the
/// Observable before it begins emitting items.
abstract class ConnectableObservable<T> extends Observable<T> {
ConnectableObservable(Stream<T> stream) : super(stream);
/// Returns an Observable that automatically connects (at most once) to this
/// ConnectableObservable when the first Observer subscribes.
///
/// To disconnect from the source Stream, provide a [connection] callback and
/// cancel the `subscription` at the appropriate time.
Observable<T> autoConnect({
void Function(StreamSubscription<T> subscription) connection,
});
/// Instructs the [ConnectableObservable] to begin emitting items from the
/// source Stream. To disconnect from the source stream, cancel the
/// subscription.
StreamSubscription<T> connect();
/// Returns an Observable that stays connected to this ConnectableObservable
/// as long as there is at least one subscription to this
/// ConnectableObservable.
Observable<T> refCount();
}
/// A [ConnectableObservable] that converts a single-subscription Stream into
/// a broadcast Stream.
class PublishConnectableObservable<T> extends ConnectableObservable<T> {
final Stream<T> _source;
final PublishSubject<T> _subject;
factory PublishConnectableObservable(Stream<T> source) {
return PublishConnectableObservable<T>._(source, PublishSubject<T>());
}
PublishConnectableObservable._(this._source, this._subject) : super(_subject);
@override
Observable<T> autoConnect({
void Function(StreamSubscription<T> subscription) connection,
}) {
_subject.onListen = () {
if (connection != null) {
connection(connect());
} else {
connect();
}
};
return _subject;
}
@override
StreamSubscription<T> connect() {
return ConnectableObservableStreamSubscription<T>(
_source.listen(_subject.add, onError: _subject.addError),
_subject,
);
}
@override
Observable<T> refCount() {
ConnectableObservableStreamSubscription<T> subscription;
_subject.onListen = () {
subscription = ConnectableObservableStreamSubscription<T>(
_source.listen(_subject.add, onError: _subject.addError),
_subject,
);
};
_subject.onCancel = () {
subscription.cancel();
};
return _subject;
}
}
/// A [ConnectableObservable] that converts a single-subscription Stream into
/// a broadcast Stream that replays the latest value to any new listener, and
/// provides synchronous access to the latest emitted value.
class ValueConnectableObservable<T> extends ConnectableObservable<T>
implements ValueObservable<T> {
final Stream<T> _source;
final BehaviorSubject<T> _subject;
ValueConnectableObservable._(this._source, this._subject) : super(_subject);
factory ValueConnectableObservable(Stream<T> source) =>
ValueConnectableObservable<T>._(
source,
BehaviorSubject<T>(),
);
factory ValueConnectableObservable.seeded(
Stream<T> source,
T seedValue,
) =>
ValueConnectableObservable<T>._(
source,
BehaviorSubject<T>.seeded(seedValue),
);
@override
ValueObservable<T> autoConnect({
void Function(StreamSubscription<T> subscription) connection,
}) {
_subject.onListen = () {
if (connection != null) {
connection(connect());
} else {
connect();
}
};
return _subject;
}
@override
StreamSubscription<T> connect() {
return ConnectableObservableStreamSubscription<T>(
_source.listen(_subject.add, onError: _subject.addError),
_subject,
);
}
@override
ValueObservable<T> refCount() {
ConnectableObservableStreamSubscription<T> subscription;
_subject.onListen = () {
subscription = ConnectableObservableStreamSubscription<T>(
_source.listen(_subject.add, onError: _subject.addError),
_subject,
);
};
_subject.onCancel = () {
subscription.cancel();
};
return _subject;
}
@override
T get value => _subject.value;
@override
bool get hasValue => _subject.hasValue;
}
/// A [ConnectableObservable] that converts a single-subscription Stream into
/// a broadcast Stream that replays emitted items to any new listener, and
/// provides synchronous access to the list of emitted values.
class ReplayConnectableObservable<T> extends ConnectableObservable<T>
implements ReplayObservable<T> {
final Stream<T> _source;
final ReplaySubject<T> _subject;
factory ReplayConnectableObservable(Stream<T> stream, {int maxSize}) {
return ReplayConnectableObservable<T>._(
stream,
ReplaySubject<T>(maxSize: maxSize),
);
}
ReplayConnectableObservable._(this._source, this._subject) : super(_subject);
@override
ReplayObservable<T> autoConnect({
void Function(StreamSubscription<T> subscription) connection,
}) {
_subject.onListen = () {
if (connection != null) {
connection(connect());
} else {
connect();
}
};
return _subject;
}
@override
StreamSubscription<T> connect() {
return ConnectableObservableStreamSubscription<T>(
_source.listen(_subject.add, onError: _subject.addError),
_subject,
);
}
@override
ReplayObservable<T> refCount() {
ConnectableObservableStreamSubscription<T> subscription;
_subject.onListen = () {
subscription = ConnectableObservableStreamSubscription<T>(
_source.listen(_subject.add, onError: _subject.addError),
_subject,
);
};
_subject.onCancel = () {
subscription.cancel();
};
return _subject;
}
@override
List<T> get values => _subject.values;
}
/// A special [StreamSubscription] that not only cancels the connection to
/// the source [Stream], but also closes down a subject that drives the Stream.
class ConnectableObservableStreamSubscription<T> extends StreamSubscription<T> {
final StreamSubscription<T> _source;
final Subject<T> _subject;
ConnectableObservableStreamSubscription(this._source, this._subject);
@override
Future<dynamic> cancel() {
_subject.close();
return _source.cancel();
}
@override
Future<E> asFuture<E>([E futureValue]) => _source.asFuture(futureValue);
@override
bool get isPaused => _source.isPaused;
@override
void onData(void Function(T data) handleData) => _source.onData(handleData);
@override
void onDone(void Function() handleDone) => _source.onDone(handleDone);
@override
void onError(Function handleError) => _source.onError(handleError);
@override
void pause([Future<dynamic> resumeSignal]) => _source.pause(resumeSignal);
@override
void resume() => _source.resume();
}