blob: b1f43e061b0a307d40e616db195d498944e5fb25 [file] [log] [blame]
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests the gRPC Core shutdown path."""
import datetime
import threading
import time
import unittest
import grpc
_TIMEOUT_FOR_SEGFAULT = datetime.timedelta(seconds=10)
class GrpcShutdownTest(unittest.TestCase):
def test_channel_close_with_connectivity_watcher(self):
"""Originated by https://github.com/grpc/grpc/issues/20299.
The grpc_shutdown happens synchronously, but there might be Core object
references left in Cython which might lead to ABORT or SIGSEGV.
"""
connection_failed = threading.Event()
def on_state_change(state):
if state in (grpc.ChannelConnectivity.TRANSIENT_FAILURE,
grpc.ChannelConnectivity.SHUTDOWN):
connection_failed.set()
# Connects to an void address, and subscribes state changes
channel = grpc.insecure_channel("0.1.1.1:12345")
channel.subscribe(on_state_change, True)
deadline = datetime.datetime.now() + _TIMEOUT_FOR_SEGFAULT
while datetime.datetime.now() < deadline:
time.sleep(0.1)
if connection_failed.is_set():
channel.close()
if __name__ == '__main__':
unittest.main(verbosity=2)