blob: 371b3c055111b898b0a123ac29aa4725e5baed21 [file] [log] [blame]
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:http2/transport.dart';
import 'package:meta/meta.dart';
import '../shared/timeout.dart';
import 'call.dart';
import 'options.dart';
enum ConnectionState {
/// Actively trying to connect.
connecting,
/// Connection successfully established.
ready,
/// Some transient failure occurred, waiting to re-connect.
transientFailure,
/// Not currently connected, and no pending RPCs.
idle,
/// Shutting down, no further RPCs allowed.
shutdown
}
/// A connection to a single RPC endpoint.
///
/// RPCs made on a connection are always sent to the same endpoint.
class ClientConnection {
static final _methodPost = new Header.ascii(':method', 'POST');
static final _schemeHttp = new Header.ascii(':scheme', 'http');
static final _schemeHttps = new Header.ascii(':scheme', 'https');
static final _contentTypeGrpc =
new Header.ascii('content-type', 'application/grpc');
static final _teTrailers = new Header.ascii('te', 'trailers');
static final _grpcAcceptEncoding =
new Header.ascii('grpc-accept-encoding', 'identity');
final String host;
final int port;
final ChannelOptions options;
ConnectionState _state = ConnectionState.idle;
void Function(ClientConnection connection) onStateChanged;
final _pendingCalls = <ClientCall>[];
ClientTransportConnection _transport;
/// Used for idle and reconnect timeout, depending on [_state].
Timer _timer;
Duration _currentReconnectDelay;
ClientConnection(this.host, this.port, this.options);
ConnectionState get state => _state;
static List<Header> createCallHeaders(bool useTls, String authority,
String path, Duration timeout, Map<String, String> metadata,
{String userAgent}) {
final headers = [
_methodPost,
useTls ? _schemeHttps : _schemeHttp,
new Header(ascii.encode(':path'), utf8.encode(path)),
new Header(ascii.encode(':authority'), utf8.encode(authority)),
];
if (timeout != null) {
headers.add(new Header.ascii('grpc-timeout', toTimeoutString(timeout)));
}
headers.addAll([
_contentTypeGrpc,
_teTrailers,
_grpcAcceptEncoding,
new Header.ascii('user-agent', userAgent ?? defaultUserAgent),
]);
metadata?.forEach((key, value) {
headers.add(new Header(ascii.encode(key), utf8.encode(value)));
});
return headers;
}
String get authority => options.credentials.authority ?? host;
@visibleForTesting
Future<ClientTransportConnection> connectTransport() async {
final securityContext = options.credentials.securityContext;
var socket = await Socket.connect(host, port);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw 'Shutting down';
}
if (securityContext != null) {
socket = await SecureSocket.secure(socket,
host: authority,
context: securityContext,
onBadCertificate: _validateBadCertificate);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw 'Shutting down';
}
}
socket.done.then(_handleSocketClosed);
return new ClientTransportConnection.viaSocket(socket);
}
bool _validateBadCertificate(X509Certificate certificate) {
final validator = options.credentials.onBadCertificate;
if (validator == null) return false;
return validator(certificate, authority);
}
void _connect() {
if (_state != ConnectionState.idle &&
_state != ConnectionState.transientFailure) {
return;
}
_setState(ConnectionState.connecting);
connectTransport().then((transport) {
_currentReconnectDelay = null;
_transport = transport;
_transport.onActiveStateChanged = _handleActiveStateChanged;
_setState(ConnectionState.ready);
_pendingCalls.forEach(_startCall);
_pendingCalls.clear();
}).catchError(_handleConnectionFailure);
}
void dispatchCall(ClientCall call) {
switch (_state) {
case ConnectionState.ready:
_startCall(call);
break;
case ConnectionState.shutdown:
_shutdownCall(call);
break;
default:
_pendingCalls.add(call);
if (_state == ConnectionState.idle) {
_connect();
}
}
}
ClientTransportStream makeRequest(
String path, Duration timeout, Map<String, String> metadata) {
final headers = createCallHeaders(
options.credentials.isSecure, authority, path, timeout, metadata,
userAgent: options.userAgent);
return _transport.makeRequest(headers);
}
void _startCall(ClientCall call) {
if (call.isCancelled) return;
call.onConnectionReady(this);
}
void _failCall(ClientCall call, dynamic error) {
if (call.isCancelled) return;
call.onConnectionError(error);
}
void _shutdownCall(ClientCall call) {
_failCall(call, 'Connection shutting down.');
}
/// Shuts down this connection.
///
/// No further calls may be made on this connection, but existing calls
/// are allowed to finish.
Future<void> shutdown() async {
if (_state == ConnectionState.shutdown) return null;
_setShutdownState();
await _transport?.finish();
}
/// Terminates this connection.
///
/// All open calls are terminated immediately, and no further calls may be
/// made on this connection.
Future<void> terminate() async {
_setShutdownState();
await _transport?.terminate();
}
void _setShutdownState() {
_setState(ConnectionState.shutdown);
_cancelTimer();
_pendingCalls.forEach(_shutdownCall);
_pendingCalls.clear();
}
void _setState(ConnectionState state) {
_state = state;
if (onStateChanged != null) {
onStateChanged(this);
}
}
void _handleIdleTimeout() {
if (_timer == null || _state != ConnectionState.ready) return;
_cancelTimer();
_transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error.
_transport = null;
_setState(ConnectionState.idle);
}
void _cancelTimer() {
_timer?.cancel();
_timer = null;
}
void _handleActiveStateChanged(bool isActive) {
if (isActive) {
_cancelTimer();
} else {
if (options.idleTimeout != null) {
_timer ??= new Timer(options.idleTimeout, _handleIdleTimeout);
}
}
}
bool _hasPendingCalls() {
// Get rid of pending calls that have timed out.
_pendingCalls.removeWhere((call) => call.isCancelled);
return _pendingCalls.isNotEmpty;
}
void _handleConnectionFailure(error) {
_transport = null;
if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) {
return;
}
// TODO(jakobr): Log error.
_cancelTimer();
_pendingCalls.forEach((call) => _failCall(call, error));
_pendingCalls.clear();
_setState(ConnectionState.idle);
}
void _handleReconnect() {
if (_timer == null || _state != ConnectionState.transientFailure) return;
_cancelTimer();
_connect();
}
void _handleSocketClosed(_) {
_cancelTimer();
_transport = null;
if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) {
// All good.
return;
}
// We were not planning to close the socket.
if (!_hasPendingCalls()) {
// No pending calls. Just hop to idle, and wait for a new RPC.
_setState(ConnectionState.idle);
return;
}
// We have pending RPCs. Reconnect after backoff delay.
_setState(ConnectionState.transientFailure);
_currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay);
_timer = new Timer(_currentReconnectDelay, _handleReconnect);
}
}