blob: 8606eab39282931ee09dfbf7e851fe9006b208d4 [file] [log] [blame]
package agent
import (
"errors"
"sync"
"time"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/log"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
var (
dispatcherRPCTimeout = 5 * time.Second
errSessionDisconnect = errors.New("agent: session disconnect") // instructed to disconnect
errSessionClosed = errors.New("agent: session closed")
)
// session encapsulates one round of registration with the manager. session
// starts the registration and heartbeat control cycle. Any failure will result
// in a complete shutdown of the session and it must be reestablished.
//
// All communication with the master is done through session. Changes that
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
conn *connectionbroker.Conn
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage
subscriptions chan *api.SubscriptionMessage
cancel func() // this is assumed to be never nil, and set whenever a session is created
registered chan struct{} // closed registration
closed chan struct{}
closeOnce sync.Once
}
func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
sessionCtx, sessionCancel := context.WithCancel(ctx)
s := &session{
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
subscriptions: make(chan *api.SubscriptionMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
cancel: sessionCancel,
}
// TODO(stevvooe): Need to move connection management up a level or create
// independent connection for log broker client.
cc, err := agent.config.ConnBroker.Select(
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
if err != nil {
// since we are returning without launching the session goroutine, we
// need to provide the delay that is guaranteed by calling this
// function. We launch a goroutine so that we only delay the retry and
// avoid blocking the main loop.
go func() {
time.Sleep(delay)
s.errs <- err
}()
return s
}
log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer())
s.conn = cc
go s.run(sessionCtx, delay, description)
return s
}
func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
timer := time.NewTimer(delay) // delay before registering.
log.G(ctx).Infof("waiting %v before registering session", delay)
defer timer.Stop()
select {
case <-timer.C:
case <-ctx.Done():
return
}
if err := s.start(ctx, description); err != nil {
select {
case s.errs <- err:
case <-s.closed:
case <-ctx.Done():
}
return
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
go runctx(ctx, s.closed, s.errs, s.logSubscriptions)
close(s.registered)
}
// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context, description *api.NodeDescription) error {
log.G(ctx).Debugf("(*session).start")
errChan := make(chan error, 1)
var (
msg *api.SessionMessage
stream api.Dispatcher_SessionClient
err error
)
// Note: we don't defer cancellation of this context, because the
// streaming RPC is used after this function returned. We only cancel
// it in the timeout case to make sure the goroutine completes.
// We also fork this context again from the `run` context, because on
// `dispatcherRPCTimeout`, we want to cancel establishing a session and
// return an error. If we cancel the `run` context instead of forking,
// then in `run` it's possible that we just terminate the function because
// `ctx` is done and hence fail to propagate the timeout error to the agent.
// If the error is not propogated to the agent, the agent will not close
// the session or rebuild a new sesssion.
sessionCtx, cancelSession := context.WithCancel(ctx)
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
client := api.NewDispatcherClient(s.conn.ClientConn)
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
SessionID: s.sessionID,
})
if err != nil {
errChan <- err
return
}
msg, err = stream.Recv()
errChan <- err
}()
select {
case err := <-errChan:
if err != nil {
return err
}
case <-time.After(dispatcherRPCTimeout):
cancelSession()
return errors.New("session initiation timed out")
}
s.sessionID = msg.SessionID
s.session = stream
return s.handleSessionMessage(ctx, msg)
}
func (s *session) heartbeat(ctx context.Context) error {
log.G(ctx).Debugf("(*session).heartbeat")
client := api.NewDispatcherClient(s.conn.ClientConn)
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()
fields := logrus.Fields{
"sessionID": s.sessionID,
"method": "(*session).heartbeat",
}
for {
select {
case <-heartbeat.C:
heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
// TODO(anshul) log manager info in all logs in this function.
log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout)
resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
SessionID: s.sessionID,
})
cancel()
if err != nil {
log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer())
if grpc.Code(err) == codes.NotFound {
err = errNodeNotRegistered
}
return err
}
log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period)
heartbeat.Reset(resp.Period)
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *session) listen(ctx context.Context) error {
defer s.session.CloseSend()
log.G(ctx).Debugf("(*session).listen")
for {
msg, err := s.session.Recv()
if err != nil {
return err
}
if err := s.handleSessionMessage(ctx, msg); err != nil {
return err
}
}
}
func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMessage) error {
select {
case s.messages <- msg:
return nil
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")
client := api.NewLogBrokerClient(s.conn.ClientConn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
}
defer subscriptions.CloseSend()
for {
resp, err := subscriptions.Recv()
if grpc.Code(err) == codes.Unimplemented {
log.Warning("manager does not support log subscriptions")
// Don't return, because returning would bounce the session
select {
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
if err != nil {
return err
}
select {
case s.subscriptions <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *session) watch(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
log.Debugf("")
var (
resp *api.AssignmentsMessage
assignmentWatch api.Dispatcher_AssignmentsClient
tasksWatch api.Dispatcher_TasksClient
streamReference string
tasksFallback bool
err error
)
client := api.NewDispatcherClient(s.conn.ClientConn)
for {
// If this is the first time we're running the loop, or there was a reference mismatch
// attempt to get the assignmentWatch
if assignmentWatch == nil && !tasksFallback {
assignmentWatch, err = client.Assignments(ctx, &api.AssignmentsRequest{SessionID: s.sessionID})
if err != nil {
return err
}
}
// We have an assignmentWatch, let's try to receive an AssignmentMessage
if assignmentWatch != nil {
// If we get a code = 12 desc = unknown method Assignments, try to use tasks
resp, err = assignmentWatch.Recv()
if err != nil {
if grpc.Code(err) != codes.Unimplemented {
return err
}
tasksFallback = true
assignmentWatch = nil
log.WithError(err).Infof("falling back to Tasks")
}
}
// This code is here for backwards compatibility (so that newer clients can use the
// older method Tasks)
if tasksWatch == nil && tasksFallback {
tasksWatch, err = client.Tasks(ctx, &api.TasksRequest{SessionID: s.sessionID})
if err != nil {
return err
}
}
if tasksWatch != nil {
// When falling back to Tasks because of an old managers, we wrap the tasks in assignments.
var taskResp *api.TasksMessage
var assignmentChanges []*api.AssignmentChange
taskResp, err = tasksWatch.Recv()
if err != nil {
return err
}
for _, t := range taskResp.Tasks {
taskChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Task{
Task: t,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
assignmentChanges = append(assignmentChanges, taskChange)
}
resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, Changes: assignmentChanges}
}
// If there seems to be a gap in the stream, let's break out of the inner for and
// re-sync (by calling Assignments again).
if streamReference != "" && streamReference != resp.AppliesTo {
assignmentWatch = nil
} else {
streamReference = resp.ResultsIn
}
select {
case s.assignments <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
}
// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
client := api.NewDispatcherClient(s.conn.ClientConn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
{
TaskID: taskID,
Status: status,
},
},
}); err != nil {
// TODO(stevvooe): Dispatcher should not return this error. Status
// reports for unknown tasks should be ignored.
if grpc.Code(err) == codes.NotFound {
return errTaskUnknown
}
return err
}
return nil
}
func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTaskStatusRequest_TaskStatusUpdate) ([]*api.UpdateTaskStatusRequest_TaskStatusUpdate, error) {
if len(updates) < 1 {
return nil, nil
}
const batchSize = 1024
select {
case <-s.registered:
select {
case <-s.closed:
return updates, ErrClosed
default:
}
case <-s.closed:
return updates, ErrClosed
case <-ctx.Done():
return updates, ctx.Err()
}
client := api.NewDispatcherClient(s.conn.ClientConn)
n := batchSize
if len(updates) < n {
n = len(updates)
}
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: updates[:n],
}); err != nil {
log.G(ctx).WithError(err).Errorf("failed sending task status batch size of %d", len(updates[:n]))
return updates, err
}
return updates[n:], nil
}
// sendError is used to send errors to errs channel and trigger session recreation
func (s *session) sendError(err error) {
select {
case s.errs <- err:
case <-s.closed:
}
}
// close the given session. It should be called only in <-session.errs branch
// of event loop, or when cleaning up the agent.
func (s *session) close() error {
s.closeOnce.Do(func() {
s.cancel()
if s.conn != nil {
s.conn.Close(false)
}
close(s.closed)
})
return nil
}