blob: 149182e92891b1b36c1c2609d12d86e0845729b6 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// This is a daemon for managing a pool of Docker containers on swarming bots.
// The daemon is event-driven, processing the stream of events from the Docker
// daemon.
//
// The daemon creates and starts a set number of containers from a specified
// image which is pulled from the Google container registry. These containers
// are then monitored and automatically restarted on exit (i.e. when
// swarming_bot itself has shutdown the container by invoking /sbin/shutdown).
//
// When the daemon is sent the SIGTERM signal, it'll start draining the pool;
// it'll send a SIGTERM signal to every container which causes the swarming_bot
// to exit at the next opportunity (i.e. when not running task). When all the
// containers have succefully exited, the daemon itself will terminate.
//
// When the daemon is sent the SIGINT signal, it'll immediately terminate all
// containers without any grace period.
package main
import (
"context"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
docker "github.com/docker/docker/client"
"go.fuchsia.dev/tools/retry"
"golang.org/x/sync/errgroup"
)
const (
registryDomain = "gcr.io"
permissionsFolder = "/sys/fs/cgroup/devices/docker"
)
var (
config string // config file path
timeout time.Duration // default timeout for all operations
)
func init() {
flag.StringVar(&config, "config", "/etc/swarm_docker/config.json", "config file path")
flag.DurationVar(&timeout, "timeout", 1*time.Minute, "default timeout")
}
// Config contains the service configuration.
type Config struct {
Memory int `json:"memory"`
Cpus int `json:"cpus"`
NetworkMode string `json:"network_mode"`
ImageName string `json:"image_name"`
Project string `json:"project"`
Credentials string `json:"credentials"`
Containers []Container `json:"containers"`
Cmd []string `json:"cmd,omitempty"`
}
// Container describes the container instance.
type Container struct {
Name string `json:"name"`
Devices []struct {
PathOnHost string `json:"path_on_host"`
PathInContainer string `json:"path_in_container"`
Permissions string `json:"permissions"`
} `json:"devices,omitempty"`
Mounts []struct {
Source string `json:"source"`
Target string `json:"target"`
ReadOnly bool `json:"readonly,omitempty"`
} `json:"mounts,omitempty"`
EnvVars map[string]interface{} `json:"env_vars, omitempty"`
CapAdd []string `json:"capadd,omitempty"`
hostname string
domainname string
cpuset string
memory int
}
// stringsFlag implements flag.Value interface for array of strings.
type stringsFlag []string
func (s *stringsFlag) String() string {
return strings.Join(*s, ", ")
}
func (s *stringsFlag) Set(value string) error {
*s = append(*s, value)
return nil
}
type permissionWriter interface {
writePermissions(context.Context, string) error
}
type dockerPermissionWriter struct {
dockerPermissionsFolder string
}
// WritePermissions gives the container with the given ID rwm permissions to
// all serial/fastboot/adb devices.
func (d *dockerPermissionWriter) writePermissions(ctx context.Context, id string) error {
path := fmt.Sprintf("%s/%s/devices.allow", d.dockerPermissionsFolder, id)
log.Printf("Attempting to write serial permissions to %s", path)
if err := ioutil.WriteFile(path, []byte("c 180:* rwm"), 0666); err != nil {
return err
}
if err := ioutil.WriteFile(path, []byte("c 188:* rwm"), 0666); err != nil {
return err
}
if err := ioutil.WriteFile(path, []byte("c 189:* rwm"), 0666); err != nil {
return err
}
return nil
}
// Pool is used to manage a static pool of container instances.
type Pool struct {
client docker.CommonAPIClient
containers map[string]Container
draining int32 // accessed atomically
started time.Time
lock sync.RWMutex
ctx context.Context
cancel context.CancelFunc
pWriter permissionWriter
}
// NewPool creates a new Pool instance.
func NewPool(client docker.CommonAPIClient, pWriter permissionWriter) *Pool {
return &Pool{
client: client,
containers: map[string]Container{},
pWriter: pWriter,
}
}
// Create instantiates all the containers but doesn't start them.
func (p *Pool) Create(ctx context.Context, cfg *Config, containers []*Container) <-chan error {
errc := make(chan error)
var wg sync.WaitGroup
p.lock.RLock()
for _, c := range containers {
wg.Add(1)
go func(c *Container) {
defer wg.Done()
log.Printf("creating container %s\n", c.Name)
if id, err := p.create(ctx, cfg, c); err != nil {
errc <- fmt.Errorf("failed to create container %s: %v", c.Name, err)
} else {
log.Printf("container %s created\n", id)
p.lock.Lock()
p.containers[id] = *c
p.lock.Unlock()
}
}(c)
}
p.lock.RUnlock()
go func() {
wg.Wait()
close(errc)
}()
return errc
}
// Serve runs the main loop responsible for managing containers, restarting
// them as need and handling signals appropriately.
func (p *Pool) Serve(ctx context.Context) <-chan error {
errc := make(chan error)
p.started = time.Now()
var eg errgroup.Group
p.lock.RLock()
for id := range p.containers {
id := id
eg.Go(func() error {
log.Printf("starting container %s\n", id)
if err := retry.Retry(ctx, retry.WithMaxRetries(retry.NewConstantBackoff(10*time.Second), 12), func() error { return p.client.ContainerStart(ctx, id, types.ContainerStartOptions{}) }, nil); err != nil {
log.Printf("failed to start container %s: %v\n", id, err)
p.lock.Lock()
delete(p.containers, id)
p.lock.Unlock()
} else {
log.Printf("container %s started\n", id)
if err := p.pWriter.writePermissions(ctx, id); err != nil {
return fmt.Errorf("failed to write permissions for container %s: %v", id, err)
}
}
return nil
})
}
p.lock.RUnlock()
if err := eg.Wait(); err != nil {
errc <- err
}
args := []filters.KeyValuePair{{Key: "type", Value: "container"}}
for id := range p.containers {
args = append(args, filters.KeyValuePair{Key: "container", Value: id})
}
filters := filters.NewArgs(args...)
go func() {
msgs, errs := p.client.Events(ctx, types.EventsOptions{
Filters: filters,
Since: p.started.Format(time.RFC3339),
})
for {
select {
case msg := <-msgs:
switch action := msg.Action; action {
case "die":
name := msg.Actor.Attributes["name"]
if atomic.LoadInt32(&p.draining) != 0 {
continue
}
go func() {
log.Printf("restart %s\n", name)
if err := retry.Retry(ctx, retry.WithMaxRetries(retry.NewConstantBackoff(10*time.Second), 12), func() error { return p.client.ContainerStart(ctx, msg.Actor.ID, types.ContainerStartOptions{}) }, nil); err != nil {
errc <- err
} else if err := p.pWriter.writePermissions(ctx, msg.Actor.ID); err != nil {
errc <- fmt.Errorf("failed to write permissions for container %s: %v", msg.Actor.ID, err)
}
}()
}
case <-ctx.Done():
if err := ctx.Err(); err != context.Canceled {
errc <- err
}
return
case err := <-errs:
if err != io.EOF {
errc <- err
}
return
}
}
}()
return errc
}
// Drain sends a SIGTERM signal to all containers and waits for their exit.
func (p *Pool) Drain(ctx context.Context) <-chan error {
atomic.AddInt32(&p.draining, 1)
var wg sync.WaitGroup
errc := make(chan error)
for id := range p.containers {
wg.Add(1)
go func(id string) {
defer wg.Done()
log.Printf("send termination to %s\n", id)
if err := p.client.ContainerKill(ctx, id, "TERM"); err != nil {
errc <- err
}
msgs, errs := p.client.ContainerWait(ctx, id, container.WaitConditionNotRunning)
select {
case body := <-msgs:
log.Printf("container %s exited (status %d)\n", id, body.StatusCode)
case err := <-errs:
errc <- err
}
}(id)
}
go func() {
wg.Wait()
atomic.AddInt32(&p.draining, -1)
close(errc)
}()
return errc
}
// Remove forcibly stops and removes all containers.
func (p *Pool) Remove(ctx context.Context) <-chan error {
ctx, cancel := context.WithTimeout(ctx, timeout)
atomic.AddInt32(&p.draining, 1)
var wg sync.WaitGroup
errc := make(chan error)
for id := range p.containers {
wg.Add(1)
go func(id string) {
defer wg.Done()
log.Printf("stop and remove %s\n", id)
if err := p.client.ContainerStop(ctx, id, nil); err != nil {
errc <- err
}
if err := p.client.ContainerRemove(ctx, id, types.ContainerRemoveOptions{Force: true}); err != nil {
errc <- err
}
}(id)
}
go func() {
wg.Wait()
atomic.AddInt32(&p.draining, -1)
cancel()
close(errc)
}()
return errc
}
// create creates a new swarming container with the appropraite configuration.
func (p *Pool) create(ctx context.Context, cfg *Config, c *Container) (string, error) {
config := container.Config{
Hostname: c.hostname,
Domainname: c.domainname, // should be same as host
Image: fmt.Sprintf("%s/%s/%s", registryDomain, cfg.Project, cfg.ImageName),
Cmd: cfg.Cmd,
}
for k, v := range c.EnvVars {
config.Env = append(config.Env, fmt.Sprintf("%s=%v", k, v))
}
// TODO(nmulcahey): sysctl isn't configurable via the config file.
// Don't forget to move this to Salt.
sysctlMap := make(map[string]string)
sysctlMap["net.ipv6.conf.all.disable_ipv6"] = "0"
hostConfig := container.HostConfig{
Resources: container.Resources{
Memory: int64(c.memory) * 1024 * 1024 * 1024,
CpusetCpus: c.cpuset,
},
NetworkMode: container.NetworkMode(cfg.NetworkMode),
Sysctls: sysctlMap,
}
for _, d := range c.Devices {
hostConfig.Resources.Devices = append(hostConfig.Resources.Devices, container.DeviceMapping{
PathOnHost: d.PathOnHost,
PathInContainer: d.PathInContainer,
CgroupPermissions: d.Permissions,
})
}
for _, m := range c.Mounts {
hostConfig.Mounts = append(hostConfig.Mounts, mount.Mount{
Type: mount.TypeBind,
Source: m.Source,
Target: m.Target,
ReadOnly: m.ReadOnly,
})
}
hostConfig.CapAdd = append(hostConfig.CapAdd, c.CapAdd...)
networkingConfig := network.NetworkingConfig{}
res, err := p.client.ContainerCreate(ctx, &config, &hostConfig, &networkingConfig, c.Name)
if err != nil {
return "", fmt.Errorf("failed to create new container: %v", err)
}
return res.ID, nil
}
// Image represents a Docker image.
type Image struct {
client docker.ImageAPIClient
}
// NewImage creates a new Image instance.
func NewImage(client docker.ImageAPIClient) *Image {
return &Image{
client: client,
}
}
func (r *Image) Exists(ctx context.Context, project, imageName string) error {
reference := fmt.Sprintf("%s/%s/%s", registryDomain, project, imageName)
// Check whether we already have the image.
filters := filters.NewArgs(filters.KeyValuePair{
Key: "reference",
Value: reference,
})
summary, err := r.client.ImageList(ctx, types.ImageListOptions{Filters: filters})
if err != nil {
return fmt.Errorf("cannot list images: %v", err)
}
if len(summary) != 0 {
return nil
}
return nil
}
// Pull checks whether the image is present on the host and if not fetches
// it from the remote container registry using credentials for authentication.
func (r *Image) Pull(ctx context.Context, project, imageName, credentials string) error {
reference := fmt.Sprintf("%s/%s/%s", registryDomain, project, imageName)
buf, err := json.Marshal(types.AuthConfig{
Username: "_json_key",
Password: credentials,
ServerAddress: fmt.Sprintf("https://%s", registryDomain),
})
if err != nil {
return fmt.Errorf("failed to marshall auth: %v", err)
}
// Fetch the image from the remote container registry.
log.Printf("pulling the image %s\n", reference)
res, err := r.client.ImagePull(ctx, reference, types.ImagePullOptions{
RegistryAuth: base64.URLEncoding.EncodeToString(buf),
All: true,
})
if err != nil {
return fmt.Errorf("image pull failed: %v", err)
}
defer res.Close()
type JSONMessage struct {
Status string `json:"status,omitempty"`
Progress string `json:"serror,omitempty"`
Error string `json:"error,omitempty"`
}
// Report progress as the image is being downloaded.
dec := json.NewDecoder(res)
for dec.More() {
var m JSONMessage
err := dec.Decode(&m)
if err != nil {
return err
}
log.Printf("%s %s\n", m.Status, m.Progress)
if m.Error != "" {
log.Printf("%s\n", m.Error)
}
}
return nil
}
// getHostDomain return the name and domain components of FQDN.
func getHostDomain() (string, string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", "", err
}
strs := strings.SplitN(hostname, ".", 2)
if len(strs) == 2 {
return strs[0], strs[1], nil
}
return strs[0], "", nil
}
// loadConfig reads the service configuration from a file.
func loadConfig(ctx context.Context, path string) (*Config, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
var config Config
if err := json.NewDecoder(file).Decode(&config); err != nil {
return nil, err
}
return &config, err
}
// sdNotify sends a message to the init systemd daemon using the domain
// socket referenced in the $NOTIFY_SOCKET environment variable.
func sdNotify(unsetEnvironment bool, state string) (sent bool, err error) {
addr := &net.UnixAddr{
Name: os.Getenv("NOTIFY_SOCKET"),
Net: "unixgram",
}
// NOTIFY_SOCKET not set
if addr.Name == "" {
return false, nil
}
if unsetEnvironment {
err = os.Unsetenv("NOTIFY_SOCKET")
}
if err != nil {
return false, err
}
conn, err := net.DialUnix(addr.Net, nil, addr)
if err != nil {
return false, fmt.Errorf("error connecting to NOTIFY_SOCKET: %v", err)
}
defer conn.Close()
_, err = conn.Write([]byte(state))
if err != nil {
return false, fmt.Errorf("error sending the message: %v", err)
}
return true, nil
}
func main() {
flag.Parse()
client, err := docker.NewEnvClient()
if err != nil {
log.Fatalln("cannot create new client", err)
}
defer client.Close()
ctx := context.Background()
config, err := loadConfig(ctx, config)
if err != nil {
log.Fatalln("cannot read configuration", err)
}
// Let systemd know that we're ready.
if _, err := sdNotify(false, "READY=1"); err != nil {
log.Printf("failed to notify systemd: %v", err)
}
// Prune stale images.
if _, err := client.ImagesPrune(ctx, filters.Args{}); err != nil {
log.Fatalln("failed to prune images")
}
// Read the credentials from the specified file.
bytes, err := ioutil.ReadFile(config.Credentials)
if err != nil {
log.Fatalln("failed to read credentials", err)
}
// Try to pull the image if not already present.
registry := NewImage(client)
if err := registry.Pull(ctx, config.Project, config.ImageName, string(bytes)); err != nil {
log.Fatalln("failed to pull image", err)
}
hostname, domainname, err := getHostDomain()
if err != nil {
log.Fatalln("failed to get hostname", err)
}
// Prune stale containers.
if _, err := client.ContainersPrune(ctx, filters.Args{}); err != nil {
log.Fatalln("failed to prune containers")
}
// Create the specified number containers.
containers := make([]*Container, len(config.Containers))
for i, c := range config.Containers {
cpuset := ""
if config.Cpus != 0 {
cpuset = fmt.Sprintf("%d-%d", i*config.Cpus, (i+1)*config.Cpus-1)
}
containers[i] = &Container{
Name: c.Name,
Devices: c.Devices,
Mounts: c.Mounts,
EnvVars: c.EnvVars,
CapAdd: c.CapAdd,
hostname: fmt.Sprintf("%s--%s", hostname, c.Name),
domainname: domainname,
cpuset: cpuset,
memory: config.Memory,
}
}
// Create a permissions writer.
pWriter := &dockerPermissionWriter{
dockerPermissionsFolder: permissionsFolder,
}
// Create and start the pool.
pool := NewPool(client, pWriter)
for err := range pool.Create(ctx, config, containers) {
log.Fatalln("failed to create pool", err)
}
// SIGTERM terminates the process with graceful shutdown draining the pool.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM)
// SIGINT and SIGHUP terminate the process forcibly stopping containers.
interrupts := make(chan os.Signal, 1)
signal.Notify(interrupts, syscall.SIGINT, syscall.SIGHUP)
ctx, cancel := context.WithCancel(ctx)
errs := pool.Serve(ctx)
loop:
for {
select {
case <-signals:
for err := range pool.Drain(ctx) {
log.Printf("failed to drain pool: %s\n", err)
}
break loop
case <-interrupts:
break loop
case err := <-errs:
log.Printf("error in the pool: %v", err)
}
}
cancel()
// Let systemd know that we began to shutdown.
if _, err := sdNotify(false, "STOPPING=1"); err != nil {
log.Printf("failed to notify systemd: %v", err)
}
for err := range pool.Remove(ctx) {
log.Printf("failed to remove container: %v", err)
}
}