| // Package connectionbroker is a layer on top of remotes that returns |
| // a gRPC connection to a manager. The connection may be a local connection |
| // using a local socket such as a UNIX socket. |
| package connectionbroker |
| |
| import ( |
| "sync" |
| |
| "github.com/docker/swarmkit/api" |
| "github.com/docker/swarmkit/remotes" |
| grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" |
| "google.golang.org/grpc" |
| ) |
| |
| // Broker is a simple connection broker. It can either return a fresh |
| // connection to a remote manager selected with weighted randomization, or a |
| // local gRPC connection to the local manager. |
| type Broker struct { |
| mu sync.Mutex |
| remotes remotes.Remotes |
| localConn *grpc.ClientConn |
| } |
| |
| // New creates a new connection broker. |
| func New(remotes remotes.Remotes) *Broker { |
| return &Broker{ |
| remotes: remotes, |
| } |
| } |
| |
| // SetLocalConn changes the local gRPC connection used by the connection broker. |
| func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) { |
| b.mu.Lock() |
| defer b.mu.Unlock() |
| |
| b.localConn = localConn |
| } |
| |
| // Select a manager from the set of available managers, and return a connection. |
| func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) { |
| b.mu.Lock() |
| localConn := b.localConn |
| b.mu.Unlock() |
| |
| if localConn != nil { |
| return &Conn{ |
| ClientConn: localConn, |
| isLocal: true, |
| }, nil |
| } |
| |
| return b.SelectRemote(dialOpts...) |
| } |
| |
| // SelectRemote chooses a manager from the remotes, and returns a TCP |
| // connection. |
| func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { |
| peer, err := b.remotes.Select() |
| if err != nil { |
| return nil, err |
| } |
| |
| dialOpts = append(dialOpts, |
| grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), |
| grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor)) |
| |
| cc, err := grpc.Dial(peer.Addr, dialOpts...) |
| if err != nil { |
| b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight) |
| return nil, err |
| } |
| |
| return &Conn{ |
| ClientConn: cc, |
| remotes: b.remotes, |
| peer: peer, |
| }, nil |
| } |
| |
| // Remotes returns the remotes interface used by the broker, so the caller |
| // can make observations or see weights directly. |
| func (b *Broker) Remotes() remotes.Remotes { |
| return b.remotes |
| } |
| |
| // Conn is a wrapper around a gRPC client connection. |
| type Conn struct { |
| *grpc.ClientConn |
| isLocal bool |
| remotes remotes.Remotes |
| peer api.Peer |
| } |
| |
| // Close closes the client connection if it is a remote connection. It also |
| // records a positive experience with the remote peer if success is true, |
| // otherwise it records a negative experience. If a local connection is in use, |
| // Close is a noop. |
| func (c *Conn) Close(success bool) error { |
| if c.isLocal { |
| return nil |
| } |
| |
| if success { |
| c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight) |
| } else { |
| c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight) |
| } |
| |
| return c.ClientConn.Close() |
| } |