Merge pull request #296 from thaJeztah/19.03_backport_exec_hang

[19.03 backport] Handle blocked I/O of exec'd processes
diff --git a/container/container.go b/container/container.go
index 6a5907c..71a8a1b 100644
--- a/container/container.go
+++ b/container/container.go
@@ -730,7 +730,7 @@
 }
 
 func (i *rio) Wait() {
-	i.sc.Wait()
+	i.sc.Wait(context.Background())
 
 	i.IO.Wait()
 }
diff --git a/container/stream/streams.go b/container/stream/streams.go
index d81867c..585f9e8 100644
--- a/container/stream/streams.go
+++ b/container/stream/streams.go
@@ -1,6 +1,7 @@
 package stream // import "github.com/docker/docker/container/stream"
 
 import (
+	"context"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -24,11 +25,12 @@
 // copied and delivered to all StdoutPipe and StderrPipe consumers, using
 // a kind of "broadcaster".
 type Config struct {
-	sync.WaitGroup
+	wg        sync.WaitGroup
 	stdout    *broadcaster.Unbuffered
 	stderr    *broadcaster.Unbuffered
 	stdin     io.ReadCloser
 	stdinPipe io.WriteCloser
+	dio       *cio.DirectIO
 }
 
 // NewConfig creates a stream config and initializes
@@ -115,14 +117,15 @@
 
 // CopyToPipe connects streamconfig with a libcontainerd.IOPipe
 func (c *Config) CopyToPipe(iop *cio.DirectIO) {
+	c.dio = iop
 	copyFunc := func(w io.Writer, r io.ReadCloser) {
-		c.Add(1)
+		c.wg.Add(1)
 		go func() {
 			if _, err := pools.Copy(w, r); err != nil {
 				logrus.Errorf("stream copy error: %v", err)
 			}
 			r.Close()
-			c.Done()
+			c.wg.Done()
 		}()
 	}
 
@@ -144,3 +147,23 @@
 		}
 	}
 }
+
+// Wait for the stream to close
+// Wait supports timeouts via the context to unblock and forcefully
+// close the io streams
+func (c *Config) Wait(ctx context.Context) {
+	done := make(chan struct{}, 1)
+	go func() {
+		c.wg.Wait()
+		close(done)
+	}()
+	select {
+	case <-done:
+	case <-ctx.Done():
+		if c.dio != nil {
+			c.dio.Cancel()
+			c.dio.Wait()
+			c.dio.Close()
+		}
+	}
+}
diff --git a/daemon/exec/exec.go b/daemon/exec/exec.go
index cf2a955..f1b5259 100644
--- a/daemon/exec/exec.go
+++ b/daemon/exec/exec.go
@@ -1,6 +1,7 @@
 package exec // import "github.com/docker/docker/daemon/exec"
 
 import (
+	"context"
 	"runtime"
 	"sync"
 
@@ -58,7 +59,7 @@
 }
 
 func (i *rio) Wait() {
-	i.sc.Wait()
+	i.sc.Wait(context.Background())
 
 	i.IO.Wait()
 }
diff --git a/daemon/monitor.go b/daemon/monitor.go
index 5ef97c7..2f47497 100644
--- a/daemon/monitor.go
+++ b/daemon/monitor.go
@@ -55,8 +55,9 @@
 			if err != nil {
 				logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
 			}
-
-			c.StreamConfig.Wait()
+			ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+			c.StreamConfig.Wait(ctx)
+			cancel()
 			c.Reset(false)
 
 			exitStatus := container.ExitStatus{
@@ -124,7 +125,11 @@
 			defer execConfig.Unlock()
 			execConfig.ExitCode = &ec
 			execConfig.Running = false
-			execConfig.StreamConfig.Wait()
+
+			ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+			execConfig.StreamConfig.Wait(ctx)
+			cancel()
+
 			if err := execConfig.CloseStreams(); err != nil {
 				logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
 			}
diff --git a/integration-cli/docker_cli_exec_test.go b/integration-cli/docker_cli_exec_test.go
index 5f60a76..f6d4e3c 100644
--- a/integration-cli/docker_cli_exec_test.go
+++ b/integration-cli/docker_cli_exec_test.go
@@ -11,7 +11,6 @@
 	"reflect"
 	"runtime"
 	"sort"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -19,7 +18,6 @@
 	"github.com/docker/docker/client"
 	"github.com/docker/docker/integration-cli/cli"
 	"github.com/docker/docker/integration-cli/cli/build"
-	"github.com/docker/docker/pkg/parsers/kernel"
 	"github.com/go-check/check"
 	"gotest.tools/assert"
 	is "gotest.tools/assert/cmp"
@@ -534,100 +532,3 @@
 	assert.Check(c, is.Contains(out, "HOSTNAME=myhost"))
 	assert.Check(c, is.Contains(out, "DB_NAME=/bar/db"))
 }
