blob: 672448ce320ea415225ce4f1079e92a1203b42d5 [file] [log] [blame]
import 'dart:async';
import 'dart:collection';
import 'package:rxdart/src/observables/observable.dart';
import 'package:rxdart/src/observables/replay_observable.dart';
import 'package:rxdart/src/subjects/subject.dart';
/// A special StreamController that captures all of the items that have been
/// added to the controller, and emits those as the first items to any new
/// listener.
///
/// This subject allows sending data, error and done events to the listener.
/// As items are added to the subject, the ReplaySubject will store them.
/// When the stream is listened to, those recorded items will be emitted to
/// the listener. After that, any new events will be appropriately sent to the
/// listeners. It is possible to cap the number of stored events by setting
/// a maxSize value.
///
/// ReplaySubject is, by default, a broadcast (aka hot) controller, in order
/// to fulfill the Rx Subject contract. This means the Subject's `stream` can
/// be listened to multiple times.
///
/// ### Example
///
/// final subject = new ReplaySubject<int>();
///
/// subject.add(1);
/// subject.add(2);
/// subject.add(3);
///
/// subject.stream.listen(print); // prints 1, 2, 3
/// subject.stream.listen(print); // prints 1, 2, 3
/// subject.stream.listen(print); // prints 1, 2, 3
///
/// ### Example with maxSize
///
/// final subject = new ReplaySubject<int>(maxSize: 2);
///
/// subject.add(1);
/// subject.add(2);
/// subject.add(3);
///
/// subject.stream.listen(print); // prints 2, 3
/// subject.stream.listen(print); // prints 2, 3
/// subject.stream.listen(print); // prints 2, 3
class ReplaySubject<T> extends Subject<T> implements ReplayObservable<T> {
final Queue<T> _queue;
final int _maxSize;
factory ReplaySubject({
int maxSize,
void onListen(),
void onCancel(),
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = Queue<T>();
return ReplaySubject<T>._(
controller,
Observable<T>.defer(
() => Observable<T>(controller.stream)
.startWithMany(queue.toList(growable: false)),
reusable: true),
queue,
maxSize,
);
}
ReplaySubject._(
StreamController<T> controller,
Observable<T> observable,
this._queue,
this._maxSize,
) : super(controller, observable);
@override
void onAdd(T event) {
if (_queue.length == _maxSize) {
_queue.removeFirst();
}
_queue.add(event);
}
/// Synchronously get the values stored in Subject. May be empty.
@override
List<T> get values => _queue.toList(growable: false);
}