| # Copyright 2015 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| require_relative '../grpc' |
| |
| # GRPC contains the General RPC module. |
| module GRPC |
| # RpcDesc is a Descriptor of an RPC method. |
| class RpcDesc < Struct.new(:name, :input, :output, :marshal_method, |
| :unmarshal_method) |
| include Core::StatusCodes |
| |
| # Used to wrap a message class to indicate that it needs to be streamed. |
| class Stream |
| attr_accessor :type |
| |
| def initialize(type) |
| @type = type |
| end |
| end |
| |
| # @return [Proc] { |instance| marshalled(instance) } |
| def marshal_proc |
| proc { |o| o.class.method(marshal_method).call(o).to_s } |
| end |
| |
| # @param [:input, :output] target determines whether to produce the an |
| # unmarshal Proc for the rpc input parameter or |
| # its output parameter |
| # |
| # @return [Proc] An unmarshal proc { |marshalled(instance)| instance } |
| def unmarshal_proc(target) |
| fail ArgumentError unless [:input, :output].include?(target) |
| unmarshal_class = method(target).call |
| unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream |
| proc { |o| unmarshal_class.method(unmarshal_method).call(o) } |
| end |
| |
| def handle_request_response(active_call, mth, inter_ctx) |
| req = active_call.read_unary_request |
| call = active_call.single_req_view |
| |
| inter_ctx.intercept!( |
| :request_response, |
| method: mth, |
| call: call, |
| request: req |
| ) do |
| resp = mth.call(req, call) |
| active_call.server_unary_response( |
| resp, |
| trailing_metadata: active_call.output_metadata |
| ) |
| end |
| end |
| |
| def handle_client_streamer(active_call, mth, inter_ctx) |
| call = active_call.multi_req_view |
| |
| inter_ctx.intercept!( |
| :client_streamer, |
| method: mth, |
| call: call |
| ) do |
| resp = mth.call(call) |
| active_call.server_unary_response( |
| resp, |
| trailing_metadata: active_call.output_metadata |
| ) |
| end |
| end |
| |
| def handle_server_streamer(active_call, mth, inter_ctx) |
| req = active_call.read_unary_request |
| call = active_call.single_req_view |
| |
| inter_ctx.intercept!( |
| :server_streamer, |
| method: mth, |
| call: call, |
| request: req |
| ) do |
| replies = mth.call(req, call) |
| replies.each { |r| active_call.remote_send(r) } |
| send_status(active_call, OK, 'OK', active_call.output_metadata) |
| end |
| end |
| |
| ## |
| # @param [GRPC::ActiveCall] active_call |
| # @param [Method] mth |
| # @param [Array<GRPC::InterceptionContext>] inter_ctx |
| # |
| def handle_bidi_streamer(active_call, mth, inter_ctx) |
| active_call.run_server_bidi(mth, inter_ctx) |
| send_status(active_call, OK, 'OK', active_call.output_metadata) |
| end |
| |
| ## |
| # @param [GRPC::ActiveCall] active_call The current active call object |
| # for the request |
| # @param [Method] mth The current RPC method being called |
| # @param [GRPC::InterceptionContext] inter_ctx The interception context |
| # being executed |
| # |
| def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) |
| # While a server method is running, it might be cancelled, its deadline |
| # might be reached, the handler could throw an unknown error, or a |
| # well-behaved handler could throw a StatusError. |
| if request_response? |
| handle_request_response(active_call, mth, inter_ctx) |
| elsif client_streamer? |
| handle_client_streamer(active_call, mth, inter_ctx) |
| elsif server_streamer? |
| handle_server_streamer(active_call, mth, inter_ctx) |
| else # is a bidi_stream |
| handle_bidi_streamer(active_call, mth, inter_ctx) |
| end |
| rescue BadStatus => e |
| # this is raised by handlers that want GRPC to send an application error |
| # code and detail message and some additional app-specific metadata. |
| GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}") |
| send_status(active_call, e.code, e.details, e.metadata) |
| rescue Core::CallError => e |
| # This is raised by GRPC internals but should rarely, if ever happen. |
| # Log it, but don't notify the other endpoint.. |
| GRPC.logger.warn("failed call: #{active_call}\n#{e}") |
| rescue Core::OutOfTime |
| # This is raised when active_call#method.call exceeds the deadline |
| # event. Send a status of deadline exceeded |
| GRPC.logger.warn("late call: #{active_call}") |
| send_status(active_call, DEADLINE_EXCEEDED, 'late') |
| rescue StandardError, NotImplementedError => e |
| # This will usuaally be an unhandled error in the handling code. |
| # Send back a UNKNOWN status to the client |
| # |
| # Note: this intentionally does not map NotImplementedError to |
| # UNIMPLEMENTED because NotImplementedError is intended for low-level |
| # OS interaction (e.g. syscalls) not supported by the current OS. |
| GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") |
| GRPC.logger.warn(e) |
| send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}") |
| end |
| |
| def assert_arity_matches(mth) |
| # A bidi handler function can optionally be passed a second |
| # call object parameter for access to metadata, cancelling, etc. |
| if bidi_streamer? |
| if mth.arity != 2 && mth.arity != 1 |
| fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \ |
| "#{mth.name}(req)") |
| end |
| elsif request_response? || server_streamer? |
| if mth.arity != 2 |
| fail arity_error(mth, 2, "should be #{mth.name}(req, call)") |
| end |
| else |
| if mth.arity != 1 |
| fail arity_error(mth, 1, "should be #{mth.name}(call)") |
| end |
| end |
| end |
| |
| def request_response? |
| !input.is_a?(Stream) && !output.is_a?(Stream) |
| end |
| |
| def client_streamer? |
| input.is_a?(Stream) && !output.is_a?(Stream) |
| end |
| |
| def server_streamer? |
| !input.is_a?(Stream) && output.is_a?(Stream) |
| end |
| |
| def bidi_streamer? |
| input.is_a?(Stream) && output.is_a?(Stream) |
| end |
| |
| def arity_error(mth, want, msg) |
| "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" |
| end |
| |
| def send_status(active_client, code, details, metadata = {}) |
| details = 'Not sure why' if details.nil? |
| GRPC.logger.debug("Sending status #{code}:#{details}") |
| active_client.send_status(code, details, code == OK, metadata: metadata) |
| rescue StandardError => e |
| GRPC.logger.warn("Could not send status #{code}:#{details}") |
| GRPC.logger.warn(e) |
| end |
| end |
| end |