blob: dc6edeca040683c2ea11d646666208cb77339b59 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package source
import (
tuf ""
tuf_data ""
const (
configFileName = "config.json"
var merklePat = regexp.MustCompile("^[0-9a-f]{64}$")
// ErrNoUpdate is returned if no update is available.
var ErrNoUpdate = errors.New("amber/source: no update available")
// ErrUnknownPkg is returned if the Source doesn't have any data about any
// version of the package.
var ErrUnknownPkg = errors.New("amber/source: package not known")
// ErrNoUpdateContent is returned if the requested package content couldn't be
// retrieved.
var ErrNoUpdateContent = errors.New("amber/source: update content not available")
type tufSourceConfig struct {
Config *amber.SourceConfig
Oauth2Token *oauth2.Token
Status *SourceStatus
type SourceStatus struct {
Enabled *bool
// LoadSourceConfigs loads source configs from a directory. The directory
// structure looks like:
// $dir/source1/config.json
// $dir/source2/config.json
// ...
// If an error is encountered loading any config, none are returned.
func LoadSourceConfigs(dir string) ([]*amber.SourceConfig, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
configs := make([]*amber.SourceConfig, 0, len(files))
for _, file := range files {
p := filepath.Join(dir, file.Name(), configFileName)
log.Printf("loading source config %s", p)
cfg, err := LoadSourceConfigFromPath(p)
if err != nil {
return nil, err
configs = append(configs, cfg)
return configs, nil
func LoadSourceConfigFromPath(path string) (*amber.SourceConfig, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
defer f.Close()
var cfg amber.SourceConfig
if err := json.NewDecoder(f).Decode(&cfg); err != nil {
return nil, err
// it is possible we encounter a config on disk that does not have
// this value set, set the defaults
if cfg.StatusConfig == nil {
cfg.StatusConfig = &amber.StatusConfig{Enabled: true}
return &cfg, nil
func newSourceConfig(cfg *amber.SourceConfig) (tufSourceConfig, error) {
if cfg.Id == "" {
return tufSourceConfig{}, fmt.Errorf("tuf source id cannot be empty")
if _, err := url.ParseRequestURI(cfg.RepoUrl); err != nil {
return tufSourceConfig{}, err
if len(cfg.RootKeys) == 0 {
return tufSourceConfig{}, fmt.Errorf("no root keys provided")
status := false
srcStatus := &SourceStatus{&status}
if cfg.StatusConfig != nil && cfg.StatusConfig.Enabled {
*srcStatus.Enabled = true
return tufSourceConfig{
Config: cfg,
Status: srcStatus,
}, nil
// Source wraps a TUF Client into the Source interface
type Source struct {
// mu guards all following fields
mu sync.Mutex
cfg tufSourceConfig
dir string
// We save a reference to the tuf local store so that when we update
// the tuf client, we can reuse the store. This avoids us from having
// multiple database connections to the same file, which could corrupt
// the TUF database.
localStore tuf.LocalStore
tokenSource oauth2.TokenSource
httpClient *http.Client
keys []*tuf_data.Key
tufClient *tuf.Client
// TODO(raggi): can optimize startup by persisting this information, or by
// loading the tuf metadata and inspecting the timestamp metadata.
lastUpdate time.Time
ctx context.Context
cancel context.CancelFunc
type custom struct {
Merkle string `json:"merkle"`
type RemoteStoreError struct {
type IOError struct {
func NewSource(dir string, c *amber.SourceConfig) (*Source, error) {
if dir == "" {
return nil, fmt.Errorf("tuf source directory cannot be an empty string")
cfg, err := newSourceConfig(c)
if err != nil {
return nil, err
ctx, cancel := context.WithCancel(context.Background())
src := Source{
cfg: cfg,
dir: dir,
ctx: ctx,
cancel: cancel,
if err := src.initSource(); err != nil {
return nil, err
return &src, nil
// setEnabledStatus examines the config to see if the Status field exists and
// if enabled is set. If either is not present the field is added and set to
// enabled. If the sourceConfig is changed true is returned, otherwise false.
func setEnabledStatus(cfg *tufSourceConfig) bool {
dirty := false
if cfg.Status == nil {
enabled := true
cfg.Status = &SourceStatus{&enabled}
dirty = true
} else if cfg.Status.Enabled == nil {
enabled := true
cfg.Status.Enabled = &enabled
dirty = true
// it is possible we encounter a config on disk that does not have
// this value set, set the defaults
if cfg.Config.StatusConfig == nil {
cfg.Config.StatusConfig = &amber.StatusConfig{Enabled: true}
dirty = true
return dirty
func LoadSourceFromPath(dir string) (*Source, error) {
log.Printf("loading source from %s", dir)
f, err := os.Open(filepath.Join(dir, configFileName))
if err != nil {
return nil, err
defer f.Close()
var cfg tufSourceConfig
if err := json.NewDecoder(f).Decode(&cfg); err != nil {
return nil, err
dirty := setEnabledStatus(&cfg)
ctx, cancel := context.WithCancel(context.Background())
src := Source{
cfg: cfg,
dir: dir,
ctx: ctx,
cancel: cancel,
if dirty {
if err := src.initSource(); err != nil {
return nil, err
return &src, nil
func (f *Source) Enabled() bool {
return *f.cfg.Status.Enabled
// This function finishes initializing the Source by parsing out the config
// data to create the derived fields, like the TUFClient.
func (f *Source) initSource() error {
keys, err := newTUFKeys(f.cfg.Config.RootKeys)
if err != nil {
return err
f.keys = keys
// We might have multiple things in the store directory, so put tuf in
// it's own directory.
localStore, err := NewFileStore(filepath.Join(f.dir, "tuf.json"))
if err != nil {
return IOError{fmt.Errorf("couldn't open datastore: %s", err)}
f.localStore = localStore
return f.updateTUFClientLocked()
// Start starts background operations associated with this Source, such as
// token fetching and update source monitoring. This method should only be
// called once per active source.
func (f *Source) Start() {
func (f *Source) processTokenUpdate(clientId, token string) {
if !f.Enabled() {
if clientId != f.cfg.Config.Oauth2Config.ClientId {
log.Println("source: client ID is unexpected, dropping token")
f.setAuthTokenLocked(&oauth2.Token{RefreshToken: token, TokenType: "Bearer", Expiry: time.Now()})
func oauth2Config(c *amber.OAuth2Config) *oauth2.Config {
if c == nil {
return nil
return &oauth2.Config{
ClientID: c.ClientId,
ClientSecret: c.ClientSecret,
Endpoint: oauth2.Endpoint{
AuthURL: c.AuthUrl,
TokenURL: c.TokenUrl,
Scopes: c.Scopes,
func oauth2deviceConfig(c *amber.OAuth2Config) *oauth2device.Config {
if c == nil {
return nil
return &oauth2device.Config{
Config: oauth2Config(c),
DeviceEndpoint: oauth2device.DeviceEndpoint{
CodeURL: c.DeviceCodeUrl,
func oauth2HttpClient(httpClient *http.Client, cfg *oauth2.Config,
token *oauth2.Token) (*http.Client, oauth2.TokenSource) {
if cfg == nil {
return httpClient, nil
// If we have oauth2 configured, we need to wrap the client in order to
// inject the authentication header.
var tokenSource oauth2.TokenSource
// Store the client in the context so oauth2 can use it to
// fetch the token. This client's transport will also be used
// as the base of the client oauth2 returns to us, except for
// the request timeout, which we manually have to copy over.
ctx := context.WithValue(context.Background(), oauth2.HTTPClient, httpClient)
timeout := httpClient.Timeout
transport := httpClient.Transport
tokenSource = cfg.TokenSource(ctx, token)
httpClient = oauth2.NewClient(ctx, tokenSource)
httpClient.Timeout = timeout
httpClient.Transport = transport
return httpClient, tokenSource
// Initialize (or reinitialize) the TUFClient. This is especially useful when
// logging in, or modifying any of the http settings since there's no way to
// change settings in place, so we need to replace it with a new tuf.Client.
// NOTE: It's the responsibility of the caller to hold the mutex before calling
// this function.
func (f *Source) updateTUFClientLocked() error {
httpClient, err := newHTTPClient(f.cfg.Config.TransportConfig)
if err != nil {
return err
authConfig := oauth2Config(f.cfg.Config.Oauth2Config)
httpClient, tokenSource := oauth2HttpClient(httpClient, authConfig, f.cfg.Oauth2Token)
// Create a new tuf client that uses the new http client.
remoteStore, err := tuf.HTTPRemoteStore(f.cfg.Config.RepoUrl, nil, httpClient)
if err != nil {
return RemoteStoreError{fmt.Errorf("server address not understood: %s", err)}
tufClient := tuf.NewClient(f.localStore, remoteStore)
// We got our tuf client ready to go. Before we store the client in our
// source, make sure to close the old client's transport's idle
// connections so we don't leave a bunch of sockets open.
// We're done! Save the clients for the next time we update our source.
f.tokenSource = tokenSource
f.httpClient = httpClient
f.tufClient = tufClient
return nil
func newHTTPClient(cfg *amber.TransportConfig) (*http.Client, error) {
// Create our transport with default settings copied from Go's
// `http.DefaultTransport`. We can't just copy the default because it
// contains some mutexes, and copying it may leave the transport in an
// inconsistent state.
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
MaxIdleConns: 100,
// The following setting is non-default:
MaxConnsPerHost: 50,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: nil,
if cfg == nil {
return &http.Client{Transport: t}, nil
tlsClientConfig, err := newTLSClientConfig(cfg.TlsClientConfig)
if err != nil {
return nil, err
t.TLSClientConfig = tlsClientConfig
if cfg.ConnectTimeout != 0 || cfg.KeepAlive != 0 {
t.DialContext = (&net.Dialer{
Timeout: time.Duration(cfg.ConnectTimeout) * time.Millisecond,
KeepAlive: time.Duration(cfg.KeepAlive) * time.Millisecond,
DualStack: true,
if cfg.MaxIdleConns != 0 {
t.MaxIdleConns = int(cfg.MaxIdleConns)
if cfg.MaxIdleConnsPerHost != 0 {
t.MaxIdleConnsPerHost = int(cfg.MaxIdleConnsPerHost)
if cfg.IdleConnTimeout != 0 {
t.IdleConnTimeout = time.Duration(cfg.IdleConnTimeout) * time.Millisecond
if cfg.ResponseHeaderTimeout != 0 {
t.ResponseHeaderTimeout = time.Duration(cfg.ResponseHeaderTimeout) * time.Millisecond
if cfg.TlsHandshakeTimeout != 0 {
t.TLSHandshakeTimeout = time.Duration(cfg.TlsHandshakeTimeout) * time.Millisecond
if cfg.ExpectContinueTimeout != 0 {
t.ExpectContinueTimeout = time.Duration(cfg.ExpectContinueTimeout) * time.Millisecond
c := &http.Client{
Transport: t,
if cfg.RequestTimeout != 0 {
c.Timeout = time.Duration(cfg.RequestTimeout) * time.Millisecond
return c, nil
func newTLSClientConfig(cfg *amber.TlsClientConfig) (*tls.Config, error) {
if cfg == nil {
return nil, nil
t := &tls.Config{
InsecureSkipVerify: cfg.InsecureSkipVerify,
if len(cfg.RootCAs) != 0 {
t.RootCAs = x509.NewCertPool()
for _, ca := range cfg.RootCAs {
if !t.RootCAs.AppendCertsFromPEM([]byte(ca)) {
log.Printf("failed to add cert")
return nil, fmt.Errorf("failed to add certificate")
return t, nil
// Note, the mutex should be held when this is called.
func (f *Source) initLocalStoreLocked() error {
if needsInit(f.localStore) {
log.Print("initializing local TUF store")
err := f.tufClient.Init(f.keys, len(f.keys))
if err != nil {
return fmt.Errorf("TUF init failed: %s", err)
return nil
func newTUFKeys(cfg []amber.KeyConfig) ([]*tuf_data.Key, error) {
keys := make([]*tuf_data.Key, len(cfg))
for i, key := range cfg {
if key.Type != "ed25519" {
return nil, fmt.Errorf("unsupported key type %s", key.Type)
keyHex, err := hex.DecodeString(key.Value)
if err != nil {
return nil, fmt.Errorf("invalid key value: %s", err)
keys[i] = &tuf_data.Key{
Type: key.Type,
Value: tuf_data.KeyValue{Public: tuf_data.HexBytes(keyHex)},
return keys, nil
func (f *Source) GetId() string {
return f.cfg.Config.Id
func (f *Source) GetConfig() *amber.SourceConfig {
return f.cfg.Config
func (f *Source) SetEnabled(enabled bool) {
f.cfg.Status.Enabled = &enabled
// try to start a refresh token update. This should not be called while holding
// the struct mutex as it will be locked during this call
func (f *Source) updateRefreshToken() {
if !f.Enabled() {
if f.cfg.Config.Oauth2Config == nil {
go defaultTokenLoader.ReadToken(f.processTokenUpdate, f.cfg.Config.Oauth2Config.ClientId)
func (f *Source) GetHttpClient() (*http.Client, error) {
if err := f.refreshOauth2TokenLocked(); err != nil {
return nil, fmt.Errorf("failed to refresh oauth2 token: %s", err)
// http.Client itself is thread safe, but the member alias is not, so is
// guarded here.
return f.httpClient, nil
// Check if the token has refreshed. If so, save a new token
func (f *Source) refreshOauth2TokenLocked() error {
if f.cfg.Oauth2Token == nil {
return nil
// Grab the latest token from the token source. If the token has
// expired, it will automatically refresh it in the background and give
// us a new access token.
newToken, err := f.tokenSource.Token()
if err != nil {
return err
if newToken.AccessToken != f.cfg.Oauth2Token.AccessToken {
log.Printf("refreshed oauth2 token for: %s", f.cfg.Config.Id)
f.cfg.Oauth2Token = newToken
return nil
func (f *Source) Login() (*amber.DeviceCode, error) {
log.Printf("logging into tuf source: %s", f.cfg.Config.Id)
c := oauth2deviceConfig(f.cfg.Config.Oauth2Config)
if c == nil {
log.Printf("source is not configured for oauth2")
return nil, fmt.Errorf("source is not configured for oauth2")
code, err := oauth2device.RequestDeviceCode(http.DefaultClient, c)
if err != nil {
log.Printf("failed to get device code: %s", err)
return nil, fmt.Errorf("failed to get device code: %s", err)
// Wait for the device authorization in a separate thread so we don't
// block the response. This thread will eventually time out if the user
// does not enter in the code.
go f.waitForDeviceAuthorization(c, code)
return &amber.DeviceCode{
VerificationUrl: code.VerificationURL,
UserCode: code.UserCode,
ExpiresIn: code.ExpiresIn,
}, nil
// Wait for the oauth2 server to authorize us to access the TUF repository. If
// we are denied access, we will log the error, but otherwise do nothing.
func (f *Source) waitForDeviceAuthorization(config *oauth2device.Config, code *oauth2device.DeviceCode) {
log.Printf("waiting for device authorization: %s", f.cfg.Config.Id)
token, err := oauth2device.WaitForDeviceAuthorization(http.DefaultClient, config, code)
if err != nil {
log.Printf("failed to get device authorization token: %s", err)
log.Printf("token received for remote store: %s", f.cfg.Config.Id)
// Now that we have a token, grab the lock, and update our client.
if err := f.setAuthTokenLocked(token); err != nil {
log.Printf("remote store updated: %s", f.cfg.Config.Id)
func (f *Source) setAuthTokenLocked(token *oauth2.Token) error {
f.cfg.Oauth2Token = token
if err := f.updateTUFClientLocked(); err != nil {
log.Printf("failed to update tuf client: %s", err)
return err
if err := f.saveLocked(); err != nil {
log.Printf("failed to save config: %s", err)
return err
return nil
// UpdateIfStale updates this source if the source has not recently updated.
func (f *Source) UpdateIfStale() error {
maxAge := time.Duration(f.cfg.Config.RatePeriod) * time.Second
needsUpdate := time.Since(f.lastUpdate) > maxAge
if needsUpdate {
return f.Update()
return nil
// Update requests updated metadata from this source, returning any error that
// ocurred during the process.
func (f *Source) Update() error {
return atonce.Do("source", f.cfg.Config.Id, func() error {
if !*f.cfg.Status.Enabled {
return nil
// update any relevant auth token before initializing the tuf client
// because the tuf client will try to use the HTTP context when
// initializing
if err := f.refreshOauth2TokenLocked(); err != nil {
return fmt.Errorf("tuf_source: failed to refresh oauth2 token: %s", err)
if err := f.initLocalStoreLocked(); err != nil {
return fmt.Errorf("tuf_source: source could not be initialized: %s", err)
_, err := f.tufClient.Update()
if tuf.IsLatestSnapshot(err) || err == nil {
f.lastUpdate = time.Now()
err = nil
return err
func (f *Source) MerkleFor(name, version string) (string, int64, error) {
m, err := f.tufClient.Targets()
if err != nil {
return "", 0, fmt.Errorf("tuf_source: error reading TUF tarets: %s", err)
if version == "" {
version = "0"
target := fmt.Sprintf("/%s/%s", name, version)
meta, ok := m[target]
if !ok {
return "", 0, ErrUnknownPkg
if meta.Custom == nil {
return "", 0, ErrUnknownPkg
custom := custom{}
if err := json.Unmarshal(*meta.Custom, &custom); err != nil {
return "", 0, fmt.Errorf("error parsing merkle metadata: %s", err)
if !merklePat.MatchString(custom.Merkle) {
log.Printf("tuf_source: found target %q, but has invalid merkle metadata: %q", target, custom.Merkle)
return "", 0, ErrUnknownPkg
return custom.Merkle, meta.Length, nil
func (f *Source) Save() error {
return f.saveLocked()
func (f *Source) DeleteConfig() error {
return os.Remove(filepath.Join(f.dir, configFileName))
func (f *Source) Delete() error {
return os.RemoveAll(f.dir)
func (f *Source) Close() {
func (f *Source) closeIdleConnections() {
if f.httpClient != nil {
transport := f.httpClient.Transport
if transport == nil {
transport = http.DefaultTransport
if transport, ok := transport.(*http.Transport); ok {
// Actually save the config.
// NOTE: It's the responsibility of the caller to hold the mutex before calling
// this function.
func (f *Source) saveLocked() error {
err := os.MkdirAll(f.dir, os.ModePerm)
if err != nil {
return err
p := filepath.Join(f.dir, configFileName)
// We want to atomically write the config, so we'll first write it to a
// temp file, then do an atomic rename to overwrite the target.
file, err := ioutil.TempFile(f.dir, configFileName)
if err != nil {
return err
defer file.Close()
// Make sure to clean up the temp file if there's an error.
defer func() {
if err != nil {
// Encode the cfg as a pretty printed json.
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err = encoder.Encode(f.cfg); err != nil {
return err
if err = file.Close(); err != nil {
return err
return os.Rename(file.Name(), p)
func needsInit(s tuf.LocalStore) bool {
meta, err := s.GetMeta()
if err != nil {
return true
_, found := meta["root.json"]
return !found
func (f *Source) requestBlob(blob string) (*http.Response, error) {
client, err := f.GetHttpClient()
if err != nil {
return nil, err
blobUrl := f.GetConfig().BlobRepoUrl
if blobUrl == "" {
blobUrl = f.GetConfig().RepoUrl + "/blobs"
u, err := url.Parse(blobUrl)
if err != nil {
return nil, err
u.Path = filepath.Join(u.Path, blob)
return client.Get(u.String())
func (f *Source) FetchInto(blob string, length int64, outputDir string) error {
dst, err := os.OpenFile(filepath.Join(outputDir, blob), os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
defer dst.Close()
resp, err := f.requestBlob(blob)
if err != nil {
return err
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("fetch %q failed with code %d", blob, resp.StatusCode)
if resp.ContentLength == -1 && length == -1 {
return fmt.Errorf("unknown content length, can not write")
if resp.ContentLength > -1 && length > -1 && resp.ContentLength != length {
return fmt.Errorf("bad content length: %d, expected %d", resp.ContentLength, length)
var src io.Reader = resp.Body
if length > -1 {
src = io.LimitReader(resp.Body, length)
err = dst.Truncate(length)
} else {
err = dst.Truncate(resp.ContentLength)
if err != nil {
return err
written, err := io.Copy(dst, src)
if err != nil {
return err
if resp.ContentLength != -1 && written != resp.ContentLength {
return fmt.Errorf("blob incomplete, only wrote %d out of %d bytes", written, resp.ContentLength)
return nil
func (f *Source) AutoWatch() {
if !f.Enabled() || !f.cfg.Config.Auto {
go func() {
for {
if !f.Enabled() {
req, err := http.NewRequest("GET", f.cfg.Config.RepoUrl+"/auto", nil)
if err != nil {
log.Printf("autowatch terminal error: %q: %s", f.cfg.Config.RepoUrl, err)
req.Header.Add("Accept", "text/event-stream")
cli, err := f.GetHttpClient()
if err != nil {
log.Printf("autowatch error for %q: %s", f.cfg.Config.RepoUrl, err)
r, err := cli.Do(req.WithContext(f.ctx))
if err != nil {
if e, ok := err.(*url.Error); ok {
if e.Err == context.Canceled {
log.Printf("autowatch error for %q: %s", f.cfg.Config.RepoUrl, err)
c, err := sse.New(r)
if err != nil {
log.Printf("autowatch error for %q: %s", f.cfg.Config.RepoUrl, err)
for {
_, err := c.ReadEvent()
if err != nil {
if !f.Enabled() {