blob: 3f6d44a7e1a694e587b41f8165aa9bb6fdc94ac6 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library http2.src.async_utils.async_utils;
import 'dart:async';
import 'dart:io';
/// An interface for `StreamSink`-like classes to indicate whether adding data
/// would be buffered and when the buffer is empty again.
class BufferIndicator {
final StreamController _controller =
new StreamController.broadcast(sync: true);
/// A state variable indicating whether buffereing would occur at the moment.
bool _wouldBuffer = true;
/// Indicates whether calling [BufferedBytesWriter.add] would buffer the data
/// if called.
///
/// This can be used at a higher level as a way to do custom buffering and
/// possibly prioritization.
bool get wouldBuffer {
return _wouldBuffer;
}
/// Signals that no buffering is happening at the moment.
void markUnBuffered() {
if (_wouldBuffer) {
_wouldBuffer = false;
_controller.add(null);
}
}
/// Signals that buffering starts to happen.
void markBuffered() {
_wouldBuffer = true;
}
/// A broadcast stream notifying users that the [BufferedBytesWriter.add]
/// method would not buffer the data if called.
Stream get bufferEmptyEvents => _controller.stream;
}
/// Contains a [StreamSink] and a [BufferIndicator] to indicate whether writes
/// to the sink would cause buffering.
///
/// It uses the `pause signal` from the `sink.addStream()` as an indicator
/// whether the underlying stream cannot handle more data and would buffer.
class BufferedSink {
/// The indicator whether the underlying sink is buffering at the moment.
final BufferIndicator bufferIndicator = new BufferIndicator();
/// A intermediate [StreamController] used to catch pause signals and to
/// propagate the change via [bufferIndicator].
StreamController<List<int>> _controller;
/// A future which completes once the sink has been closed.
Future _doneFuture;
BufferedSink(StreamSink<List<int>> dataSink) {
bufferIndicator.markBuffered();
_controller = new StreamController<List<int>>(
onListen: () {
bufferIndicator.markUnBuffered();
},
onPause: () {
bufferIndicator.markBuffered();
},
onResume: () {
bufferIndicator.markUnBuffered();
},
onCancel: () {
// TODO: We may want to propagate cancel events as errors.
// Currently `_doneFuture` will just complete normally if the sink
// cancelled.
},
sync: true);
_doneFuture =
Future.wait([_controller.stream.pipe(dataSink), dataSink.done]);
}
/// The underlying sink.
StreamSink<List<int>> get sink => _controller;
/// The future which will complete once this sink has been closed.
Future get doneFuture => _doneFuture;
}
/// A small wrapper around [BufferedSink] which writes data in batches.
class BufferedBytesWriter {
/// A buffer which will be used for batching writes.
final BytesBuilder _builder = new BytesBuilder(copy: false);
/// The underlying [BufferedSink].
final BufferedSink _bufferedSink;
BufferedBytesWriter(StreamSink<List<int>> outgoing)
: _bufferedSink = new BufferedSink(outgoing);
/// An indicator whether the underlying sink is buffering at the moment.
BufferIndicator get bufferIndicator => _bufferedSink.bufferIndicator;
/// Adds [data] immediately to the underlying buffer.
///
/// If there is buffered data which was added with [addBufferedData] and it
/// has not been flushed with [flushBufferedData] an error will be thrown.
void add(List<int> data) {
if (_builder.length > 0) {
throw new StateError(
'Cannot trigger an asynchronous write while there is buffered data.');
}
_bufferedSink.sink.add(data);
}
/// Queues up [bytes] to be written.
void addBufferedData(List<int> bytes) {
_builder.add(bytes);
}
/// Flushes all data which was enqueued by [addBufferedData].
void flushBufferedData() {
if (_builder.length > 0) {
_bufferedSink.sink.add(_builder.takeBytes());
}
}
/// Closes this sink.
Future close() {
flushBufferedData();
return _bufferedSink.sink.close().whenComplete(() => doneFuture);
}
/// The future which will complete once this sink has been closed.
Future get doneFuture => _bufferedSink.doneFuture;
}