blob: a7d9dc4bee80f392197efeba224ac4998bf8fecf [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library http2.client;
import 'dart:async';
import 'dart:convert' show ascii;
import 'dart:io';
import '../../transport.dart';
class Request {
final String method;
final Uri uri;
Request(this.method, this.uri);
}
class Response {
final Map<String, List<String>> headers;
final Stream<List<int>> stream;
final Stream<ServerPush> serverPushes;
Response(this.headers, this.stream, this.serverPushes);
}
class ServerPush {
final Map<String, List<String>> requestHeaders;
final Future<Response> response;
ServerPush(this.requestHeaders, this.response);
}
class ClientConnection {
final ClientTransportConnection connection;
/// Assumes the protocol on [socket] was negogiated to be http/2.
///
/// If [settings] are omitted, the default [ClientSettings] will be used.
ClientConnection(Socket socket, {ClientSettings settings})
: connection =
new ClientTransportConnection.viaSocket(socket, settings: settings);
Future<Response> makeRequest(Request request) {
var path = request.uri.path;
if (path.isEmpty) path = '/';
var headers = [
new Header.ascii(':method', request.method),
new Header.ascii(':path', path),
new Header.ascii(':scheme', request.uri.scheme),
new Header.ascii(':authority', '${request.uri.host}'),
];
return _handleStream(connection.makeRequest(headers, endStream: true));
}
Future close() {
return connection.finish();
}
Future<Response> _handleStream(ClientTransportStream stream) {
var completer = new Completer<Response>();
bool isFirst = true;
var controller = new StreamController<List<int>>();
var serverPushController = new StreamController<ServerPush>(sync: true);
stream.incomingMessages.listen((StreamMessage msg) {
if (isFirst) {
isFirst = false;
var headerMap = _convertHeaders((msg as HeadersStreamMessage).headers);
completer.complete(new Response(
headerMap, controller.stream, serverPushController.stream));
} else {
controller.add((msg as DataStreamMessage).bytes);
}
}, onDone: controller.close);
_handlePeerPushes(stream.peerPushes).pipe(serverPushController);
return completer.future;
}
Stream<ServerPush> _handlePeerPushes(
Stream<TransportStreamPush> serverPushes) {
var pushesController = new StreamController<ServerPush>();
serverPushes.listen((TransportStreamPush push) {
var responseCompleter = new Completer<Response>();
var serverPush = new ServerPush(
_convertHeaders(push.requestHeaders), responseCompleter.future);
pushesController.add(serverPush);
bool isFirst = true;
var dataController = new StreamController<List<int>>();
push.stream.incomingMessages.listen((StreamMessage msg) {
if (isFirst) {
isFirst = false;
var headerMap =
_convertHeaders((msg as HeadersStreamMessage).headers);
var response = new Response(
headerMap, dataController.stream, new Stream.fromIterable([]));
responseCompleter.complete(response);
} else {
dataController.add((msg as DataStreamMessage).bytes);
}
}, onDone: dataController.close);
}, onDone: pushesController.close);
return pushesController.stream;
}
Map<String, List<String>> _convertHeaders(List<Header> headers) {
var headerMap = <String, List<String>>{};
for (var header in headers) {
headerMap
.putIfAbsent(ascii.decode(header.name), () => [])
.add(ascii.decode(header.value));
}
return headerMap;
}
}
/// Tries to connect to [uri] via a secure socket connection and establishes a
/// http/2 connection.
///
/// If [allowServerPushes] is `true`, server pushes need to be handled by the
/// client. The maximum number of concurrent server pushes can be configured via
/// [maxConcurrentPushes] (default is `null` meaning no limit).
Future<ClientConnection> connect(Uri uri,
{bool allowServerPushes: false, int maxConcurrentPushes}) async {
const List<String> Http2AlpnProtocols = const <String>[
'h2-14',
'h2-15',
'h2-16',
'h2-17',
'h2'
];
bool useSSL = uri.scheme == 'https';
var settings = new ClientSettings(
concurrentStreamLimit: maxConcurrentPushes,
allowServerPushes: allowServerPushes);
if (useSSL) {
SecureSocket socket = await SecureSocket.connect(uri.host, uri.port,
supportedProtocols: Http2AlpnProtocols);
if (!Http2AlpnProtocols.contains(socket.selectedProtocol)) {
throw new Exception('Server does not support HTTP/2.');
}
return new ClientConnection(socket, settings: settings);
} else {
Socket socket = await Socket.connect(uri.host, uri.port);
return new ClientConnection(socket, settings: settings);
}
}