Make sure plugin container is removed on failure
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit f81172b9031160218e51fb2a7dbeee19962a60a9)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go
index 90f1388..050a80d 100644
--- a/libcontainerd/client_daemon.go
+++ b/libcontainerd/client_daemon.go
@@ -225,7 +225,7 @@
// TODO(mlaventure): when containerd support lcow, revisit runtime value
containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
if err != nil {
- return err
+ return wrapError(err)
}
c.Lock()
@@ -306,7 +306,7 @@
rio.Cancel()
rio.Close()
}
- return -1, err
+ return -1, wrapError(err)
}
ctr.setTask(t)
@@ -320,7 +320,7 @@
Error("failed to delete task after fail start")
}
ctr.setTask(nil)
- return -1, err
+ return -1, wrapError(err)
}
return int(t.Pid()), nil
@@ -364,7 +364,7 @@
})
if err != nil {
close(stdinCloseSync)
- return -1, err
+ return -1, wrapError(err)
}
ctr.addProcess(processID, p)
@@ -375,7 +375,7 @@
if err = p.Start(ctx); err != nil {
p.Delete(context.Background())
ctr.deleteProcess(processID)
- return -1, err
+ return -1, wrapError(err)
}
return int(p.Pid()), nil
@@ -413,7 +413,7 @@
return err
}
- return p.(containerd.Task).Pause(ctx)
+ return wrapError(p.(containerd.Task).Pause(ctx))
}
func (c *client) Resume(ctx context.Context, containerID string) error {
@@ -513,7 +513,7 @@
}
if err := ctr.ctr.Delete(ctx); err != nil {
- return err
+ return wrapError(err)
}
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
@@ -543,7 +543,7 @@
s, err := t.Status(ctx)
if err != nil {
- return StatusUnknown, err
+ return StatusUnknown, wrapError(err)
}
return Status(s.Status), nil
@@ -557,7 +557,7 @@
img, err := p.(containerd.Task).Checkpoint(ctx)
if err != nil {
- return err
+ return wrapError(err)
}
// Whatever happens, delete the checkpoint from containerd
defer func() {
diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go
index 5531306..e490ef0 100644
--- a/plugin/executor/containerd/containerd.go
+++ b/plugin/executor/containerd/containerd.go
@@ -5,6 +5,7 @@
"io"
"path/filepath"
"sync"
+ "time"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/linux/runctypes"
@@ -15,21 +16,34 @@
"github.com/sirupsen/logrus"
)
-// PluginNamespace is the name used for the plugins namespace
-var PluginNamespace = "plugins.moby"
+// pluginNamespace is the name used for the plugins namespace
+const pluginNamespace = "plugins.moby"
// ExitHandler represents an object that is called when the exit event is received from containerd
type ExitHandler interface {
HandleExitEvent(id string) error
}
+// Client is used by the exector to perform operations.
+// TODO(@cpuguy83): This should really just be based off the containerd client interface.
+// However right now this whole package is tied to github.com/docker/docker/libcontainerd
+type Client interface {
+ Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
+ Restore(ctx context.Context, containerID string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error)
+ Status(ctx context.Context, containerID string) (libcontainerd.Status, error)
+ Delete(ctx context.Context, containerID string) error
+ DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
+ Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error)
+ SignalProcess(ctx context.Context, containerID, processID string, signal int) error
+}
+
// New creates a new containerd plugin executor
func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
e := &Executor{
rootDir: rootDir,
exitHandler: exitHandler,
}
- client, err := remote.NewClient(PluginNamespace, e)
+ client, err := remote.NewClient(pluginNamespace, e)
if err != nil {
return nil, errors.Wrap(err, "error creating containerd exec client")
}
@@ -40,7 +54,7 @@
// Executor is the containerd client implementation of a plugin executor
type Executor struct {
rootDir string
- client libcontainerd.Client
+ client Client
exitHandler ExitHandler
}
@@ -52,10 +66,34 @@
ctx := context.Background()
err := e.client.Create(ctx, id, &spec, &opts)
if err != nil {
- return err
+ status, err2 := e.client.Status(ctx, id)
+ if err2 != nil {
+ if !errdefs.IsNotFound(err2) {
+ logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
+ }
+ } else {
+ if status != libcontainerd.StatusRunning && status != libcontainerd.StatusUnknown {
+ if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
+ logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
+ }
+ err = e.client.Create(ctx, id, &spec, &opts)
+ }
+ }
+
+ if err != nil {
+ return errors.Wrap(err, "error creating containerd container")
+ }
}
_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
+ if err != nil {
+ if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
+ logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start")
+ }
+ if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
+ logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start")
+ }
+ }
return err
}
@@ -69,13 +107,11 @@
_, _, err = e.client.DeleteTask(context.Background(), id)
if err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
- return err
}
err = e.client.Delete(context.Background(), id)
if err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
- return err
}
}
return nil
diff --git a/plugin/executor/containerd/containerd_test.go b/plugin/executor/containerd/containerd_test.go
new file mode 100644
index 0000000..cd1a51b
--- /dev/null
+++ b/plugin/executor/containerd/containerd_test.go
@@ -0,0 +1,148 @@
+package containerd
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/docker/docker/libcontainerd"
+ "github.com/gotestyourself/gotestyourself/assert"
+ specs "github.com/opencontainers/runtime-spec/specs-go"
+ "github.com/pkg/errors"
+)
+
+func TestLifeCycle(t *testing.T) {
+ t.Parallel()
+
+ mock := newMockClient()
+ exec, cleanup := setupTest(t, mock, mock)
+ defer cleanup()
+
+ id := "test-create"
+ mock.simulateStartError(true, id)
+ err := exec.Create(id, specs.Spec{}, nil, nil)
+ assert.Assert(t, err != nil)
+ mock.simulateStartError(false, id)
+
+ err = exec.Create(id, specs.Spec{}, nil, nil)
+ assert.Assert(t, err)
+ running, _ := exec.IsRunning(id)
+ assert.Assert(t, running)
+
+ // create with the same ID
+ err = exec.Create(id, specs.Spec{}, nil, nil)
+ assert.Assert(t, err != nil)
+
+ mock.HandleExitEvent(id) // simulate a plugin that exits
+
+ err = exec.Create(id, specs.Spec{}, nil, nil)
+ assert.Assert(t, err)
+}
+
+func setupTest(t *testing.T, client Client, eh ExitHandler) (*Executor, func()) {
+ rootDir, err := ioutil.TempDir("", "test-daemon")
+ assert.Assert(t, err)
+ assert.Assert(t, client != nil)
+ assert.Assert(t, eh != nil)
+
+ return &Executor{
+ rootDir: rootDir,
+ client: client,
+ exitHandler: eh,
+ }, func() {
+ assert.Assert(t, os.RemoveAll(rootDir))
+ }
+}
+
+type mockClient struct {
+ mu sync.Mutex
+ containers map[string]bool
+ errorOnStart map[string]bool
+}
+
+func newMockClient() *mockClient {
+ return &mockClient{
+ containers: make(map[string]bool),
+ errorOnStart: make(map[string]bool),
+ }
+}
+
+func (c *mockClient) Create(ctx context.Context, id string, _ *specs.Spec, _ interface{}) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.containers[id]; ok {
+ return errors.New("exists")
+ }
+
+ c.containers[id] = false
+ return nil
+}
+
+func (c *mockClient) Restore(ctx context.Context, id string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error) {
+ return false, 0, nil
+}
+
+func (c *mockClient) Status(ctx context.Context, id string) (libcontainerd.Status, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ running, ok := c.containers[id]
+ if !ok {
+ return libcontainerd.StatusUnknown, errors.New("not found")
+ }
+ if running {
+ return libcontainerd.StatusRunning, nil
+ }
+ return libcontainerd.StatusStopped, nil
+}
+
+func (c *mockClient) Delete(ctx context.Context, id string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ delete(c.containers, id)
+ return nil
+}
+
+func (c *mockClient) DeleteTask(ctx context.Context, id string) (uint32, time.Time, error) {
+ return 0, time.Time{}, nil
+}
+
+func (c *mockClient) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.containers[id]; !ok {
+ return 0, errors.New("not found")
+ }
+
+ if c.errorOnStart[id] {
+ return 0, errors.New("some startup error")
+ }
+ c.containers[id] = true
+ return 1, nil
+}
+
+func (c *mockClient) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
+ return nil
+}
+
+func (c *mockClient) simulateStartError(sim bool, id string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if sim {
+ c.errorOnStart[id] = sim
+ return
+ }
+ delete(c.errorOnStart, id)
+}
+
+func (c *mockClient) HandleExitEvent(id string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ delete(c.containers, id)
+ return nil
+}