-
-func (s *DockerSuite) TestExecWindowsOpenHandles(c *check.C) {
-	testRequires(c, DaemonIsWindows)
-
-	if runtime.GOOS == "windows" {
-		v, err := kernel.GetKernelVersion()
-		assert.NilError(c, err)
-		build, _ := strconv.Atoi(strings.Split(strings.SplitN(v.String(), " ", 3)[2][1:], ".")[0])
-		if build >= 17743 {
-			c.Skip("Temporarily disabled on RS5 17743+ builds due to platform bug")
-
-			// This is being tracked internally. @jhowardmsft. Summary of failure
-			// from an email in early July 2018 below:
-			//
-			// Platform regression. In cmd.exe by the look of it. I can repro
-			// it outside of CI.  It fails the same on 17681, 17676 and even as
-			// far back as 17663, over a month old. From investigating, I can see
-			// what's happening in the container, but not the reason. The test
-			// starts a long-running container based on the Windows busybox image.
-			// It then adds another process (docker exec) to that container to
-			// sleep. It loops waiting for two instances of busybox.exe running,
-			// and cmd.exe to quit. What's actually happening is that the second
-			// exec hangs indefinitely, and from docker top, I can see
-			// "OpenWith.exe" running.
-
-			//Manual repro would be
-			//# Start the first long-running container
-			//docker run --rm -d --name test busybox sleep 300
-
-			//# In another window, docker top test. There should be a single instance of busybox.exe running
-			//# In a third window, docker exec test cmd /c start sleep 10  NOTE THIS HANGS UNTIL 5 MIN TIMEOUT
-			//# In the second window, run docker top test. Note that OpenWith.exe is running, one cmd.exe and only one busybox. I would expect no "OpenWith" and two busybox.exe's.
-		}
-	}
-
-	runSleepingContainer(c, "-d", "--name", "test")
-	exec := make(chan bool)
-	go func() {
-		dockerCmd(c, "exec", "test", "cmd", "/c", "start sleep 10")
-		exec <- true
-	}()
-
-	count := 0
-	for {
-		top := make(chan string)
-		var out string
-		go func() {
-			out, _ := dockerCmd(c, "top", "test")
-			top <- out
-		}()
-
-		select {
-		case <-time.After(time.Second * 5):
-			c.Fatal("timed out waiting for top while exec is exiting")
-		case out = <-top:
-			break
-		}
-
-		if strings.Count(out, "busybox.exe") == 2 && !strings.Contains(out, "cmd.exe") {
-			// The initial exec process (cmd.exe) has exited, and both sleeps are currently running
-			break
-		}
-		count++
-		if count >= 30 {
-			c.Fatal("too many retries")
-		}
-		time.Sleep(1 * time.Second)
-	}
-
-	inspect := make(chan bool)
-	go func() {
-		dockerCmd(c, "inspect", "test")
-		inspect <- true
-	}()
-
-	select {
-	case <-time.After(time.Second * 5):
-		c.Fatal("timed out waiting for inspect while exec is exiting")
-	case <-inspect:
-		break
-	}
-
-	// Ensure the background sleep is still running
-	out, _ := dockerCmd(c, "top", "test")
-	assert.Equal(c, strings.Count(out, "busybox.exe"), 2)
-
-	// The exec should exit when the background sleep exits
-	select {
-	case <-time.After(time.Second * 15):
-		c.Fatal("timed out waiting for async exec to exit")
-	case <-exec:
-		// Ensure the background sleep has actually exited
-		out, _ := dockerCmd(c, "top", "test")
-		assert.Equal(c, strings.Count(out, "busybox.exe"), 1)
-		break
-	}
-}
diff --git a/libcontainerd/remote/client.go b/libcontainerd/remote/client.go
index 07389e3..ef94198 100644
--- a/libcontainerd/remote/client.go
+++ b/libcontainerd/remote/client.go
@@ -652,13 +652,6 @@
 					}).Error("exit event")
 				return
 			}
-			_, err = p.Delete(context.Background())
-			if err != nil {
-				c.logger.WithError(err).WithFields(logrus.Fields{
-					"container": ei.ContainerID,
-					"process":   ei.ProcessID,
-				}).Warn("failed to delete process")
-			}
 
 			ctr, err := c.getContainer(ctx, ei.ContainerID)
 			if err != nil {
@@ -672,11 +665,18 @@
 					c.logger.WithFields(logrus.Fields{
 						"container": ei.ContainerID,
 						"error":     err,
-					}).Error("failed to find container")
+					}).Error("failed to get container labels")
 					return
 				}
 				newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
 			}
+			_, err = p.Delete(context.Background())
+			if err != nil {
+				c.logger.WithError(err).WithFields(logrus.Fields{
+					"container": ei.ContainerID,
+					"process":   ei.ProcessID,
+				}).Warn("failed to delete process")
+			}
 		}
 	})
 }