blob: 3497cab3ea5bb587f3732ce9a006103d35621e2c [file] [log] [blame]
import 'dart:async';
import 'package:rxdart/src/samplers/buffer_strategy.dart';
/// Creates an Observable where each item is a [List] containing the items
/// from the source sequence, batched by the [sampler].
///
/// ### Example with [onCount]
///
/// Observable.range(1, 4)
/// .buffer(onCount(2))
/// .listen(print); // prints [1, 2], [3, 4]
///
/// ### Example with [onFuture]
///
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
/// .buffer(onFuture(() => new Future.delayed(const Duration(milliseconds: 220))))
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ...
///
/// ### Example with [onTest]
///
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
/// .buffer(onTest((i) => i % 2 == 0))
/// .listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ...
///
/// ### Example with [onTime]
///
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
/// .buffer(onTime(const Duration(milliseconds: 220)))
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ...
///
/// ### Example with [onStream]
///
/// new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
/// .buffer(onStream(new Stream.periodic(const Duration(milliseconds: 220), (int i) => i)))
/// .listen(print); // prints [0, 1] [2, 3] [4, 5] ...
///
/// You can create your own sampler by extending [StreamView]
/// should the above samplers be insufficient for your use case.
class BufferStreamTransformer<T> extends StreamTransformerBase<T, List<T>> {
final SamplerBuilder<T, List<T>> sampler;
final bool exhaustBufferOnDone;
BufferStreamTransformer(this.sampler, {this.exhaustBufferOnDone = true});
@override
Stream<List<T>> bind(Stream<T> stream) =>
_buildTransformer<T>(sampler, exhaustBufferOnDone).bind(stream);
static StreamTransformer<T, List<T>> _buildTransformer<T>(
SamplerBuilder<T, List<T>> scheduler, bool exhaustBufferOnDone) {
assertSampler(scheduler);
return StreamTransformer<T, List<T>>((Stream<T> input, bool cancelOnError) {
StreamController<List<T>> controller;
StreamSubscription<List<T>> subscription;
var buffer = <T>[];
void onDone() {
if (controller.isClosed) return;
if (exhaustBufferOnDone && buffer.isNotEmpty)
controller.add(List<T>.from(buffer));
controller.close();
}
controller = StreamController<List<T>>(
sync: true,
onListen: () {
try {
subscription = scheduler(input, (
T data,
EventSink<List<T>> sink, [
int startBufferEvery,
]) {
buffer.add(data);
sink.add(buffer);
}, (_, EventSink<List<T>> sink, [int startBufferEvery = 0]) {
startBufferEvery ?? 0;
sink.add(List<T>.unmodifiable(buffer));
buffer =
startBufferEvery > 0 && startBufferEvery < buffer.length
? buffer.sublist(startBufferEvery)
: <T>[];
}).listen(controller.add,
onError: controller.addError,
onDone: onDone,
cancelOnError: cancelOnError);
} catch (e, s) {
controller.addError(e, s);
}
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());
return controller.stream.listen(null);
});
}
static void assertSampler<T>(SamplerBuilder<T, List<T>> scheduler) {
if (scheduler == null) {
throw ArgumentError('scheduler cannot be null');
}
}
}