blob: b1c890fb7a8626174265ff0fb07d05e20f1ea2e3 [file] [log] [blame]
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'package:http2/transport.dart';
import '../shared/status.dart';
import '../shared/streams.dart';
import 'common.dart';
import 'connection.dart';
import 'method.dart';
import 'options.dart';
const _reservedHeaders = const [
'content-type',
'te',
'grpc-timeout',
'grpc-accept-encoding',
'user-agent',
];
/// An active call to a gRPC endpoint.
class ClientCall<Q, R> implements Response {
final ClientMethod<Q, R> _method;
final Stream<Q> _requests;
final CallOptions options;
final _headers = new Completer<Map<String, String>>();
final _trailers = new Completer<Map<String, String>>();
bool _hasReceivedResponses = false;
Map<String, String> _headerMetadata;
TransportStream _stream;
StreamController<R> _responses;
StreamSubscription<StreamMessage> _requestSubscription;
StreamSubscription<GrpcMessage> _responseSubscription;
bool isCancelled = false;
Timer _timeoutTimer;
ClientCall(this._method, this._requests, this.options) {
_responses = new StreamController(onListen: _onResponseListen);
if (options.timeout != null) {
_timeoutTimer = new Timer(options.timeout, _onTimedOut);
}
}
String get path => _method.path;
void onConnectionError(error) {
_terminateWithError(new GrpcError.unavailable('Error connecting: $error'));
}
void _terminateWithError(GrpcError error) {
if (!_responses.isClosed) {
_responses.addError(error);
}
_safeTerminate();
}
static Map<String, String> _sanitizeMetadata(Map<String, String> metadata) {
final sanitizedMetadata = <String, String>{};
metadata.forEach((String key, String value) {
final lowerCaseKey = key.trim().toLowerCase();
if (!lowerCaseKey.startsWith(':') &&
!_reservedHeaders.contains(lowerCaseKey)) {
sanitizedMetadata[lowerCaseKey] = value.trim();
}
});
return sanitizedMetadata;
}
void onConnectionReady(ClientConnection connection) {
if (isCancelled) return;
if (options.metadataProviders.isEmpty) {
_sendRequest(connection, _sanitizeMetadata(options.metadata));
} else {
final metadata = new Map<String, String>.from(options.metadata);
String audience;
if (connection.options.credentials.isSecure) {
final port = connection.port != 443 ? ':${connection.port}' : '';
final lastSlashPos = path.lastIndexOf('/');
final audiencePath =
lastSlashPos == -1 ? path : path.substring(0, lastSlashPos);
audience = 'https://${connection.authority}$port$audiencePath';
}
Future.forEach(options.metadataProviders,
(provider) => provider(metadata, audience))
.then((_) => _sendRequest(connection, _sanitizeMetadata(metadata)))
.catchError(_onMetadataProviderError);
}
}
void _onMetadataProviderError(error) {
_terminateWithError(new GrpcError.internal('Error making call: $error'));
}
void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
try {
_stream = connection.makeRequest(path, options.timeout, metadata);
} catch (e) {
_terminateWithError(new GrpcError.unavailable('Error making call: $e'));
return;
}
_requestSubscription = _requests
.map(_method.requestSerializer)
.map(GrpcHttpEncoder.frame)
.map<StreamMessage>((bytes) => new DataStreamMessage(bytes))
.handleError(_onRequestError)
.listen(_stream.outgoingMessages.add,
onError: _stream.outgoingMessages.addError,
onDone: _stream.outgoingMessages.close,
cancelOnError: true);
// The response stream might have been listened to before _stream was ready,
// so try setting up the subscription here as well.
_onResponseListen();
}
void _onTimedOut() {
_responses.addError(new GrpcError.deadlineExceeded('Deadline exceeded'));
_safeTerminate();
}
/// Subscribe to incoming response messages, once [_stream] is available, and
/// the caller has subscribed to the [_responses] stream.
void _onResponseListen() {
if (_stream != null &&
_responses.hasListener &&
_responseSubscription == null) {
_responseSubscription = _stream.incomingMessages
.transform(new GrpcHttpDecoder())
.transform(grpcDecompressor())
.listen(_onResponseData,
onError: _onResponseError,
onDone: _onResponseDone,
cancelOnError: true);
if (_responses.isPaused) {
_responseSubscription.pause();
}
_responses.onPause = _responseSubscription.pause;
_responses.onResume = _responseSubscription.resume;
_responses.onCancel = _responseSubscription.cancel;
}
}
/// Emit an error response to the user, and tear down this call.
void _responseError(GrpcError error) {
_responses.addError(error);
_timeoutTimer?.cancel();
_requestSubscription?.cancel();
_responseSubscription.cancel();
_responses.close();
_stream.terminate();
}
/// Data handler for responses coming from the server. Handles header/trailer
/// metadata, and forwards response objects to [_responses].
void _onResponseData(GrpcMessage data) {
if (data is GrpcData) {
if (!_headers.isCompleted) {
_responseError(
new GrpcError.unimplemented('Received data before headers'));
return;
}
if (_trailers.isCompleted) {
_responseError(
new GrpcError.unimplemented('Received data after trailers'));
return;
}
_responses.add(_method.responseDeserializer(data.data));
_hasReceivedResponses = true;
} else if (data is GrpcMetadata) {
if (!_headers.isCompleted) {
// TODO(jakobr): Parse, and extract common headers.
_headerMetadata = data.metadata;
_headers.complete(_headerMetadata);
return;
}
if (_trailers.isCompleted) {
_responseError(
new GrpcError.unimplemented('Received multiple trailers'));
return;
}
final metadata = data.metadata;
_trailers.complete(metadata);
// TODO(jakobr): Parse more!
if (metadata.containsKey('grpc-status')) {
final status = int.parse(metadata['grpc-status']);
final message = metadata['grpc-message'];
if (status != 0) {
_responseError(new GrpcError.custom(status, message));
}
}
} else {
_responseError(new GrpcError.unimplemented('Unexpected frame received'));
}
}
/// Handler for response errors. Forward the error to the [_responses] stream,
/// wrapped if necessary.
void _onResponseError(error) {
if (error is GrpcError) {
_responseError(error);
return;
}
_responseError(new GrpcError.unknown(error.toString()));
}
/// Handles closure of the response stream. Verifies that server has sent
/// response messages and header/trailer metadata, as necessary.
void _onResponseDone() {
if (!_headers.isCompleted) {
_responseError(new GrpcError.unavailable('Did not receive anything'));
return;
}
if (!_trailers.isCompleted) {
if (_hasReceivedResponses) {
// Trailers are required after receiving data.
_responseError(new GrpcError.unavailable('Missing trailers'));
return;
}
// Only received a header frame and no data frames, so the header
// should contain "trailers" as well (Trailers-Only).
_trailers.complete(_headerMetadata);
final status = _headerMetadata['grpc-status'];
// If status code is missing, we must treat it as '0'. As in 'success'.
final statusCode = status != null ? int.parse(status) : 0;
if (statusCode != 0) {
final message = _headerMetadata['grpc-message'];
_responseError(new GrpcError.custom(statusCode, message));
}
}
_timeoutTimer?.cancel();
_responses.close();
_responseSubscription.cancel();
}
/// Error handler for the requests stream. Something went wrong while trying
/// to send the request to the server. Abort the request, and forward the
/// error to the user code on the [_responses] stream.
void _onRequestError(error) {
if (error is! GrpcError) {
error = new GrpcError.unknown(error.toString());
}
_responses.addError(error);
_timeoutTimer?.cancel();
_responses.close();
_requestSubscription?.cancel();
_responseSubscription?.cancel();
_stream.terminate();
}
Stream<R> get response => _responses.stream;
@override
Future<Map<String, String>> get headers => _headers.future;
@override
Future<Map<String, String>> get trailers => _trailers.future;
@override
Future<void> cancel() {
if (!_responses.isClosed) {
_responses.addError(new GrpcError.cancelled('Cancelled by client.'));
}
return _terminate();
}
Future<void> _terminate() async {
isCancelled = true;
_timeoutTimer?.cancel();
// Don't await _responses.close() here. It'll only complete once the done
// event has been delivered, and it's the caller of this function that is
// reading from responses as well, so we might end up deadlocked.
_responses.close();
_stream?.terminate();
final futures = <Future>[];
if (_requestSubscription != null) {
futures.add(_requestSubscription.cancel());
}
if (_responseSubscription != null) {
futures.add(_responseSubscription.cancel());
}
await Future.wait(futures);
}
Future<void> _safeTerminate() {
return _terminate().catchError((_) {});
}
}