blob: a0ba7cf0a8b34705cd38661bc2fb76e1a83384b6 [file] [log] [blame]
// Package connectionbroker is a layer on top of remotes that returns
// a gRPC connection to a manager. The connection may be a local connection
// using a local socket such as a UNIX socket.
package connectionbroker
import (
"sync"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/remotes"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
)
// Broker is a simple connection broker. It can either return a fresh
// connection to a remote manager selected with weighted randomization, or a
// local gRPC connection to the local manager.
type Broker struct {
mu sync.Mutex
remotes remotes.Remotes
localConn *grpc.ClientConn
}
// New creates a new connection broker.
func New(remotes remotes.Remotes) *Broker {
return &Broker{
remotes: remotes,
}
}
// SetLocalConn changes the local gRPC connection used by the connection broker.
func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) {
b.mu.Lock()
defer b.mu.Unlock()
b.localConn = localConn
}
// Select a manager from the set of available managers, and return a connection.
func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
b.mu.Lock()
localConn := b.localConn
b.mu.Unlock()
if localConn != nil {
return &Conn{
ClientConn: localConn,
isLocal: true,
}, nil
}
return b.SelectRemote(dialOpts...)
}
// SelectRemote chooses a manager from the remotes, and returns a TCP
// connection.
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
peer, err := b.remotes.Select()
if err != nil {
return nil, err
}
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor))
cc, err := grpc.Dial(peer.Addr, dialOpts...)
if err != nil {
b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight)
return nil, err
}
return &Conn{
ClientConn: cc,
remotes: b.remotes,
peer: peer,
}, nil
}
// Remotes returns the remotes interface used by the broker, so the caller
// can make observations or see weights directly.
func (b *Broker) Remotes() remotes.Remotes {
return b.remotes
}
// Conn is a wrapper around a gRPC client connection.
type Conn struct {
*grpc.ClientConn
isLocal bool
remotes remotes.Remotes
peer api.Peer
}
// Close closes the client connection if it is a remote connection. It also
// records a positive experience with the remote peer if success is true,
// otherwise it records a negative experience. If a local connection is in use,
// Close is a noop.
func (c *Conn) Close(success bool) error {
if c.isLocal {
return nil
}
if success {
c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
} else {
c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
}
return c.ClientConn.Close()
}