[memory test] reland: add benchmarks with xDS enabled (#34820)

Relands #34785, which was reverted in #34818.

The first commit is the revert. The second commit removes the gtest
dependency from the xds_server library, which should address the
testonly problem internally.
diff --git a/test/core/memory_usage/BUILD b/test/core/memory_usage/BUILD
index cfc2880..a0d600a 100644
--- a/test/core/memory_usage/BUILD
+++ b/test/core/memory_usage/BUILD
@@ -77,6 +77,7 @@
         "//:gpr",
         "//:grpc",
         "//src/core:channel_args",
+        "//src/core:xds_enabled_server",
         "//test/core/end2end:ssl_test_data",
         "//test/core/util:grpc_test_util",
         "//test/core/util:grpc_test_util_base",
@@ -162,6 +163,7 @@
         "//:subprocess",
         "//test/core/util:grpc_test_util",
         "//test/core/util:grpc_test_util_base",
+        "//test/cpp/end2end/xds:xds_utils",
     ],
 )
 
@@ -182,5 +184,6 @@
         "//:subprocess",
         "//test/core/util:grpc_test_util",
         "//test/core/util:grpc_test_util_base",
+        "//test/cpp/end2end/xds:xds_utils",
     ],
 )
diff --git a/test/core/memory_usage/callback_client.cc b/test/core/memory_usage/callback_client.cc
index 6923fca..5d120c7 100644
--- a/test/core/memory_usage/callback_client.cc
+++ b/test/core/memory_usage/callback_client.cc
@@ -27,6 +27,7 @@
 
 #include "absl/flags/flag.h"
 #include "absl/flags/parse.h"
+#include "absl/strings/match.h"
 
 #include <grpc/impl/channel_arg_names.h>
 #include <grpc/support/log.h>
@@ -160,12 +161,14 @@
                                              std::chrono::milliseconds(1)));
   }
 
+  const char* prefix = "";
+  if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
   printf("---------Client channel stats--------\n");
