| // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| library http2.src.flowcontrol.stream_queues; |
| |
| import 'dart:async'; |
| import 'dart:collection'; |
| |
| import '../../transport.dart'; |
| import '../async_utils/async_utils.dart'; |
| import '../byte_utils.dart'; |
| import '../error_handler.dart'; |
| |
| import 'connection_queues.dart'; |
| import 'queue_messages.dart'; |
| import 'window_handler.dart'; |
| |
| /// This class will buffer any headers/data messages in the order they were |
| /// added. |
| /// |
| /// It will ensure that we never send more data than the remote flow control |
| /// window allows. |
| class StreamMessageQueueOut extends Object |
| with TerminatableMixin, ClosableMixin { |
| /// The id of the stream this message queue belongs to. |
| final int streamId; |
| |
| /// The stream-level flow control handler. |
| final OutgoingStreamWindowHandler streamWindow; |
| |
| /// The underlying connection-level message queue. |
| final ConnectionMessageQueueOut connectionMessageQueue; |
| |
| /// A indicator for whether this queue is currently buffering. |
| final BufferIndicator bufferIndicator = new BufferIndicator(); |
| |
| /// Buffered [Message]s which will be written to the underlying connection |
| /// if the flow control window allows so. |
| final Queue<Message> _messages = new Queue<Message>(); |
| |
| /// Debugging data on how much data should be written to the underlying |
| /// connection message queue. |
| int toBeWrittenBytes = 0; |
| |
| /// Debugging data on how much data was written to the underlying connection |
| /// message queue. |
| int writtenBytes = 0; |
| |
| StreamMessageQueueOut( |
| this.streamId, this.streamWindow, this.connectionMessageQueue) { |
| streamWindow.positiveWindow.bufferEmptyEvents.listen((_) { |
| if (!wasTerminated) { |
| _trySendData(); |
| } |
| }); |
| if (streamWindow.positiveWindow.wouldBuffer) { |
| bufferIndicator.markBuffered(); |
| } else { |
| bufferIndicator.markUnBuffered(); |
| } |
| } |
| |
| /// Debugging data about how many messages are pending to be written to the |
| /// connection message queue. |
| int get pendingMessages => _messages.length; |
| |
| /// Enqueues a new [Message] which is to be delivered to the connection |
| /// message queue. |
| void enqueueMessage(Message message) { |
| if (message is! ResetStreamMessage) ensureNotClosingSync(() {}); |
| if (!wasTerminated) { |
| if (message.endStream) startClosing(); |
| |
| if (message is DataMessage) { |
| toBeWrittenBytes += message.bytes.length; |
| } |
| |
| _messages.addLast(message); |
| _trySendData(); |
| |
| if (_messages.length > 0) { |
| bufferIndicator.markBuffered(); |
| } |
| } |
| } |
| |
| void onTerminated(error) { |
| _messages.clear(); |
| closeWithError(error); |
| } |
| |
| void onCheckForClose() { |
| if (isClosing && _messages.isEmpty) closeWithValue(); |
| } |
| |
| void _trySendData() { |
| int queueLenBefore = _messages.length; |
| |
| while (_messages.length > 0) { |
| Message message = _messages.first; |
| |
| if (message is HeadersMessage) { |
| _messages.removeFirst(); |
| connectionMessageQueue.enqueueMessage(message); |
| } else if (message is DataMessage) { |
| int bytesAvailable = streamWindow.peerWindowSize; |
| if (bytesAvailable > 0 || message.bytes.length == 0) { |
| _messages.removeFirst(); |
| |
| // Do we need to fragment? |
| DataMessage messageToSend = message; |
| List<int> messageBytes = message.bytes; |
| // TODO: Do not fragment if the number of bytes we can send is too low |
| if (messageBytes.length > bytesAvailable) { |
| var partA = viewOrSublist(messageBytes, 0, bytesAvailable); |
| var partB = viewOrSublist(messageBytes, bytesAvailable, |
| messageBytes.length - bytesAvailable); |
| var messageA = new DataMessage(message.streamId, partA, false); |
| var messageB = |
| new DataMessage(message.streamId, partB, message.endStream); |
| |
| // Put the second fragment back into the front of the queue. |
| _messages.addFirst(messageB); |
| |
| // Send the first fragment. |
| messageToSend = messageA; |
| } |
| |
| writtenBytes += messageToSend.bytes.length; |
| streamWindow.decreaseWindow(messageToSend.bytes.length); |
| connectionMessageQueue.enqueueMessage(messageToSend); |
| } else { |
| break; |
| } |
| } else if (message is ResetStreamMessage) { |
| _messages.removeFirst(); |
| connectionMessageQueue.enqueueMessage(message); |
| } else { |
| throw new StateError('Unknown messages type: ${message.runtimeType}'); |
| } |
| } |
| if (queueLenBefore > 0 && _messages.isEmpty) { |
| bufferIndicator.markUnBuffered(); |
| } |
| |
| onCheckForClose(); |
| } |
| } |
| |
| /// Keeps a list of [Message] which should be delivered to the |
| /// [TransportStream]. |
| /// |
| /// It will keep messages up to the stream flow control window size if the |
| /// [messages] listener is paused. |
| class StreamMessageQueueIn extends Object |
| with TerminatableMixin, ClosableMixin, CancellableMixin { |
| /// The stream-level window our peer is using when sending us messages. |
| final IncomingWindowHandler windowHandler; |
| |
| /// A indicator whether this [StreamMessageQueueIn] is currently buffering. |
| final BufferIndicator bufferIndicator = new BufferIndicator(); |
| |
| /// The pending [Message]s which are to be delivered via the [messages] |
| /// stream. |
| final Queue<Message> _pendingMessages = new Queue<Message>(); |
| |
| /// The [StreamController] used for producing the [messages] stream. |
| StreamController<StreamMessage> _incomingMessagesC; |
| |
| /// The [StreamController] used for producing the [serverPushes] stream. |
| StreamController<TransportStreamPush> _serverPushStreamsC; |
| |
| StreamMessageQueueIn(this.windowHandler) { |
| // We start by marking it as buffered, since no one is listening yet and |
| // incoming messages will get buffered. |
| bufferIndicator.markBuffered(); |
| |
| _incomingMessagesC = new StreamController( |
| onListen: () { |
| if (!wasClosed && !wasTerminated) { |
| _tryDispatch(); |
| _tryUpdateBufferIndicator(); |
| } |
| }, |
| onPause: () { |
| _tryUpdateBufferIndicator(); |
| // TODO: Would we ever want to decrease the window size in this |
| // situation? |
| }, |
| onResume: () { |
| if (!wasClosed && !wasTerminated) { |
| _tryDispatch(); |
| _tryUpdateBufferIndicator(); |
| } |
| }, |
| onCancel: cancel); |
| |
| _serverPushStreamsC = new StreamController(onListen: () { |
| if (!wasClosed && !wasTerminated) { |
| _tryDispatch(); |
| _tryUpdateBufferIndicator(); |
| } |
| }); |
| } |
| |
| /// Debugging data: the number of pending messages in this queue. |
| int get pendingMessages => _pendingMessages.length; |
| |
| /// The stream of [StreamMessage]s which come from the remote peer. |
| Stream<StreamMessage> get messages => _incomingMessagesC.stream; |
| |
| /// The stream of [TransportStreamPush]es which come from the remote peer. |
| Stream<TransportStreamPush> get serverPushes => _serverPushStreamsC.stream; |
| |
| /// A lower layer enqueues a new [Message] which should be delivered to the |
| /// app. |
| void enqueueMessage(Message message) { |
| ensureNotClosingSync(() { |
| if (!wasTerminated) { |
| if (message is PushPromiseMessage) { |
| // NOTE: If server pushes were enabled, the client is responsible for |
| // either rejecting or handling them. |
| assert(!_serverPushStreamsC.isClosed); |
| var transportStreamPush = |
| new TransportStreamPush(message.headers, message.pushedStream); |
| _serverPushStreamsC.add(transportStreamPush); |
| return; |
| } |
| |
| if (message is DataMessage) { |
| windowHandler.gotData(message.bytes.length); |
| } |
| _pendingMessages.add(message); |
| if (message.endStream) startClosing(); |
| |
| _tryDispatch(); |
| _tryUpdateBufferIndicator(); |
| } |
| }); |
| } |
| |
| void onTerminated(exception) { |
| _pendingMessages.clear(); |
| if (!wasClosed) { |
| if (exception != null) { |
| _incomingMessagesC.addError(exception); |
| } |
| _incomingMessagesC.close(); |
| _serverPushStreamsC.close(); |
| closeWithError(exception); |
| } |
| } |
| |
| void onCloseCheck() { |
| if (isClosing && !wasClosed && _pendingMessages.isEmpty) { |
| _incomingMessagesC.close(); |
| _serverPushStreamsC.close(); |
| closeWithValue(); |
| } |
| } |
| |
| void forceDispatchIncomingMessages() { |
| while (_pendingMessages.isNotEmpty) { |
| final message = _pendingMessages.removeFirst(); |
| assert(!_incomingMessagesC.isClosed); |
| if (message is HeadersMessage) { |
| _incomingMessagesC.add(new HeadersStreamMessage(message.headers, |
| endStream: message.endStream)); |
| } else if (message is DataMessage) { |
| if (message.bytes.length > 0) { |
| _incomingMessagesC.add(new DataStreamMessage(message.bytes, |
| endStream: message.endStream)); |
| } |
| } else { |
| // This can never happen. |
| assert(false); |
| } |
| if (message.endStream) { |
| onCloseCheck(); |
| } |
| } |
| } |
| |
| void _tryDispatch() { |
| while (!wasTerminated && _pendingMessages.isNotEmpty) { |
| bool handled = wasCancelled; |
| |
| var message = _pendingMessages.first; |
| if (wasCancelled) { |
| _pendingMessages.removeFirst(); |
| } else if (message is HeadersMessage || message is DataMessage) { |
| assert(!_incomingMessagesC.isClosed); |
| if (_incomingMessagesC.hasListener && !_incomingMessagesC.isPaused) { |
| _pendingMessages.removeFirst(); |
| if (message is HeadersMessage) { |
| // NOTE: Header messages do not affect flow control - only |
| // data messages do. |
| _incomingMessagesC.add(new HeadersStreamMessage(message.headers, |
| endStream: message.endStream)); |
| } else if (message is DataMessage) { |
| if (message.bytes.length > 0) { |
| _incomingMessagesC.add(new DataStreamMessage(message.bytes, |
| endStream: message.endStream)); |
| windowHandler.dataProcessed(message.bytes.length); |
| } |
| } else { |
| // This can never happen. |
| assert(false); |
| } |
| handled = true; |
| } |
| } |
| if (handled) { |
| if (message.endStream) { |
| onCloseCheck(); |
| } |
| } else { |
| break; |
| } |
| } |
| } |
| |
| void _tryUpdateBufferIndicator() { |
| if (_incomingMessagesC.isPaused || _pendingMessages.length > 0) { |
| bufferIndicator.markBuffered(); |
| } else if (bufferIndicator.wouldBuffer && !_incomingMessagesC.isPaused) { |
| bufferIndicator.markUnBuffered(); |
| } |
| } |
| } |