Make sure plugin container is removed on failure

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit f81172b9031160218e51fb2a7dbeee19962a60a9)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go
index 90f1388..050a80d 100644
--- a/libcontainerd/client_daemon.go
+++ b/libcontainerd/client_daemon.go
@@ -225,7 +225,7 @@
 		// TODO(mlaventure): when containerd support lcow, revisit runtime value
 		containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
 	if err != nil {
-		return err
+		return wrapError(err)
 	}
 
 	c.Lock()
@@ -306,7 +306,7 @@
 			rio.Cancel()
 			rio.Close()
 		}
-		return -1, err
+		return -1, wrapError(err)
 	}
 
 	ctr.setTask(t)
@@ -320,7 +320,7 @@
 				Error("failed to delete task after fail start")
 		}
 		ctr.setTask(nil)
-		return -1, err
+		return -1, wrapError(err)
 	}
 
 	return int(t.Pid()), nil
@@ -364,7 +364,7 @@
 	})
 	if err != nil {
 		close(stdinCloseSync)
-		return -1, err
+		return -1, wrapError(err)
 	}
 
 	ctr.addProcess(processID, p)
@@ -375,7 +375,7 @@
 	if err = p.Start(ctx); err != nil {
 		p.Delete(context.Background())
 		ctr.deleteProcess(processID)
-		return -1, err
+		return -1, wrapError(err)
 	}
 
 	return int(p.Pid()), nil
@@ -413,7 +413,7 @@
 		return err
 	}
 
-	return p.(containerd.Task).Pause(ctx)
+	return wrapError(p.(containerd.Task).Pause(ctx))
 }
 
 func (c *client) Resume(ctx context.Context, containerID string) error {
@@ -513,7 +513,7 @@
 	}
 
 	if err := ctr.ctr.Delete(ctx); err != nil {
-		return err
+		return wrapError(err)
 	}
 
 	if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
@@ -543,7 +543,7 @@
 
 	s, err := t.Status(ctx)
 	if err != nil {
-		return StatusUnknown, err
+		return StatusUnknown, wrapError(err)
 	}
 
 	return Status(s.Status), nil
@@ -557,7 +557,7 @@
 
 	img, err := p.(containerd.Task).Checkpoint(ctx)
 	if err != nil {
-		return err
+		return wrapError(err)
 	}
 	// Whatever happens, delete the checkpoint from containerd
 	defer func() {
diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go
index 5531306..e490ef0 100644
--- a/plugin/executor/containerd/containerd.go
+++ b/plugin/executor/containerd/containerd.go
@@ -5,6 +5,7 @@
 	"io"
 	"path/filepath"
 	"sync"
+	"time"
 
 	"github.com/containerd/containerd/cio"
 	"github.com/containerd/containerd/linux/runctypes"
@@ -15,21 +16,34 @@
 	"github.com/sirupsen/logrus"
 )
 
-// PluginNamespace is the name used for the plugins namespace
-var PluginNamespace = "plugins.moby"
+// pluginNamespace is the name used for the plugins namespace
+const pluginNamespace = "plugins.moby"
 
 // ExitHandler represents an object that is called when the exit event is received from containerd
 type ExitHandler interface {
 	HandleExitEvent(id string) error
 }
 
+// Client is used by the exector to perform operations.
+// TODO(@cpuguy83): This should really just be based off the containerd client interface.
+// However right now this whole package is tied to github.com/docker/docker/libcontainerd
+type Client interface {
+	Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
+	Restore(ctx context.Context, containerID string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error)
+	Status(ctx context.Context, containerID string) (libcontainerd.Status, error)
+	Delete(ctx context.Context, containerID string) error
+	DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
+	Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error)
+	SignalProcess(ctx context.Context, containerID, processID string, signal int) error
+}
+
 // New creates a new containerd plugin executor
 func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
 	e := &Executor{
 		rootDir:     rootDir,
 		exitHandler: exitHandler,
 	}
-	client, err := remote.NewClient(PluginNamespace, e)
+	client, err := remote.NewClient(pluginNamespace, e)
 	if err != nil {
 		return nil, errors.Wrap(err, "error creating containerd exec client")
 	}
@@ -40,7 +54,7 @@
 // Executor is the containerd client implementation of a plugin executor
 type Executor struct {
 	rootDir     string
-	client      libcontainerd.Client
+	client      Client
 	exitHandler ExitHandler
 }
 
@@ -52,10 +66,34 @@
 	ctx := context.Background()
 	err := e.client.Create(ctx, id, &spec, &opts)
 	if err != nil {
-		return err
+		status, err2 := e.client.Status(ctx, id)
+		if err2 != nil {
+			if !errdefs.IsNotFound(err2) {
+				logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
+			}
+		} else {
+			if status != libcontainerd.StatusRunning && status != libcontainerd.StatusUnknown {
+				if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
+					logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
+				}
+				err = e.client.Create(ctx, id, &spec, &opts)
+			}
+		}
+
+		if err != nil {
+			return errors.Wrap(err, "error creating containerd container")
+		}
 	}
 
 	_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
