[memory test] reland: add benchmarks with xDS enabled (#34820)
Relands #34785, which was reverted in #34818.
The first commit is the revert. The second commit removes the gtest
dependency from the xds_server library, which should address the
testonly problem internally.
diff --git a/test/core/memory_usage/BUILD b/test/core/memory_usage/BUILD
index cfc2880..a0d600a 100644
--- a/test/core/memory_usage/BUILD
+++ b/test/core/memory_usage/BUILD
@@ -77,6 +77,7 @@
"//:gpr",
"//:grpc",
"//src/core:channel_args",
+ "//src/core:xds_enabled_server",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
@@ -162,6 +163,7 @@
"//:subprocess",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
+ "//test/cpp/end2end/xds:xds_utils",
],
)
@@ -182,5 +184,6 @@
"//:subprocess",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
+ "//test/cpp/end2end/xds:xds_utils",
],
)
diff --git a/test/core/memory_usage/callback_client.cc b/test/core/memory_usage/callback_client.cc
index 6923fca..5d120c7 100644
--- a/test/core/memory_usage/callback_client.cc
+++ b/test/core/memory_usage/callback_client.cc
@@ -27,6 +27,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
+#include "absl/strings/match.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
@@ -160,12 +161,14 @@
std::chrono::milliseconds(1)));
}
+ const char* prefix = "";
+ if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
printf("---------Client channel stats--------\n");
- printf("client channel memory usage: %f bytes per channel\n",
+ printf("%sclient channel memory usage: %f bytes per channel\n", prefix,
static_cast<double>(peak_client_memory - before_client_memory) / size *
1024);
printf("---------Server channel stats--------\n");
- printf("server channel memory usage: %f bytes per channel\n",
+ printf("%sserver channel memory usage: %f bytes per channel\n", prefix,
static_cast<double>(peak_server_memory - before_server_memory) / size *
1024);
gpr_log(GPR_INFO, "Client Done");
diff --git a/test/core/memory_usage/callback_server.cc b/test/core/memory_usage/callback_server.cc
index 2b7a3fc..7737592 100644
--- a/test/core/memory_usage/callback_server.cc
+++ b/test/core/memory_usage/callback_server.cc
@@ -30,12 +30,17 @@
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/status.h>
+#include <grpcpp/xds_server_builder.h>
#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/test_config.h"
+ABSL_FLAG(std::string, bind, "", "Bind host:port");
+ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
+ABSL_FLAG(bool, use_xds, false, "Use xDS");
+
class ServerCallbackImpl final
: public grpc::testing::BenchmarkService::CallbackService {
public:
@@ -69,9 +74,6 @@
// TODO(chennancy) Add graceful shutdown
static void sigint_handler(int /*x*/) { _exit(0); }
-ABSL_FLAG(std::string, bind, "", "Bind host:port");
-ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
-
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
char* fake_argv[1];
@@ -90,7 +92,11 @@
// Get initial process memory usage before creating server
long before_server_create = GetMemUsage();
ServerCallbackImpl callback_server(before_server_create);
- grpc::ServerBuilder builder;
+
+ grpc::XdsServerBuilder xds_builder;
+ grpc::ServerBuilder normal_builder;
+ grpc::ServerBuilder* builder =
+ absl::GetFlag(FLAGS_use_xds) ? &xds_builder : &normal_builder;
// Set the authentication mechanism.
std::shared_ptr<grpc::ServerCredentials> creds =
@@ -99,11 +105,11 @@
gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
// TODO (chennancy) Add in secure credentials
}
- builder.AddListeningPort(server_address, creds);
- builder.RegisterService(&callback_server);
+ builder->AddListeningPort(server_address, creds);
+ builder->RegisterService(&callback_server);
// Set up the server to start accepting requests.
- std::shared_ptr<grpc::Server> server(builder.BuildAndStart());
+ std::shared_ptr<grpc::Server> server(builder->BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.c_str());
// Keep the program running until the server shuts down.
diff --git a/test/core/memory_usage/client.cc b/test/core/memory_usage/client.cc
index ae76e13..3b20c71 100644
--- a/test/core/memory_usage/client.cc
+++ b/test/core/memory_usage/client.cc
@@ -27,6 +27,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
+#include "absl/strings/match.h"
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
@@ -41,6 +42,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/test_config.h"
@@ -158,6 +160,11 @@
(void*)nullptr, nullptr));
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
+ gpr_log(GPR_INFO, "Call %d status %d (%s)", call_idx, calls[call_idx].status,
+ std::string(grpc_core::StringViewFromSlice(calls[call_idx].details))
+ .c_str());
+
+ GPR_ASSERT(response_payload_recv != nullptr);
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, response_payload_recv);
grpc_slice response = grpc_byte_buffer_reader_readall(&reader);
@@ -282,14 +289,16 @@
grpc_completion_queue_destroy(cq);
grpc_shutdown_blocking();
+ const char* prefix = "";
+ if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
printf("---------client stats--------\n");
- printf("client call memory usage: %f bytes per call\n",
+ printf("%sclient call memory usage: %f bytes per call\n", prefix,
static_cast<double>(client_calls_inflight.rss -
client_benchmark_calls_start.rss) /
benchmark_iterations * 1024);
printf("---------server stats--------\n");
- printf("server call memory usage: %f bytes per call\n",
+ printf("%sserver call memory usage: %f bytes per call\n", prefix,
static_cast<double>(server_calls_inflight.rss -
server_benchmark_calls_start.rss) /
benchmark_iterations * 1024);
diff --git a/test/core/memory_usage/memory_usage_test.cc b/test/core/memory_usage/memory_usage_test.cc
index 0c2c9ee..7b0ab54 100644
--- a/test/core/memory_usage/memory_usage_test.cc
+++ b/test/core/memory_usage/memory_usage_test.cc
@@ -16,12 +16,15 @@
//
//
+#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <algorithm>
#include <iterator>
+#include <limits>
#include <map>
+#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -32,15 +35,26 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
+#include "google/protobuf/wrappers.pb.h"
+#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include <grpcpp/security/server_credentials.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gpr/subprocess.h"
-#include "src/core/lib/gprpp/host_port.h"
+#include "src/core/lib/gprpp/env.h"
+#include "src/proto/grpc/testing/xds/v3/cluster.pb.h"
#include "test/core/util/port.h"
+#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/xds/xds_server.h"
+#include "test/cpp/end2end/xds/xds_utils.h"
+
+using grpc::testing::XdsResourceUtils;
ABSL_FLAG(std::string, benchmark_names, "call,channel",
"Which benchmark to run"); // Default all benchmarks in order to
@@ -51,6 +65,9 @@
"secure (Use SSL credentials on server)");
ABSL_FLAG(bool, memory_profiling, false,
"Run memory profiling"); // TODO (chennancy) Connect this flag
+ABSL_FLAG(bool, use_xds, false, "Use xDS");
+
+// TODO(roth, ctiller): Add support for multiple addresses per channel.
class Subprocess {
public:
@@ -74,28 +91,38 @@
};
// per-call memory usage benchmark
-int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags,
+int RunCallBenchmark(int port, char* root,
+ std::vector<std::string> server_scenario_flags,
std::vector<std::string> client_scenario_flags) {
int status;
- int port = grpc_pick_unused_port_or_die();
// start the server
+ gpr_log(GPR_INFO, "starting server");
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_server",
gpr_subprocess_binary_extension()),
"--grpc_experiments",
std::string(grpc_core::ConfigVars::Get().Experiments()), "--bind",
- grpc_core::JoinHostPort("::", port)};
+ grpc_core::LocalIpAndPort(port)};
+ if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
// Add scenario-specific server flags to the end of the server_flags
absl::c_move(server_scenario_flags, std::back_inserter(server_flags));
Subprocess svr(server_flags);
+ gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());
+
+ // Wait one second before starting client to give the server a chance
+ // to start up.
+ gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
// start the client
+ gpr_log(GPR_INFO, "starting client");
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_client",
gpr_subprocess_binary_extension()),
"--target",
- grpc_core::JoinHostPort("localhost", port),
+ absl::GetFlag(FLAGS_use_xds)
+ ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
+ : grpc_core::LocalIpAndPort(port),
"--grpc_experiments",
std::string(grpc_core::ConfigVars::Get().Experiments()),
absl::StrCat("--warmup=", 10000),
@@ -103,6 +130,7 @@
// Add scenario-specific client flags to the end of the client_flags
absl::c_move(client_scenario_flags, std::back_inserter(client_flags));
Subprocess cli(client_flags);
+ gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
// wait for completion
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
@@ -114,32 +142,38 @@
}
// Per-channel benchmark
-int RunChannelBenchmark(char* root) {
+int RunChannelBenchmark(int port, char* root) {
// TODO(chennancy) Add the scenario specific flags
int status;
- int port = grpc_pick_unused_port_or_die();
// start the server
+ gpr_log(GPR_INFO, "starting server");
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_callback_server",
gpr_subprocess_binary_extension()),
- "--bind", grpc_core::JoinHostPort("::", port)};
+ "--bind", grpc_core::LocalIpAndPort(port)};
+ if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
Subprocess svr(server_flags);
+ gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());
// Wait one second before starting client to avoid possible race condition
// of client sending an RPC before the server is set up
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
// start the client
+ gpr_log(GPR_INFO, "starting client");
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_callback_client",
gpr_subprocess_binary_extension()),
"--target",
- grpc_core::JoinHostPort("localhost", port),
+ absl::GetFlag(FLAGS_use_xds)
+ ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
+ : grpc_core::LocalIpAndPort(port),
"--nosecure",
absl::StrCat("--server_pid=", svr.GetPID()),
absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
Subprocess cli(client_flags);
+ gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
// wait for completion
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
@@ -149,17 +183,78 @@
return svr.Join() == 0 ? 0 : 2;
}
+struct XdsServer {
+ std::shared_ptr<grpc::testing::AdsServiceImpl> ads_service;
+ std::unique_ptr<grpc::Server> server;
+};
+
+XdsServer StartXdsServerAndConfigureBootstrap(int server_port) {
+ XdsServer xds_server;
+ int xds_server_port = grpc_pick_unused_port_or_die();
+ gpr_log(GPR_INFO, "xDS server port: %d", xds_server_port);
+ // Generate xDS bootstrap and set the env var.
+ std::string bootstrap =
+ grpc::testing::XdsBootstrapBuilder()
+ .SetDefaultServer(absl::StrCat("localhost:", xds_server_port))
+ .SetXdsChannelCredentials("insecure")
+ .Build();
+ grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap);
+ gpr_log(GPR_INFO, "xDS bootstrap: %s", bootstrap.c_str());
+ // Create ADS service.
+ xds_server.ads_service = std::make_shared<grpc::testing::AdsServiceImpl>();
+ xds_server.ads_service->Start();
+ // Populate xDS resources.
+ XdsResourceUtils::SetListenerAndRouteConfiguration(
+ xds_server.ads_service.get(), XdsResourceUtils::DefaultListener(),
+ XdsResourceUtils::DefaultRouteConfig());
+ auto cluster = XdsResourceUtils::DefaultCluster();
+ cluster.mutable_circuit_breakers()
+ ->add_thresholds()
+ ->mutable_max_requests()
+ ->set_value(std::numeric_limits<uint32_t>::max());
+ xds_server.ads_service->SetCdsResource(cluster);
+ xds_server.ads_service->SetEdsResource(
+ XdsResourceUtils::BuildEdsResource(XdsResourceUtils::EdsResourceArgs(
+ {XdsResourceUtils::EdsResourceArgs::Locality(
+ "here",
+ {XdsResourceUtils::EdsResourceArgs::Endpoint(server_port)})})));
+ XdsResourceUtils::SetServerListenerNameAndRouteConfiguration(
+ xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(),
+ server_port, XdsResourceUtils::DefaultServerRouteConfig());
+ // Create and start server.
+ gpr_log(GPR_INFO, "starting xDS server...");
+ grpc::ServerBuilder builder;
+ builder.RegisterService(xds_server.ads_service.get());
+ builder.AddListeningPort(absl::StrCat("localhost:", xds_server_port),
+ grpc::InsecureServerCredentials());
+ xds_server.server = builder.BuildAndStart();
+ gpr_log(GPR_INFO, "xDS server started");
+ return xds_server;
+}
+
int RunBenchmark(char* root, absl::string_view benchmark,
std::vector<std::string> server_scenario_flags,
std::vector<std::string> client_scenario_flags) {
+ gpr_log(GPR_INFO, "running benchmark: %s", std::string(benchmark).c_str());
+ int server_port = grpc_pick_unused_port_or_die();
+ gpr_log(GPR_INFO, "server port: %d", server_port);
+ XdsServer xds_server;
+ if (absl::GetFlag(FLAGS_use_xds)) {
+ xds_server = StartXdsServerAndConfigureBootstrap(server_port);
+ }
+ int retval;
if (benchmark == "call") {
- return RunCallBenchmark(root, server_scenario_flags, client_scenario_flags);
+ retval = RunCallBenchmark(server_port, root, server_scenario_flags,
+ client_scenario_flags);
} else if (benchmark == "channel") {
- return RunChannelBenchmark(root);
+ retval = RunChannelBenchmark(server_port, root);
} else {
gpr_log(GPR_INFO, "Not a valid benchmark name");
- return 4;
+ retval = 4;
}
+ if (xds_server.server != nullptr) xds_server.server->Shutdown();
+ gpr_log(GPR_INFO, "done running benchmark");
+ return retval;
}
int main(int argc, char** argv) {
@@ -199,10 +294,12 @@
// Run all benchmarks listed (Multiple benchmarks usually only for default
// scenario)
auto benchmarks = absl::StrSplit(absl::GetFlag(FLAGS_benchmark_names), ',');
+ grpc_init();
for (const auto& benchmark : benchmarks) {
int r = RunBenchmark(root, benchmark, it_scenario->second.server,
it_scenario->second.client);
if (r != 0) return r;
}
+ grpc_shutdown();
return 0;
}
diff --git a/test/core/memory_usage/server.cc b/test/core/memory_usage/server.cc
index c4eaf77..7426e8b 100644
--- a/test/core/memory_usage/server.cc
+++ b/test/core/memory_usage/server.cc
@@ -21,6 +21,11 @@
#include <stdlib.h>
#include <time.h>
+#ifndef _WIN32
+// This is for _exit() below, which is temporary.
+#include <unistd.h>
+#endif
+
#include <algorithm>
#include <string>
#include <vector>
@@ -28,6 +33,7 @@
#include "absl/base/attributes.h"
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
+#include "absl/status/status.h"
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
@@ -35,23 +41,23 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/slice.h>
#include <grpc/status.h>
-
-#include "test/core/memory_usage/memstats.h"
-#ifndef _WIN32
-// This is for _exit() below, which is temporary.
-#include <unistd.h>
-#endif
-
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include "src/core/ext/xds/xds_enabled_server.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/host_port.h"
#include "test/core/end2end/data/ssl_test_data.h"
+#include "test/core/memory_usage/memstats.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+ABSL_FLAG(std::string, bind, "", "Bind host:port");
+ABSL_FLAG(bool, secure, false, "Use security");
+ABSL_FLAG(bool, minstack, false, "Use minimal stack");
+ABSL_FLAG(bool, use_xds, false, "Use xDS");
+
static grpc_completion_queue* cq;
static grpc_server* server;
static grpc_op metadata_ops[2];
@@ -154,9 +160,13 @@
// When that is resolved, please remove the #include <unistd.h> above.
static void sigint_handler(int /*x*/) { _exit(0); }
-ABSL_FLAG(std::string, bind, "", "Bind host:port");
-ABSL_FLAG(bool, secure, false, "Use security");
-ABSL_FLAG(bool, minstack, false, "Use minimal stack");
+static void OnServingStatusUpdate(void* /*user_data*/, const char* uri,
+ grpc_serving_status_update update) {
+ absl::Status status(static_cast<absl::StatusCode>(update.code),
+ update.error_message);
+ gpr_log(GPR_INFO, "xDS serving status notification: uri=\"%s\", status=%s",
+ uri, status.ToString().c_str());
+}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
@@ -188,7 +198,26 @@
args_vec.push_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MINIMAL_STACK), 1));
}
+ // TODO(roth): The xDS code here duplicates the functionality in
+ // XdsServerBuilder, which is undesirable. We should ideally convert
+ // this to use the C++ API instead of the C-core API, so that we can
+ // avoid this duplication.
+ if (absl::GetFlag(FLAGS_use_xds)) {
+ args_vec.push_back(grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_XDS_ENABLED_SERVER), 1));
+ }
+
grpc_channel_args args = {args_vec.size(), args_vec.data()};
+ server = grpc_server_create(&args, nullptr);
+
+ if (absl::GetFlag(FLAGS_use_xds)) {
+ grpc_server_config_fetcher* config_fetcher =
+ grpc_server_config_fetcher_xds_create({OnServingStatusUpdate, nullptr},
+ &args);
+ if (config_fetcher != nullptr) {
+ grpc_server_set_config_fetcher(server, config_fetcher);
+ }
+ }
MemStats before_server_create = MemStats::Snapshot();
if (absl::GetFlag(FLAGS_secure)) {
@@ -196,11 +225,9 @@
test_server1_cert};
grpc_server_credentials* ssl_creds = grpc_ssl_server_credentials_create(
nullptr, &pem_key_cert_pair, 1, 0, nullptr);
- server = grpc_server_create(&args, nullptr);
GPR_ASSERT(grpc_server_add_http2_port(server, addr.c_str(), ssl_creds));
grpc_server_credentials_release(ssl_creds);
} else {
- server = grpc_server_create(&args, nullptr);
GPR_ASSERT(grpc_server_add_http2_port(
server, addr.c_str(), grpc_insecure_server_credentials_create()));
}
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 06bdf74..08634b4 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -51,7 +51,6 @@
grpc_cc_library(
name = "counted_service",
- testonly = True,
hdrs = ["counted_service.h"],
deps = [
"//:grpc",
diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD
index 8cc655b..5db35f4 100644
--- a/test/cpp/end2end/xds/BUILD
+++ b/test/cpp/end2end/xds/BUILD
@@ -23,12 +23,8 @@
grpc_cc_library(
name = "xds_server",
- testonly = True,
srcs = ["xds_server.cc"],
hdrs = ["xds_server.h"],
- external_deps = [
- "gtest",
- ],
deps = [
"//:gpr",
"//:grpc",
@@ -47,7 +43,6 @@
grpc_cc_library(
name = "xds_utils",
- testonly = True,
srcs = ["xds_utils.cc"],
hdrs = ["xds_utils.h"],
deps = [
diff --git a/test/cpp/end2end/xds/xds_core_end2end_test.cc b/test/cpp/end2end/xds/xds_core_end2end_test.cc
index a6a9896..3efcc99 100644
--- a/test/cpp/end2end/xds/xds_core_end2end_test.cc
+++ b/test/cpp/end2end/xds/xds_core_end2end_test.cc
@@ -76,11 +76,11 @@
WaitForAllBackends(DEBUG_LOCATION, 0, 1);
// Stop balancer.
balancer_->Shutdown();
- // Tell balancer to require minimum version 1 for all resource types.
- balancer_->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1);
- balancer_->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1);
- balancer_->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1);
- balancer_->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1);
+ // Expect minimum version 1 for all resource types.
+ balancer_->ads_service()->SetCheckVersionCallback(
+ [&](absl::string_view resource_type, int version) {
+ EXPECT_GE(version, 1) << "resource_type: " << resource_type;
+ });
// Update backend, just so we can be sure that the client has
// reconnected to the balancer.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
@@ -530,15 +530,15 @@
CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(4000));
// Stop balancer.
balancer_->Shutdown();
- // Tell balancer to require minimum version 1 for all resource types
- // and to not reply to the requests.
- balancer_->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1);
+ // Expect minimum version 1 for all resource types.
+ balancer_->ads_service()->SetCheckVersionCallback(
+ [&](absl::string_view resource_type, int version) {
+ EXPECT_GE(version, 1) << "resource_type: " << resource_type;
+ });
+ // Tell balancer not to reply to the requests.
balancer_->ads_service()->IgnoreResourceType(kLdsTypeUrl);
- balancer_->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1);
balancer_->ads_service()->IgnoreResourceType(kRdsTypeUrl);
- balancer_->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1);
balancer_->ads_service()->IgnoreResourceType(kCdsTypeUrl);
- balancer_->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1);
balancer_->ads_service()->IgnoreResourceType(kEdsTypeUrl);
// Restart balancer.
balancer_->Start();
diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
index b2c893b..ca04697 100644
--- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc
+++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
@@ -52,6 +52,8 @@
namespace testing {
using ::envoy::config::core::v3::HealthStatus;
+using ::envoy::service::discovery::v3::DiscoveryRequest;
+using ::envoy::service::load_stats::v3::LoadStatsRequest;
using ::grpc::experimental::ExternalCertificateVerifier;
using ::grpc::experimental::IdentityKeyCertPair;
@@ -275,10 +277,30 @@
XdsEnd2endTest::BalancerServerThread::BalancerServerThread(
XdsEnd2endTest* test_obj)
: ServerThread(test_obj, /*use_xds_enabled_server=*/false),
- ads_service_(new AdsServiceImpl()),
- lrs_service_(
- new LrsServiceImpl((GetParam().enable_load_reporting() ? 20 : 0),
- {kDefaultClusterName})) {}
+ ads_service_(new AdsServiceImpl(
+ // First request must have node set with the right client features.
+ [&](const DiscoveryRequest& request) {
+ EXPECT_TRUE(request.has_node());
+ EXPECT_THAT(request.node().client_features(),
+ ::testing::UnorderedElementsAre(
+ "envoy.lb.does_not_support_overprovisioning",
+ "xds.config.resource-in-sotw"));
+ },
+ // NACKs must use the right status code.
+ [&](absl::StatusCode code) {
+ EXPECT_EQ(code, absl::StatusCode::kInvalidArgument);
+ })),
+ lrs_service_(new LrsServiceImpl(
+ (GetParam().enable_load_reporting() ? 20 : 0), {kDefaultClusterName},
+ // Fail if load reporting is used when not enabled.
+ [&]() { EXPECT_TRUE(GetParam().enable_load_reporting()); },
+ // Make sure we send the client feature saying that we support
+ // send_all_clusters.
+ [&](const LoadStatsRequest& request) {
+ EXPECT_THAT(
+ request.node().client_features(),
+ ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
+ })) {}
void XdsEnd2endTest::BalancerServerThread::RegisterAllServices(
ServerBuilder* builder) {
diff --git a/test/cpp/end2end/xds/xds_server.cc b/test/cpp/end2end/xds/xds_server.cc
index 65463a1..b4a657c 100644
--- a/test/cpp/end2end/xds/xds_server.cc
+++ b/test/cpp/end2end/xds/xds_server.cc
@@ -22,9 +22,6 @@
#include <thread>
#include <vector>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
#include "absl/types/optional.h"
#include <grpc/support/log.h>
diff --git a/test/cpp/end2end/xds/xds_server.h b/test/cpp/end2end/xds/xds_server.h
index 29fd1c1..b4fa271 100644
--- a/test/cpp/end2end/xds/xds_server.h
+++ b/test/cpp/end2end/xds/xds_server.h
@@ -23,9 +23,6 @@
#include <thread>
#include <vector>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
#include "absl/types/optional.h"
#include <grpc/support/log.h>
@@ -62,6 +59,9 @@
::envoy::service::discovery::v3::AggregatedDiscoveryService::Service>,
public std::enable_shared_from_this<AdsServiceImpl> {
public:
+ using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest;
+ using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse;
+
// State for a given xDS resource type.
struct ResponseState {
enum State {
@@ -72,7 +72,12 @@
std::string error_message;
};
- AdsServiceImpl() {}
+ explicit AdsServiceImpl(
+ std::function<void(const DiscoveryRequest& request)> check_first_request =
+ nullptr,
+ std::function<void(absl::StatusCode)> check_nack_status_code = nullptr)
+ : check_first_request_(std::move(check_first_request)),
+ check_nack_status_code_(std::move(check_nack_status_code)) {}
void set_wrap_resources(bool wrap_resources) {
grpc_core::MutexLock lock(&ads_mu_);
@@ -119,12 +124,12 @@
resource_types_to_ignore_.emplace(type_url);
}
- // Sets the minimum version that the server will accept for a given
- // resource type. Will cause a gmock expectation failure if we see a
- // lower version.
- void SetResourceMinVersion(const std::string& type_url, int version) {
+ // Sets a callback to be invoked on request messages with respoonse_nonce
+ // set. The callback is passed the resource type and version.
+ void SetCheckVersionCallback(
+ std::function<void(absl::string_view, int)> check_version_callack) {
grpc_core::MutexLock lock(&ads_mu_);
- resource_type_min_versions_[type_url] = version;
+ check_version_callack_ = std::move(check_version_callack);
}
// Get the list of response state for each resource type.
@@ -214,8 +219,6 @@
using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>;
- using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest;
- using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse;
using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
Status StreamAggregatedResources(ServerContext* context,
@@ -355,9 +358,10 @@
GPR_ASSERT(absl::SimpleAtoi(request.version_info(),
&client_resource_type_version));
}
- EXPECT_GE(client_resource_type_version,
- resource_type_min_versions_[request.type_url()])
- << "resource_type: " << request.type_url();
+ if (check_version_callack_ != nullptr) {
+ check_version_callack_(request.type_url(),
+ client_resource_type_version);
+ }
} else {
int client_nonce;
GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
@@ -370,7 +374,10 @@
request.version_info().c_str());
} else {
response_state.state = ResponseState::NACKED;
- EXPECT_EQ(request.error_detail().code(), GRPC_STATUS_INVALID_ARGUMENT);
+ if (check_nack_status_code_ != nullptr) {
+ check_nack_status_code_(
+ static_cast<absl::StatusCode>(request.error_detail().code()));
+ }
response_state.error_message = request.error_detail().message();
gpr_log(GPR_INFO,
"ADS[%p]: client NACKed resource_type=%s version=%s: %s", this,
@@ -476,11 +483,9 @@
bool seen_first_request = false;
while (stream->Read(&request)) {
if (!seen_first_request) {
- EXPECT_TRUE(request.has_node());
- EXPECT_THAT(request.node().client_features(),
- ::testing::UnorderedElementsAre(
- "envoy.lb.does_not_support_overprovisioning",
- "xds.config.resource-in-sotw"));
+ if (check_first_request_ != nullptr) {
+ check_first_request_(request);
+ }
seen_first_request = true;
}
{
@@ -557,6 +562,9 @@
clients_.erase(client);
}
+ std::function<void(const DiscoveryRequest& request)> check_first_request_;
+ std::function<void(absl::StatusCode)> check_nack_status_code_;
+
grpc_core::CondVar ads_cond_;
grpc_core::Mutex ads_mu_;
bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false;
@@ -564,7 +572,7 @@
resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_);
std::set<std::string /*resource_type*/> resource_types_to_ignore_
ABSL_GUARDED_BY(ads_mu_);
- std::map<std::string /*resource_type*/, int> resource_type_min_versions_
+ std::function<void(absl::string_view, int)> check_version_callack_
ABSL_GUARDED_BY(ads_mu_);
// An instance data member containing the current state of all resources.
// Note that an entry will exist whenever either of the following is true:
@@ -585,6 +593,9 @@
::envoy::service::load_stats::v3::LoadReportingService::Service>,
public std::enable_shared_from_this<LrsServiceImpl> {
public:
+ using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest;
+ using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse;
+
// Stats reported by client.
class ClientStats {
public:
@@ -686,10 +697,15 @@
};
LrsServiceImpl(int client_load_reporting_interval_seconds,
- std::set<std::string> cluster_names)
+ std::set<std::string> cluster_names,
+ std::function<void()> stream_started_callback = nullptr,
+ std::function<void(const LoadStatsRequest& request)>
+ check_first_request = nullptr)
: client_load_reporting_interval_seconds_(
client_load_reporting_interval_seconds),
- cluster_names_(std::move(cluster_names)) {}
+ cluster_names_(std::move(cluster_names)),
+ stream_started_callback_(std::move(stream_started_callback)),
+ check_first_request_(std::move(check_first_request)) {}
// Must be called before the LRS call is started.
void set_send_all_clusters(bool send_all_clusters) {
@@ -710,13 +726,11 @@
absl::Duration timeout = absl::InfiniteDuration());
private:
- using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest;
- using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse;
using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>;
Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override {
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
- EXPECT_GT(client_load_reporting_interval_seconds_, 0);
+ if (stream_started_callback_ != nullptr) stream_started_callback_();
// Take a reference of the LrsServiceImpl object, reference will go
// out of scope after this method exits.
std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this();
@@ -724,9 +738,7 @@
LoadStatsRequest request;
if (stream->Read(&request)) {
IncreaseRequestCount();
- // Verify client features.
- EXPECT_THAT(request.node().client_features(),
- ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
+ if (check_first_request_ != nullptr) check_first_request_(request);
// Send initial response.
LoadStatsResponse response;
if (send_all_clusters_) {
@@ -769,6 +781,8 @@
const int client_load_reporting_interval_seconds_;
bool send_all_clusters_ = false;
std::set<std::string> cluster_names_;
+ std::function<void()> stream_started_callback_;
+ std::function<void(const LoadStatsRequest& request)> check_first_request_;
grpc_core::CondVar lrs_cv_;
grpc_core::Mutex lrs_mu_;
diff --git a/test/cpp/end2end/xds/xds_utils.cc b/test/cpp/end2end/xds/xds_utils.cc
index 8ad0cea..07f37ee 100644
--- a/test/cpp/end2end/xds/xds_utils.cc
+++ b/test/cpp/end2end/xds/xds_utils.cc
@@ -23,9 +23,6 @@
#include <thread>
#include <vector>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
@@ -89,7 +86,7 @@
" \"server_uri\": \"<SERVER_URI>\",\n"
" \"channel_creds\": [\n"
" {\n"
- " \"type\": \"fake\"\n"
+ " \"type\": \"<SERVER_CREDS_TYPE>\"\n"
" }\n"
" ],\n"
" \"server_features\": [<SERVER_FEATURES>]\n"
@@ -102,6 +99,7 @@
return absl::StrReplaceAll(
kXdsServerTemplate,
{{"<SERVER_URI>", server_uri},
+ {"<SERVER_CREDS_TYPE>", xds_channel_creds_type_},
{"<SERVER_FEATURES>", absl::StrJoin(server_features, ", ")}});
}
diff --git a/test/cpp/end2end/xds/xds_utils.h b/test/cpp/end2end/xds/xds_utils.h
index 5797d9c..6acfbc6 100644
--- a/test/cpp/end2end/xds/xds_utils.h
+++ b/test/cpp/end2end/xds/xds_utils.h
@@ -46,6 +46,10 @@
if (!ignore_if_set || top_server_.empty()) top_server_ = server;
return *this;
}
+ XdsBootstrapBuilder& SetXdsChannelCredentials(const std::string& type) {
+ xds_channel_creds_type_ = type;
+ return *this;
+ }
XdsBootstrapBuilder& SetClientDefaultListenerResourceNameTemplate(
const std::string& client_default_listener_resource_name_template) {
client_default_listener_resource_name_template_ =
@@ -90,6 +94,7 @@
bool ignore_resource_deletion_ = false;
std::string top_server_;
+ std::string xds_channel_creds_type_ = "fake";
std::string client_default_listener_resource_name_template_;
std::map<std::string /*key*/, PluginInfo> plugins_;
std::map<std::string /*authority_name*/, AuthorityInfo> authorities_;
diff --git a/tools/profiling/memory/memory_diff.py b/tools/profiling/memory/memory_diff.py
index eb2af95..f235b42 100755
--- a/tools/profiling/memory/memory_diff.py
+++ b/tools/profiling/memory/memory_diff.py
@@ -63,6 +63,22 @@
rb"server channel memory usage: ([0-9\.]+) bytes per channel",
float,
),
+ "call/xds_client": (
+ rb"xds client call memory usage: ([0-9\.]+) bytes per call",
+ float,
+ ),
+ "call/xds_server": (
+ rb"xds server call memory usage: ([0-9\.]+) bytes per call",
+ float,
+ ),
+ "channel/xds_client": (
+ rb"xds client channel memory usage: ([0-9\.]+) bytes per channel",
+ float,
+ ),
+ "channel/xds_server": (
+ rb"xds server channel memory usage: ([0-9\.]+) bytes per channel",
+ float,
+ ),
}
_SCENARIOS = {
@@ -89,26 +105,28 @@
)
ret = {}
for name, benchmark_args in _BENCHMARKS.items():
- for scenario, extra_args in _SCENARIOS.items():
- # TODO(chenancy) Remove when minstack is implemented for channel
- if name == "channel" and scenario == "minstack":
- continue
- try:
- output = subprocess.check_output(
- [
- "bazel-bin/test/core/memory_usage/memory_usage_test",
- ]
+ for use_xds in (False, True):
+ for scenario, extra_args in _SCENARIOS.items():
+ # TODO(chenancy) Remove when minstack is implemented for channel
+ if name == "channel" and scenario == "minstack":
+ continue
+ argv = (
+ ["bazel-bin/test/core/memory_usage/memory_usage_test"]
+ benchmark_args
+ extra_args
)
- except subprocess.CalledProcessError as e:
- print("Error running benchmark:", e)
- continue
- for line in output.splitlines():
- for key, (pattern, conversion) in _INTERESTING.items():
- m = re.match(pattern, line)
- if m:
- ret[scenario + ": " + key] = conversion(m.group(1))
+ if use_xds:
+ argv.append("--use_xds")
+ try:
+ output = subprocess.check_output(argv)
+ except subprocess.CalledProcessError as e:
+ print("Error running benchmark:", e)
+ continue
+ for line in output.splitlines():
+ for key, (pattern, conversion) in _INTERESTING.items():
+ m = re.match(pattern, line)
+ if m:
+ ret[scenario + ": " + key] = conversion(m.group(1))
return ret