blob: 6b7cf4eeff5e39c6cdb88c314f68763d2983ce2f [file] [log] [blame]
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'package:http2/transport.dart';
import '../shared/codec.dart';
import '../shared/codec_registry.dart';
import '../shared/io_bits/io_bits.dart' show X509Certificate;
import '../shared/message.dart';
import '../shared/status.dart';
import '../shared/streams.dart';
import '../shared/timeout.dart';
import 'call.dart';
import 'interceptor.dart';
import 'service.dart';
/// Handles an incoming gRPC call.
class ServerHandler_ extends ServiceCall {
final ServerTransportStream _stream;
final Service? Function(String service) _serviceLookup;
final List<Interceptor> _interceptors;
final CodecRegistry? _codecRegistry;
// ignore: cancel_subscriptions
StreamSubscription<GrpcMessage>? _incomingSubscription;
late Service _service;
late ServiceMethod _descriptor;
Map<String, String>? _clientMetadata;
Codec? _callEncodingCodec;
StreamController? _requests;
bool _hasReceivedRequest = false;
late Stream _responses;
StreamSubscription? _responseSubscription;
bool _headersSent = false;
Map<String, String>? _customHeaders = {};
Map<String, String>? _customTrailers = {};
DateTime? _deadline;
bool _isCanceled = false;
bool _isTimedOut = false;
Timer? _timeoutTimer;
final X509Certificate? _clientCertificate;
ServerHandler_(this._serviceLookup, this._stream, this._interceptors,
this._codecRegistry,
[this._clientCertificate]);
@override
DateTime? get deadline => _deadline;
@override
bool get isCanceled => _isCanceled;
@override
bool get isTimedOut => _isTimedOut;
@override
Map<String, String>? get clientMetadata => _clientMetadata;
@override
Map<String, String>? get headers => _customHeaders;
@override
Map<String, String>? get trailers => _customTrailers;
@override
X509Certificate? get clientCertificate => _clientCertificate;
void handle() {
_stream.onTerminated = (_) => cancel();
_incomingSubscription = _stream.incomingMessages
.transform(GrpcHttpDecoder())
.transform(grpcDecompressor(codecRegistry: _codecRegistry))
.listen(_onDataIdle,
onError: _onError, onDone: _onDoneError, cancelOnError: true);
_stream.outgoingMessages.done.then((_) {
cancel();
});
}
/// Cancel response subscription, if active. If the stream exits with an
/// error, just ignore it. The client is long gone, so it doesn't care.
/// We need the catchError() handler here, since otherwise the error would
/// be an unhandled exception.
void _cancelResponseSubscription() {
_responseSubscription?.cancel().catchError((_) {});
}
// -- Idle state, incoming data --
void _onDataIdle(GrpcMessage headerMessage) async {
if (headerMessage is! GrpcMetadata) {
_sendError(GrpcError.unimplemented('Expected header frame'));
_sinkIncoming();
return;
}
_incomingSubscription!.pause();
_clientMetadata = headerMessage.metadata;
final path = _clientMetadata![':path']!;
final pathSegments = path.split('/');
if (pathSegments.length < 3) {
_sendError(GrpcError.unimplemented('Invalid path'));
_sinkIncoming();
return;
}
final serviceName = pathSegments[1];
final methodName = pathSegments[2];
if (_codecRegistry != null) {
final acceptedEncodings =
clientMetadata!['grpc-accept-encoding']?.split(',') ?? [];
_callEncodingCodec = acceptedEncodings
.map(_codecRegistry!.lookup)
.firstWhere((c) => c != null, orElse: () => null);
}
final service = _serviceLookup(serviceName);
final descriptor = service?.$lookupMethod(methodName);
if (descriptor == null) {
_sendError(GrpcError.unimplemented('Path $path not found'));
_sinkIncoming();
return;
}
_service = service!;
_descriptor = descriptor;
final error = await _applyInterceptors();
if (error != null) {
_sendError(error);
_sinkIncoming();
return;
}
_startStreamingRequest();
}
GrpcError? _onMetadata() {
try {
_service.$onMetadata(this);
} on GrpcError catch (error) {
return error;
} catch (error) {
final grpcError = GrpcError.internal(error.toString());
return grpcError;
}
return null;
}
Future<GrpcError?> _applyInterceptors() async {
try {
for (final interceptor in _interceptors) {
final error = await interceptor(this, _descriptor);
if (error != null) {
return error;
}
}
} catch (error) {
final grpcError = GrpcError.internal(error.toString());
return grpcError;
}
return null;
}
void _startStreamingRequest() {
final requests = _descriptor.createRequestStream(_incomingSubscription!);
_requests = requests;
_incomingSubscription!.onData(_onDataActive);
final error = _onMetadata();
if (error != null) {
if (!requests.isClosed) {
requests
..addError(error)
..close();
}
_sendError(error);
_onDone();
_stream.terminate();
return;
}
_responses = _descriptor.handle(this, requests.stream);
_responseSubscription = _responses.listen(_onResponse,
onError: _onResponseError,
onDone: _onResponseDone,
cancelOnError: true);
_incomingSubscription!.onData(_onDataActive);
_incomingSubscription!.onDone(_onDoneExpected);
final timeout = fromTimeoutString(_clientMetadata!['grpc-timeout']);
if (timeout != null) {
_deadline = DateTime.now().add(timeout);
_timeoutTimer = Timer(timeout, _onTimedOut);
}
}
void _onTimedOut() {
if (_isCanceled) return;
_isTimedOut = true;
_isCanceled = true;
final error = GrpcError.deadlineExceeded('Deadline exceeded');
_sendError(error);
if (!_requests!.isClosed) {
_requests!
..addError(error)
..close();
}
}
// -- Active state, incoming data --
void _onDataActive(GrpcMessage message) {
if (message is! GrpcData) {
final error = GrpcError.unimplemented('Expected request');
_sendError(error);
_requests!
..addError(error)
..close();
return;
}
if (_hasReceivedRequest && !_descriptor.streamingRequest) {
final error = GrpcError.unimplemented('Too many requests');
_sendError(error);
_requests!
..addError(error)
..close();
return;
}
final data = message;
var request;
try {
request = _descriptor.deserialize(data.data);
} catch (error) {
final grpcError =
GrpcError.internal('Error deserializing request: $error');
_sendError(grpcError);
_requests!
..addError(grpcError)
..close();
return;
}
_requests!.add(request);
_hasReceivedRequest = true;
}
// -- Active state, outgoing response data --
void _onResponse(response) {
try {
final bytes = _descriptor.serialize(response);
if (!_headersSent) {
sendHeaders();
}
_stream.sendData(frame(bytes, _callEncodingCodec));
} catch (error) {
final grpcError = GrpcError.internal('Error sending response: $error');
if (!_requests!.isClosed) {
// If we can, alert the handler that things are going wrong.
_requests!
..addError(grpcError)
..close();
}
_sendError(grpcError);
_cancelResponseSubscription();
}
}
void _onResponseDone() {
sendTrailers();
}
void _onResponseError(error) {
if (error is GrpcError) {
_sendError(error);
} else {
_sendError(GrpcError.unknown(error.toString()));
}
}
@override
void sendHeaders() {
if (_headersSent) throw GrpcError.internal('Headers already sent');
_customHeaders!
..remove(':status')
..remove('content-type');
// TODO(jakobr): Should come from package:http2?
final outgoingHeadersMap = <String, String>{
':status': '200',
'content-type': 'application/grpc',
if (_callEncodingCodec != null)
'grpc-encoding': _callEncodingCodec!.encodingName,
};
outgoingHeadersMap.addAll(_customHeaders!);
_customHeaders = null;
final outgoingHeaders = <Header>[];
outgoingHeadersMap.forEach((key, value) =>
outgoingHeaders.add(Header(ascii.encode(key), utf8.encode(value))));
_stream.sendHeaders(outgoingHeaders);
_headersSent = true;
}
@override
void sendTrailers(
{int? status = 0, String? message, Map<String, String>? errorTrailers}) {
_timeoutTimer?.cancel();
final outgoingTrailersMap = <String, String>{};
if (!_headersSent) {
// TODO(jakobr): Should come from package:http2?
outgoingTrailersMap[':status'] = '200';
outgoingTrailersMap['content-type'] = 'application/grpc';
_customHeaders!
..remove(':status')
..remove('content-type');
outgoingTrailersMap.addAll(_customHeaders!);
_customHeaders = null;
_headersSent = true;
}
_customTrailers!
..remove(':status')
..remove('content-type');
outgoingTrailersMap.addAll(_customTrailers!);
_customTrailers = null;
outgoingTrailersMap['grpc-status'] = status.toString();
if (message != null) {
outgoingTrailersMap['grpc-message'] =
Uri.encodeFull(message).replaceAll('%20', ' ');
}
if (errorTrailers != null) {
outgoingTrailersMap.addAll(errorTrailers);
}
final outgoingTrailers = <Header>[];
outgoingTrailersMap.forEach((key, value) =>
outgoingTrailers.add(Header(ascii.encode(key), utf8.encode(value))));
_stream.sendHeaders(outgoingTrailers, endStream: true);
// We're done!
_cancelResponseSubscription();
_sinkIncoming();
}
// -- All states, incoming error / stream closed --
void _onError(error) {
// Exception from the incoming stream. Most likely a cancel request from the
// client, so we treat it as such.
_timeoutTimer?.cancel();
_isCanceled = true;
if (_requests != null && !_requests!.isClosed) {
_requests!.addError(GrpcError.cancelled('Cancelled'));
}
_cancelResponseSubscription();
_incomingSubscription!.cancel();
_stream.terminate();
}
void _onDoneError() {
_sendError(GrpcError.unavailable('Request stream closed unexpectedly'));
_onDone();
}
void _onDoneExpected() {
if (!(_hasReceivedRequest || _descriptor.streamingRequest)) {
final error = GrpcError.unimplemented('No request received');
_sendError(error);
_requests!.addError(error);
}
_onDone();
}
void _onDone() {
_requests?.close();
_incomingSubscription!.cancel();
}
/// Sink incoming requests. This is used when an error has already been
/// reported, but we still need to consume the request stream from the client.
void _sinkIncoming() {
_incomingSubscription!
..onData((_) {})
..onDone(_onDone);
}
void _sendError(GrpcError error) {
sendTrailers(
status: error.code,
message: error.message,
errorTrailers: error.trailers,
);
}
void cancel() {
_isCanceled = true;
_timeoutTimer?.cancel();
_cancelResponseSubscription();
}
}
class ServerHandler extends ServerHandler_ {
ServerHandler(Service Function(String service) serviceLookup, stream,
[List<Interceptor> interceptors = const <Interceptor>[],
CodecRegistry? codecRegistry,
X509Certificate? clientCertificate])
: super(serviceLookup, stream, interceptors, codecRegistry,
clientCertificate);
}