Internal configuration cleanup

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35045 from eugeneo:tasks/remove-interop-metadata f8002eca8cc663a132981ae69b414a815cd337e4
FUTURE_COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35045 from eugeneo:tasks/remove-interop-metadata f8002eca8cc663a132981ae69b414a815cd337e4
PiperOrigin-RevId: 585570956
diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
index 845e48c..c5b2277 100644
--- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
+++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
@@ -75,6 +75,7 @@
   return *actual_value;
 }
 
+#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
 // The default values for TCP_USER_TIMEOUT are currently configured to be in
 // line with the default values of KEEPALIVE_TIMEOUT as proposed in
 // https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md */
@@ -83,8 +84,6 @@
 bool kDefaultClientUserTimeoutEnabled = false;
 bool kDefaultServerUserTimeoutEnabled = true;
 
-#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
-
 absl::Status ErrorForFd(
     int fd, const experimental::EventEngine::ResolvedAddress& addr) {
   if (fd >= 0) return absl::OkStatus();
diff --git a/src/python/grpcio/_spawn_patch.py b/src/python/grpcio/_spawn_patch.py
index cb1f909..71cf771 100644
--- a/src/python/grpcio/_spawn_patch.py
+++ b/src/python/grpcio/_spawn_patch.py
@@ -61,4 +61,4 @@
 def monkeypatch_spawn():
     """Monkeypatching is dumb, but it's either that or we become maintainers of
     something much, much bigger."""
-    ccompiler.CCompiler.spawn = _commandfile_spawn
+    ccompiler.CCompiler.spawn = _commandfile_spawn
\ No newline at end of file
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
index 508196e..a21c1ec 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
@@ -470,6 +470,7 @@
     """
     cdef object async_response_generator
     cdef object response_message
+    install_context_from_request_call_event_aio(rpc_state)
 
     if inspect.iscoroutinefunction(stream_handler):
         # Case 1: Coroutine async handler - using reader-writer API
@@ -523,6 +524,7 @@
     rpc_state.metadata_sent = True
     rpc_state.status_sent = True
     await execute_batch(rpc_state, finish_ops, loop)
+    uninstall_context()
 
 
 async def _handle_unary_unary_rpc(object method_handler,
diff --git a/test/cpp/interop/xds_stats_watcher.cc b/test/cpp/interop/xds_stats_watcher.cc
index d6cb303..1f28525 100644
--- a/test/cpp/interop/xds_stats_watcher.cc
+++ b/test/cpp/interop/xds_stats_watcher.cc
@@ -51,6 +51,19 @@
   return result;
 }
 
+bool HasNonEmptyMetadata(
+    const std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>&
+        metadata_by_peer) {
+  for (const auto& entry : metadata_by_peer) {
+    for (const auto& rpc_metadata : entry.second.rpc_metadata()) {
+      if (rpc_metadata.metadata_size() > 0) {
+        return true;
+      }
+    }
+  }
+  return false;
+}
+
 }  // namespace
 
 XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
@@ -113,8 +126,12 @@
                [this] { return rpcs_needed_ == 0; });
   response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
                                           rpcs_by_peer_.end());
-  response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(),
-                                               metadata_by_peer_.end());
+  // Return metadata if at least one RPC had relevant metadata. Note that empty
+  // entries would be returned for RCPs with no relevant metadata in this case.
+  if (HasNonEmptyMetadata(metadata_by_peer_)) {
+    response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(),
+                                                 metadata_by_peer_.end());
+  }
   auto& response_rpcs_by_method = *response.mutable_rpcs_by_method();
   for (const auto& rpc_by_type : rpcs_by_type_) {
     std::string method_name;
diff --git a/test/cpp/interop/xds_stats_watcher_test.cc b/test/cpp/interop/xds_stats_watcher_test.cc
index 0909b49..3d2f951 100644
--- a/test/cpp/interop/xds_stats_watcher_test.cc
+++ b/test/cpp/interop/xds_stats_watcher_test.cc
@@ -152,7 +152,7 @@
             watcher.WaitForRpcStatsResponse(0).DebugString());
 }
 
-TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseIgnoresMetadata) {
+TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseExcludesMetadata) {
   XdsStatsWatcher watcher(0, 3, {});
   // RPC had metadata - but watcher should ignore it
   watcher.RpcCompleted(BuildCallResult(0), "peer1",
@@ -163,11 +163,6 @@
                        {{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}});
   LoadBalancerStatsResponse expected;
   expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
-  // There will still be an empty metadata collection for each RPC
-  expected.mutable_metadatas_by_peer()->insert({
-      {"peer1", BuildMetadatas({{}, {}})},
-      {"peer2", BuildMetadatas({{}})},
-  });
   (*expected.mutable_rpcs_by_method())["UnaryCall"]
       .mutable_rpcs_by_peer()
       ->insert({{"peer1", 2}, {"peer2", 1}});