| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // Package index implements a basic index of packages and their relative |
| // installation states, as well as thier various top level metadata properties. |
| package index |
| |
| import ( |
| "io/ioutil" |
| "log" |
| "os" |
| "path/filepath" |
| "sort" |
| "sync" |
| "syscall/zx" |
| |
| "fuchsia.googlesource.com/pm/pkg" |
| "fuchsia.googlesource.com/pmd/amberer" |
| ) |
| |
| // DynamicIndex provides concurrency safe access to a dynamic index of packages and package metadata |
| type DynamicIndex struct { |
| root string |
| |
| static *StaticIndex |
| |
| // client to connect to amber |
| amberClient amberer.AmberClient |
| |
| // mu protects all following fields |
| mu sync.Mutex |
| |
| // installing is a map of merkleroot -> package name/version |
| installing map[string]pkg.Package |
| |
| // needs is a map of blob merkleroot -> set[package merkleroot] for packages that need blobs |
| needs map[string]map[string]struct{} |
| |
| // waiting is a map of package merkleroot -> set[blob merkleroots] |
| waiting map[string]map[string]struct{} |
| } |
| |
| // NewDynamic initializes an DynamicIndex with the given root path. |
| func NewDynamic(root string, static *StaticIndex, am amberer.AmberClient) *DynamicIndex { |
| // TODO(PKG-14): error is deliberately ignored. This should not be fatal to boot. |
| _ = os.MkdirAll(root, os.ModePerm) |
| return &DynamicIndex{ |
| root: root, |
| static: static, |
| installing: make(map[string]pkg.Package), |
| needs: make(map[string]map[string]struct{}), |
| waiting: make(map[string]map[string]struct{}), |
| amberClient: am, |
| } |
| } |
| |
| // List returns a list of all known packages in byte-lexical order. |
| func (idx *DynamicIndex) List() ([]pkg.Package, error) { |
| paths, err := filepath.Glob(idx.PackageVersionPath("*", "*")) |
| if err != nil { |
| return nil, err |
| } |
| sort.Strings(paths) |
| pkgs := make([]pkg.Package, len(paths)) |
| for i, path := range paths { |
| pkgs[i].Version = filepath.Base(path) |
| pkgs[i].Name = filepath.Base(filepath.Dir(path)) |
| } |
| return pkgs, nil |
| } |
| |
| // Get looks up a package in the dynamic index, returning it if found. |
| func (idx *DynamicIndex) Get(p pkg.Package) (string, bool) { |
| bmerkle, err := ioutil.ReadFile(idx.PackageVersionPath(p.Name, p.Version)) |
| return string(bmerkle), err == nil |
| } |
| |
| // Add adds a package to the index |
| func (idx *DynamicIndex) Add(p pkg.Package, root string) error { |
| if _, found := idx.static.GetRoot(root); found { |
| return os.ErrExist |
| } |
| |
| if _, found := idx.static.Get(p); found { |
| // TODO(PKG-19): this needs to be removed as the static package set should not |
| // be updated dynamically in future. |
| err := idx.static.Set(p, root) |
| |
| idx.Notify(root) |
| return err |
| } |
| |
| if err := os.MkdirAll(idx.PackagePath(p.Name), os.ModePerm); err != nil { |
| return err |
| } |
| |
| path := idx.PackageVersionPath(p.Name, p.Version) |
| if bmerkle, err := ioutil.ReadFile(path); err == nil && string(bmerkle) == root { |
| return os.ErrExist |
| } |
| |
| if err := ioutil.WriteFile(path, []byte(root), os.ModePerm); err != nil { |
| return err |
| } |
| |
| idx.Notify(root) |
| return nil |
| } |
| |
| func (idx *DynamicIndex) PackagePath(name string) string { |
| return filepath.Join(idx.PackagesDir(), name) |
| } |
| |
| func (idx *DynamicIndex) PackageVersionPath(name, version string) string { |
| return filepath.Join(idx.PackagesDir(), name, version) |
| } |
| |
| func (idx *DynamicIndex) PackagesDir() string { |
| dir := filepath.Join(idx.root, "packages") |
| // TODO(PKG-14): refactor out the initialization logic so that we can do this |
| // once, at an appropriate point in the runtime. |
| _ = os.MkdirAll(dir, os.ModePerm) |
| return dir |
| } |
| |
| // Installing marks the given package as being in the process of installing. The |
| // package identity is not yet known, and can be updated later using |
| // UpdateInstalling. |
| func (idx *DynamicIndex) Installing(root string) { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| idx.installing[root] = pkg.Package{} |
| } |
| |
| // UpdateInstalling updates the installing index for the given package with an |
| // identity once known (that is, once the package meta.far has been able to be |
| // opened, so the packages identity is known). |
| func (idx *DynamicIndex) UpdateInstalling(root string, p pkg.Package) { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| idx.installing[root] = p |
| } |
| |
| // InstallingFailedForBlob notifies amber that blob install failed for a package. |
| // It does not actually stop the package's installation for the other blobs; the choice |
| // to retry the package installation is left up to amber. |
| func (idx *DynamicIndex) InstallingFailedForBlob(blobRoot string, err error) { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| pkgRoots := []string{} |
| for p := range idx.needs[blobRoot] { |
| pkgRoots = append(pkgRoots, p) |
| } |
| var status zx.Status |
| if e, ok := err.(zx.Error); ok { |
| status = e.Status |
| } else { |
| // "err" should be a zx.Error, but fall back to a generic error code |
| // in case it's not. |
| status = zx.ErrInternal |
| } |
| idx.amberClient.PackagesFailed(pkgRoots, status, blobRoot) |
| } |
| |
| // InstallingFailedForPackage removes an entry from the package installation index, |
| // this is called when the package meta.far blob is not readable, or the package is |
| // not valid. |
| func (idx *DynamicIndex) InstallingFailedForPackage(pkgRoot string) { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| p := idx.installing[pkgRoot] |
| log.Printf("package failed %s/%s (%s)", p.Name, p.Version, pkgRoot) |
| delete(idx.installing, pkgRoot) |
| } |
| |
| // AddNeeds updates the index about the blobs required in order to activate an |
| // installing package. It is possible for the addition of needs to race |
| // fulfillment that is happening in other concurrent processes. When that |
| // occurs, this method will return os.ErrExist. |
| func (idx *DynamicIndex) AddNeeds(root string, needs map[string]struct{}) error { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| if _, found := idx.installing[root]; !found { |
| return os.ErrExist |
| } |
| |
| for blob := range needs { |
| if _, found := idx.needs[blob]; found { |
| idx.needs[blob][root] = struct{}{} |
| } else { |
| idx.needs[blob] = map[string]struct{}{root: struct{}{}} |
| } |
| } |
| // We wait on all of the "needs", that is, all blobs that were not found on the |
| // system at the time of import. |
| idx.waiting[root] = needs |
| |
| // create a copy of the needs so we're concurrency safe |
| cn := make([]string, 0, len(needs)) |
| for root := range needs { |
| cn = append(cn, root) |
| } |
| |
| log.Printf("asking amber to fetch %d needed blobs", len(cn)) |
| go func() { |
| for _, root := range cn { |
| idx.amberClient.GetBlob(root) |
| } |
| }() |
| return nil |
| } |
| |
| // Fulfill processes the signal that a blob need has been fulfilled. meta.far's |
| // are also published through this path, but a meta.far fulfillment does not |
| // mean that the package is activated, only that its blob has been written. When |
| // a packages 'waiting' set has been emptied, fulfill will call Add, which is |
| // the point of activation. |
| func (idx *DynamicIndex) Fulfill(need string) { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| packageRoots := idx.needs[need] |
| delete(idx.needs, need) |
| |
| for pkgRoot := range packageRoots { |
| waiting := idx.waiting[pkgRoot] |
| delete(waiting, need) |
| if len(waiting) == 0 { |
| delete(idx.waiting, pkgRoot) |
| p := idx.installing[pkgRoot] |
| if err := idx.Add(p, pkgRoot); err != nil { |
| if os.IsExist(err) { |
| log.Printf("package already exists at fulfillment: %s", err) |
| } else { |
| log.Printf("unexpected error adding package after fulfillment: %s", err) |
| } |
| } else { |
| log.Printf("package activated %s/%s (%s)", p.Name, p.Version, pkgRoot) |
| } |
| delete(idx.installing, pkgRoot) |
| } |
| } |
| } |
| |
| func (idx *DynamicIndex) HasNeed(root string) bool { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| _, found := idx.needs[root] |
| return found |
| } |
| |
| func (idx *DynamicIndex) NeedsList() []string { |
| idx.mu.Lock() |
| defer idx.mu.Unlock() |
| |
| names := make([]string, 0, len(idx.needs)) |
| for name := range idx.needs { |
| names = append(names, name) |
| } |
| |
| return names |
| } |
| |
| func (idx *DynamicIndex) Notify(roots ...string) { |
| if len(roots) == 0 { |
| return |
| } |
| |
| idx.amberClient.PackagesActivated(roots) |
| } |
| |
| // PackageBlobs returns the list of blobs which are meta FARs backing packages |
| // in the dynamic and static indices. |
| func (idx *DynamicIndex) PackageBlobs() []string { |
| packageBlobs := idx.static.PackageBlobs() |
| paths, err := filepath.Glob(idx.PackageVersionPath("*", "*")) |
| if err != nil { |
| log.Printf("glob all extant dynamic packages: %s", err) |
| return packageBlobs |
| } |
| |
| for _, path := range paths { |
| merkle, err := ioutil.ReadFile(path) |
| if err != nil { |
| log.Printf("read dynamic package index %s: %s", path, err) |
| continue |
| } |
| packageBlobs = append(packageBlobs, string(merkle)) |
| } |
| return packageBlobs |
| } |
| |
| // AllPackageBlobs aggregates all installing, dynamic and static index package |
| // meta.far blobs into a single list. Any errors encountered along the way are |
| // logged, but otherwise the best available list is generated under a single |
| // lock, to provide a relatively consistent view of objects that must be |
| // maintained. This function is intended for use by the GC and the versions |
| // directory. The list will not contain duplicates. |
| func (idx *DynamicIndex) AllPackageBlobs() []string { |
| allPackageBlobs := make(map[string]struct{}) |
| idx.mu.Lock() |
| for blob := range idx.installing { |
| allPackageBlobs[blob] = struct{}{} |
| } |
| idx.mu.Unlock() |
| |
| for _, blob := range idx.PackageBlobs() { |
| allPackageBlobs[blob] = struct{}{} |
| } |
| |
| blobList := make([]string, 0, len(allPackageBlobs)) |
| for blob := range allPackageBlobs { |
| blobList = append(blobList, blob) |
| } |
| |
| return blobList |
| } |