blob: a0f076e894a6b9b42cbd790ef26842bb7df9a3f5 [file] [log] [blame]
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python example of utilizing wait-for-ready flag."""
from __future__ import print_function
import logging
from concurrent import futures
from contextlib import contextmanager
import socket
import threading
import grpc
from examples import helloworld_pb2
from examples import helloworld_pb2_grpc
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
@contextmanager
def get_free_loopback_tcp_port():
if socket.has_ipv6:
tcp_socket = socket.socket(socket.AF_INET6)
else:
tcp_socket = socket.socket(socket.AF_INET)
tcp_socket.bind(('', 0))
address_tuple = tcp_socket.getsockname()
yield "localhost:%s" % (address_tuple[1])
tcp_socket.close()
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, unused_context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def create_server(server_address):
server = grpc.server(futures.ThreadPoolExecutor())
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
bound_port = server.add_insecure_port(server_address)
assert bound_port == int(server_address.split(':')[-1])
return server
def process(stub, wait_for_ready=None):
try:
response = stub.SayHello(
helloworld_pb2.HelloRequest(name='you'),
wait_for_ready=wait_for_ready)
message = response.message
except grpc.RpcError as rpc_error:
assert rpc_error.code() == grpc.StatusCode.UNAVAILABLE
assert not wait_for_ready
message = rpc_error
else:
assert wait_for_ready
_LOGGER.info("Wait-for-ready %s, client received: %s", "enabled"
if wait_for_ready else "disabled", message)
def main():
# Pick a random free port
with get_free_loopback_tcp_port() as server_address:
# Register connectivity event to notify main thread
transient_failure_event = threading.Event()
def wait_for_transient_failure(channel_connectivity):
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
transient_failure_event.set()
# Create gRPC channel
channel = grpc.insecure_channel(server_address)
channel.subscribe(wait_for_transient_failure)
stub = helloworld_pb2_grpc.GreeterStub(channel)
# Fire an RPC without wait_for_ready
thread_disabled_wait_for_ready = threading.Thread(
target=process, args=(stub, False))
thread_disabled_wait_for_ready.start()
# Fire an RPC with wait_for_ready
thread_enabled_wait_for_ready = threading.Thread(
target=process, args=(stub, True))
thread_enabled_wait_for_ready.start()
# Wait for the channel entering TRANSIENT FAILURE state.
transient_failure_event.wait()
server = create_server(server_address)
server.start()
# Expected to fail with StatusCode.UNAVAILABLE.
thread_disabled_wait_for_ready.join()
# Expected to success.
thread_enabled_wait_for_ready.join()
server.stop(None)
channel.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()