| // Copyright 2017 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| #include <vector> |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| |
| #include "src/core/ext/filters/client_channel/backup_poller.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" |
| #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" |
| #include "src/core/lib/address_utils/sockaddr_utils.h" |
| #include "src/core/lib/gpr/env.h" |
| #include "src/core/lib/resolver/server_address.h" |
| #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" |
| #include "test/cpp/end2end/connection_delay_injector.h" |
| #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" |
| |
| namespace grpc { |
| namespace testing { |
| namespace { |
| |
| using ::envoy::config::cluster::v3::CustomClusterType; |
| using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig; |
| |
| class ClusterTypeTest : public XdsEnd2endTest { |
| protected: |
| void SetUp() override { |
| logical_dns_cluster_resolver_response_generator_ = |
| grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>(); |
| InitClient(); |
| ChannelArguments args; |
| args.SetPointerWithVtable( |
| GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR, |
| logical_dns_cluster_resolver_response_generator_.get(), |
| &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable); |
| ResetStub(/*failover_timeout_ms=*/0, &args); |
| } |
| |
| grpc_core::ServerAddressList CreateAddressListFromPortList( |
| const std::vector<int>& ports) { |
| grpc_core::ServerAddressList addresses; |
| for (int port : ports) { |
| absl::StatusOr<grpc_core::URI> lb_uri = grpc_core::URI::Parse( |
| absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port)); |
| GPR_ASSERT(lb_uri.ok()); |
| grpc_resolved_address address; |
| GPR_ASSERT(grpc_parse_uri(*lb_uri, &address)); |
| addresses.emplace_back(address.addr, address.len, nullptr); |
| } |
| return addresses; |
| } |
| |
| grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> |
| logical_dns_cluster_resolver_response_generator_; |
| }; |
| |
| // |
| // LOGICAL_DNS cluster tests |
| // |
| |
| using LogicalDNSClusterTest = ClusterTypeTest; |
| |
| INSTANTIATE_TEST_SUITE_P(XdsTest, LogicalDNSClusterTest, |
| ::testing::Values(XdsTestType()), &XdsTestType::Name); |
| |
| TEST_P(LogicalDNSClusterTest, Basic) { |
| CreateAndStartBackends(1); |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| auto* address = cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address() |
| ->mutable_socket_address(); |
| address->set_address(kServerName); |
| address->set_port_value(443); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Set Logical DNS result |
| { |
| grpc_core::ExecCtx exec_ctx; |
| grpc_core::Resolver::Result result; |
| result.addresses = CreateAddressListFromPortList(GetBackendPorts()); |
| logical_dns_cluster_resolver_response_generator_->SetResponse( |
| std::move(result)); |
| } |
| // RPCs should succeed. |
| CheckRpcSendOk(DEBUG_LOCATION); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, MissingLoadAssignment) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr( |
| "load_assignment not present for LOGICAL_DNS cluster")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, MissingLocalities) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT( |
| response_state->error_message, |
| ::testing::HasSubstr("load_assignment for LOGICAL_DNS cluster must have " |
| "exactly one locality, found 0")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, MultipleLocalities) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| auto* load_assignment = cluster.mutable_load_assignment(); |
| load_assignment->add_endpoints(); |
| load_assignment->add_endpoints(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT( |
| response_state->error_message, |
| ::testing::HasSubstr("load_assignment for LOGICAL_DNS cluster must have " |
| "exactly one locality, found 2")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, MissingEndpoints) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment()->add_endpoints(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr( |
| "locality for LOGICAL_DNS cluster must have exactly one " |
| "endpoint, found 0")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, MultipleEndpoints) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| auto* locality = cluster.mutable_load_assignment()->add_endpoints(); |
| locality->add_lb_endpoints(); |
| locality->add_lb_endpoints(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr( |
| "locality for LOGICAL_DNS cluster must have exactly one " |
| "endpoint, found 2")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, EmptyEndpoint) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment()->add_endpoints()->add_lb_endpoints(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr("LbEndpoint endpoint field not set")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, EndpointMissingAddress) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr("Endpoint address field not set")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, AddressMissingSocketAddress) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr("Address socket_address field not set")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, SocketAddressHasResolverName) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address() |
| ->mutable_socket_address() |
| ->set_resolver_name("foo"); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr("LOGICAL_DNS clusters must NOT have a " |
| "custom resolver name set")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, SocketAddressMissingAddress) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address() |
| ->mutable_socket_address(); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr("SocketAddress address field not set")); |
| } |
| |
| TEST_P(LogicalDNSClusterTest, SocketAddressMissingPort) { |
| // Create Logical DNS Cluster |
| auto cluster = default_cluster_; |
| cluster.set_type(Cluster::LOGICAL_DNS); |
| cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address() |
| ->mutable_socket_address() |
| ->set_address(kServerName); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| const auto response_state = WaitForCdsNack(DEBUG_LOCATION); |
| ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; |
| EXPECT_THAT(response_state->error_message, |
| ::testing::HasSubstr("SocketAddress port_value field not set")); |
| } |
| |
| // |
| // aggregate cluster tests |
| // |
| |
| // TODO(roth): Add tests showing that load reporting is enabled on a |
| // per-underlying-cluster basis. |
| |
| using AggregateClusterTest = ClusterTypeTest; |
| |
| INSTANTIATE_TEST_SUITE_P(XdsTest, AggregateClusterTest, |
| ::testing::Values(XdsTestType()), &XdsTestType::Name); |
| |
| TEST_P(AggregateClusterTest, Basic) { |
| CreateAndStartBackends(2); |
| const char* kNewCluster1Name = "new_cluster_1"; |
| const char* kNewEdsService1Name = "new_eds_service_name_1"; |
| const char* kNewCluster2Name = "new_cluster_2"; |
| const char* kNewEdsService2Name = "new_eds_service_name_2"; |
| // Populate new EDS resources. |
| EdsResourceArgs args1({ |
| {"locality0", CreateEndpointsForBackends(0, 1)}, |
| }); |
| EdsResourceArgs args2({ |
| {"locality0", CreateEndpointsForBackends(1, 2)}, |
| }); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsService1Name)); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args2, kNewEdsService2Name)); |
| // Populate new CDS resources. |
| Cluster new_cluster1 = default_cluster_; |
| new_cluster1.set_name(kNewCluster1Name); |
| new_cluster1.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsService1Name); |
| balancer_->ads_service()->SetCdsResource(new_cluster1); |
| Cluster new_cluster2 = default_cluster_; |
| new_cluster2.set_name(kNewCluster2Name); |
| new_cluster2.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsService2Name); |
| balancer_->ads_service()->SetCdsResource(new_cluster2); |
| // Create Aggregate Cluster |
| auto cluster = default_cluster_; |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(kNewCluster1Name); |
| cluster_config.add_clusters(kNewCluster2Name); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Wait for traffic to go to backend 0. |
| WaitForBackend(DEBUG_LOCATION, 0); |
| // Shutdown backend 0 and wait for all traffic to go to backend 1. |
| backends_[0]->StopListeningAndSendGoaways(); |
| WaitForBackend(DEBUG_LOCATION, 1); |
| auto response_state = balancer_->ads_service()->cds_response_state(); |
| ASSERT_TRUE(response_state.has_value()); |
| EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); |
| // Bring backend 0 back and ensure all traffic goes back to it. |
| ShutdownBackend(0); |
| StartBackend(0); |
| WaitForBackend(DEBUG_LOCATION, 0); |
| } |
| |
| TEST_P(AggregateClusterTest, DiamondDependency) { |
| const char* kNewClusterName1 = "new_cluster_1"; |
| const char* kNewEdsServiceName1 = "new_eds_service_name_1"; |
| const char* kNewClusterName2 = "new_cluster_2"; |
| const char* kNewEdsServiceName2 = "new_eds_service_name_2"; |
| const char* kNewAggregateClusterName = "new_aggregate_cluster"; |
| // Populate new EDS resources. |
| CreateAndStartBackends(2); |
| EdsResourceArgs args1({{"locality0", CreateEndpointsForBackends(0, 1)}}); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsServiceName1)); |
| EdsResourceArgs args2({{"locality0", CreateEndpointsForBackends(1, 2)}}); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args2, kNewEdsServiceName2)); |
| // Populate new CDS resources. |
| Cluster new_cluster1 = default_cluster_; |
| new_cluster1.set_name(kNewClusterName1); |
| new_cluster1.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsServiceName1); |
| balancer_->ads_service()->SetCdsResource(new_cluster1); |
| Cluster new_cluster2 = default_cluster_; |
| new_cluster2.set_name(kNewClusterName2); |
| new_cluster2.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsServiceName2); |
| balancer_->ads_service()->SetCdsResource(new_cluster2); |
| // Populate top-level aggregate cluster pointing to kNewClusterName1 |
| // and kNewAggregateClusterName. |
| auto cluster = default_cluster_; |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(kNewClusterName1); |
| cluster_config.add_clusters(kNewAggregateClusterName); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Populate kNewAggregateClusterName aggregate cluster pointing to |
| // kNewClusterName1 and kNewClusterName2. |
| auto aggregate_cluster2 = default_cluster_; |
| aggregate_cluster2.set_name(kNewAggregateClusterName); |
| custom_cluster = aggregate_cluster2.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| cluster_config.Clear(); |
| cluster_config.add_clusters(kNewClusterName1); |
| cluster_config.add_clusters(kNewClusterName2); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(aggregate_cluster2); |
| // Wait for traffic to go to backend 0. |
| WaitForBackend(DEBUG_LOCATION, 0); |
| // Shutdown backend 0 and wait for all traffic to go to backend 1. |
| backends_[0]->StopListeningAndSendGoaways(); |
| WaitForBackend(DEBUG_LOCATION, 1); |
| auto response_state = balancer_->ads_service()->cds_response_state(); |
| ASSERT_TRUE(response_state.has_value()); |
| EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); |
| // Bring backend 0 back and ensure all traffic go back to it. |
| ShutdownBackend(0); |
| StartBackend(0); |
| WaitForBackend(DEBUG_LOCATION, 0); |
| } |
| |
| // This test covers a bug found in the following scenario: |
| // 1. P0 reports TRANSIENT_FAILURE, so we start connecting to P1. |
| // 2. While P1 is still in CONNECTING, P0 goes back to READY, so we |
| // switch back to P0, deactivating P1. |
| // 3. P0 then goes back to TRANSIENT_FAILURE, and we reactivate P1. |
| // The bug caused us to fail to choose P1 even though it is in state |
| // CONNECTING (because the failover timer was not running), so we |
| // incorrectly failed the RPCs. |
| TEST_P(AggregateClusterTest, FallBackWithConnectivityChurn) { |
| CreateAndStartBackends(2); |
| const char* kClusterName1 = "cluster1"; |
| const char* kClusterName2 = "cluster2"; |
| const char* kEdsServiceName2 = "eds_service_name2"; |
| // Populate EDS resources. |
| EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); |
| balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); |
| args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}}); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args, kEdsServiceName2)); |
| // Populate new CDS resources. |
| Cluster cluster1 = default_cluster_; |
| cluster1.set_name(kClusterName1); |
| balancer_->ads_service()->SetCdsResource(cluster1); |
| Cluster cluster2 = default_cluster_; |
| cluster2.set_name(kClusterName2); |
| cluster2.mutable_eds_cluster_config()->set_service_name(kEdsServiceName2); |
| balancer_->ads_service()->SetCdsResource(cluster2); |
| // Create Aggregate Cluster |
| auto cluster = default_cluster_; |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(kClusterName1); |
| cluster_config.add_clusters(kClusterName2); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Start connection injector. |
| ConnectionHoldInjector injector; |
| injector.Start(); |
| auto hold0 = injector.AddHold(backends_[0]->port()); |
| auto hold1 = injector.AddHold(backends_[1]->port()); |
| // Start long-running RPC in the background. |
| // This will trigger the channel to start connecting. |
| // Increase timeout to account for subchannel connection delays. |
| LongRunningRpc rpc; |
| rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(2000)); |
| // Tell channel to start connecting. |
| channel_->GetState(/*try_to_connect=*/true); |
| // Wait for backend 0 connection attempt to start, then fail it. |
| hold0->Wait(); |
| hold0->Fail( |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("injected connection failure")); |
| // The channel should trigger a connection attempt for backend 1 now, |
| // but we've added a hold for that, so it will not complete yet. |
| // Meanwhile, the channel will also start a second attempt for backend |
| // 0, which we have NOT held, so it will complete normally, and the |
| // RPC will finish on backend 0. |
| gpr_log(GPR_INFO, "=== WAITING FOR RPC TO FINISH === "); |
| Status status = rpc.GetStatus(); |
| gpr_log(GPR_INFO, "=== RPC FINISHED === "); |
| EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
| << " message=" << status.error_message(); |
| EXPECT_EQ(1UL, backends_[0]->backend_service()->request_count()); |
| // Wait for backend 1 connection attempt to start. |
| hold1->Wait(); |
| // Send GOAWAY from the P0 backend. |
| // We don't actually shut it down here to avoid flakiness caused by |
| // failing an RPC after the client has already sent it but before the |
| // server finished processing it. |
| backends_[0]->StopListeningAndSendGoaways(); |
| // Allow the connection attempt to the P1 backend to resume. |
| hold1->Resume(); |
| // Wait for P1 backend to start getting traffic. |
| WaitForBackend(DEBUG_LOCATION, 1); |
| } |
| |
| TEST_P(AggregateClusterTest, EdsToLogicalDns) { |
| CreateAndStartBackends(2); |
| const char* kNewCluster1Name = "new_cluster_1"; |
| const char* kNewEdsService1Name = "new_eds_service_name_1"; |
| const char* kLogicalDNSClusterName = "logical_dns_cluster"; |
| // Populate new EDS resources. |
| EdsResourceArgs args1({{"locality0", CreateEndpointsForBackends(0, 1)}}); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsService1Name)); |
| // Populate new CDS resources. |
| Cluster new_cluster1 = default_cluster_; |
| new_cluster1.set_name(kNewCluster1Name); |
| new_cluster1.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsService1Name); |
| balancer_->ads_service()->SetCdsResource(new_cluster1); |
| // Create Logical DNS Cluster |
| auto logical_dns_cluster = default_cluster_; |
| logical_dns_cluster.set_name(kLogicalDNSClusterName); |
| logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); |
| auto* address = logical_dns_cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address() |
| ->mutable_socket_address(); |
| address->set_address(kServerName); |
| address->set_port_value(443); |
| balancer_->ads_service()->SetCdsResource(logical_dns_cluster); |
| // Create Aggregate Cluster |
| auto cluster = default_cluster_; |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(kNewCluster1Name); |
| cluster_config.add_clusters(kLogicalDNSClusterName); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Set Logical DNS result |
| { |
| grpc_core::ExecCtx exec_ctx; |
| grpc_core::Resolver::Result result; |
| result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2)); |
| logical_dns_cluster_resolver_response_generator_->SetResponse( |
| std::move(result)); |
| } |
| // Wait for traffic to go to backend 0. |
| WaitForBackend(DEBUG_LOCATION, 0); |
| // Shutdown backend 0 and wait for all traffic to go to backend 1. |
| backends_[0]->StopListeningAndSendGoaways(); |
| WaitForBackend(DEBUG_LOCATION, 1); |
| auto response_state = balancer_->ads_service()->cds_response_state(); |
| ASSERT_TRUE(response_state.has_value()); |
| EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); |
| // Bring backend 0 back and ensure all traffic go back to it. |
| ShutdownBackend(0); |
| StartBackend(0); |
| WaitForBackend(DEBUG_LOCATION, 0); |
| } |
| |
| TEST_P(AggregateClusterTest, LogicalDnsToEds) { |
| CreateAndStartBackends(2); |
| const char* kNewCluster2Name = "new_cluster_2"; |
| const char* kNewEdsService2Name = "new_eds_service_name_2"; |
| const char* kLogicalDNSClusterName = "logical_dns_cluster"; |
| // Populate new EDS resources. |
| EdsResourceArgs args2({ |
| {"locality0", CreateEndpointsForBackends(1, 2)}, |
| }); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args2, kNewEdsService2Name)); |
| // Populate new CDS resources. |
| Cluster new_cluster2 = default_cluster_; |
| new_cluster2.set_name(kNewCluster2Name); |
| new_cluster2.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsService2Name); |
| balancer_->ads_service()->SetCdsResource(new_cluster2); |
| // Create Logical DNS Cluster |
| auto logical_dns_cluster = default_cluster_; |
| logical_dns_cluster.set_name(kLogicalDNSClusterName); |
| logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); |
| auto* address = logical_dns_cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address() |
| ->mutable_socket_address(); |
| address->set_address(kServerName); |
| address->set_port_value(443); |
| balancer_->ads_service()->SetCdsResource(logical_dns_cluster); |
| // Create Aggregate Cluster |
| auto cluster = default_cluster_; |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(kLogicalDNSClusterName); |
| cluster_config.add_clusters(kNewCluster2Name); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Set Logical DNS result |
| { |
| grpc_core::ExecCtx exec_ctx; |
| grpc_core::Resolver::Result result; |
| result.addresses = CreateAddressListFromPortList(GetBackendPorts(0, 1)); |
| logical_dns_cluster_resolver_response_generator_->SetResponse( |
| std::move(result)); |
| } |
| // Wait for traffic to go to backend 0. |
| WaitForBackend(DEBUG_LOCATION, 0); |
| // Shutdown backend 0 and wait for all traffic to go to backend 1. |
| backends_[0]->StopListeningAndSendGoaways(); |
| WaitForBackend(DEBUG_LOCATION, 1); |
| auto response_state = balancer_->ads_service()->cds_response_state(); |
| ASSERT_TRUE(response_state.has_value()); |
| EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); |
| // Bring backend 0 back and ensure all traffic go back to it. |
| ShutdownBackend(0); |
| StartBackend(0); |
| WaitForBackend(DEBUG_LOCATION, 0); |
| } |
| |
| // This test covers a bug seen in the wild where the |
| // xds_cluster_resolver policy's code to reuse child policy names did |
| // not correctly handle the case where the LOGICAL_DNS priority failed, |
| // thus returning a priority with no localities. This caused the child |
| // name to be reused incorrectly, which triggered an assertion failure |
| // in the xds_cluster_impl policy caused by changing its cluster name. |
| TEST_P(AggregateClusterTest, ReconfigEdsWhileLogicalDnsChildFails) { |
| CreateAndStartBackends(2); |
| const char* kNewCluster1Name = "new_cluster_1"; |
| const char* kNewEdsService1Name = "new_eds_service_name_1"; |
| const char* kLogicalDNSClusterName = "logical_dns_cluster"; |
| // Populate EDS resource with all unreachable endpoints. |
| // - Priority 0: locality0 |
| // - Priority 1: locality1, locality2 |
| EdsResourceArgs args1({ |
| {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0}, |
| {"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1}, |
| {"locality2", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1}, |
| }); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsService1Name)); |
| // Populate new CDS resources. |
| Cluster new_cluster1 = default_cluster_; |
| new_cluster1.set_name(kNewCluster1Name); |
| new_cluster1.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsService1Name); |
| balancer_->ads_service()->SetCdsResource(new_cluster1); |
| // Create Logical DNS Cluster |
| auto logical_dns_cluster = default_cluster_; |
| logical_dns_cluster.set_name(kLogicalDNSClusterName); |
| logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); |
| auto* address = logical_dns_cluster.mutable_load_assignment() |
| ->add_endpoints() |
| ->add_lb_endpoints() |
| ->mutable_endpoint() |
| ->mutable_address() |
| ->mutable_socket_address(); |
| address->set_address(kServerName); |
| address->set_port_value(443); |
| balancer_->ads_service()->SetCdsResource(logical_dns_cluster); |
| // Create Aggregate Cluster |
| auto cluster = default_cluster_; |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(kNewCluster1Name); |
| cluster_config.add_clusters(kLogicalDNSClusterName); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Set Logical DNS result |
| { |
| grpc_core::ExecCtx exec_ctx; |
| grpc_core::Resolver::Result result; |
| result.addresses = absl::UnavailableError("injected error"); |
| logical_dns_cluster_resolver_response_generator_->SetResponse( |
| std::move(result)); |
| } |
| // When an RPC fails, we know the channel has seen the update. |
| constexpr char kErrorMessage[] = |
| // TODO(roth): Figure out how to get some sort of resolution note |
| // included here as part of https://github.com/grpc/grpc/issues/22883. |
| "empty address list: "; |
| CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage); |
| // Send an EDS update that moves locality1 to priority 0. |
| args1 = EdsResourceArgs({ |
| {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight, |
| 0}, |
| {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight, |
| 1}, |
| }); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsService1Name)); |
| WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) { |
| if (!result.status.ok()) { |
| EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE); |
| EXPECT_EQ(result.status.error_message(), kErrorMessage); |
| } |
| }); |
| } |
| |
| TEST_P(AggregateClusterTest, MultipleClustersWithSameLocalities) { |
| CreateAndStartBackends(2); |
| const char* kNewClusterName1 = "new_cluster_1"; |
| const char* kNewEdsServiceName1 = "new_eds_service_name_1"; |
| const char* kNewClusterName2 = "new_cluster_2"; |
| const char* kNewEdsServiceName2 = "new_eds_service_name_2"; |
| // Populate EDS resource for cluster 1 with unreachable endpoint. |
| EdsResourceArgs args1({{"locality0", {MakeNonExistantEndpoint()}}}); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsServiceName1)); |
| // Populate CDS resource for cluster 1. |
| Cluster new_cluster1 = default_cluster_; |
| new_cluster1.set_name(kNewClusterName1); |
| new_cluster1.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsServiceName1); |
| balancer_->ads_service()->SetCdsResource(new_cluster1); |
| // Populate EDS resource for cluster 2. |
| args1 = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(0, 1)}}); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsServiceName2)); |
| // Populate CDS resource for cluster 2. |
| Cluster new_cluster2 = default_cluster_; |
| new_cluster2.set_name(kNewClusterName2); |
| new_cluster2.mutable_eds_cluster_config()->set_service_name( |
| kNewEdsServiceName2); |
| balancer_->ads_service()->SetCdsResource(new_cluster2); |
| // Create Aggregate Cluster |
| auto cluster = default_cluster_; |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(kNewClusterName1); |
| cluster_config.add_clusters(kNewClusterName2); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| // Wait for channel to get the resources and get connected. |
| WaitForBackend(DEBUG_LOCATION, 0); |
| // Send an EDS update for cluster 1 that reuses the locality name from |
| // cluster 1 and points traffic to backend 1. |
| args1 = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}}); |
| balancer_->ads_service()->SetEdsResource( |
| BuildEdsResource(args1, kNewEdsServiceName1)); |
| WaitForBackend(DEBUG_LOCATION, 1); |
| } |
| |
| TEST_P(AggregateClusterTest, RecursionDepthJustBelowMax) { |
| // Populate EDS resource. |
| CreateAndStartBackends(1); |
| EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); |
| balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); |
| // Populate new CDS resource. |
| Cluster new_cluster = default_cluster_; |
| new_cluster.set_name(absl::StrCat(kDefaultClusterName, 15)); |
| balancer_->ads_service()->SetCdsResource(new_cluster); |
| // Populate aggregate cluster chain. |
| for (int i = 14; i >= 0; --i) { |
| auto cluster = default_cluster_; |
| if (i > 0) cluster.set_name(absl::StrCat(kDefaultClusterName, i)); |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(absl::StrCat(kDefaultClusterName, i + 1)); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| } |
| // RPCs should fail with the right status. |
| CheckRpcSendOk(DEBUG_LOCATION); |
| } |
| |
| TEST_P(AggregateClusterTest, RecursionMaxDepth) { |
| // Populate EDS resource. |
| CreateAndStartBackends(1); |
| EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); |
| balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); |
| // Populate new CDS resource. |
| Cluster new_cluster = default_cluster_; |
| new_cluster.set_name(absl::StrCat(kDefaultClusterName, 16)); |
| balancer_->ads_service()->SetCdsResource(new_cluster); |
| // Populate aggregate cluster chain. |
| for (int i = 15; i >= 0; --i) { |
| auto cluster = default_cluster_; |
| if (i > 0) cluster.set_name(absl::StrCat(kDefaultClusterName, i)); |
| CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); |
| custom_cluster->set_name("envoy.clusters.aggregate"); |
| ClusterConfig cluster_config; |
| cluster_config.add_clusters(absl::StrCat(kDefaultClusterName, i + 1)); |
| custom_cluster->mutable_typed_config()->PackFrom(cluster_config); |
| balancer_->ads_service()->SetCdsResource(cluster); |
| } |
| // RPCs should fail with the right status. |
| const Status status = SendRpc(); |
| EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); |
| EXPECT_THAT( |
| status.error_message(), |
| ::testing::HasSubstr("aggregate cluster graph exceeds max depth")); |
| } |
| |
| } // namespace |
| } // namespace testing |
| } // namespace grpc |
| |
| int main(int argc, char** argv) { |
| grpc::testing::TestEnvironment env(&argc, argv); |
| ::testing::InitGoogleTest(&argc, argv); |
| // Make the backup poller poll very frequently in order to pick up |
| // updates from all the subchannels's FDs. |
| GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); |
| #if TARGET_OS_IPHONE |
| // Workaround Apple CFStream bug |
| gpr_setenv("grpc_cfstream", "0"); |
| #endif |
| grpc_init(); |
| grpc::testing::ConnectionAttemptInjector::Init(); |
| const auto result = RUN_ALL_TESTS(); |
| grpc_shutdown(); |
| return result; |
| } |