-  printf("client channel memory usage: %f bytes per channel\n",
+  printf("%sclient channel memory usage: %f bytes per channel\n", prefix,
          static_cast<double>(peak_client_memory - before_client_memory) / size *
              1024);
   printf("---------Server channel stats--------\n");
-  printf("server channel memory usage: %f bytes per channel\n",
+  printf("%sserver channel memory usage: %f bytes per channel\n", prefix,
          static_cast<double>(peak_server_memory - before_server_memory) / size *
              1024);
   gpr_log(GPR_INFO, "Client Done");
diff --git a/test/core/memory_usage/callback_server.cc b/test/core/memory_usage/callback_server.cc
index 2b7a3fc..7737592 100644
--- a/test/core/memory_usage/callback_server.cc
+++ b/test/core/memory_usage/callback_server.cc
@@ -30,12 +30,17 @@
 #include <grpcpp/security/server_credentials.h>
 #include <grpcpp/support/server_callback.h>
 #include <grpcpp/support/status.h>
+#include <grpcpp/xds_server_builder.h>
 
 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
 #include "src/proto/grpc/testing/messages.pb.h"
 #include "test/core/memory_usage/memstats.h"
 #include "test/core/util/test_config.h"
 
+ABSL_FLAG(std::string, bind, "", "Bind host:port");
+ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
+ABSL_FLAG(bool, use_xds, false, "Use xDS");
+
 class ServerCallbackImpl final
     : public grpc::testing::BenchmarkService::CallbackService {
  public:
@@ -69,9 +74,6 @@
 // TODO(chennancy) Add graceful shutdown
 static void sigint_handler(int /*x*/) { _exit(0); }
 
-ABSL_FLAG(std::string, bind, "", "Bind host:port");
-ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
-
 int main(int argc, char** argv) {
   absl::ParseCommandLine(argc, argv);
   char* fake_argv[1];
@@ -90,7 +92,11 @@
   // Get initial process memory usage before creating server
   long before_server_create = GetMemUsage();
   ServerCallbackImpl callback_server(before_server_create);
-  grpc::ServerBuilder builder;
+
+  grpc::XdsServerBuilder xds_builder;
+  grpc::ServerBuilder normal_builder;
+  grpc::ServerBuilder* builder =
+      absl::GetFlag(FLAGS_use_xds) ? &xds_builder : &normal_builder;
 
   // Set the authentication mechanism.
   std::shared_ptr<grpc::ServerCredentials> creds =
@@ -99,11 +105,11 @@
     gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
     // TODO (chennancy) Add in secure credentials
   }
-  builder.AddListeningPort(server_address, creds);
-  builder.RegisterService(&callback_server);
+  builder->AddListeningPort(server_address, creds);
+  builder->RegisterService(&callback_server);
 
   // Set up the server to start accepting requests.
-  std::shared_ptr<grpc::Server> server(builder.BuildAndStart());
+  std::shared_ptr<grpc::Server> server(builder->BuildAndStart());
   gpr_log(GPR_INFO, "Server listening on %s", server_address.c_str());
 
   // Keep the program running until the server shuts down.
diff --git a/test/core/memory_usage/client.cc b/test/core/memory_usage/client.cc
index ae76e13..3b20c71 100644
--- a/test/core/memory_usage/client.cc
+++ b/test/core/memory_usage/client.cc
@@ -27,6 +27,7 @@
 
 #include "absl/flags/flag.h"
 #include "absl/flags/parse.h"
+#include "absl/strings/match.h"
 
 #include <grpc/byte_buffer.h>
 #include <grpc/byte_buffer_reader.h>
@@ -41,6 +42,7 @@
 
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/slice/slice_internal.h"
 #include "test/core/memory_usage/memstats.h"
 #include "test/core/util/test_config.h"
 
@@ -158,6 +160,11 @@
                                                    (void*)nullptr, nullptr));
   grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
 
+  gpr_log(GPR_INFO, "Call %d status %d (%s)", call_idx, calls[call_idx].status,
+          std::string(grpc_core::StringViewFromSlice(calls[call_idx].details))
+              .c_str());
+
+  GPR_ASSERT(response_payload_recv != nullptr);
   grpc_byte_buffer_reader reader;
   grpc_byte_buffer_reader_init(&reader, response_payload_recv);
   grpc_slice response = grpc_byte_buffer_reader_readall(&reader);
@@ -282,14 +289,16 @@
   grpc_completion_queue_destroy(cq);
   grpc_shutdown_blocking();
 
+  const char* prefix = "";
+  if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
   printf("---------client stats--------\n");
-  printf("client call memory usage: %f bytes per call\n",
+  printf("%sclient call memory usage: %f bytes per call\n", prefix,
          static_cast<double>(client_calls_inflight.rss -
                              client_benchmark_calls_start.rss) /
              benchmark_iterations * 1024);
 
   printf("---------server stats--------\n");
-  printf("server call memory usage: %f bytes per call\n",
+  printf("%sserver call memory usage: %f bytes per call\n", prefix,
          static_cast<double>(server_calls_inflight.rss -
                              server_benchmark_calls_start.rss) /
              benchmark_iterations * 1024);
diff --git a/test/core/memory_usage/memory_usage_test.cc b/test/core/memory_usage/memory_usage_test.cc
index 0c2c9ee..7b0ab54 100644
--- a/test/core/memory_usage/memory_usage_test.cc
+++ b/test/core/memory_usage/memory_usage_test.cc
@@ -16,12 +16,15 @@
 //
 //
 
+#include <stdint.h>
 #include <stdio.h>
 #include <string.h>
 
 #include <algorithm>
 #include <iterator>
+#include <limits>
 #include <map>
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
@@ -32,15 +35,26 @@
 #include "absl/strings/str_cat.h"
 #include "absl/strings/str_split.h"
 #include "absl/strings/string_view.h"
+#include "google/protobuf/wrappers.pb.h"
 
+#include <grpc/grpc.h>
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
+#include <grpcpp/security/server_credentials.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
 
 #include "src/core/lib/config/config_vars.h"
 #include "src/core/lib/gpr/subprocess.h"
-#include "src/core/lib/gprpp/host_port.h"
+#include "src/core/lib/gprpp/env.h"
+#include "src/proto/grpc/testing/xds/v3/cluster.pb.h"
 #include "test/core/util/port.h"
+#include "test/core/util/resolve_localhost_ip46.h"
 #include "test/core/util/test_config.h"
+#include "test/cpp/end2end/xds/xds_server.h"
+#include "test/cpp/end2end/xds/xds_utils.h"
+
+using grpc::testing::XdsResourceUtils;
 
 ABSL_FLAG(std::string, benchmark_names, "call,channel",
           "Which benchmark to run");  // Default all benchmarks in order to
@@ -51,6 +65,9 @@
           "secure (Use SSL credentials on server)");
 ABSL_FLAG(bool, memory_profiling, false,
           "Run memory profiling");  // TODO (chennancy) Connect this flag
+ABSL_FLAG(bool, use_xds, false, "Use xDS");
+
+// TODO(roth, ctiller): Add support for multiple addresses per channel.
 
 class Subprocess {
  public:
@@ -74,28 +91,38 @@
 };
 
 // per-call memory usage benchmark
-int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags,
+int RunCallBenchmark(int port, char* root,
+                     std::vector<std::string> server_scenario_flags,
                      std::vector<std::string> client_scenario_flags) {
   int status;
-  int port = grpc_pick_unused_port_or_die();
 
   // start the server
+  gpr_log(GPR_INFO, "starting server");
   std::vector<std::string> server_flags = {
       absl::StrCat(root, "/memory_usage_server",
                    gpr_subprocess_binary_extension()),
       "--grpc_experiments",
       std::string(grpc_core::ConfigVars::Get().Experiments()), "--bind",
-      grpc_core::JoinHostPort("::", port)};
+      grpc_core::LocalIpAndPort(port)};
+  if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
   // Add scenario-specific server flags to the end of the server_flags
   absl::c_move(server_scenario_flags, std::back_inserter(server_flags));
   Subprocess svr(server_flags);
+  gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());
+
+  // Wait one second before starting client to give the server a chance
+  // to start up.
+  gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
 
   // start the client
+  gpr_log(GPR_INFO, "starting client");
   std::vector<std::string> client_flags = {
       absl::StrCat(root, "/memory_usage_client",
                    gpr_subprocess_binary_extension()),
       "--target",
-      grpc_core::JoinHostPort("localhost", port),
+      absl::GetFlag(FLAGS_use_xds)
+          ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
+          : grpc_core::LocalIpAndPort(port),
       "--grpc_experiments",
       std::string(grpc_core::ConfigVars::Get().Experiments()),
       absl::StrCat("--warmup=", 10000),
@@ -103,6 +130,7 @@
   // Add scenario-specific client flags to the end of the client_flags
   absl::c_move(client_scenario_flags, std::back_inserter(client_flags));
   Subprocess cli(client_flags);
+  gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
   // wait for completion
   if ((status = cli.Join()) != 0) {
     printf("client failed with: %d", status);
@@ -114,32 +142,38 @@
 }
 
 // Per-channel benchmark
-int RunChannelBenchmark(char* root) {
+int RunChannelBenchmark(int port, char* root) {
   // TODO(chennancy) Add the scenario specific flags
   int status;
-  int port = grpc_pick_unused_port_or_die();
 
   // start the server
+  gpr_log(GPR_INFO, "starting server");
   std::vector<std::string> server_flags = {
       absl::StrCat(root, "/memory_usage_callback_server",
                    gpr_subprocess_binary_extension()),
-      "--bind", grpc_core::JoinHostPort("::", port)};
+      "--bind", grpc_core::LocalIpAndPort(port)};
+  if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
   Subprocess svr(server_flags);
+  gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());
 
   // Wait one second before starting client to avoid possible race condition
   // of client sending an RPC before the server is set up
   gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
 
   // start the client
+  gpr_log(GPR_INFO, "starting client");
   std::vector<std::string> client_flags = {
       absl::StrCat(root, "/memory_usage_callback_client",
                    gpr_subprocess_binary_extension()),
       "--target",
-      grpc_core::JoinHostPort("localhost", port),
+      absl::GetFlag(FLAGS_use_xds)
+          ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
+          : grpc_core::LocalIpAndPort(port),
       "--nosecure",
       absl::StrCat("--server_pid=", svr.GetPID()),
       absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
   Subprocess cli(client_flags);
+  gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
   // wait for completion
   if ((status = cli.Join()) != 0) {
     printf("client failed with: %d", status);
@@ -149,17 +183,78 @@
   return svr.Join() == 0 ? 0 : 2;
 }
 
+struct XdsServer {
+  std::shared_ptr<grpc::testing::AdsServiceImpl> ads_service;
+  std::unique_ptr<grpc::Server> server;
+};
+
+XdsServer StartXdsServerAndConfigureBootstrap(int server_port) {
+  XdsServer xds_server;
+  int xds_server_port = grpc_pick_unused_port_or_die();
+  gpr_log(GPR_INFO, "xDS server port: %d", xds_server_port);
+  // Generate xDS bootstrap and set the env var.
+  std::string bootstrap =
+      grpc::testing::XdsBootstrapBuilder()
+          .SetDefaultServer(absl::StrCat("localhost:", xds_server_port))
+          .SetXdsChannelCredentials("insecure")
+          .Build();
+  grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap);
+  gpr_log(GPR_INFO, "xDS bootstrap: %s", bootstrap.c_str());
+  // Create ADS service.
+  xds_server.ads_service = std::make_shared<grpc::testing::AdsServiceImpl>();
+  xds_server.ads_service->Start();
+  // Populate xDS resources.
+  XdsResourceUtils::SetListenerAndRouteConfiguration(
+      xds_server.ads_service.get(), XdsResourceUtils::DefaultListener(),
+      XdsResourceUtils::DefaultRouteConfig());
+  auto cluster = XdsResourceUtils::DefaultCluster();
+  cluster.mutable_circuit_breakers()
+      ->add_thresholds()
+      ->mutable_max_requests()
+      ->set_value(std::numeric_limits<uint32_t>::max());
+  xds_server.ads_service->SetCdsResource(cluster);
+  xds_server.ads_service->SetEdsResource(
+      XdsResourceUtils::BuildEdsResource(XdsResourceUtils::EdsResourceArgs(
+          {XdsResourceUtils::EdsResourceArgs::Locality(
+              "here",
+              {XdsResourceUtils::EdsResourceArgs::Endpoint(server_port)})})));
+  XdsResourceUtils::SetServerListenerNameAndRouteConfiguration(
+      xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(),
+      server_port, XdsResourceUtils::DefaultServerRouteConfig());
+  // Create and start server.
+  gpr_log(GPR_INFO, "starting xDS server...");
+  grpc::ServerBuilder builder;
+  builder.RegisterService(xds_server.ads_service.get());
+  builder.AddListeningPort(absl::StrCat("localhost:", xds_server_port),
+                           grpc::InsecureServerCredentials());
+  xds_server.server = builder.BuildAndStart();
+  gpr_log(GPR_INFO, "xDS server started");
+  return xds_server;
+}
+
 int RunBenchmark(char* root, absl::string_view benchmark,
                  std::vector<std::string> server_scenario_flags,
                  std::vector<std::string> client_scenario_flags) {
+  gpr_log(GPR_INFO, "running benchmark: %s", std::string(benchmark).c_str());
+  int server_port = grpc_pick_unused_port_or_die();
+  gpr_log(GPR_INFO, "server port: %d", server_port);
+  XdsServer xds_server;
+  if (absl::GetFlag(FLAGS_use_xds)) {
+    xds_server = StartXdsServerAndConfigureBootstrap(server_port);
+  }
+  int retval;
   if (benchmark == "call") {
-    return RunCallBenchmark(root, server_scenario_flags, client_scenario_flags);
+    retval = RunCallBenchmark(server_port, root, server_scenario_flags,
+                              client_scenario_flags);
   } else if (benchmark == "channel") {
-    return RunChannelBenchmark(root);
+    retval = RunChannelBenchmark(server_port, root);
   } else {
     gpr_log(GPR_INFO, "Not a valid benchmark name");
-    return 4;
+    retval = 4;
   }
+  if (xds_server.server != nullptr) xds_server.server->Shutdown();
+  gpr_log(GPR_INFO, "done running benchmark");
+  return retval;
 }
 
 int main(int argc, char** argv) {
@@ -199,10 +294,12 @@
   // Run all benchmarks listed (Multiple benchmarks usually only for default
   // scenario)
   auto benchmarks = absl::StrSplit(absl::GetFlag(FLAGS_benchmark_names), ',');
+  grpc_init();
   for (const auto& benchmark : benchmarks) {
     int r = RunBenchmark(root, benchmark, it_scenario->second.server,
                          it_scenario->second.client);
     if (r != 0) return r;
   }
+  grpc_shutdown();
   return 0;
 }
diff --git a/test/core/memory_usage/server.cc b/test/core/memory_usage/server.cc
index c4eaf77..7426e8b 100644
--- a/test/core/memory_usage/server.cc
+++ b/test/core/memory_usage/server.cc
@@ -21,6 +21,11 @@
 #include <stdlib.h>
 #include <time.h>
 
+#ifndef _WIN32
+// This is for _exit() below, which is temporary.
+#include <unistd.h>
+#endif
+
 #include <algorithm>
 #include <string>
 #include <vector>
@@ -28,6 +33,7 @@
 #include "absl/base/attributes.h"
 #include "absl/flags/flag.h"
 #include "absl/flags/parse.h"
+#include "absl/status/status.h"
 
 #include <grpc/byte_buffer.h>
 #include <grpc/grpc.h>
@@ -35,23 +41,23 @@
 #include <grpc/impl/channel_arg_names.h>
 #include <grpc/slice.h>
 #include <grpc/status.h>
-
-#include "test/core/memory_usage/memstats.h"
-#ifndef _WIN32
-// This is for _exit() below, which is temporary.
-#include <unistd.h>
-#endif
-
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
 
+#include "src/core/ext/xds/xds_enabled_server.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/gprpp/host_port.h"
 #include "test/core/end2end/data/ssl_test_data.h"
+#include "test/core/memory_usage/memstats.h"
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
 
+ABSL_FLAG(std::string, bind, "", "Bind host:port");
+ABSL_FLAG(bool, secure, false, "Use security");
+ABSL_FLAG(bool, minstack, false, "Use minimal stack");
+ABSL_FLAG(bool, use_xds, false, "Use xDS");
+
 static grpc_completion_queue* cq;
 static grpc_server* server;
 static grpc_op metadata_ops[2];
@@ -154,9 +160,13 @@
 // When that is resolved, please remove the #include <unistd.h> above.
 static void sigint_handler(int /*x*/) { _exit(0); }
 
-ABSL_FLAG(std::string, bind, "", "Bind host:port");
-ABSL_FLAG(bool, secure, false, "Use security");
-ABSL_FLAG(bool, minstack, false, "Use minimal stack");
+static void OnServingStatusUpdate(void* /*user_data*/, const char* uri,
+                                  grpc_serving_status_update update) {
+  absl::Status status(static_cast<absl::StatusCode>(update.code),
+                      update.error_message);
+  gpr_log(GPR_INFO, "xDS serving status notification: uri=\"%s\", status=%s",
+          uri, status.ToString().c_str());
+}
 
 int main(int argc, char** argv) {
   absl::ParseCommandLine(argc, argv);
@@ -188,7 +198,26 @@
     args_vec.push_back(grpc_channel_arg_integer_create(
         const_cast<char*>(GRPC_ARG_MINIMAL_STACK), 1));
   }
+  // TODO(roth): The xDS code here duplicates the functionality in
+  // XdsServerBuilder, which is undesirable.  We should ideally convert
+  // this to use the C++ API instead of the C-core API, so that we can
+  // avoid this duplication.
+  if (absl::GetFlag(FLAGS_use_xds)) {
+    args_vec.push_back(grpc_channel_arg_integer_create(
+        const_cast<char*>(GRPC_ARG_XDS_ENABLED_SERVER), 1));
+  }
+
   grpc_channel_args args = {args_vec.size(), args_vec.data()};
+  server = grpc_server_create(&args, nullptr);
+
+  if (absl::GetFlag(FLAGS_use_xds)) {
+    grpc_server_config_fetcher* config_fetcher =
+        grpc_server_config_fetcher_xds_create({OnServingStatusUpdate, nullptr},
+                                              &args);
+    if (config_fetcher != nullptr) {
+      grpc_server_set_config_fetcher(server, config_fetcher);
+    }
+  }
 
   MemStats before_server_create = MemStats::Snapshot();
   if (absl::GetFlag(FLAGS_secure)) {
@@ -196,11 +225,9 @@
                                                     test_server1_cert};
     grpc_server_credentials* ssl_creds = grpc_ssl_server_credentials_create(
         nullptr, &pem_key_cert_pair, 1, 0, nullptr);
-    server = grpc_server_create(&args, nullptr);
     GPR_ASSERT(grpc_server_add_http2_port(server, addr.c_str(), ssl_creds));
     grpc_server_credentials_release(ssl_creds);
   } else {
-    server = grpc_server_create(&args, nullptr);
     GPR_ASSERT(grpc_server_add_http2_port(
         server, addr.c_str(), grpc_insecure_server_credentials_create()));
   }
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 06bdf74..08634b4 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -51,7 +51,6 @@
 
 grpc_cc_library(
     name = "counted_service",
-    testonly = True,
     hdrs = ["counted_service.h"],
     deps = [
         "//:grpc",
diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD
index 8cc655b..5db35f4 100644
--- a/test/cpp/end2end/xds/BUILD
+++ b/test/cpp/end2end/xds/BUILD
@@ -23,12 +23,8 @@
 
 grpc_cc_library(
     name = "xds_server",
-    testonly = True,
     srcs = ["xds_server.cc"],
     hdrs = ["xds_server.h"],
-    external_deps = [
-        "gtest",
-    ],
     deps = [
         "//:gpr",
         "//:grpc",
@@ -47,7 +43,6 @@
 
 grpc_cc_library(
     name = "xds_utils",
-    testonly = True,
     srcs = ["xds_utils.cc"],
     hdrs = ["xds_utils.h"],
     deps = [
diff --git a/test/cpp/end2end/xds/xds_core_end2end_test.cc b/test/cpp/end2end/xds/xds_core_end2end_test.cc
index a6a9896..3efcc99 100644
--- a/test/cpp/end2end/xds/xds_core_end2end_test.cc
+++ b/test/cpp/end2end/xds/xds_core_end2end_test.cc
@@ -76,11 +76,11 @@
   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
   // Stop balancer.
   balancer_->Shutdown();
-  // Tell balancer to require minimum version 1 for all resource types.
-  balancer_->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1);
-  balancer_->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1);
-  balancer_->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1);
-  balancer_->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1);
+  // Expect minimum version 1 for all resource types.
+  balancer_->ads_service()->SetCheckVersionCallback(
+      [&](absl::string_view resource_type, int version) {
+        EXPECT_GE(version, 1) << "resource_type: " << resource_type;
+      });
   // Update backend, just so we can be sure that the client has
   // reconnected to the balancer.
   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
@@ -530,15 +530,15 @@
   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(4000));
   // Stop balancer.
   balancer_->Shutdown();
-  // Tell balancer to require minimum version 1 for all resource types
-  // and to not reply to the requests.
-  balancer_->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1);
+  // Expect minimum version 1 for all resource types.
+  balancer_->ads_service()->SetCheckVersionCallback(
+      [&](absl::string_view resource_type, int version) {
+        EXPECT_GE(version, 1) << "resource_type: " << resource_type;
+      });
+  // Tell balancer not to reply to the requests.
   balancer_->ads_service()->IgnoreResourceType(kLdsTypeUrl);
-  balancer_->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1);
   balancer_->ads_service()->IgnoreResourceType(kRdsTypeUrl);
