Merge pull request #11049 from murgatroid99/node_server_socket_leak

Improve how UV TCP servers and endpoints are shut down
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index e924694..d446e53 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -56,6 +56,8 @@
   int port;
   /* linked list */
   struct grpc_tcp_listener *next;
+
+  bool closed;
 };
 
 struct grpc_tcp_server {
@@ -77,6 +79,8 @@
   /* shutdown callback */
   grpc_closure *shutdown_complete;
 
+  bool shutdown;
+
   grpc_resource_quota *resource_quota;
 };
 
@@ -109,6 +113,7 @@
   s->shutdown_starting.head = NULL;
   s->shutdown_starting.tail = NULL;
   s->shutdown_complete = shutdown_complete;
+  s->shutdown = false;
   *server = s;
   return GRPC_ERROR_NONE;
 }
@@ -125,6 +130,7 @@
 }
 
 static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+  GPR_ASSERT(s->shutdown);
   if (s->shutdown_complete != NULL) {
     grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
   }
@@ -144,21 +150,31 @@
   grpc_tcp_listener *sp = (grpc_tcp_listener *)handle->data;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   sp->server->open_ports--;
-  if (sp->server->open_ports == 0) {
+  if (sp->server->open_ports == 0 && sp->server->shutdown) {
     finish_shutdown(&exec_ctx, sp->server);
   }
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
+static void close_listener(grpc_tcp_listener *sp) {
+  if (!sp->closed) {
+    sp->closed = true;
+    uv_close((uv_handle_t *)sp->handle, handle_close_callback);
+  }
+}
+
 static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
   int immediately_done = 0;
   grpc_tcp_listener *sp;
 
+  GPR_ASSERT(!s->shutdown);
+  s->shutdown = true;
+
   if (s->open_ports == 0) {
     immediately_done = 1;
   }
   for (sp = s->head; sp; sp = sp->next) {
-    uv_close((uv_handle_t *)sp->handle, handle_close_callback);
+    close_listener(sp);
   }
 
   if (immediately_done) {
@@ -196,9 +212,14 @@
   int err;
 
   if (status < 0) {
-    gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
-            uv_strerror(status));
-    return;
+    switch (status) {
+      case UV_EINTR:
+      case UV_EAGAIN:
+        return;
+      default:
+        close_listener(sp);
+        return;
+    }
   }
 
   client = gpr_malloc(sizeof(uv_tcp_t));
@@ -287,6 +308,7 @@
   sp->handle = handle;
   sp->port = port;
   sp->port_index = port_index;
+  sp->closed = false;
   handle->data = sp;
   s->open_ports++;
   GPR_ASSERT(sp->handle);
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 8e8db9f..b9ca1a3 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -88,12 +88,12 @@
 #ifdef GRPC_TCP_REFCOUNT_DEBUG
 #define TCP_UNREF(exec_ctx, tcp, reason) \
   tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) \
-  tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
 static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
                       const char *reason, const char *file, int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
-          reason, tcp->refcount.count, tcp->refcount.count - 1);
+  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+          "TCP unref %p : %s %" PRIiPTR " -> %" PRIiPTR, tcp, reason,
+          tcp->refcount.count, tcp->refcount.count - 1);
   if (gpr_unref(&tcp->refcount)) {
     tcp_free(exec_ctx, tcp);
   }
@@ -101,8 +101,9 @@
 
 static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
                     int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP   ref %p : %s %d -> %d", tcp,
-          reason, tcp->refcount.count, tcp->refcount.count + 1);
+  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+          "TCP   ref %p : %s %" PRIiPTR " -> %" PRIiPTR, tcp, reason,
+          tcp->refcount.count, tcp->refcount.count + 1);
   gpr_ref(&tcp->refcount);
 }
 #else
@@ -311,6 +312,7 @@
     tcp->shutting_down = true;
     uv_shutdown_t *req = &tcp->shutdown_req;
     uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
+    grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
   }
   GRPC_ERROR_UNREF(why);
 }