|  | #!/usr/bin/env ruby | 
|  |  | 
|  | # Copyright 2016 gRPC authors. | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  |  | 
|  | # Worker and worker service implementation | 
|  |  | 
|  | this_dir = File.expand_path(File.dirname(__FILE__)) | 
|  | lib_dir = File.join(File.dirname(this_dir), 'lib') | 
|  | $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) | 
|  | $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) | 
|  |  | 
|  | require 'grpc' | 
|  | require 'histogram' | 
|  | require 'src/proto/grpc/testing/benchmark_service_services_pb' | 
|  |  | 
|  | class Poisson | 
|  | def interarrival | 
|  | @lambda_recip * (-Math.log(1.0-rand)) | 
|  | end | 
|  | def advance | 
|  | t = @next_time | 
|  | @next_time += interarrival | 
|  | t | 
|  | end | 
|  | def initialize(lambda) | 
|  | @lambda_recip = 1.0/lambda | 
|  | @next_time = Time.now + interarrival | 
|  | end | 
|  | end | 
|  |  | 
|  | class BenchmarkClient | 
|  | def initialize(config) | 
|  | opts = {} | 
|  | if config.security_params | 
|  | if config.security_params.use_test_ca | 
|  | certs = load_test_certs | 
|  | cred = GRPC::Core::ChannelCredentials.new(certs[0]) | 
|  | else | 
|  | cred = GRPC::Core::ChannelCredentials.new() | 
|  | end | 
|  | if config.security_params.server_host_override | 
|  | channel_args = {} | 
|  | channel_args[GRPC::Core::Channel::SSL_TARGET] = | 
|  | config.security_params.server_host_override | 
|  | opts[:channel_args] = channel_args | 
|  | end | 
|  | else | 
|  | cred = :this_channel_is_insecure | 
|  | end | 
|  | @histres = config.histogram_params.resolution | 
|  | @histmax = config.histogram_params.max_possible | 
|  | @start_time = Time.now | 
|  | @histogram = Histogram.new(@histres, @histmax) | 
|  | @done = false | 
|  |  | 
|  | gtsr = Grpc::Testing::SimpleRequest | 
|  | gtpt = Grpc::Testing::PayloadType | 
|  | gtp = Grpc::Testing::Payload | 
|  | simple_params = config.payload_config.simple_params | 
|  | req = gtsr.new(response_type: gtpt::COMPRESSABLE, | 
|  | response_size: simple_params.resp_size, | 
|  | payload: gtp.new(type: gtpt::COMPRESSABLE, | 
|  | body: nulls(simple_params.req_size))) | 
|  |  | 
|  | @child_threads = [] | 
|  |  | 
|  | (0..config.client_channels-1).each do |chan| | 
|  | gtbss = Grpc::Testing::BenchmarkService::Stub | 
|  | st = config.server_targets | 
|  | stub = gtbss.new(st[chan % st.length], cred, **opts) | 
|  | (0..config.outstanding_rpcs_per_channel-1).each do |r| | 
|  | @child_threads << Thread.new { | 
|  | case config.load_params.load.to_s | 
|  | when 'closed_loop' | 
|  | waiter = nil | 
|  | when 'poisson' | 
|  | waiter = Poisson.new(config.load_params.poisson.offered_load / | 
|  | (config.client_channels * | 
|  | config.outstanding_rpcs_per_channel)) | 
|  | end | 
|  | case config.rpc_type | 
|  | when :UNARY | 
|  | unary_ping_ponger(req,stub,config,waiter) | 
|  | when :STREAMING | 
|  | streaming_ping_ponger(req,stub,config,waiter) | 
|  | end | 
|  | } | 
|  | end | 
|  | end | 
|  | end | 
|  | def wait_to_issue(waiter) | 
|  | if waiter | 
|  | delay = waiter.advance-Time.now | 
|  | sleep delay if delay > 0 | 
|  | end | 
|  | end | 
|  | def unary_ping_ponger(req, stub, config,waiter) | 
|  | while !@done | 
|  | wait_to_issue(waiter) | 
|  | start = Time.now | 
|  | resp = stub.unary_call(req) | 
|  | @histogram.add((Time.now-start)*1e9) | 
|  | end | 
|  | end | 
|  | def streaming_ping_ponger(req, stub, config, waiter) | 
|  | q = EnumeratorQueue.new(self) | 
|  | resp = stub.streaming_call(q.each_item) | 
|  | start = Time.now | 
|  | q.push(req) | 
|  | pushed_sentinal = false | 
|  | resp.each do |r| | 
|  | @histogram.add((Time.now-start)*1e9) | 
|  | if !@done | 
|  | wait_to_issue(waiter) | 
|  | start = Time.now | 
|  | q.push(req) | 
|  | else | 
|  | q.push(self) unless pushed_sentinal | 
|  | # Continue polling on the responses to consume and release resources | 
|  | pushed_sentinal = true | 
|  | end | 
|  | end | 
|  | end | 
|  | def mark(reset) | 
|  | lat = Grpc::Testing::HistogramData.new( | 
|  | bucket: @histogram.contents, | 
|  | min_seen: @histogram.minimum, | 
|  | max_seen: @histogram.maximum, | 
|  | sum: @histogram.sum, | 
|  | sum_of_squares: @histogram.sum_of_squares, | 
|  | count: @histogram.count | 
|  | ) | 
|  | elapsed = Time.now-@start_time | 
|  | if reset | 
|  | @start_time = Time.now | 
|  | @histogram = Histogram.new(@histres, @histmax) | 
|  | end | 
|  | Grpc::Testing::ClientStats.new(latencies: lat, time_elapsed: elapsed) | 
|  | end | 
|  | def shutdown | 
|  | @done = true | 
|  | @child_threads.each do |thread| | 
|  | thread.join | 
|  | end | 
|  | end | 
|  | end |