-  balancer_->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1);
   balancer_->ads_service()->IgnoreResourceType(kCdsTypeUrl);
-  balancer_->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1);
   balancer_->ads_service()->IgnoreResourceType(kEdsTypeUrl);
   // Restart balancer.
   balancer_->Start();
diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
index b2c893b..ca04697 100644
--- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc
+++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
@@ -52,6 +52,8 @@
 namespace testing {
 
 using ::envoy::config::core::v3::HealthStatus;
+using ::envoy::service::discovery::v3::DiscoveryRequest;
+using ::envoy::service::load_stats::v3::LoadStatsRequest;
 
 using ::grpc::experimental::ExternalCertificateVerifier;
 using ::grpc::experimental::IdentityKeyCertPair;
@@ -275,10 +277,30 @@
 XdsEnd2endTest::BalancerServerThread::BalancerServerThread(
     XdsEnd2endTest* test_obj)
     : ServerThread(test_obj, /*use_xds_enabled_server=*/false),
-      ads_service_(new AdsServiceImpl()),
-      lrs_service_(
-          new LrsServiceImpl((GetParam().enable_load_reporting() ? 20 : 0),
-                             {kDefaultClusterName})) {}
+      ads_service_(new AdsServiceImpl(
+          // First request must have node set with the right client features.
+          [&](const DiscoveryRequest& request) {
+            EXPECT_TRUE(request.has_node());
+            EXPECT_THAT(request.node().client_features(),
+                        ::testing::UnorderedElementsAre(
+                            "envoy.lb.does_not_support_overprovisioning",
+                            "xds.config.resource-in-sotw"));
+          },
+          // NACKs must use the right status code.
+          [&](absl::StatusCode code) {
+            EXPECT_EQ(code, absl::StatusCode::kInvalidArgument);
+          })),
+      lrs_service_(new LrsServiceImpl(
+          (GetParam().enable_load_reporting() ? 20 : 0), {kDefaultClusterName},
+          // Fail if load reporting is used when not enabled.
+          [&]() { EXPECT_TRUE(GetParam().enable_load_reporting()); },
+          // Make sure we send the client feature saying that we support
+          // send_all_clusters.
+          [&](const LoadStatsRequest& request) {
+            EXPECT_THAT(
+                request.node().client_features(),
+                ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
+          })) {}
 
 void XdsEnd2endTest::BalancerServerThread::RegisterAllServices(
     ServerBuilder* builder) {
diff --git a/test/cpp/end2end/xds/xds_server.cc b/test/cpp/end2end/xds/xds_server.cc
index 65463a1..b4a657c 100644
--- a/test/cpp/end2end/xds/xds_server.cc
+++ b/test/cpp/end2end/xds/xds_server.cc
@@ -22,9 +22,6 @@
 #include <thread>
 #include <vector>
 
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
 #include "absl/types/optional.h"
 
 #include <grpc/support/log.h>
diff --git a/test/cpp/end2end/xds/xds_server.h b/test/cpp/end2end/xds/xds_server.h
index 29fd1c1..b4fa271 100644
--- a/test/cpp/end2end/xds/xds_server.h
+++ b/test/cpp/end2end/xds/xds_server.h
@@ -23,9 +23,6 @@
 #include <thread>
 #include <vector>
 
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
 #include "absl/types/optional.h"
 
 #include <grpc/support/log.h>
@@ -62,6 +59,9 @@
           ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service>,
       public std::enable_shared_from_this<AdsServiceImpl> {
  public:
+  using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest;
+  using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse;
+
   // State for a given xDS resource type.
   struct ResponseState {
     enum State {
@@ -72,7 +72,12 @@
     std::string error_message;
   };
 
-  AdsServiceImpl() {}
+  explicit AdsServiceImpl(
+      std::function<void(const DiscoveryRequest& request)> check_first_request =
+          nullptr,
+      std::function<void(absl::StatusCode)> check_nack_status_code = nullptr)
+      : check_first_request_(std::move(check_first_request)),
+        check_nack_status_code_(std::move(check_nack_status_code)) {}
 
   void set_wrap_resources(bool wrap_resources) {
     grpc_core::MutexLock lock(&ads_mu_);
@@ -119,12 +124,12 @@
     resource_types_to_ignore_.emplace(type_url);
   }
 
-  // Sets the minimum version that the server will accept for a given
-  // resource type.  Will cause a gmock expectation failure if we see a
-  // lower version.
-  void SetResourceMinVersion(const std::string& type_url, int version) {
+  // Sets a callback to be invoked on request messages with respoonse_nonce
+  // set.  The callback is passed the resource type and version.
+  void SetCheckVersionCallback(
+      std::function<void(absl::string_view, int)> check_version_callack) {
     grpc_core::MutexLock lock(&ads_mu_);
-    resource_type_min_versions_[type_url] = version;
+    check_version_callack_ = std::move(check_version_callack);
   }
 
   // Get the list of response state for each resource type.
@@ -214,8 +219,6 @@
 
   using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>;
 
-  using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest;
-  using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse;
   using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
 
   Status StreamAggregatedResources(ServerContext* context,
@@ -355,9 +358,10 @@
         GPR_ASSERT(absl::SimpleAtoi(request.version_info(),
                                     &client_resource_type_version));
       }
-      EXPECT_GE(client_resource_type_version,
-                resource_type_min_versions_[request.type_url()])
-          << "resource_type: " << request.type_url();
+      if (check_version_callack_ != nullptr) {
+        check_version_callack_(request.type_url(),
+                               client_resource_type_version);
+      }
     } else {
       int client_nonce;
       GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
@@ -370,7 +374,10 @@
                 request.version_info().c_str());
       } else {
         response_state.state = ResponseState::NACKED;
-        EXPECT_EQ(request.error_detail().code(), GRPC_STATUS_INVALID_ARGUMENT);
+        if (check_nack_status_code_ != nullptr) {
+          check_nack_status_code_(
+              static_cast<absl::StatusCode>(request.error_detail().code()));
+        }
         response_state.error_message = request.error_detail().message();
         gpr_log(GPR_INFO,
                 "ADS[%p]: client NACKed resource_type=%s version=%s: %s", this,
@@ -476,11 +483,9 @@
     bool seen_first_request = false;
     while (stream->Read(&request)) {
       if (!seen_first_request) {
-        EXPECT_TRUE(request.has_node());
-        EXPECT_THAT(request.node().client_features(),
-                    ::testing::UnorderedElementsAre(
-                        "envoy.lb.does_not_support_overprovisioning",
-                        "xds.config.resource-in-sotw"));
+        if (check_first_request_ != nullptr) {
+          check_first_request_(request);
+        }
         seen_first_request = true;
       }
       {
@@ -557,6 +562,9 @@
     clients_.erase(client);
   }
 
+  std::function<void(const DiscoveryRequest& request)> check_first_request_;
+  std::function<void(absl::StatusCode)> check_nack_status_code_;
+
   grpc_core::CondVar ads_cond_;
   grpc_core::Mutex ads_mu_;
   bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false;
@@ -564,7 +572,7 @@
       resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_);
   std::set<std::string /*resource_type*/> resource_types_to_ignore_
       ABSL_GUARDED_BY(ads_mu_);
-  std::map<std::string /*resource_type*/, int> resource_type_min_versions_
+  std::function<void(absl::string_view, int)> check_version_callack_
       ABSL_GUARDED_BY(ads_mu_);
   // An instance data member containing the current state of all resources.
   // Note that an entry will exist whenever either of the following is true:
@@ -585,6 +593,9 @@
           ::envoy::service::load_stats::v3::LoadReportingService::Service>,
       public std::enable_shared_from_this<LrsServiceImpl> {
  public:
+  using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest;
+  using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse;
+
   // Stats reported by client.
   class ClientStats {
    public:
@@ -686,10 +697,15 @@
   };
 
   LrsServiceImpl(int client_load_reporting_interval_seconds,
-                 std::set<std::string> cluster_names)
+                 std::set<std::string> cluster_names,
+                 std::function<void()> stream_started_callback = nullptr,
+                 std::function<void(const LoadStatsRequest& request)>
+                     check_first_request = nullptr)
       : client_load_reporting_interval_seconds_(
             client_load_reporting_interval_seconds),
-        cluster_names_(std::move(cluster_names)) {}
+        cluster_names_(std::move(cluster_names)),
+        stream_started_callback_(std::move(stream_started_callback)),
+        check_first_request_(std::move(check_first_request)) {}
 
   // Must be called before the LRS call is started.
   void set_send_all_clusters(bool send_all_clusters) {
@@ -710,13 +726,11 @@
       absl::Duration timeout = absl::InfiniteDuration());
 
  private:
-  using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest;
-  using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse;
   using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>;
 
   Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override {
     gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
-    EXPECT_GT(client_load_reporting_interval_seconds_, 0);
+    if (stream_started_callback_ != nullptr) stream_started_callback_();
     // Take a reference of the LrsServiceImpl object, reference will go
     // out of scope after this method exits.
     std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this();
@@ -724,9 +738,7 @@
     LoadStatsRequest request;
     if (stream->Read(&request)) {
       IncreaseRequestCount();
-      // Verify client features.
-      EXPECT_THAT(request.node().client_features(),
-                  ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
+      if (check_first_request_ != nullptr) check_first_request_(request);
       // Send initial response.
       LoadStatsResponse response;
       if (send_all_clusters_) {
@@ -769,6 +781,8 @@
   const int client_load_reporting_interval_seconds_;
   bool send_all_clusters_ = false;
   std::set<std::string> cluster_names_;
+  std::function<void()> stream_started_callback_;
+  std::function<void(const LoadStatsRequest& request)> check_first_request_;
 
   grpc_core::CondVar lrs_cv_;
   grpc_core::Mutex lrs_mu_;
diff --git a/test/cpp/end2end/xds/xds_utils.cc b/test/cpp/end2end/xds/xds_utils.cc
index 8ad0cea..07f37ee 100644
--- a/test/cpp/end2end/xds/xds_utils.cc
+++ b/test/cpp/end2end/xds/xds_utils.cc
@@ -23,9 +23,6 @@
 #include <thread>
 #include <vector>
 
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
 #include "absl/memory/memory.h"
 #include "absl/strings/str_cat.h"
 #include "absl/strings/str_format.h"
@@ -89,7 +86,7 @@
       "          \"server_uri\": \"<SERVER_URI>\",\n"
       "          \"channel_creds\": [\n"
       "            {\n"
-      "              \"type\": \"fake\"\n"
+      "              \"type\": \"<SERVER_CREDS_TYPE>\"\n"
       "            }\n"
       "          ],\n"
       "          \"server_features\": [<SERVER_FEATURES>]\n"
@@ -102,6 +99,7 @@
   return absl::StrReplaceAll(
       kXdsServerTemplate,
       {{"<SERVER_URI>", server_uri},
+       {"<SERVER_CREDS_TYPE>", xds_channel_creds_type_},
        {"<SERVER_FEATURES>", absl::StrJoin(server_features, ", ")}});
 }
 
diff --git a/test/cpp/end2end/xds/xds_utils.h b/test/cpp/end2end/xds/xds_utils.h
index 5797d9c..6acfbc6 100644
--- a/test/cpp/end2end/xds/xds_utils.h
+++ b/test/cpp/end2end/xds/xds_utils.h
@@ -46,6 +46,10 @@
     if (!ignore_if_set || top_server_.empty()) top_server_ = server;
     return *this;
   }
+  XdsBootstrapBuilder& SetXdsChannelCredentials(const std::string& type) {
+    xds_channel_creds_type_ = type;
+    return *this;
+  }
   XdsBootstrapBuilder& SetClientDefaultListenerResourceNameTemplate(
       const std::string& client_default_listener_resource_name_template) {
     client_default_listener_resource_name_template_ =
@@ -90,6 +94,7 @@
 
   bool ignore_resource_deletion_ = false;
   std::string top_server_;
+  std::string xds_channel_creds_type_ = "fake";
   std::string client_default_listener_resource_name_template_;
   std::map<std::string /*key*/, PluginInfo> plugins_;
   std::map<std::string /*authority_name*/, AuthorityInfo> authorities_;
diff --git a/tools/profiling/memory/memory_diff.py b/tools/profiling/memory/memory_diff.py
index eb2af95..f235b42 100755
--- a/tools/profiling/memory/memory_diff.py
+++ b/tools/profiling/memory/memory_diff.py
@@ -63,6 +63,22 @@
         rb"server channel memory usage: ([0-9\.]+) bytes per channel",
         float,
     ),
+    "call/xds_client": (
+        rb"xds client call memory usage: ([0-9\.]+) bytes per call",
+        float,
+    ),
+    "call/xds_server": (
+        rb"xds server call memory usage: ([0-9\.]+) bytes per call",
+        float,
+    ),
+    "channel/xds_client": (
+        rb"xds client channel memory usage: ([0-9\.]+) bytes per channel",
+        float,
+    ),
+    "channel/xds_server": (
+        rb"xds server channel memory usage: ([0-9\.]+) bytes per channel",
+        float,
+    ),
 }
 
 _SCENARIOS = {
@@ -89,26 +105,28 @@
     )
     ret = {}
     for name, benchmark_args in _BENCHMARKS.items():
-        for scenario, extra_args in _SCENARIOS.items():
-            # TODO(chenancy) Remove when minstack is implemented for channel
-            if name == "channel" and scenario == "minstack":
-                continue
-            try:
-                output = subprocess.check_output(
-                    [
-                        "bazel-bin/test/core/memory_usage/memory_usage_test",
-                    ]
+        for use_xds in (False, True):
+            for scenario, extra_args in _SCENARIOS.items():
+                # TODO(chenancy) Remove when minstack is implemented for channel
+                if name == "channel" and scenario == "minstack":
+                    continue
+                argv = (
+                    ["bazel-bin/test/core/memory_usage/memory_usage_test"]
                     + benchmark_args
                     + extra_args
                 )
-            except subprocess.CalledProcessError as e:
-                print("Error running benchmark:", e)
-                continue
-            for line in output.splitlines():
-                for key, (pattern, conversion) in _INTERESTING.items():
-                    m = re.match(pattern, line)
-                    if m:
-                        ret[scenario + ": " + key] = conversion(m.group(1))
+                if use_xds:
+                    argv.append("--use_xds")
+                try:
+                    output = subprocess.check_output(argv)
+                except subprocess.CalledProcessError as e:
+                    print("Error running benchmark:", e)
+                    continue
+                for line in output.splitlines():
+                    for key, (pattern, conversion) in _INTERESTING.items():
+                        m = re.match(pattern, line)
+                        if m:
+                            ret[scenario + ": " + key] = conversion(m.group(1))
     return ret