+	if err != nil {
+		if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
+			logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start")
+		}
+		if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
+			logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start")
+		}
+	}
 	return err
 }
 
@@ -69,13 +107,11 @@
 		_, _, err = e.client.DeleteTask(context.Background(), id)
 		if err != nil && !errdefs.IsNotFound(err) {
 			logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
-			return err
 		}
 
 		err = e.client.Delete(context.Background(), id)
 		if err != nil && !errdefs.IsNotFound(err) {
 			logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
-			return err
 		}
 	}
 	return nil
diff --git a/plugin/executor/containerd/containerd_test.go b/plugin/executor/containerd/containerd_test.go
new file mode 100644
index 0000000..cd1a51b
--- /dev/null
+++ b/plugin/executor/containerd/containerd_test.go
@@ -0,0 +1,148 @@
+package containerd
+
+import (
+	"context"
+	"io/ioutil"
+	"os"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/docker/docker/libcontainerd"
+	"github.com/gotestyourself/gotestyourself/assert"
+	specs "github.com/opencontainers/runtime-spec/specs-go"
+	"github.com/pkg/errors"
+)
+
+func TestLifeCycle(t *testing.T) {
+	t.Parallel()
+
+	mock := newMockClient()
+	exec, cleanup := setupTest(t, mock, mock)
+	defer cleanup()
+
+	id := "test-create"
+	mock.simulateStartError(true, id)
+	err := exec.Create(id, specs.Spec{}, nil, nil)
+	assert.Assert(t, err != nil)
+	mock.simulateStartError(false, id)
+
+	err = exec.Create(id, specs.Spec{}, nil, nil)
+	assert.Assert(t, err)
+	running, _ := exec.IsRunning(id)
+	assert.Assert(t, running)
+
+	// create with the same ID
+	err = exec.Create(id, specs.Spec{}, nil, nil)
+	assert.Assert(t, err != nil)
+
+	mock.HandleExitEvent(id) // simulate a plugin that exits
+
+	err = exec.Create(id, specs.Spec{}, nil, nil)
+	assert.Assert(t, err)
+}
+
+func setupTest(t *testing.T, client Client, eh ExitHandler) (*Executor, func()) {
+	rootDir, err := ioutil.TempDir("", "test-daemon")
+	assert.Assert(t, err)
+	assert.Assert(t, client != nil)
+	assert.Assert(t, eh != nil)
+
+	return &Executor{
+			rootDir:     rootDir,
+			client:      client,
+			exitHandler: eh,
+		}, func() {
+			assert.Assert(t, os.RemoveAll(rootDir))
+		}
+}
+
+type mockClient struct {
+	mu           sync.Mutex
+	containers   map[string]bool
+	errorOnStart map[string]bool
+}
+
+func newMockClient() *mockClient {
+	return &mockClient{
+		containers:   make(map[string]bool),
+		errorOnStart: make(map[string]bool),
+	}
+}
+
+func (c *mockClient) Create(ctx context.Context, id string, _ *specs.Spec, _ interface{}) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.containers[id]; ok {
+		return errors.New("exists")
+	}
+
+	c.containers[id] = false
+	return nil
+}
+
+func (c *mockClient) Restore(ctx context.Context, id string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error) {
+	return false, 0, nil
+}
+
+func (c *mockClient) Status(ctx context.Context, id string) (libcontainerd.Status, error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	running, ok := c.containers[id]
+	if !ok {
+		return libcontainerd.StatusUnknown, errors.New("not found")
+	}
+	if running {
+		return libcontainerd.StatusRunning, nil
+	}
+	return libcontainerd.StatusStopped, nil
+}
+
+func (c *mockClient) Delete(ctx context.Context, id string) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	delete(c.containers, id)
+	return nil
+}
+
+func (c *mockClient) DeleteTask(ctx context.Context, id string) (uint32, time.Time, error) {
+	return 0, time.Time{}, nil
+}
+
+func (c *mockClient) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.containers[id]; !ok {
+		return 0, errors.New("not found")
+	}
+
+	if c.errorOnStart[id] {
+		return 0, errors.New("some startup error")
+	}
+	c.containers[id] = true
+	return 1, nil
+}
+
+func (c *mockClient) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
+	return nil
+}
+
+func (c *mockClient) simulateStartError(sim bool, id string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if sim {
+		c.errorOnStart[id] = sim
+		return
+	}
+	delete(c.errorOnStart, id)
+}
+
+func (c *mockClient) HandleExitEvent(id string) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	delete(c.containers, id)
+	return nil
+}