| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // This is a daemon for managing a pool of Docker containers on swarming bots. |
| // The daemon is event-driven, processing the stream of events from the Docker |
| // daemon. |
| // |
| // The daemon creates and starts a set number of containers from a specified |
| // image which is pulled from the Google container registry. These containers |
| // are then monitored and automatically restarted on exit (i.e. when |
| // swarming_bot itself has shutdown the container by invoking /sbin/shutdown). |
| // |
| // When the daemon is sent the SIGTERM signal, it'll start draining the pool; |
| // it'll send a SIGTERM signal to every container which causes the swarming_bot |
| // to exit at the next opportunity (i.e. when not running task). When all the |
| // containers have succefully exited, the daemon itself will terminate. |
| // |
| // When the daemon is sent the SIGINT signal, it'll immediately terminate all |
| // containers without any grace period. |
| package main |
| |
| import ( |
| "context" |
| "encoding/base64" |
| "encoding/json" |
| "flag" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "log" |
| "net" |
| "os" |
| "os/signal" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "syscall" |
| "time" |
| |
| "github.com/docker/docker/api/types" |
| "github.com/docker/docker/api/types/container" |
| "github.com/docker/docker/api/types/filters" |
| "github.com/docker/docker/api/types/mount" |
| "github.com/docker/docker/api/types/network" |
| docker "github.com/docker/docker/client" |
| |
| "go.fuchsia.dev/tools/retry" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| const ( |
| registryDomain = "gcr.io" |
| permissionsFolder = "/sys/fs/cgroup/devices/docker" |
| ) |
| |
| var ( |
| config string // config file path |
| timeout time.Duration // default timeout for all operations |
| ) |
| |
| func init() { |
| flag.StringVar(&config, "config", "/etc/swarm_docker/config.json", "config file path") |
| flag.DurationVar(&timeout, "timeout", 1*time.Minute, "default timeout") |
| } |
| |
| // Config contains the service configuration. |
| type Config struct { |
| Memory int `json:"memory"` |
| Cpus int `json:"cpus"` |
| NetworkMode string `json:"network_mode"` |
| ImageName string `json:"image_name"` |
| Project string `json:"project"` |
| Credentials string `json:"credentials"` |
| Containers []Container `json:"containers"` |
| Cmd []string `json:"cmd,omitempty"` |
| } |
| |
| // Container describes the container instance. |
| type Container struct { |
| Name string `json:"name"` |
| Devices []struct { |
| PathOnHost string `json:"path_on_host"` |
| PathInContainer string `json:"path_in_container"` |
| Permissions string `json:"permissions"` |
| } `json:"devices,omitempty"` |
| Mounts []struct { |
| Source string `json:"source"` |
| Target string `json:"target"` |
| ReadOnly bool `json:"readonly,omitempty"` |
| } `json:"mounts,omitempty"` |
| EnvVars map[string]interface{} `json:"env_vars, omitempty"` |
| CapAdd []string `json:"capadd,omitempty"` |
| hostname string |
| domainname string |
| cpuset string |
| memory int |
| } |
| |
| // stringsFlag implements flag.Value interface for array of strings. |
| type stringsFlag []string |
| |
| func (s *stringsFlag) String() string { |
| return strings.Join(*s, ", ") |
| } |
| |
| func (s *stringsFlag) Set(value string) error { |
| *s = append(*s, value) |
| return nil |
| } |
| |
| type permissionWriter interface { |
| writePermissions(context.Context, string) error |
| } |
| |
| type dockerPermissionWriter struct { |
| dockerPermissionsFolder string |
| } |
| |
| // WritePermissions gives the container with the given ID rwm permissions to |
| // all serial/fastboot/adb devices. |
| func (d *dockerPermissionWriter) writePermissions(ctx context.Context, id string) error { |
| path := fmt.Sprintf("%s/%s/devices.allow", d.dockerPermissionsFolder, id) |
| log.Printf("Attempting to write serial permissions to %s", path) |
| if err := ioutil.WriteFile(path, []byte("c 180:* rwm"), 0666); err != nil { |
| return err |
| } |
| if err := ioutil.WriteFile(path, []byte("c 188:* rwm"), 0666); err != nil { |
| return err |
| } |
| if err := ioutil.WriteFile(path, []byte("c 189:* rwm"), 0666); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // Pool is used to manage a static pool of container instances. |
| type Pool struct { |
| client docker.CommonAPIClient |
| containers map[string]Container |
| draining int32 // accessed atomically |
| started time.Time |
| lock sync.RWMutex |
| ctx context.Context |
| cancel context.CancelFunc |
| pWriter permissionWriter |
| } |
| |
| // NewPool creates a new Pool instance. |
| func NewPool(client docker.CommonAPIClient, pWriter permissionWriter) *Pool { |
| return &Pool{ |
| client: client, |
| containers: map[string]Container{}, |
| pWriter: pWriter, |
| } |
| } |
| |
| // Create instantiates all the containers but doesn't start them. |
| func (p *Pool) Create(ctx context.Context, cfg *Config, containers []*Container) <-chan error { |
| errc := make(chan error) |
| var wg sync.WaitGroup |
| p.lock.RLock() |
| for _, c := range containers { |
| wg.Add(1) |
| go func(c *Container) { |
| defer wg.Done() |
| log.Printf("creating container %s\n", c.Name) |
| if id, err := p.create(ctx, cfg, c); err != nil { |
| errc <- fmt.Errorf("failed to create container %s: %v", c.Name, err) |
| } else { |
| log.Printf("container %s created\n", id) |
| p.lock.Lock() |
| p.containers[id] = *c |
| p.lock.Unlock() |
| } |
| }(c) |
| } |
| p.lock.RUnlock() |
| go func() { |
| wg.Wait() |
| close(errc) |
| }() |
| return errc |
| } |
| |
| // Serve runs the main loop responsible for managing containers, restarting |
| // them as need and handling signals appropriately. |
| func (p *Pool) Serve(ctx context.Context) <-chan error { |
| errc := make(chan error) |
| p.started = time.Now() |
| |
| var eg errgroup.Group |
| p.lock.RLock() |
| for id := range p.containers { |
| id := id |
| eg.Go(func() error { |
| log.Printf("starting container %s\n", id) |
| if err := retry.Retry(ctx, retry.WithMaxRetries(retry.NewConstantBackoff(10*time.Second), 12), func() error { return p.client.ContainerStart(ctx, id, types.ContainerStartOptions{}) }, nil); err != nil { |
| log.Printf("failed to start container %s: %v\n", id, err) |
| p.lock.Lock() |
| delete(p.containers, id) |
| p.lock.Unlock() |
| } else { |
| log.Printf("container %s started\n", id) |
| if err := p.pWriter.writePermissions(ctx, id); err != nil { |
| return fmt.Errorf("failed to write permissions for container %s: %v", id, err) |
| } |
| } |
| return nil |
| }) |
| } |
| p.lock.RUnlock() |
| if err := eg.Wait(); err != nil { |
| errc <- err |
| } |
| |
| args := []filters.KeyValuePair{{Key: "type", Value: "container"}} |
| for id := range p.containers { |
| args = append(args, filters.KeyValuePair{Key: "container", Value: id}) |
| } |
| filters := filters.NewArgs(args...) |
| |
| go func() { |
| msgs, errs := p.client.Events(ctx, types.EventsOptions{ |
| Filters: filters, |
| Since: p.started.Format(time.RFC3339), |
| }) |
| for { |
| select { |
| case msg := <-msgs: |
| switch action := msg.Action; action { |
| case "die": |
| name := msg.Actor.Attributes["name"] |
| if atomic.LoadInt32(&p.draining) != 0 { |
| continue |
| } |
| go func() { |
| log.Printf("restart %s\n", name) |
| if err := retry.Retry(ctx, retry.WithMaxRetries(retry.NewConstantBackoff(10*time.Second), 12), func() error { return p.client.ContainerStart(ctx, msg.Actor.ID, types.ContainerStartOptions{}) }, nil); err != nil { |
| errc <- err |
| } else if err := p.pWriter.writePermissions(ctx, msg.Actor.ID); err != nil { |
| errc <- fmt.Errorf("failed to write permissions for container %s: %v", msg.Actor.ID, err) |
| } |
| }() |
| } |
| case <-ctx.Done(): |
| if err := ctx.Err(); err != context.Canceled { |
| errc <- err |
| } |
| return |
| case err := <-errs: |
| if err != io.EOF { |
| errc <- err |
| } |
| return |
| } |
| } |
| }() |
| |
| return errc |
| } |
| |
| // Drain sends a SIGTERM signal to all containers and waits for their exit. |
| func (p *Pool) Drain(ctx context.Context) <-chan error { |
| atomic.AddInt32(&p.draining, 1) |
| |
| var wg sync.WaitGroup |
| errc := make(chan error) |
| for id := range p.containers { |
| wg.Add(1) |
| go func(id string) { |
| defer wg.Done() |
| log.Printf("send termination to %s\n", id) |
| if err := p.client.ContainerKill(ctx, id, "TERM"); err != nil { |
| errc <- err |
| } |
| msgs, errs := p.client.ContainerWait(ctx, id, container.WaitConditionNotRunning) |
| select { |
| case body := <-msgs: |
| log.Printf("container %s exited (status %d)\n", id, body.StatusCode) |
| case err := <-errs: |
| errc <- err |
| } |
| }(id) |
| } |
| go func() { |
| wg.Wait() |
| atomic.AddInt32(&p.draining, -1) |
| close(errc) |
| }() |
| return errc |
| } |
| |
| // Remove forcibly stops and removes all containers. |
| func (p *Pool) Remove(ctx context.Context) <-chan error { |
| ctx, cancel := context.WithTimeout(ctx, timeout) |
| atomic.AddInt32(&p.draining, 1) |
| |
| var wg sync.WaitGroup |
| errc := make(chan error) |
| for id := range p.containers { |
| wg.Add(1) |
| go func(id string) { |
| defer wg.Done() |
| log.Printf("stop and remove %s\n", id) |
| if err := p.client.ContainerStop(ctx, id, nil); err != nil { |
| errc <- err |
| } |
| if err := p.client.ContainerRemove(ctx, id, types.ContainerRemoveOptions{Force: true}); err != nil { |
| errc <- err |
| } |
| }(id) |
| } |
| go func() { |
| wg.Wait() |
| atomic.AddInt32(&p.draining, -1) |
| cancel() |
| close(errc) |
| }() |
| return errc |
| } |
| |
| // create creates a new swarming container with the appropraite configuration. |
| func (p *Pool) create(ctx context.Context, cfg *Config, c *Container) (string, error) { |
| config := container.Config{ |
| Hostname: c.hostname, |
| Domainname: c.domainname, // should be same as host |
| Image: fmt.Sprintf("%s/%s/%s", registryDomain, cfg.Project, cfg.ImageName), |
| Cmd: cfg.Cmd, |
| } |
| for k, v := range c.EnvVars { |
| config.Env = append(config.Env, fmt.Sprintf("%s=%v", k, v)) |
| } |
| // TODO(nmulcahey): sysctl isn't configurable via the config file. |
| // Don't forget to move this to Salt. |
| sysctlMap := make(map[string]string) |
| sysctlMap["net.ipv6.conf.all.disable_ipv6"] = "0" |
| hostConfig := container.HostConfig{ |
| Resources: container.Resources{ |
| Memory: int64(c.memory) * 1024 * 1024 * 1024, |
| CpusetCpus: c.cpuset, |
| }, |
| NetworkMode: container.NetworkMode(cfg.NetworkMode), |
| Sysctls: sysctlMap, |
| } |
| for _, d := range c.Devices { |
| hostConfig.Resources.Devices = append(hostConfig.Resources.Devices, container.DeviceMapping{ |
| PathOnHost: d.PathOnHost, |
| PathInContainer: d.PathInContainer, |
| CgroupPermissions: d.Permissions, |
| }) |
| } |
| for _, m := range c.Mounts { |
| hostConfig.Mounts = append(hostConfig.Mounts, mount.Mount{ |
| Type: mount.TypeBind, |
| Source: m.Source, |
| Target: m.Target, |
| ReadOnly: m.ReadOnly, |
| }) |
| } |
| hostConfig.CapAdd = append(hostConfig.CapAdd, c.CapAdd...) |
| networkingConfig := network.NetworkingConfig{} |
| res, err := p.client.ContainerCreate(ctx, &config, &hostConfig, &networkingConfig, c.Name) |
| if err != nil { |
| return "", fmt.Errorf("failed to create new container: %v", err) |
| } |
| return res.ID, nil |
| } |
| |
| // Image represents a Docker image. |
| type Image struct { |
| client docker.ImageAPIClient |
| } |
| |
| // NewImage creates a new Image instance. |
| func NewImage(client docker.ImageAPIClient) *Image { |
| return &Image{ |
| client: client, |
| } |
| } |
| |
| func (r *Image) Exists(ctx context.Context, project, imageName string) error { |
| reference := fmt.Sprintf("%s/%s/%s", registryDomain, project, imageName) |
| // Check whether we already have the image. |
| filters := filters.NewArgs(filters.KeyValuePair{ |
| Key: "reference", |
| Value: reference, |
| }) |
| summary, err := r.client.ImageList(ctx, types.ImageListOptions{Filters: filters}) |
| if err != nil { |
| return fmt.Errorf("cannot list images: %v", err) |
| } |
| if len(summary) != 0 { |
| return nil |
| } |
| return nil |
| } |
| |
| // Pull checks whether the image is present on the host and if not fetches |
| // it from the remote container registry using credentials for authentication. |
| func (r *Image) Pull(ctx context.Context, project, imageName, credentials string) error { |
| reference := fmt.Sprintf("%s/%s/%s", registryDomain, project, imageName) |
| |
| buf, err := json.Marshal(types.AuthConfig{ |
| Username: "_json_key", |
| Password: credentials, |
| ServerAddress: fmt.Sprintf("https://%s", registryDomain), |
| }) |
| if err != nil { |
| return fmt.Errorf("failed to marshall auth: %v", err) |
| } |
| |
| // Fetch the image from the remote container registry. |
| log.Printf("pulling the image %s\n", reference) |
| res, err := r.client.ImagePull(ctx, reference, types.ImagePullOptions{ |
| RegistryAuth: base64.URLEncoding.EncodeToString(buf), |
| All: true, |
| }) |
| if err != nil { |
| return fmt.Errorf("image pull failed: %v", err) |
| } |
| defer res.Close() |
| |
| type JSONMessage struct { |
| Status string `json:"status,omitempty"` |
| Progress string `json:"serror,omitempty"` |
| Error string `json:"error,omitempty"` |
| } |
| |
| // Report progress as the image is being downloaded. |
| dec := json.NewDecoder(res) |
| for dec.More() { |
| var m JSONMessage |
| err := dec.Decode(&m) |
| if err != nil { |
| return err |
| } |
| log.Printf("%s %s\n", m.Status, m.Progress) |
| if m.Error != "" { |
| log.Printf("%s\n", m.Error) |
| } |
| } |
| return nil |
| } |
| |
| // getHostDomain return the name and domain components of FQDN. |
| func getHostDomain() (string, string, error) { |
| hostname, err := os.Hostname() |
| if err != nil { |
| return "", "", err |
| } |
| strs := strings.SplitN(hostname, ".", 2) |
| if len(strs) == 2 { |
| return strs[0], strs[1], nil |
| } |
| return strs[0], "", nil |
| } |
| |
| // loadConfig reads the service configuration from a file. |
| func loadConfig(ctx context.Context, path string) (*Config, error) { |
| file, err := os.Open(path) |
| if err != nil { |
| return nil, err |
| } |
| |
| var config Config |
| if err := json.NewDecoder(file).Decode(&config); err != nil { |
| return nil, err |
| } |
| |
| return &config, err |
| } |
| |
| // sdNotify sends a message to the init systemd daemon using the domain |
| // socket referenced in the $NOTIFY_SOCKET environment variable. |
| func sdNotify(unsetEnvironment bool, state string) (sent bool, err error) { |
| addr := &net.UnixAddr{ |
| Name: os.Getenv("NOTIFY_SOCKET"), |
| Net: "unixgram", |
| } |
| |
| // NOTIFY_SOCKET not set |
| if addr.Name == "" { |
| return false, nil |
| } |
| |
| if unsetEnvironment { |
| err = os.Unsetenv("NOTIFY_SOCKET") |
| } |
| if err != nil { |
| return false, err |
| } |
| |
| conn, err := net.DialUnix(addr.Net, nil, addr) |
| if err != nil { |
| return false, fmt.Errorf("error connecting to NOTIFY_SOCKET: %v", err) |
| } |
| defer conn.Close() |
| |
| _, err = conn.Write([]byte(state)) |
| if err != nil { |
| return false, fmt.Errorf("error sending the message: %v", err) |
| } |
| return true, nil |
| } |
| |
| func main() { |
| flag.Parse() |
| |
| client, err := docker.NewEnvClient() |
| if err != nil { |
| log.Fatalln("cannot create new client", err) |
| } |
| defer client.Close() |
| |
| ctx := context.Background() |
| |
| config, err := loadConfig(ctx, config) |
| if err != nil { |
| log.Fatalln("cannot read configuration", err) |
| } |
| |
| // Let systemd know that we're ready. |
| if _, err := sdNotify(false, "READY=1"); err != nil { |
| log.Printf("failed to notify systemd: %v", err) |
| } |
| |
| // Prune stale images. |
| if _, err := client.ImagesPrune(ctx, filters.Args{}); err != nil { |
| log.Fatalln("failed to prune images") |
| } |
| |
| // Read the credentials from the specified file. |
| bytes, err := ioutil.ReadFile(config.Credentials) |
| if err != nil { |
| log.Fatalln("failed to read credentials", err) |
| } |
| |
| // Try to pull the image if not already present. |
| registry := NewImage(client) |
| if err := registry.Pull(ctx, config.Project, config.ImageName, string(bytes)); err != nil { |
| log.Fatalln("failed to pull image", err) |
| } |
| |
| hostname, domainname, err := getHostDomain() |
| if err != nil { |
| log.Fatalln("failed to get hostname", err) |
| } |
| |
| // Prune stale containers. |
| if _, err := client.ContainersPrune(ctx, filters.Args{}); err != nil { |
| log.Fatalln("failed to prune containers") |
| } |
| |
| // Create the specified number containers. |
| containers := make([]*Container, len(config.Containers)) |
| for i, c := range config.Containers { |
| cpuset := "" |
| if config.Cpus != 0 { |
| cpuset = fmt.Sprintf("%d-%d", i*config.Cpus, (i+1)*config.Cpus-1) |
| } |
| containers[i] = &Container{ |
| Name: c.Name, |
| Devices: c.Devices, |
| Mounts: c.Mounts, |
| EnvVars: c.EnvVars, |
| CapAdd: c.CapAdd, |
| hostname: fmt.Sprintf("%s--%s", hostname, c.Name), |
| domainname: domainname, |
| cpuset: cpuset, |
| memory: config.Memory, |
| } |
| } |
| |
| // Create a permissions writer. |
| pWriter := &dockerPermissionWriter{ |
| dockerPermissionsFolder: permissionsFolder, |
| } |
| // Create and start the pool. |
| pool := NewPool(client, pWriter) |
| for err := range pool.Create(ctx, config, containers) { |
| log.Fatalln("failed to create pool", err) |
| } |
| |
| // SIGTERM terminates the process with graceful shutdown draining the pool. |
| signals := make(chan os.Signal, 1) |
| signal.Notify(signals, syscall.SIGTERM) |
| |
| // SIGINT and SIGHUP terminate the process forcibly stopping containers. |
| interrupts := make(chan os.Signal, 1) |
| signal.Notify(interrupts, syscall.SIGINT, syscall.SIGHUP) |
| |
| ctx, cancel := context.WithCancel(ctx) |
| errs := pool.Serve(ctx) |
| |
| loop: |
| for { |
| select { |
| case <-signals: |
| for err := range pool.Drain(ctx) { |
| log.Printf("failed to drain pool: %s\n", err) |
| } |
| break loop |
| case <-interrupts: |
| break loop |
| case err := <-errs: |
| log.Printf("error in the pool: %v", err) |
| } |
| } |
| |
| cancel() |
| |
| // Let systemd know that we began to shutdown. |
| if _, err := sdNotify(false, "STOPPING=1"); err != nil { |
| log.Printf("failed to notify systemd: %v", err) |
| } |
| |
| for err := range pool.Remove(ctx) { |
| log.Printf("failed to remove container: %v", err) |
| } |
| } |