| // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| library http2.src.conn; |
| |
| import 'dart:async'; |
| import 'dart:convert' show utf8; |
| |
| import '../transport.dart'; |
| import 'connection_preface.dart'; |
| import 'flowcontrol/connection_queues.dart'; |
| import 'flowcontrol/queue_messages.dart'; |
| import 'flowcontrol/window.dart'; |
| import 'flowcontrol/window_handler.dart'; |
| import 'frames/frame_defragmenter.dart'; |
| import 'frames/frames.dart'; |
| import 'hpack/hpack.dart'; |
| import 'ping/ping_handler.dart'; |
| import 'settings/settings.dart'; |
| import 'streams/stream_handler.dart'; |
| import 'sync_errors.dart'; |
| |
| class ConnectionState { |
| /// The connection has been established, we're waiting for the settings frame |
| /// of the remote end. |
| static const int Initialized = 1; |
| |
| /// The connection has been established and is fully operational. |
| static const int Operational = 2; |
| |
| /// The connection is no longer accepting new streams or creating new streams. |
| static const int Finishing = 3; |
| |
| /// The connection has been terminated and cannot be used anymore. |
| static const int Terminated = 4; |
| |
| /// Whether we actively were finishing the connection. |
| static const int FinishingActive = 1; |
| |
| /// Whether we passively were finishing the connection. |
| static const int FinishingPassive = 2; |
| |
| int state = Initialized; |
| int finishingState = 0; |
| |
| ConnectionState(); |
| |
| bool get isInitialized => state == ConnectionState.Initialized; |
| |
| bool get isOperational => state == ConnectionState.Operational; |
| |
| bool get isFinishing => state == ConnectionState.Finishing; |
| |
| bool get isTerminated => state == ConnectionState.Terminated; |
| |
| bool get activeFinishing => |
| state == Finishing && (finishingState & FinishingActive) != 0; |
| |
| bool get passiveFinishing => |
| state == Finishing && (finishingState & FinishingPassive) != 0; |
| |
| String toString() { |
| String message = ''; |
| |
| void add(bool condition, String flag) { |
| if (condition) { |
| if (message.length == 0) { |
| message = flag; |
| } else { |
| message = '$message/$flag'; |
| } |
| } |
| } |
| |
| add(isInitialized, 'Initialized'); |
| add(isOperational, 'IsOperational'); |
| add(isFinishing, 'IsFinishing'); |
| add(isTerminated, 'IsTerminated'); |
| add(activeFinishing, 'ActiveFinishing'); |
| add(passiveFinishing, 'PassiveFinishing'); |
| |
| return message; |
| } |
| } |
| |
| abstract class Connection { |
| /// The settings the other end has acknowledged to use when communicating with |
| /// us. |
| final ActiveSettings acknowledgedSettings = new ActiveSettings(); |
| |
| /// The settings we have to obey communicating with the other side. |
| final ActiveSettings peerSettings = new ActiveSettings(); |
| |
| /// Whether this connection is a client connection. |
| final bool isClientConnection; |
| |
| /// Active state handler for this connection. |
| ActiveStateHandler onActiveStateChanged; |
| |
| /// The HPack context for this connection. |
| final HPackContext _hpackContext = new HPackContext(); |
| |
| /// The flow window for this connection of the peer. |
| final Window _peerWindow = new Window(); |
| |
| /// The flow window for this connection of this end. |
| final Window _localWindow = new Window(); |
| |
| /// Used for defragmenting PushPromise/Header frames. |
| final FrameDefragmenter _defragmenter = new FrameDefragmenter(); |
| |
| /// The outgoing frames of this connection; |
| FrameWriter _frameWriter; |
| |
| /// A subscription of incoming [Frame]s. |
| StreamSubscription<Frame> _frameReaderSubscription; |
| |
| /// The incoming connection-level message queue. |
| ConnectionMessageQueueIn _incomingQueue; |
| |
| /// The outgoing connection-level message queue. |
| ConnectionMessageQueueOut _outgoingQueue; |
| |
| /// The ping handler used for making pings & handling remote pings. |
| PingHandler _pingHandler; |
| |
| /// The settings handler used for changing settings & for handling remote |
| /// setting changes. |
| SettingsHandler _settingsHandler; |
| |
| /// The set of active streams this connection has. |
| StreamHandler _streams; |
| |
| /// The connection-level flow control window handler for outgoing messages. |
| OutgoingConnectionWindowHandler _connectionWindowHandler; |
| |
| /// The state of this connection. |
| ConnectionState _state; |
| |
| Connection(Stream<List<int>> incoming, StreamSink<List<int>> outgoing, |
| Settings settings, |
| {this.isClientConnection: true}) { |
| _setupConnection(incoming, outgoing, settings); |
| } |
| |
| /// Runs all setup necessary before new streams can be created with the remote |
| /// peer. |
| void _setupConnection(Stream<List<int>> incoming, |
| StreamSink<List<int>> outgoing, Settings settingsObject) { |
| // Setup frame reading. |
| var incomingFrames = |
| new FrameReader(incoming, acknowledgedSettings).startDecoding(); |
| _frameReaderSubscription = incomingFrames.listen((Frame frame) { |
| _catchProtocolErrors(() => _handleFrameImpl(frame)); |
| }, onError: (error, stack) { |
| _terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true); |
| }, onDone: () { |
| // Ensure existing messages from lower levels are sent to the upper |
| // levels before we terminate everything. |
| _incomingQueue.forceDispatchIncomingMessages(); |
| _streams.forceDispatchIncomingMessages(); |
| |
| _terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true); |
| }); |
| |
| // Setup frame writing. |
| _frameWriter = |
| new FrameWriter(_hpackContext.encoder, outgoing, peerSettings); |
| _frameWriter.doneFuture.then((_) { |
| _terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true); |
| }).catchError((error, stack) { |
| _terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true); |
| }); |
| |
| // Setup handlers. |
| _settingsHandler = new SettingsHandler(_hpackContext.encoder, _frameWriter, |
| acknowledgedSettings, peerSettings); |
| _pingHandler = new PingHandler(_frameWriter); |
| |
| var settings = _decodeSettings(settingsObject); |
| |
| // Do the initial settings handshake (possibly with pushes disabled). |
| _settingsHandler.changeSettings(settings).catchError((error) { |
| // TODO: The [error] can contain sensitive information we now expose via |
| // a [Goaway] frame. We should somehow ensure we're only sending useful |
| // but non-sensitive information. |
| _terminate(ErrorCode.PROTOCOL_ERROR, |
| message: 'Failed to set initial settings (error: $error).'); |
| }); |
| |
| _settingsHandler.onInitialWindowSizeChange.listen((int difference) { |
| _catchProtocolErrors(() { |
| _streams.processInitialWindowSizeSettingChange(difference); |
| }); |
| }); |
| |
| // Setup the connection window handler, which keeps track of the |
| // size of the outgoing connection window. |
| _connectionWindowHandler = new OutgoingConnectionWindowHandler(_peerWindow); |
| |
| var connectionWindowUpdater = |
| new IncomingWindowHandler.connection(_frameWriter, _localWindow); |
| |
| // Setup queues for outgoing/incoming messages on the connection level. |
| _outgoingQueue = |
| new ConnectionMessageQueueOut(_connectionWindowHandler, _frameWriter); |
| _incomingQueue = new ConnectionMessageQueueIn( |
| connectionWindowUpdater, _catchProtocolErrors); |
| |
| if (isClientConnection) { |
| _streams = new StreamHandler.client( |
| _frameWriter, |
| _incomingQueue, |
| _outgoingQueue, |
| _settingsHandler.peerSettings, |
| _settingsHandler.acknowledgedSettings, |
| _activeStateHandler); |
| } else { |
| _streams = new StreamHandler.server( |
| _frameWriter, |
| _incomingQueue, |
| _outgoingQueue, |
| _settingsHandler.peerSettings, |
| _settingsHandler.acknowledgedSettings, |
| _activeStateHandler); |
| } |
| |
| // NOTE: We're not waiting until initial settings have been exchanged |
| // before we start using the connection (i.e. we don't wait for half a |
| // round-trip-time). |
| _state = new ConnectionState(); |
| } |
| |
| List<Setting> _decodeSettings(Settings settings) { |
| var settingsList = <Setting>[]; |
| |
| // By default a endpoitn can make an unlimited number of concurrent streams. |
| if (settings.concurrentStreamLimit != null) { |
| settingsList.add(new Setting(Setting.SETTINGS_MAX_CONCURRENT_STREAMS, |
| settings.concurrentStreamLimit)); |
| } |
| |
| // By default the stream level flow control window is 64 KiB. |
| if (settings.streamWindowSize != null) { |
| settingsList.add(new Setting( |
| Setting.SETTINGS_INITIAL_WINDOW_SIZE, settings.streamWindowSize)); |
| } |
| |
| if (settings is ClientSettings) { |
| // By default the server is allowed to do server pushes. |
| if (settings.allowServerPushes == null || |
| settings.allowServerPushes == false) { |
| settingsList.add(new Setting(Setting.SETTINGS_ENABLE_PUSH, 0)); |
| } |
| } else if (settings is ServerSettings) { |
| // No special server settings at the moment. |
| } else { |
| assert(false); |
| } |
| |
| return settingsList; |
| } |
| |
| /// Pings the remote peer (can e.g. be used for measuring latency). |
| Future ping() { |
| return _pingHandler.ping().catchError((e, s) { |
| return new Future.error( |
| new TransportException('The connection has been terminated.')); |
| }, test: (e) => e is TerminatedException); |
| } |
| |
| /// Finishes this connection. |
| Future finish() { |
| _finishing(active: true); |
| |
| // TODO: There is probably more we need to wait for. |
| return _streams.done.whenComplete(() { |
| var futures = [_frameWriter.close()]; |
| var f = _frameReaderSubscription.cancel(); |
| if (f != null) futures.add(f); |
| return Future.wait(futures); |
| }); |
| } |
| |
| /// Terminates this connection forcefully. |
| Future terminate() { |
| return _terminate(ErrorCode.NO_ERROR); |
| } |
| |
| void _activeStateHandler(bool isActive) { |
| if (onActiveStateChanged != null) { |
| onActiveStateChanged(isActive); |
| } |
| } |
| |
| /// Invokes the passed in closure and catches any exceptions. |
| void _catchProtocolErrors(void fn()) { |
| try { |
| fn(); |
| } on ProtocolException catch (error) { |
| _terminate(ErrorCode.PROTOCOL_ERROR, message: '$error'); |
| } on FlowControlException catch (error) { |
| _terminate(ErrorCode.FLOW_CONTROL_ERROR, message: '$error'); |
| } on FrameSizeException catch (error) { |
| _terminate(ErrorCode.FRAME_SIZE_ERROR, message: '$error'); |
| } on HPackDecodingException catch (error) { |
| _terminate(ErrorCode.PROTOCOL_ERROR, message: '$error'); |
| } on TerminatedException { |
| // We tried to perform an action even though the connection was already |
| // terminated. |
| // TODO: Can this even happen and if so, how should we propagate this |
| // error? |
| } catch (error) { |
| _terminate(ErrorCode.INTERNAL_ERROR, message: '$error'); |
| } |
| } |
| |
| void _handleFrameImpl(Frame frame) { |
| // The first frame from the other side must be a [SettingsFrame], otherwise |
| // we terminate the connection. |
| if (_state.isInitialized) { |
| if (frame is! SettingsFrame) { |
| _terminate(ErrorCode.PROTOCOL_ERROR, |
| message: 'Expected to first receive a settings frame.'); |
| return; |
| } |
| _state.state = ConnectionState.Operational; |
| } |
| |
| // Try to defragment [frame] if it is a Headers/PushPromise frame. |
| frame = _defragmenter.tryDefragmentFrame(frame); |
| if (frame == null) return; |
| |
| // Try to decode headers if it's a Headers/PushPromise frame. |
| // [This needs to be done even if the frames get ignored, since the entire |
| // connection shares one HPack compression context.] |
| if (frame is HeadersFrame) { |
| frame.decodedHeaders = |
| _hpackContext.decoder.decode(frame.headerBlockFragment); |
| } else if (frame is PushPromiseFrame) { |
| frame.decodedHeaders = |
| _hpackContext.decoder.decode(frame.headerBlockFragment); |
| } |
| |
| // Handle the frame as either a connection or a stream frame. |
| if (frame.header.streamId == 0) { |
| if (frame is SettingsFrame) { |
| _settingsHandler.handleSettingsFrame(frame); |
| } else if (frame is PingFrame) { |
| _pingHandler.processPingFrame(frame); |
| } else if (frame is WindowUpdateFrame) { |
| _connectionWindowHandler.processWindowUpdate(frame); |
| } else if (frame is GoawayFrame) { |
| _streams.processGoawayFrame(frame); |
| _finishing(active: false); |
| } else if (frame is UnknownFrame) { |
| // We can safely ignore these. |
| } else { |
| throw new ProtocolException( |
| 'Cannot handle frame type ${frame.runtimeType} with stream-id 0.'); |
| } |
| } else { |
| _streams.processStreamFrame(_state, frame); |
| } |
| } |
| |
| void _finishing({bool active: true, String message}) { |
| // If this connection is already dead, we return. |
| if (_state.isTerminated) return; |
| |
| // If this connection is already finishing, we make sure to store the |
| // passive bit, since this information is used by [StreamHandler]. |
| // |
| // Vice versa should not matter: If we started passively finishing, an |
| // active finish should be a NOP. |
| if (_state.isFinishing) { |
| if (!active) _state.finishingState |= ConnectionState.FinishingPassive; |
| return; |
| } |
| |
| assert(_state.isInitialized || _state.isOperational); |
| |
| // If we are actively finishing this connection, we'll send a |
| // GoawayFrame otherwise we'll just propagate the message. |
| if (active) { |
| _state.state = ConnectionState.Finishing; |
| _state.finishingState |= ConnectionState.FinishingActive; |
| |
| _outgoingQueue.enqueueMessage(new GoawayMessage( |
| _streams.highestPeerInitiatedStream, |
| ErrorCode.NO_ERROR, |
| message != null ? utf8.encode(message) : [])); |
| } else { |
| _state.state = ConnectionState.Finishing; |
| _state.finishingState |= ConnectionState.FinishingPassive; |
| } |
| |
| _streams.startClosing(); |
| } |
| |
| /// Terminates this connection (if it is not already terminated). |
| /// |
| /// The returned future will never complete with an error. |
| Future _terminate(int errorCode, |
| {bool causedByTransportError: false, String message}) { |
| // TODO: When do we complete here? |
| if (_state.state != ConnectionState.Terminated) { |
| _state.state = ConnectionState.Terminated; |
| |
| var cancelFuture = new Future.sync(_frameReaderSubscription.cancel); |
| if (!causedByTransportError) { |
| _outgoingQueue.enqueueMessage(new GoawayMessage( |
| _streams.highestPeerInitiatedStream, |
| errorCode, |
| message != null ? utf8.encode(message) : [])); |
| } |
| var closeFuture = _frameWriter.close().catchError((e, s) { |
| // We ignore any errors after writing to [GoawayFrame] |
| }); |
| |
| // Close all lower level handlers with an error message. |
| // (e.g. if there is a pending connection.ping(), it's returned |
| // Future will complete with this error). |
| var exception = new TransportConnectionException( |
| errorCode, 'Connection is being forcefully terminated.'); |
| |
| // Close all streams & stream queues |
| _streams.terminate(exception); |
| |
| // Close the connection queues |
| _incomingQueue.terminate(exception); |
| _outgoingQueue.terminate(exception); |
| |
| _pingHandler.terminate(exception); |
| _settingsHandler.terminate(exception); |
| |
| return Future.wait([cancelFuture, closeFuture]).catchError((_) {}); |
| } |
| return new Future.value(); |
| } |
| } |
| |
| class ClientConnection extends Connection implements ClientTransportConnection { |
| ClientConnection._(Stream<List<int>> incoming, StreamSink<List<int>> outgoing, |
| Settings settings) |
| : super(incoming, outgoing, settings, isClientConnection: true); |
| |
| factory ClientConnection(Stream<List<int>> incoming, |
| StreamSink<List<int>> outgoing, ClientSettings clientSettings) { |
| outgoing.add(CONNECTION_PREFACE); |
| return new ClientConnection._(incoming, outgoing, clientSettings); |
| } |
| |
| bool get isOpen => |
| !_state.isFinishing && !_state.isTerminated && _streams.canOpenStream; |
| |
| ClientTransportStream makeRequest(List<Header> headers, |
| {bool endStream: false}) { |
| if (_state.isFinishing) { |
| throw new StateError( |
| 'The http/2 connection is finishing and can therefore not be used to ' |
| 'make new streams.'); |
| } else if (_state.isTerminated) { |
| throw new StateError( |
| 'The http/2 connection is no longer active and can therefore not be ' |
| 'used to make new streams.'); |
| } |
| var hStream = _streams.newStream(headers, endStream: endStream); |
| if (_streams.ranOutOfStreamIds) { |
| _finishing(active: true, message: 'Ran out of stream ids'); |
| } |
| return hStream; |
| } |
| } |
| |
| class ServerConnection extends Connection implements ServerTransportConnection { |
| ServerConnection._(Stream<List<int>> incoming, StreamSink<List<int>> outgoing, |
| Settings settings) |
| : super(incoming, outgoing, settings, isClientConnection: false); |
| |
| factory ServerConnection(Stream<List<int>> incoming, |
| StreamSink<List<int>> outgoing, ServerSettings serverSettings) { |
| var frameBytes = readConnectionPreface(incoming); |
| return new ServerConnection._(frameBytes, outgoing, serverSettings); |
| } |
| |
| Stream<ServerTransportStream> get incomingStreams => |
| _streams.incomingStreams.cast<ServerTransportStream>(); |
| } |