blob: 4d28db23dfecd212aaf62a9c2be1a8cf61f2421a [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// TODO: Take priorities into account.
// TODO: Properly fragment large data frames, so they are not taking up too much
// bandwidth.
library http2.src.flowcontrol.connection_flow_controller;
import 'dart:async';
import 'dart:collection';
import '../../transport.dart';
import '../byte_utils.dart';
import '../error_handler.dart';
import '../frames/frames.dart';
import 'queue_messages.dart';
import 'stream_queues.dart';
import 'window_handler.dart';
/// The last place before messages coming from the application get encoded and
/// send as [Frame]s.
///
/// It will convert [Message]s from higher layers and send them via [Frame]s.
///
/// - It will queue messages until the connection-level flow control window
/// allows sending the message and the underlying [StreamSink] is not
/// buffering.
/// - It will use a [FrameWriter] to write a new frame to the connection.
// TODO: Make [StreamsHandler] call [connectionOut.startClosing()] once
// * all streams have been closed
// * the connection state is finishing
class ConnectionMessageQueueOut extends Object
with TerminatableMixin, ClosableMixin {
/// The handler which will be used for increasing the connection-level flow
/// control window.
final OutgoingConnectionWindowHandler _connectionWindow;
/// The buffered [Message]s which are to be delivered to the remote peer.
final Queue<Message> _messages = new Queue<Message>();
/// The [FrameWriter] used for writing Headers/Data/PushPromise frames.
final FrameWriter _frameWriter;
ConnectionMessageQueueOut(this._connectionWindow, this._frameWriter) {
_frameWriter.bufferIndicator.bufferEmptyEvents.listen((_) {
_trySendMessages();
});
_connectionWindow.positiveWindow.bufferEmptyEvents.listen((_) {
_trySendMessages();
});
}
/// The number of pending messages which haven't been written to the wire.
int get pendingMessages => _messages.length;
/// Enqueues a new [Message] which should be delivered to the remote peer.
void enqueueMessage(Message message) {
ensureNotClosingSync(() {
if (!wasTerminated) {
_messages.addLast(message);
_trySendMessages();
}
});
}
void onTerminated(error) {
_messages.clear();
closeWithError(error);
}
void onCheckForClose() {
if (isClosing && _messages.length == 0) {
closeWithValue();
}
}
void _trySendMessages() {
if (!wasTerminated) {
// We can make progress if
// * there is at least one message to send
// * the underlying frame writer / sink / socket doesn't block
// * either one
// * the next message is a non-flow control message (e.g. headers)
// * the connection window is positive
if (_messages.length > 0 &&
!_frameWriter.bufferIndicator.wouldBuffer &&
(!_connectionWindow.positiveWindow.wouldBuffer ||
_messages.first is! DataMessage)) {
_trySendMessage();
// If we have more messages and we can send them, we'll run them
// using `Timer.run()` to let other things get in-between.
if (_messages.length > 0 &&
!_frameWriter.bufferIndicator.wouldBuffer &&
(!_connectionWindow.positiveWindow.wouldBuffer ||
_messages.first is! DataMessage)) {
// TODO: If all the frame writer methods would return the
// number of bytes written, we could just say, we loop here until 10kb
// and after words, we'll make `Timer.run()`.
Timer.run(_trySendMessages);
} else {
onCheckForClose();
}
}
}
}
void _trySendMessage() {
Message message = _messages.first;
if (message is HeadersMessage) {
_messages.removeFirst();
_frameWriter.writeHeadersFrame(message.streamId, message.headers,
endStream: message.endStream);
} else if (message is PushPromiseMessage) {
_messages.removeFirst();
_frameWriter.writePushPromiseFrame(
message.streamId, message.promisedStreamId, message.headers);
} else if (message is DataMessage) {
_messages.removeFirst();
if (_connectionWindow.peerWindowSize >= message.bytes.length) {
_connectionWindow.decreaseWindow(message.bytes.length);
_frameWriter.writeDataFrame(message.streamId, message.bytes,
endStream: message.endStream);
} else {
// NOTE: We need to fragment the DataMessage.
// TODO: Do not fragment if the number of bytes we can send is too low
int len = _connectionWindow.peerWindowSize;
var head = viewOrSublist(message.bytes, 0, len);
var tail =
viewOrSublist(message.bytes, len, message.bytes.length - len);
_connectionWindow.decreaseWindow(head.length);
_frameWriter.writeDataFrame(message.streamId, head, endStream: false);
var tailMessage =
new DataMessage(message.streamId, tail, message.endStream);
_messages.addFirst(tailMessage);
}
} else if (message is ResetStreamMessage) {
_messages.removeFirst();
_frameWriter.writeRstStreamFrame(message.streamId, message.errorCode);
} else if (message is GoawayMessage) {
_messages.removeFirst();
_frameWriter.writeGoawayFrame(
message.lastStreamId, message.errorCode, message.debugData);
} else {
throw new StateError(
'Unexpected message in queue: ${message.runtimeType}');
}
}
}
/// The first place an incoming stream message gets delivered to.
///
/// The [ConnectionMessageQueueIn] will be given [Frame]s which were sent to
/// any stream on this connection.
///
/// - It will extract the necessary data from the [Frame] and store it in a new
/// [Message] object.
/// - It will multiplex the created [Message]es to a stream-specific
/// [StreamMessageQueueIn].
/// - If the [StreamMessageQueueIn] cannot accept more data, the data will be
/// buffered until it can.
/// - [DataMessage]s which have been successfully delivered to a stream-specific
/// [StreamMessageQueueIn] will increase the flow control window for the
/// connection.
///
/// Incoming [DataFrame]s will decrease the flow control window the peer has
/// available.
// TODO: Make [StreamsHandler] call [connectionOut.startClosing()] once
// * all streams have been closed
// * the connection state is finishing
class ConnectionMessageQueueIn extends Object
with TerminatableMixin, ClosableMixin {
/// The handler which will be used for increasing the connection-level flow
/// control window.
final IncomingWindowHandler _windowUpdateHandler;
/// Catches any protocol errors and acts upon them.
final Function _catchProtocolErrors;
/// A mapping from stream-id to the corresponding stream-specific
/// [StreamMessageQueueIn].
final Map<int, StreamMessageQueueIn> _stream2messageQueue = {};
/// A buffer for [Message]s which cannot be received by their
/// [StreamMessageQueueIn].
final Map<int, Queue<Message>> _stream2pendingMessages = {};
/// The number of pending messages which haven't been delivered
/// to the stream-specific queue. (for debugging purposes)
int _count = 0;
ConnectionMessageQueueIn(
this._windowUpdateHandler, this._catchProtocolErrors);
void onTerminated(error) {
// NOTE: The higher level will be shutdown first, so all streams
// should have been removed at this point.
assert(_stream2messageQueue.isEmpty);
assert(_stream2pendingMessages.isEmpty);
closeWithError(error);
}
void onCheckForClose() {
if (isClosing) {
assert(_stream2messageQueue.isEmpty == _stream2pendingMessages.isEmpty);
if (_stream2messageQueue.isEmpty) {
closeWithValue();
}
}
}
/// The number of pending messages which haven't been delivered
/// to the stream-specific queue. (for debugging purposes)
int get pendingMessages => _count;
/// Registers a stream specific [StreamMessageQueueIn] for a new stream id.
void insertNewStreamMessageQueue(int streamId, StreamMessageQueueIn mq) {
if (_stream2messageQueue.containsKey(streamId)) {
throw new ArgumentError(
'Cannot register a SteramMessageQueueIn for the same streamId '
'multiple times');
}
var pendingMessages = new Queue<Message>();
_stream2pendingMessages[streamId] = pendingMessages;
_stream2messageQueue[streamId] = mq;
mq.bufferIndicator.bufferEmptyEvents.listen((_) {
_catchProtocolErrors(() {
_tryDispatch(streamId, mq, pendingMessages);
});
});
}
/// Removes a stream id and its message queue from this connection-level
/// message queue.
void removeStreamMessageQueue(int streamId) {
_stream2pendingMessages.remove(streamId);
_stream2messageQueue.remove(streamId);
}
/// Processes an incoming [DataFrame] which is addressed to a specific stream.
void processDataFrame(DataFrame frame) {
var streamId = frame.header.streamId;
var message =
new DataMessage(streamId, frame.bytes, frame.hasEndStreamFlag);
_windowUpdateHandler.gotData(message.bytes.length);
_addMessage(streamId, message);
}
/// If a [DataFrame] will be ignored, this method will take the minimal
/// action necessary.
void processIgnoredDataFrame(DataFrame frame) {
_windowUpdateHandler.gotData(frame.bytes.length);
}
/// Processes an incoming [HeadersFrame] which is addressed to a specific
/// stream.
void processHeadersFrame(HeadersFrame frame) {
var streamId = frame.header.streamId;
var message = new HeadersMessage(
streamId, frame.decodedHeaders, frame.hasEndStreamFlag);
// NOTE: Header frames do not affect flow control - only data frames do.
_addMessage(streamId, message);
}
/// Processes an incoming [PushPromiseFrame] which is addressed to a specific
/// stream.
void processPushPromiseFrame(
PushPromiseFrame frame, ClientTransportStream pushedStream) {
var streamId = frame.header.streamId;
var message = new PushPromiseMessage(streamId, frame.decodedHeaders,
frame.promisedStreamId, pushedStream, false);
// NOTE:
// * Header frames do not affect flow control - only data frames do.
// * At this point we might reorder a push message earlier than
// data/headers messages.
_addPushMessage(streamId, message);
}
void _addMessage(int streamId, Message message) {
_count++;
// TODO: Do we need to do a runtime check here and
// raise a protocol error if we cannot find the registered stream?
var streamMQ = _stream2messageQueue[streamId];
var pendingMessages = _stream2pendingMessages[streamId];
pendingMessages.addLast(message);
_tryDispatch(streamId, streamMQ, pendingMessages);
}
void _addPushMessage(int streamId, PushPromiseMessage message) {
_count++;
// TODO: Do we need to do a runtime check here and
// raise a protocol error if we cannot find the registered stream?
var streamMQ = _stream2messageQueue[streamId];
streamMQ.enqueueMessage(message);
}
void _tryDispatch(
int streamId, StreamMessageQueueIn mq, Queue<Message> pendingMessages) {
int bytesDeliveredToStream = 0;
while (!mq.bufferIndicator.wouldBuffer && pendingMessages.length > 0) {
_count--;
var message = pendingMessages.removeFirst();
if (message is DataMessage) {
bytesDeliveredToStream += message.bytes.length;
}
mq.enqueueMessage(message);
if (message.endStream) {
assert(pendingMessages.isEmpty);
_stream2messageQueue.remove(streamId);
_stream2pendingMessages.remove(streamId);
}
}
if (bytesDeliveredToStream > 0) {
_windowUpdateHandler.dataProcessed(bytesDeliveredToStream);
}
onCheckForClose();
}
void forceDispatchIncomingMessages() {
final toBeRemoved = new Set<int>();
_stream2pendingMessages.forEach((int streamId, Queue<Message> messages) {
final mq = _stream2messageQueue[streamId];
while (messages.isNotEmpty) {
_count--;
final message = messages.removeFirst();
mq.enqueueMessage(message);
if (message.endStream) {
toBeRemoved.add(streamId);
break;
}
}
});
for (final streamId in toBeRemoved) {
_stream2messageQueue.remove(streamId);
_stream2pendingMessages.remove(streamId);
}
}
}