blob: 26d869cd0e8ad65d1b6c7018ceadb92a8540723e [file] [log] [blame]
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import '../shared/status.dart';
import 'call.dart';
/// Definition of a gRPC service method.
class ServiceMethod<Q, R> {
final String name;
final bool streamingRequest;
final bool streamingResponse;
final Q Function(List<int> request) requestDeserializer;
final List<int> Function(R response) responseSerializer;
final Function handler;
ServiceMethod(
this.name,
this.handler,
this.streamingRequest,
this.streamingResponse,
this.requestDeserializer,
this.responseSerializer);
StreamController<Q> createRequestStream(StreamSubscription incoming) =>
new StreamController<Q>(
onListen: incoming.resume,
onPause: incoming.pause,
onResume: incoming.resume);
Q deserialize(List<int> data) => requestDeserializer(data);
List<int> serialize(dynamic response) => responseSerializer(response as R);
Stream<R> handle(ServiceCall call, Stream<Q> requests) {
if (streamingResponse) {
if (streamingRequest) {
return handler(call, requests);
} else {
return handler(call, _toSingleFuture(requests));
}
} else {
Future<R> response;
if (streamingRequest) {
response = handler(call, requests);
} else {
response = handler(call, _toSingleFuture(requests));
}
return response.asStream();
}
}
Future<Q> _toSingleFuture(Stream<Q> stream) {
Q _ensureOnlyOneRequest(Q previous, Q element) {
if (previous != null) {
throw new GrpcError.unimplemented('More than one request received');
}
return element;
}
Q _ensureOneRequest(Q value) {
if (value == null)
throw new GrpcError.unimplemented('No requests received');
return value;
}
final future =
stream.fold(null, _ensureOnlyOneRequest).then(_ensureOneRequest);
// Make sure errors on the future aren't unhandled, but return the original
// future so the request handler can also get the error.
future.catchError((_) {});
return future;
}
}
/// Definition of a gRPC service.
abstract class Service {
final Map<String, ServiceMethod> _$methods = {};
String get $name;
void $addMethod(ServiceMethod method) {
_$methods[method.name] = method;
}
/// Client metadata handler.
///
/// Services can override this method to provide common handling of incoming
/// metadata from the client.
void $onMetadata(ServiceCall context) {}
ServiceMethod $lookupMethod(String name) => _$methods[name];
}