| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package daemon |
| |
| import ( |
| "fmt" |
| "io/ioutil" |
| "log" |
| "net/url" |
| "os" |
| "path/filepath" |
| "sync" |
| "syscall/zx" |
| |
| "fidl/fuchsia/amber" |
| |
| "amber/atonce" |
| "amber/source" |
| ) |
| |
| const ( |
| DefaultPkgInstallDir = "/pkgfs/install/pkg" |
| DefaultBlobInstallDir = "/pkgfs/install/blob" |
| PackageGarbageDir = "/pkgfs/garbage" |
| ) |
| |
| type Daemon struct { |
| store string |
| pkgInstallDir string |
| blobInstallDir string |
| |
| muSrcs sync.Mutex |
| srcs map[string]*source.Source |
| |
| events *amber.EventsService |
| aw activationWatcher |
| } |
| |
| // NewDaemon creates a Daemon |
| func NewDaemon(store, pkgInstallDir, blobInstallDir string, events *amber.EventsService) (*Daemon, error) { |
| if pkgInstallDir == "" { |
| pkgInstallDir = DefaultPkgInstallDir |
| } |
| if blobInstallDir == "" { |
| blobInstallDir = DefaultBlobInstallDir |
| } |
| |
| d := &Daemon{ |
| store: store, |
| pkgInstallDir: pkgInstallDir, |
| blobInstallDir: blobInstallDir, |
| srcs: make(map[string]*source.Source), |
| events: events, |
| } |
| |
| // Ignore if the directory doesn't exist |
| srcs, err := loadSourcesFromPath(store) |
| if err != nil && !os.IsNotExist(err) { |
| return nil, err |
| } |
| |
| for _, src := range srcs { |
| if src.Enabled() { |
| if err = d.addSource(src); err != nil { |
| return nil, err |
| } |
| } |
| } |
| |
| return d, nil |
| } |
| |
| // loadSourcesFromPath loads sources from a directory, where each source gets |
| // it's own directory. The actual directory structure is source dependent. |
| func loadSourcesFromPath(dir string) ([]*source.Source, error) { |
| files, err := ioutil.ReadDir(dir) |
| if err != nil { |
| return nil, err |
| } |
| |
| srcs := make([]*source.Source, 0, len(files)) |
| for _, f := range files { |
| p := filepath.Join(dir, f.Name()) |
| src, err := source.LoadSourceFromPath(p) |
| if err != nil { |
| return nil, err |
| } |
| srcs = append(srcs, src) |
| } |
| |
| return srcs, nil |
| } |
| |
| func (d *Daemon) addToActiveSrcs(s *source.Source) { |
| d.muSrcs.Lock() |
| defer d.muSrcs.Unlock() |
| |
| id := s.GetId() |
| |
| if oldSource, ok := d.srcs[id]; ok { |
| log.Printf("overwriting active source: %s", id) |
| oldSource.Close() |
| } |
| |
| s.Start() |
| d.srcs[id] = s |
| |
| log.Printf("added source: %s", id) |
| } |
| |
| // AddSource is called to add a Source that can be used to get packages. |
| func (d *Daemon) AddSource(cfg *amber.SourceConfig) error { |
| // Make sure the id is safe to be written to disk. |
| store := filepath.Join(d.store, url.PathEscape(cfg.Id)) |
| |
| // if enabled/disabled is not set, default to disabled |
| if cfg.StatusConfig == nil { |
| cfg.StatusConfig = &amber.StatusConfig{Enabled: false} |
| } |
| src, err := source.NewSource(store, cfg) |
| if err != nil { |
| log.Printf("failed to create TUF source: %v: %s", cfg.Id, err) |
| return err |
| } |
| |
| // Save the config. |
| if err := src.Save(); err != nil { |
| log.Printf("failed to save TUF config %v: %s", cfg.Id, err) |
| return err |
| } |
| |
| if !src.Enabled() { |
| d.muSrcs.Lock() |
| if oldSource, ok := d.srcs[cfg.Id]; ok { |
| oldSource.Close() |
| delete(d.srcs, cfg.Id) |
| } |
| d.muSrcs.Unlock() |
| return nil |
| } |
| |
| return d.addSource(src) |
| } |
| |
| func (d *Daemon) DisableSource(srcID string) error { |
| d.muSrcs.Lock() |
| defer d.muSrcs.Unlock() |
| |
| // Nothing to do if the source is already disabled. |
| if _, ok := d.srcs[srcID]; !ok { |
| return nil |
| } |
| |
| // Persist the disabled bit. |
| if _, err := d.setSrcEnablementLocked(srcID, false); err != nil { |
| return err |
| } |
| |
| // Remove the source from the running service. |
| d.srcs[srcID].Close() |
| delete(d.srcs, srcID) |
| |
| return nil |
| } |
| |
| func (d *Daemon) EnableSource(srcID string) error { |
| d.muSrcs.Lock() |
| defer d.muSrcs.Unlock() |
| |
| // Nothing to do if the source is already enabled. |
| if _, ok := d.srcs[srcID]; ok { |
| return nil |
| } |
| |
| // Persist the enabled bit. |
| src, err := d.setSrcEnablementLocked(srcID, true) |
| if err != nil { |
| return err |
| } |
| |
| // Add the source to the running service. |
| src.Start() |
| d.srcs[srcID] = src |
| |
| return nil |
| } |
| |
| func (d *Daemon) setSrcEnablementLocked(srcID string, enabled bool) (*source.Source, error) { |
| src, err := d.getSourceLocked(srcID) |
| if err != nil { |
| return nil, err |
| } |
| |
| src.SetEnabled(enabled) |
| if err := src.Save(); err != nil { |
| return nil, err |
| } |
| return src, nil |
| } |
| |
| func (d *Daemon) addSource(src *source.Source) error { |
| cfg := src.GetConfig() |
| if cfg == nil { |
| return fmt.Errorf("source does not have a config") |
| } |
| |
| // If we made it to this point, we're ready to actually add the source. |
| d.addToActiveSrcs(src) |
| |
| log.Printf("added TUF source %s %v\n", cfg.Id, cfg.RepoUrl) |
| |
| return nil |
| } |
| |
| func (d *Daemon) RemoveSource(id string) (amber.Status, error) { |
| // If this method succeeds, the source should be removed from the |
| // running service and not be loaded after a service restart. Delete |
| // the config file before removing the source from the service to |
| // ensure this behavior. |
| s, err := d.removeSource(id) |
| if err != nil { |
| return amber.StatusErr, nil |
| } else if s == nil { |
| return amber.StatusErrNotFound, nil |
| } |
| |
| s.Close() |
| |
| err = s.Delete() |
| if err != nil { |
| log.Printf("unable to remove Source from disk: %v\n", err) |
| } |
| log.Printf("removed source: %s", id) |
| |
| return amber.StatusOk, nil |
| } |
| |
| func (d *Daemon) removeSource(id string) (*source.Source, error) { |
| d.muSrcs.Lock() |
| defer d.muSrcs.Unlock() |
| |
| s, err := d.getSourceLocked(id) |
| if err != nil { |
| log.Printf("source %q not found: %s", id, err) |
| return nil, nil |
| } |
| |
| err = s.DeleteConfig() |
| if err != nil { |
| log.Printf("unable to remove source config from disk: %v", err) |
| return nil, err |
| } |
| |
| delete(d.srcs, id) |
| |
| return s, nil |
| } |
| |
| // getSourceLocked gets a source, either an enabled or disabled one. It is the |
| // responsibility of the caller to hold muSrcs when calling. |
| func (d *Daemon) getSourceLocked(srcID string) (*source.Source, error) { |
| src, ok := d.srcs[srcID] |
| if ok { |
| return src, nil |
| } |
| |
| allSrcs, err := loadSourcesFromPath(d.store) |
| if err != nil { |
| return nil, err |
| } |
| |
| for _, src := range allSrcs { |
| if src.GetId() == srcID { |
| return src, nil |
| } |
| } |
| |
| return nil, os.ErrNotExist |
| } |
| |
| func (d *Daemon) Login(srcId string) (*amber.DeviceCode, error) { |
| log.Printf("logging into %s", srcId) |
| d.muSrcs.Lock() |
| src, err := d.getSourceLocked(srcId) |
| d.muSrcs.Unlock() |
| |
| if err != nil { |
| log.Printf("error getting source by ID: %s", err) |
| return nil, fmt.Errorf("unknown source: %s", err) |
| } |
| |
| return src.Login() |
| } |
| |
| func (d *Daemon) GetActiveSources() map[string]*source.Source { |
| d.muSrcs.Lock() |
| defer d.muSrcs.Unlock() |
| |
| srcs := make(map[string]*source.Source) |
| for key, value := range d.srcs { |
| srcs[key] = value |
| } |
| return srcs |
| } |
| |
| func (d *Daemon) GetSources() map[string]*source.Source { |
| srcs := d.GetActiveSources() |
| d.muSrcs.Lock() |
| defer d.muSrcs.Unlock() |
| |
| // load any sources from disk that we may know about, but are not currently |
| // using |
| allSrcs, err := loadSourcesFromPath(d.store) |
| if err != nil { |
| log.Printf("couldn't load sources from disk: %s", err) |
| } else { |
| // don't override any in-memory entries |
| for _, src := range allSrcs { |
| if _, ok := srcs[src.GetId()]; !ok { |
| srcs[src.GetId()] = src |
| } |
| } |
| } |
| |
| return srcs |
| } |
| |
| func (d *Daemon) Update() { |
| atonce.Do("daemon.Update", "", func() error { |
| for id, src := range d.GetActiveSources() { |
| if err := src.Update(); err != nil { |
| log.Printf("daemon: error updating source %s: %s", id, err) |
| } |
| } |
| return nil |
| }) |
| } |
| |
| func (d *Daemon) UpdateIfStale() { |
| atonce.Do("daemon.UpdateIfStale", "", func() error { |
| for id, src := range d.GetActiveSources() { |
| if err := src.UpdateIfStale(); err != nil { |
| log.Printf("daemon: error updating source %s: %s", id, err) |
| } |
| } |
| return nil |
| }) |
| } |
| |
| func (d *Daemon) MerkleFor(name, version, merkle string) (string, int64, error) { |
| // Temporary-ish solution to avoid failing/pulling incorrectly updated |
| // packages. We need an index into TUF metadata in order to capture appropriate |
| // length information. |
| if len(merkle) == 64 { |
| return merkle, -1, nil |
| } |
| |
| for _, src := range d.GetActiveSources() { |
| m, l, err := src.MerkleFor(name, version) |
| if err == source.ErrUnknownPkg { |
| continue |
| } |
| if err != nil { |
| log.Printf("daemon: error checking source for updates %s", err) |
| continue |
| } |
| return m, l, nil |
| } |
| return "", 0, fmt.Errorf("daemon: no update found for %s/%s/%s", name, version, merkle) |
| } |
| |
| func (d *Daemon) GetPkg(merkle string, length int64) error { |
| // TODO(raggi): the fetching of content should preference the source from which |
| // the update is sought so as to not unfairly bias fetching from an aribtrarily |
| // "first" source. |
| |
| err := d.fetchInto(merkle, length, d.pkgInstallDir) |
| if os.IsExist(err) { |
| // Packages that already exist are simply "successful" |
| d.aw.update(merkle, nil) |
| return nil |
| } |
| |
| if err != nil { |
| // errors fetching the package meta.far are terminal |
| d.aw.update(merkle, err) |
| log.Printf("error fetching pkg %q: %s", merkle, err) |
| } |
| |
| // In the non-error case, waiters are updated by activation. |
| // XXX(raggi): note this is a potentially unbounded wait. |
| return err |
| } |
| |
| // GetBlob is a blocking call which downloads the requested blob |
| func (d *Daemon) GetBlob(merkle string) error { |
| err := d.fetchInto(merkle, -1, d.blobInstallDir) |
| d.aw.update(merkle, err) |
| if err != nil && !os.IsExist(err) { |
| log.Printf("error fetching blob %q: %s", merkle, err) |
| } |
| return err |
| } |
| |
| func (d *Daemon) fetchInto(merkle string, length int64, outputDir string) error { |
| return atonce.Do("fetchInto", merkle, func() error { |
| var err error |
| for _, source := range d.GetActiveSources() { |
| err = source.FetchInto(merkle, length, outputDir) |
| if err == nil || os.IsExist(err) { |
| return err |
| } |
| if e, ok := err.(zx.Error); ok && e.Status == zx.ErrNoSpace { |
| for _, key := range d.events.BindingKeys() { |
| if p, ok := d.events.EventProxyFor(key); ok { |
| log.Printf("daemon: blobfs is out of space") |
| if err := p.OnOutOfSpace(); err != nil { |
| log.Printf("daemon: OnOutOfSpace failed: %v", err) |
| } |
| } |
| } |
| return err |
| } |
| } |
| return fmt.Errorf("not found in %d active sources. last error: %s", len(d.GetActiveSources()), err) |
| }) |
| } |
| |
| func (d *Daemon) AddWatch(merkle string, f func(string, error)) { |
| d.aw.addWatch(merkle, f) |
| } |
| |
| func (d *Daemon) Activated(merkle string) { |
| d.aw.update(merkle, nil) |
| } |
| |
| func (d *Daemon) Failed(merkle string, status zx.Status) { |
| d.aw.update(merkle, zx.Error{Status: status}) |
| } |
| |
| func (d *Daemon) GC() error { |
| // Garbage collection is done by trying to unlink a particular control |
| // file exposed by pkgfs. |
| return os.Remove(PackageGarbageDir) |
| } |