| package content |
| |
| import ( |
| "context" |
| "io" |
| |
| contentapi "github.com/containerd/containerd/api/services/content/v1" |
| "github.com/containerd/containerd/content" |
| "github.com/containerd/containerd/errdefs" |
| protobuftypes "github.com/gogo/protobuf/types" |
| digest "github.com/opencontainers/go-digest" |
| ) |
| |
| type remoteStore struct { |
| client contentapi.ContentClient |
| } |
| |
| func NewStoreFromClient(client contentapi.ContentClient) content.Store { |
| return &remoteStore{ |
| client: client, |
| } |
| } |
| |
| func (rs *remoteStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { |
| resp, err := rs.client.Info(ctx, &contentapi.InfoRequest{ |
| Digest: dgst, |
| }) |
| if err != nil { |
| return content.Info{}, errdefs.FromGRPC(err) |
| } |
| |
| return infoFromGRPC(resp.Info), nil |
| } |
| |
| func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { |
| session, err := rs.client.List(ctx, &contentapi.ListContentRequest{ |
| Filters: filters, |
| }) |
| if err != nil { |
| return errdefs.FromGRPC(err) |
| } |
| |
| for { |
| msg, err := session.Recv() |
| if err != nil { |
| if err != io.EOF { |
| return errdefs.FromGRPC(err) |
| } |
| |
| break |
| } |
| |
| for _, info := range msg.Info { |
| if err := fn(infoFromGRPC(info)); err != nil { |
| return err |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| func (rs *remoteStore) Delete(ctx context.Context, dgst digest.Digest) error { |
| if _, err := rs.client.Delete(ctx, &contentapi.DeleteContentRequest{ |
| Digest: dgst, |
| }); err != nil { |
| return errdefs.FromGRPC(err) |
| } |
| |
| return nil |
| } |
| |
| func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { |
| i, err := rs.Info(ctx, dgst) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &remoteReaderAt{ |
| ctx: ctx, |
| digest: dgst, |
| size: i.Size, |
| client: rs.client, |
| }, nil |
| } |
| |
| func (rs *remoteStore) Status(ctx context.Context, ref string) (content.Status, error) { |
| resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{ |
| Ref: ref, |
| }) |
| if err != nil { |
| return content.Status{}, errdefs.FromGRPC(err) |
| } |
| |
| status := resp.Status |
| return content.Status{ |
| Ref: status.Ref, |
| StartedAt: status.StartedAt, |
| UpdatedAt: status.UpdatedAt, |
| Offset: status.Offset, |
| Total: status.Total, |
| Expected: status.Expected, |
| }, nil |
| } |
| |
| func (rs *remoteStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { |
| resp, err := rs.client.Update(ctx, &contentapi.UpdateRequest{ |
| Info: infoToGRPC(info), |
| UpdateMask: &protobuftypes.FieldMask{ |
| Paths: fieldpaths, |
| }, |
| }) |
| if err != nil { |
| return content.Info{}, errdefs.FromGRPC(err) |
| } |
| return infoFromGRPC(resp.Info), nil |
| } |
| |
| func (rs *remoteStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { |
| resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{ |
| Filters: filters, |
| }) |
| if err != nil { |
| return nil, errdefs.FromGRPC(err) |
| } |
| |
| var statuses []content.Status |
| for _, status := range resp.Statuses { |
| statuses = append(statuses, content.Status{ |
| Ref: status.Ref, |
| StartedAt: status.StartedAt, |
| UpdatedAt: status.UpdatedAt, |
| Offset: status.Offset, |
| Total: status.Total, |
| Expected: status.Expected, |
| }) |
| } |
| |
| return statuses, nil |
| } |
| |
| func (rs *remoteStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { |
| wrclient, offset, err := rs.negotiate(ctx, ref, size, expected) |
| if err != nil { |
| return nil, errdefs.FromGRPC(err) |
| } |
| |
| return &remoteWriter{ |
| ref: ref, |
| client: wrclient, |
| offset: offset, |
| }, nil |
| } |
| |
| // Abort implements asynchronous abort. It starts a new write session on the ref l |
| func (rs *remoteStore) Abort(ctx context.Context, ref string) error { |
| if _, err := rs.client.Abort(ctx, &contentapi.AbortRequest{ |
| Ref: ref, |
| }); err != nil { |
| return errdefs.FromGRPC(err) |
| } |
| |
| return nil |
| } |
| |
| func (rs *remoteStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) { |
| wrclient, err := rs.client.Write(ctx) |
| if err != nil { |
| return nil, 0, err |
| } |
| |
| if err := wrclient.Send(&contentapi.WriteContentRequest{ |
| Action: contentapi.WriteActionStat, |
| Ref: ref, |
| Total: size, |
| Expected: expected, |
| }); err != nil { |
| return nil, 0, err |
| } |
| |
| resp, err := wrclient.Recv() |
| if err != nil { |
| return nil, 0, err |
| } |
| |
| return wrclient, resp.Offset, nil |
| } |
| |
| func infoToGRPC(info content.Info) contentapi.Info { |
| return contentapi.Info{ |
| Digest: info.Digest, |
| Size_: info.Size, |
| CreatedAt: info.CreatedAt, |
| UpdatedAt: info.UpdatedAt, |
| Labels: info.Labels, |
| } |
| } |
| |
| func infoFromGRPC(info contentapi.Info) content.Info { |
| return content.Info{ |
| Digest: info.Digest, |
| Size: info.Size_, |
| CreatedAt: info.CreatedAt, |
| UpdatedAt: info.UpdatedAt, |
| Labels: info.Labels, |
| } |
| } |