blob: 80988be318b6d08b2983df9ce28977ad3602cc7e [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#import "ProtoRPC.h"
#if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS
#import <Protobuf/GPBProtocolBuffers.h>
#else
#import <GPBProtocolBuffers.h>
#endif
#import <GRPCClient/GRPCCall.h>
#import <RxLibrary/GRXWriteable.h>
#import <RxLibrary/GRXWriter+Transformations.h>
/**
* Generate an NSError object that represents a failure in parsing a proto class.
*/
static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsingError) {
NSDictionary *info = @{
NSLocalizedDescriptionKey : @"Unable to parse response from the server",
NSLocalizedRecoverySuggestionErrorKey :
@"If this RPC is idempotent, retry "
@"with exponential backoff. Otherwise, query the server status before "
@"retrying.",
NSUnderlyingErrorKey : parsingError,
@"Expected class" : expectedClass,
@"Received value" : proto,
};
// TODO(jcanizales): Use kGRPCErrorDomain and GRPCErrorCodeInternal when they're public.
return [NSError errorWithDomain:@"io.grpc" code:13 userInfo:info];
}
@implementation GRPCUnaryProtoCall {
GRPCStreamingProtoCall *_call;
GPBMessage *_message;
}
- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
message:(GPBMessage *)message
responseHandler:(id<GRPCProtoResponseHandler>)handler
callOptions:(GRPCCallOptions *)callOptions
responseClass:(Class)responseClass {
NSAssert(message != nil, @"message cannot be empty.");
NSAssert(responseClass != nil, @"responseClass cannot be empty.");
if (message == nil || responseClass == nil) {
return nil;
}
if ((self = [super init])) {
_call = [[GRPCStreamingProtoCall alloc] initWithRequestOptions:requestOptions
responseHandler:handler
callOptions:callOptions
responseClass:responseClass];
_message = [message copy];
}
return self;
}
- (void)start {
[_call start];
[_call receiveNextMessage];
[_call writeMessage:_message];
[_call finish];
}
- (void)cancel {
[_call cancel];
}
@end
@interface GRPCStreamingProtoCall ()<GRPCResponseHandler>
@end
@implementation GRPCStreamingProtoCall {
GRPCRequestOptions *_requestOptions;
id<GRPCProtoResponseHandler> _handler;
GRPCCallOptions *_callOptions;
Class _responseClass;
GRPCCall2 *_call;
dispatch_queue_t _dispatchQueue;
}
- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
responseHandler:(id<GRPCProtoResponseHandler>)handler
callOptions:(GRPCCallOptions *)callOptions
responseClass:(Class)responseClass {
NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0 &&
requestOptions.safety <= GRPCCallSafetyCacheableRequest,
@"Invalid callOptions.");
NSAssert(handler != nil, @"handler cannot be empty.");
if (requestOptions.host.length == 0 || requestOptions.path.length == 0 ||
requestOptions.safety > GRPCCallSafetyCacheableRequest) {
return nil;
}
if (handler == nil) {
return nil;
}
if ((self = [super init])) {
_requestOptions = [requestOptions copy];
_handler = handler;
_callOptions = [callOptions copy];
_responseClass = responseClass;
// Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
#if __IPHONE_OS_VERSION_MAX_ALLOWED < 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED < 101300
if (@available(iOS 8.0, macOS 10.10, *)) {
_dispatchQueue = dispatch_queue_create(
NULL,
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
} else {
#else
{
#endif
_dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
dispatch_set_target_queue(_dispatchQueue, handler.dispatchQueue);
_call = [[GRPCCall2 alloc] initWithRequestOptions:_requestOptions
responseHandler:self
callOptions:_callOptions];
}
return self;
}
- (void)start {
GRPCCall2 *copiedCall;
@synchronized(self) {
copiedCall = _call;
}
[copiedCall start];
}
- (void)cancel {
GRPCCall2 *copiedCall;
@synchronized(self) {
copiedCall = _call;
_call = nil;
if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
copiedHandler = self->_handler;
self->_handler = nil;
}
[copiedHandler didCloseWithTrailingMetadata:nil
error:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:@{
NSLocalizedDescriptionKey :
@"Canceled by app"
}]];
});
} else {
_handler = nil;
}
}
[copiedCall cancel];
}
- (void)writeMessage:(GPBMessage *)message {
NSAssert([message isKindOfClass:[GPBMessage class]], @"Parameter message must be a GPBMessage");
if (![message isKindOfClass:[GPBMessage class]]) {
NSLog(@"Failed to send a message that is non-proto.");
return;
}
GRPCCall2 *copiedCall;
@synchronized(self) {
copiedCall = _call;
}
[copiedCall writeData:[message data]];
}
- (void)finish {
GRPCCall2 *copiedCall;
@synchronized(self) {
copiedCall = _call;
_call = nil;
}
[copiedCall finish];
}
- (void)receiveNextMessage {
[self receiveNextMessages:1];
}
- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
GRPCCall2 *copiedCall;
@synchronized(self) {
copiedCall = _call;
}
[copiedCall receiveNextMessages:numberOfMessages];
}
- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {
@synchronized(self) {
if (initialMetadata != nil &&
[_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
copiedHandler = self->_handler;
}
[copiedHandler didReceiveInitialMetadata:initialMetadata];
});
}
}
}
- (void)didReceiveRawMessage:(NSData *)message {
if (message == nil) return;
NSError *error = nil;
GPBMessage *parsed = [_responseClass parseFromData:message error:&error];
@synchronized(self) {
if (parsed && [_handler respondsToSelector:@selector(didReceiveProtoMessage:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
copiedHandler = self->_handler;
}
[copiedHandler didReceiveProtoMessage:parsed];
});
} else if (!parsed &&
[_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
copiedHandler = self->_handler;
self->_handler = nil;
}
[copiedHandler
didCloseWithTrailingMetadata:nil
error:ErrorForBadProto(message, self->_responseClass, error)];
});
[_call cancel];
_call = nil;
}
}
}
- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
@synchronized(self) {
if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
copiedHandler = self->_handler;
self->_handler = nil;
}
[copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
});
}
_call = nil;
}
}
- (void)didWriteData {
@synchronized(self) {
if ([_handler respondsToSelector:@selector(didWriteMessage)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
copiedHandler = self->_handler;
}
[copiedHandler didWriteMessage];
});
}
}
}
- (dispatch_queue_t)dispatchQueue {
return _dispatchQueue;
}
@end
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-implementations"
@implementation ProtoRPC {
#pragma clang diagnostic pop
id<GRXWriteable> _responseWriteable;
}
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wobjc-designated-initializers"
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
requestsWriter:(GRXWriter *)requestsWriter {
[NSException raise:NSInvalidArgumentException
format:@"Please use ProtoRPC's designated initializer instead."];
return nil;
}
#pragma clang diagnostic pop
// Designated initializer
- (instancetype)initWithHost:(NSString *)host
method:(GRPCProtoMethod *)method
requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable {
// Because we can't tell the type system to constrain the class, we need to check at runtime:
if (![responseClass respondsToSelector:@selector(parseFromData:error:)]) {
[NSException raise:NSInvalidArgumentException
format:@"A protobuf class to parse the responses must be provided."];
}
// A writer that serializes the proto messages to send.
GRXWriter *bytesWriter = [requestsWriter map:^id(GPBMessage *proto) {
if (![proto isKindOfClass:[GPBMessage class]]) {
[NSException raise:NSInvalidArgumentException
format:@"Request must be a proto message: %@", proto];
}
return [proto data];
}];
if ((self = [super initWithHost:host path:method.HTTPPath requestsWriter:bytesWriter])) {
__weak ProtoRPC *weakSelf = self;
// A writeable that parses the proto messages received.
_responseWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
// TODO(jcanizales): This is done in the main thread, and needs to happen in another thread.
NSError *error = nil;
id parsed = [responseClass parseFromData:value error:&error];
if (parsed) {
[responsesWriteable writeValue:parsed];
} else {
[weakSelf finishWithError:ErrorForBadProto(value, responseClass, error)];
}
}
completionHandler:^(NSError *errorOrNil) {
[responsesWriteable writesFinishedWithError:errorOrNil];
}];
}
return self;
}
- (void)start {
[self startWithWriteable:_responseWriteable];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
[super startWithWriteable:writeable];
// Break retain cycles.
_responseWriteable = nil;
}
@end
@implementation GRPCProtoCall
@end