blob: 01250181dc9e61af78df24cf18f07d885634d1c0 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"slices"
"strings"
"time"
"github.com/maruel/subcommands"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/cipd/client/cipd"
"go.chromium.org/luci/cipd/common"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"golang.org/x/exp/maps"
"go.fuchsia.dev/infra/flagutil"
)
const cipdHost = "https://chrome-infra-packages.appspot.com"
// Error strings emitted by CIPD if a ref or tag does not exist on a package.
const (
noSuchRefMessage = "no such ref"
noSuchTagMessage = "no such tag"
)
func cmdResolve(authOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "resolve -ref <ref> -tag <tag> [-flexible-pkg <package1>] [-strict-pkg <packages2>]",
ShortDesc: "Resolve common tags among many CIPD packages.",
LongDesc: "Resolve common tags among many CIPD packages.",
CommandRun: func() subcommands.CommandRun {
var c resolveCmd
c.Init(authOpts)
return &c
},
}
}
type resolveCmd struct {
commonFlags
ref string
tagName string
flexiblePackages flagutil.RepeatedStringValue
strictPackages flagutil.RepeatedStringValue
jsonOutputPath string
verbose bool
}
func (c *resolveCmd) Init(defaultAuthOpts auth.Options) {
c.commonFlags.Init(defaultAuthOpts)
c.Flags.StringVar(&c.ref, "ref", "latest", "Target ref to resolve.")
c.Flags.StringVar(&c.tagName, "tag", "", "Only tags with this name will be considered.")
c.Flags.Var(&c.strictPackages, "strict-pkg", "Strict packages which must all be pinned to the ref.")
c.Flags.Var(&c.flexiblePackages, "flexible-pkg", "Flexible packages which need not be pinned to the ref.")
c.Flags.StringVar(&c.jsonOutputPath, "json-output", "", "Path to dump output to (defaults to stdout).")
c.Flags.BoolVar(&c.verbose, "verbose", false, "Enable verbose output.")
}
func (c *resolveCmd) parseArgs() error {
if err := c.commonFlags.Parse(); err != nil {
return err
}
if len(c.flexiblePackages)+len(c.strictPackages) == 0 {
return errors.New("at least one of -flexible-pkg or -strict-pkg is required")
}
if c.ref == "" {
return errors.New("-ref is required")
}
if c.tagName == "" {
return errors.New("-tag is required")
}
return nil
}
func (c *resolveCmd) Run(a subcommands.Application, _ []string, _ subcommands.Env) int {
ctx := context.Background()
ctx = gologger.StdConfig.Use(ctx)
if err := c.parseArgs(); err != nil {
logging.Errorf(ctx, "%s: %s\n", a.GetName(), err)
return 1
}
level := logging.Error
if c.verbose {
level = logging.Info
}
ctx = logging.SetLevel(ctx, level)
if err := c.main(ctx); err != nil {
logging.Errorf(ctx, "%s: %s\n", a.GetName(), err)
return 1
}
return 0
}
func (c *resolveCmd) main(ctx context.Context) error {
authClient, err := auth.NewAuthenticator(ctx, auth.InteractiveLogin, c.parsedAuthOpts).Client()
if err != nil {
if err == auth.ErrLoginRequired {
fmt.Fprintf(os.Stderr, "You need to login first by running:\n")
fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " "))
}
return err
}
cipdOpts := cipd.ClientOptions{
ServiceURL: cipdHost,
AuthenticatedClient: authClient,
}
client, err := cipd.NewClient(cipdOpts)
if err != nil {
return fmt.Errorf("failed to create CIPD client: %w", err)
}
resolver := cipdResolver{client: client, ref: c.ref, tagName: c.tagName}
tags, err := resolver.resolve(ctx, c.strictPackages, c.flexiblePackages)
if err != nil {
return err
}
outWriter := os.Stdout
if c.jsonOutputPath != "" {
outWriter, err = os.Create(c.jsonOutputPath)
if err != nil {
return fmt.Errorf("failed to open -output-json file: %w", err)
}
defer outWriter.Close()
}
enc := json.NewEncoder(outWriter)
enc.SetIndent("", " ")
if tags == nil {
// Emit an empty JSON array instead of null.
tags = []string{}
}
return enc.Encode(tags)
}
type cipdClient interface {
ResolveVersion(context.Context, string, string) (common.Pin, error)
DescribeInstance(context.Context, common.Pin, *cipd.DescribeInstanceOpts) (*cipd.InstanceDescription, error)
}
type cacheKey struct {
pkg string
version string
}
type cipdResolver struct {
client cipdClient
ref string
tagName string
cache map[cacheKey]*cipd.InstanceDescription
}
// resolve finds common tags to which a set of CIPD packages can be pinned to
// ensure mutual interoperability.
//
// Given a set of CIPD package names, it resolves a set of tags T such that:
// 1. For every package in `strictPackages`, there exists an instance I, with
// all the tags in T, which *at some point* in had `ref` attached to it (we
// can't guarantee that it *currently* has `ref` attached because the ref may
// be moved to a different instance at any moment).
// 2. For each package P in `flexiblePackages`, there exists an instance I that has
// all the tags in T, although `ref` may not currently point to I or exist on
// any instance of P. If `strictPackages` is empty, then at least one package
// in `flexiblePackages` (the "anchor" package) will have an instance with
// all the tags in T and having had `ref` attached at some point.
// 3. T uniquely identifies a set of package instances; i.e. each tag in T
// points to the same instance for each package. This is important so that a
// roller calling this tool can check to see if the currently pinned version
// of a set of packages is up-to-date just by checking to see if it's present
// in the set of resolved tags.
// 4. T is the newest of all such sets of tags (as determined by the timestamps
// at which the tags are registered), which ensures that a roller using this
// tool will always roll to the latest possible version of a set of packages,
// and won't stall on an older version.
//
// resolve returns a slice of tags corresponding to T, sorted in approximately
// chronological order (oldest first). A client may safely pin all the packages
// to any of the returned tags, but it's strongly recommended to use only the
// oldest tag to avoid no-op rolls as new tags are attached to existing
// instances.
//
// Note that the resolution logic assumes that tags are immutable, which is true
// by convention but not guaranteed by CIPD's backend - a tag can be detached
// from one instance and reattached to another instance, or applied to two
// instances concurrently (in which case CIPD will not be able to resolve the
// tag).
func (cr *cipdResolver) resolve(ctx context.Context, strictPackages, flexiblePackages []string) ([]string, error) {
for _, sp := range strictPackages {
if slices.Contains(flexiblePackages, sp) {
return nil, fmt.Errorf("package %q cannot be both strict and flexible", sp)
}
}
if len(strictPackages) > 0 {
strictCommonTags := newTagSet(nil)
for _, pkg := range strictPackages {
tags, exists, err := cr.listTags(ctx, pkg, cr.ref)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("strict package %q does not have the %q ref", pkg, cr.ref)
}
if len(strictCommonTags) == 0 {
// Initialize. This is necessary because unioning a set with an
// empty set always produces an empty set.
strictCommonTags = tags
} else {
strictCommonTags = strictCommonTags.intersection(tags)
}
if len(strictCommonTags) == 0 {
return nil, fmt.Errorf("strict packages have no common tags")
}
}
if len(flexiblePackages) == 0 {
return strictCommonTags.keys(), nil
}
commonTags, err := cr.filterCommonTags(ctx, flexiblePackages, strictCommonTags)
if err != nil {
return nil, err
}
if len(commonTags) == 0 {
return nil, fmt.Errorf(
"failed to find common tags; none of the strict packages "+
"with the %q ref is currently pinned to a version that is "+
"available for all packages", cr.ref)
}
return commonTags.keys(), nil
}
// Keep track of whether any package has the ref attached so we can report
// an error if none of them has it attached.
onePackageHasRef := false
// The tags that we've checked as potential candidates.
triedTags := newTagSet(nil)
// If there are no strict packages then we need to choose one of the
// flexible packages as the "anchor" to resolve the set of tags that are
// currently associated with the ref.
for i, anchorPkg := range flexiblePackages {
baseCommonTags, exists, err := cr.listTags(ctx, anchorPkg, cr.ref)
if err != nil {
return nil, err
}
if !exists {
continue
}
onePackageHasRef = true
otherPackages := append([]string{}, flexiblePackages[:i]...)
otherPackages = append(otherPackages, flexiblePackages[i+1:]...)
// Filter out tags that we've tried already.
baseCommonTags = baseCommonTags.difference(triedTags)
triedTags = triedTags.union(baseCommonTags)
commonTags, err := cr.filterCommonTags(ctx, otherPackages, baseCommonTags)
if err != nil {
return nil, err
}
if len(commonTags) > 0 {
return commonTags.keys(), nil
}
}
if !onePackageHasRef {
return nil, fmt.Errorf("none of the packages has the %q ref", cr.ref)
}
return nil, fmt.Errorf(
"none of the versions with the %q ref is currently available for all packages", cr.ref)
}
// filterCommonTags returns a subset S of candidateTags for which there exists
// an instance I of each package where I is tagged with every tag in S.
func (cr *cipdResolver) filterCommonTags(ctx context.Context, pkgs []string, candidateTags tagSet) (tagSet, error) {
allTags := maps.Values(candidateTags)
slices.SortFunc(allTags, func(a, b cipd.TagInfo) int {
// Sort tags in reverse chronological order (newest first) to try to
// select the most up-to-date set of instances possible. This is only a
// best effort because tags may be registered out of order, so there's
// no guarantee that the timestamp ordering corresponds to the semantic
// version ordering.
if c := unixTimeCmp(b.RegisteredTs, a.RegisteredTs); c != 0 {
return c
}
// Fall back to sorting by tag name if the timestamps are equal to
// ensure determinism.
return strings.Compare(a.Tag, b.Tag)
})
for _, tagInfo := range allTags {
commonTags := candidateTags.copy()
for _, pkg := range pkgs {
tags, _, err := cr.listTags(ctx, pkg, tagInfo.Tag)
if err != nil {
return nil, err
}
commonTags = commonTags.intersection(tags)
if len(commonTags) == 0 {
logging.Infof(ctx, "Rejected candidate tag %q; missing for package %q", tagInfo.Tag, pkg)
break
}
}
if len(commonTags) > 0 {
return commonTags, nil
}
}
return nil, nil
}
// listTags returns a set of tags associated with the specified version (ref or
// tag) of a package. The boolean return value indicates whether an instance
// with that version exists, and it's up to the caller to decide whether that
// should be considered an error.
func (cr *cipdResolver) listTags(ctx context.Context, pkg, version string) (res tagSet, exists bool, err error) {
if cr.cache == nil {
cr.cache = make(map[cacheKey]*cipd.InstanceDescription)
}
inst, ok := cr.cache[cacheKey{pkg: pkg, version: version}]
if !ok {
pin, err := cr.client.ResolveVersion(ctx, pkg, version)
if err != nil {
if strings.Contains(err.Error(), noSuchRefMessage) || strings.Contains(err.Error(), noSuchTagMessage) {
return nil, false, nil
}
return nil, false, fmt.Errorf("could not resolve %s@%s: %w", pkg, version, err)
}
opts := &cipd.DescribeInstanceOpts{DescribeTags: true, DescribeRefs: true}
inst, err = cr.client.DescribeInstance(ctx, pin, opts)
if err != nil {
return nil, true, err
}
// Populate the cache with all of this instance's refs and tags so that
// we'll get cache hits even for different tags that point to the same
// instance.
for _, ref := range inst.Refs {
cr.cache[cacheKey{pkg: pkg, version: ref.Ref}] = inst
}
for _, tag := range inst.Tags {
cr.cache[cacheKey{pkg: pkg, version: tag.Tag}] = inst
}
}
// Ignore tags that have a name other than `tagName`.
var filtered []cipd.TagInfo
for _, t := range inst.Tags {
if strings.Split(t.Tag, ":")[0] == cr.tagName {
filtered = append(filtered, t)
}
}
return newTagSet(filtered), true, nil
}
// tagSet is a helper type for tracking and merging sets of CIPD tags.
type tagSet map[string]cipd.TagInfo
func newTagSet(tags []cipd.TagInfo) tagSet {
ss := tagSet{}
for _, t := range tags {
ss.add(t)
}
return ss
}
// union returns a new tagSet that contains every tag in `ts` or in
// `other`.
func (ts tagSet) union(other tagSet) tagSet {
res := ts.copy()
for _, v := range other {
res.add(v)
}
return res
}
// intersection returns a new tagSet that contains only the tags shared by both
// `ts` and `other`.
func (ts tagSet) intersection(other tagSet) tagSet {
res := newTagSet(nil)
for k, v := range ts {
if other.contains(k) {
res.add(v)
}
}
return res
}
// difference returns a new tagSet that contains the tags that are present in
// `ts` but not in `other`.
func (ts tagSet) difference(other tagSet) tagSet {
res := newTagSet(nil)
for k, v := range ts {
if !other.contains(k) {
res.add(v)
}
}
return res
}
func (ts tagSet) copy() tagSet {
res := newTagSet(nil)
for k, v := range ts {
res[k] = v
}
return res
}
func (ts tagSet) add(t cipd.TagInfo) {
ts[t.Tag] = t
}
func (ts tagSet) contains(t string) bool {
_, ok := ts[t]
return ok
}
// keys returns the names of all tags in the set, sorted by age (oldest first).
func (ts tagSet) keys() []string {
tags := maps.Values(ts)
slices.SortFunc(tags, func(a, b cipd.TagInfo) int {
if c := unixTimeCmp(a.RegisteredTs, b.RegisteredTs); c != 0 {
return c
}
// Fall back to sorting by tag name if the timestamps are equal to
// ensure determinism.
return strings.Compare(a.Tag, b.Tag)
})
var res []string
for _, t := range tags {
res = append(res, t.Tag)
}
return res
}
func unixTimeCmp(a, b cipd.UnixTime) int {
at, bt := time.Time(a), time.Time(b)
return at.Compare(bt)
}