blob: e33bacb683c4b8a6ed001df3c523929dbc026282 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package source
import (
"amber/atonce"
"context"
"crypto/aes"
"crypto/cipher"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"sync"
"syscall"
"time"
"fidl/fuchsia/amber"
"fuchsia.googlesource.com/sse"
tuf "github.com/flynn/go-tuf/client"
tuf_data "github.com/flynn/go-tuf/data"
)
const (
configFileName = "config.json"
)
var merklePat = regexp.MustCompile("^[0-9a-f]{64}$")
// ErrNoUpdate is returned if no update is available.
var ErrNoUpdate = errors.New("amber/source: no update available")
// ErrUnknownPkg is returned if the Source doesn't have any data about any
// version of the package.
var ErrUnknownPkg = errors.New("amber/source: package not known")
// ErrNoUpdateContent is returned if the requested package content couldn't be
// retrieved.
var ErrNoUpdateContent = errors.New("amber/source: update content not available")
type tufSourceConfig struct {
Config *amber.SourceConfig
Status *SourceStatus
}
type SourceStatus struct {
Enabled *bool
}
// LoadSourceConfigs loads source configs from a directory. The directory
// structure looks like:
//
// $dir/source1/config.json
// $dir/source2/config.json
// ...
//
// If an error is encountered loading any config, none are returned.
func LoadSourceConfigs(dir string) ([]*amber.SourceConfig, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
configs := make([]*amber.SourceConfig, 0, len(files))
for _, file := range files {
p := filepath.Join(dir, file.Name(), configFileName)
log.Printf("loading source config %s", p)
cfg, err := LoadSourceConfigFromPath(p)
if err != nil {
return nil, err
}
configs = append(configs, cfg)
}
return configs, nil
}
func LoadSourceConfigFromPath(path string) (*amber.SourceConfig, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
var cfg amber.SourceConfig
if err := json.NewDecoder(f).Decode(&cfg); err != nil {
return nil, err
}
// it is possible we encounter a config on disk that does not have
// this value set, set the defaults
if cfg.StatusConfig == nil {
cfg.StatusConfig = &amber.StatusConfig{Enabled: true}
}
return &cfg, nil
}
func newSourceConfig(cfg *amber.SourceConfig) (tufSourceConfig, error) {
if cfg.Id == "" {
return tufSourceConfig{}, fmt.Errorf("tuf source id cannot be empty")
}
if _, err := url.ParseRequestURI(cfg.RepoUrl); err != nil {
return tufSourceConfig{}, err
}
if len(cfg.RootKeys) == 0 {
return tufSourceConfig{}, fmt.Errorf("no root keys provided")
}
status := false
srcStatus := &SourceStatus{&status}
if cfg.StatusConfig != nil && cfg.StatusConfig.Enabled {
*srcStatus.Enabled = true
}
return tufSourceConfig{
Config: cfg,
Status: srcStatus,
}, nil
}
// Source wraps a TUF Client into the Source interface
type Source struct {
// mu guards all following fields
mu sync.Mutex
cfg tufSourceConfig
dir string
// We save a reference to the tuf local store so that when we update
// the tuf client, we can reuse the store. This avoids us from having
// multiple database connections to the same file, which could corrupt
// the TUF database.
localStore tuf.LocalStore
httpClient *http.Client
keys []*tuf_data.Key
tufClient *tuf.Client
// TODO(raggi): can optimize startup by persisting this information, or by
// loading the tuf metadata and inspecting the timestamp metadata.
lastUpdate time.Time
ctx context.Context
cancel context.CancelFunc
}
type custom struct {
Merkle string `json:"merkle"`
// Size sometimes not set, as it is in the process of being introduced. We use
// the pointer to determine set state.
Size *int64 `json:"size"`
}
type RemoteStoreError struct {
error
}
type IOError struct {
error
}
func NewSource(dir string, c *amber.SourceConfig) (*Source, error) {
if dir == "" {
return nil, fmt.Errorf("tuf source directory cannot be an empty string")
}
cfg, err := newSourceConfig(c)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
src := Source{
cfg: cfg,
dir: dir,
ctx: ctx,
cancel: cancel,
}
if err := src.initSource(); err != nil {
return nil, err
}
return &src, nil
}
// setEnabledStatus examines the config to see if the Status field exists and
// if enabled is set. If either is not present the field is added and set to
// enabled. If the sourceConfig is changed true is returned, otherwise false.
func setEnabledStatus(cfg *tufSourceConfig) bool {
dirty := false
if cfg.Status == nil {
enabled := true
cfg.Status = &SourceStatus{&enabled}
dirty = true
} else if cfg.Status.Enabled == nil {
enabled := true
cfg.Status.Enabled = &enabled
dirty = true
}
// it is possible we encounter a config on disk that does not have
// this value set, set the defaults
if cfg.Config.StatusConfig == nil {
cfg.Config.StatusConfig = &amber.StatusConfig{Enabled: true}
dirty = true
}
return dirty
}
func LoadSourceFromPath(dir string) (*Source, error) {
log.Printf("loading source from %s", dir)
f, err := os.Open(filepath.Join(dir, configFileName))
if err != nil {
return nil, err
}
defer f.Close()
var cfg tufSourceConfig
if err := json.NewDecoder(f).Decode(&cfg); err != nil {
return nil, err
}
dirty := setEnabledStatus(&cfg)
ctx, cancel := context.WithCancel(context.Background())
src := Source{
cfg: cfg,
dir: dir,
ctx: ctx,
cancel: cancel,
}
if dirty {
src.Save()
}
if err := src.initSource(); err != nil {
return nil, err
}
return &src, nil
}
func (f *Source) Enabled() bool {
f.mu.Lock()
defer f.mu.Unlock()
return *f.cfg.Status.Enabled
}
// This function finishes initializing the Source by parsing out the config
// data to create the derived fields, like the TUFClient.
func (f *Source) initSource() error {
f.mu.Lock()
defer f.mu.Unlock()
keys, err := newTUFKeys(f.cfg.Config.RootKeys)
if err != nil {
return err
}
f.keys = keys
// We might have multiple things in the store directory, so put tuf in
// it's own directory.
localStore, err := NewFileStore(filepath.Join(f.dir, "tuf.json"))
if err != nil {
return IOError{fmt.Errorf("couldn't open datastore: %s", err)}
}
f.localStore = localStore
// We got our tuf client ready to go. Before we store the client in our
// source, make sure to close the old client's transport's idle
// connections so we don't leave a bunch of sockets open.
f.closeIdleConnections()
f.httpClient, err = newHTTPClient(f.cfg.Config.TransportConfig)
if err != nil {
return err
}
// Create a new tuf client that uses the new http client.
remoteStore, err := tuf.HTTPRemoteStore(f.cfg.Config.RepoUrl, nil, f.httpClient)
if err != nil {
return RemoteStoreError{fmt.Errorf("server address not understood: %s", err)}
}
f.tufClient = tuf.NewClient(f.localStore, remoteStore)
return err
}
// Start starts background operations associated with this Source, such as
// token fetching and update source monitoring. This method should only be
// called once per active source.
func (f *Source) Start() {
f.AutoWatch()
}
func newHTTPClient(cfg *amber.TransportConfig) (*http.Client, error) {
// Create our transport with default settings copied from Go's
// `http.DefaultTransport`. We can't just copy the default because it
// contains some mutexes, and copying it may leave the transport in an
// inconsistent state.
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
// The following setting is non-default:
MaxConnsPerHost: 50,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: nil,
// A workaround for TC-243 where closed connections are not being
// properly detected. This is a mitigation for PKG-400
ResponseHeaderTimeout: 15 * time.Second,
}
if cfg == nil {
return &http.Client{Transport: t}, nil
}
tlsClientConfig, err := newTLSClientConfig(cfg.TlsClientConfig)
if err != nil {
return nil, err
}
t.TLSClientConfig = tlsClientConfig
if cfg.ConnectTimeout != 0 || cfg.KeepAlive != 0 {
t.DialContext = (&net.Dialer{
Timeout: time.Duration(cfg.ConnectTimeout) * time.Millisecond,
KeepAlive: time.Duration(cfg.KeepAlive) * time.Millisecond,
DualStack: true,
}).DialContext
}
if cfg.MaxIdleConns != 0 {
t.MaxIdleConns = int(cfg.MaxIdleConns)
}
if cfg.MaxIdleConnsPerHost != 0 {
t.MaxIdleConnsPerHost = int(cfg.MaxIdleConnsPerHost)
}
if cfg.IdleConnTimeout != 0 {
t.IdleConnTimeout = time.Duration(cfg.IdleConnTimeout) * time.Millisecond
}
if cfg.ResponseHeaderTimeout != 0 {
t.ResponseHeaderTimeout = time.Duration(cfg.ResponseHeaderTimeout) * time.Millisecond
}
if cfg.TlsHandshakeTimeout != 0 {
t.TLSHandshakeTimeout = time.Duration(cfg.TlsHandshakeTimeout) * time.Millisecond
}
if cfg.ExpectContinueTimeout != 0 {
t.ExpectContinueTimeout = time.Duration(cfg.ExpectContinueTimeout) * time.Millisecond
}
c := &http.Client{
Transport: t,
}
if cfg.RequestTimeout != 0 {
c.Timeout = time.Duration(cfg.RequestTimeout) * time.Millisecond
}
return c, nil
}
func newTLSClientConfig(cfg *amber.TlsClientConfig) (*tls.Config, error) {
if cfg == nil {
return nil, nil
}
t := &tls.Config{
InsecureSkipVerify: cfg.InsecureSkipVerify,
}
if len(cfg.RootCAs) != 0 {
t.RootCAs = x509.NewCertPool()
for _, ca := range cfg.RootCAs {
if !t.RootCAs.AppendCertsFromPEM([]byte(ca)) {
log.Printf("failed to add cert")
return nil, fmt.Errorf("failed to add certificate")
}
}
}
return t, nil
}
// Note, the mutex should be held when this is called.
func (f *Source) initLocalStoreLocked() error {
if needsInit(f.localStore) {
log.Print("initializing local TUF store")
err := f.tufClient.Init(f.keys, len(f.keys))
if err != nil {
return fmt.Errorf("TUF init failed: %s", err)
}
}
return nil
}
func newTUFKeys(cfg []amber.KeyConfig) ([]*tuf_data.Key, error) {
keys := make([]*tuf_data.Key, len(cfg))
for i, key := range cfg {
if key.Type != "ed25519" {
return nil, fmt.Errorf("unsupported key type %s", key.Type)
}
keyHex, err := hex.DecodeString(key.Value)
if err != nil {
return nil, fmt.Errorf("invalid key value: %s", err)
}
keys[i] = &tuf_data.Key{
Type: key.Type,
Value: tuf_data.KeyValue{Public: tuf_data.HexBytes(keyHex)},
}
}
return keys, nil
}
func (f *Source) GetId() string {
return f.cfg.Config.Id
}
func (f *Source) GetConfig() *amber.SourceConfig {
return f.cfg.Config
}
func (f *Source) SetEnabled(enabled bool) {
f.mu.Lock()
f.cfg.Status.Enabled = &enabled
f.mu.Unlock()
}
func (f *Source) GetHttpClient() (*http.Client, error) {
f.mu.Lock()
defer f.mu.Unlock()
// http.Client itself is thread safe, but the member alias is not, so is
// guarded here.
return f.httpClient, nil
}
// UpdateIfStale updates this source if the source has not recently updated.
func (f *Source) UpdateIfStale() error {
f.mu.Lock()
maxAge := time.Duration(f.cfg.Config.RatePeriod) * time.Second
needsUpdate := time.Since(f.lastUpdate) > maxAge
f.mu.Unlock()
if needsUpdate {
return f.Update()
}
return nil
}
// Update requests updated metadata from this source, returning any error that
// ocurred during the process.
func (f *Source) Update() error {
return atonce.Do("source", f.cfg.Config.Id, func() error {
f.mu.Lock()
defer f.mu.Unlock()
if !*f.cfg.Status.Enabled {
return nil
}
if err := f.initLocalStoreLocked(); err != nil {
return fmt.Errorf("tuf_source: source could not be initialized: %s", err)
}
_, err := f.tufClient.Update()
if tuf.IsLatestSnapshot(err) || err == nil {
f.lastUpdate = time.Now()
err = nil
}
return err
})
}
// MerkleFor looks up a package target from the available TUF targets, returning the merkleroot and plaintext object length, or an error.
func (f *Source) MerkleFor(name, version string) (string, int64, error) {
f.mu.Lock()
defer f.mu.Unlock()
m, err := f.tufClient.Targets()
if err != nil {
return "", 0, fmt.Errorf("tuf_source: error reading TUF tarets: %s", err)
}
if version == "" {
version = "0"
}
target := fmt.Sprintf("/%s/%s", name, version)
meta, ok := m[target]
if !ok {
return "", 0, ErrUnknownPkg
}
if meta.Custom == nil {
return "", 0, ErrUnknownPkg
}
custom := custom{}
if err := json.Unmarshal(*meta.Custom, &custom); err != nil {
return "", 0, fmt.Errorf("error parsing merkle metadata: %s", err)
}
if !merklePat.MatchString(custom.Merkle) {
log.Printf("tuf_source: found target %q, but has invalid merkle metadata: %q", target, custom.Merkle)
return "", 0, ErrUnknownPkg
}
// If a blob is encrypted then the TUF recorded length includes the iv block,
// but we want to return the plain text content size.
length := meta.Length
if custom.Size != nil {
length = *custom.Size
}
return custom.Merkle, length, nil
}
func (f *Source) Save() error {
f.mu.Lock()
defer f.mu.Unlock()
return f.saveLocked()
}
func (f *Source) DeleteConfig() error {
f.mu.Lock()
defer f.mu.Unlock()
return os.Remove(filepath.Join(f.dir, configFileName))
}
func (f *Source) Delete() error {
f.mu.Lock()
defer f.mu.Unlock()
return os.RemoveAll(f.dir)
}
func (f *Source) Close() {
f.cancel()
f.closeIdleConnections()
}
func (f *Source) closeIdleConnections() {
if f.httpClient != nil {
transport := f.httpClient.Transport
if transport == nil {
transport = http.DefaultTransport
}
if transport, ok := transport.(*http.Transport); ok {
transport.CloseIdleConnections()
}
}
}
// Actually save the config.
//
// NOTE: It's the responsibility of the caller to hold the mutex before calling
// this function.
func (f *Source) saveLocked() error {
err := os.MkdirAll(f.dir, os.ModePerm)
if err != nil {
return err
}
p := filepath.Join(f.dir, configFileName)
// We want to atomically write the config, so we'll first write it to a
// temp file, then do an atomic rename to overwrite the target.
file, err := ioutil.TempFile(f.dir, configFileName)
if err != nil {
return err
}
defer file.Close()
// Make sure to clean up the temp file if there's an error.
defer func() {
if err != nil {
os.Remove(file.Name())
}
}()
// Encode the cfg as a pretty printed json.
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err = encoder.Encode(f.cfg); err != nil {
return err
}
if err = file.Close(); err != nil {
return err
}
if err := os.Rename(file.Name(), p); err != nil {
return err
}
// We do not report an error back to the caller down the sync path, as our
// state change did complete in the running system. The state of the filesystem
// is undefined, and we can not provide meaningful information ourselves.
d, err := os.OpenFile(f.dir, syscall.O_RDONLY|syscall.O_DIRECTORY, 0600)
if err != nil {
log.Printf("save config error: open dir for sync: %s", err)
return nil
}
defer d.Close()
if err := d.Sync(); err != nil {
log.Printf("save config error: sync: %s", err)
return nil
}
return nil
}
func needsInit(s tuf.LocalStore) bool {
meta, err := s.GetMeta()
if err != nil {
return true
}
_, found := meta["root.json"]
return !found
}
func (f *Source) requestBlob(blob string) (*http.Response, error) {
client, err := f.GetHttpClient()
if err != nil {
return nil, err
}
blobUrl := f.GetConfig().BlobRepoUrl
if blobUrl == "" {
blobUrl = f.GetConfig().RepoUrl + "/blobs"
}
u, err := url.Parse(blobUrl)
if err != nil {
return nil, err
}
u.Path = filepath.Join(u.Path, blob)
return client.Get(u.String())
}
func (f *Source) FetchInto(blob string, length int64, outputDir string) error {
dst, err := os.OpenFile(filepath.Join(outputDir, blob), os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer dst.Close()
var resp *http.Response
for i := 0; i < 2; i++ {
resp, err = f.requestBlob(blob)
// If we get an error that is temporary and is a timeout (most typically this
// is a header timeout due to a tcp connection that was improperly torn down),
// then we'll attempt the make a fresh request.
if e, ok := err.(interface {
Temporary() bool
Timeout() bool
}); ok && e.Temporary() && e.Timeout() {
f.closeIdleConnections()
continue
}
// Otherwise the error is of a permanent kind, so it's time to bail
if err != nil {
return err
}
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("fetch %q failed with code %d", blob, resp.StatusCode)
}
if resp.ContentLength == -1 && length == -1 {
return fmt.Errorf("unknown content length, can not write")
}
var src io.Reader = resp.Body
gotLength := resp.ContentLength
if f.cfg.Config.BlobKey != nil {
gotLength -= aes.BlockSize
block, err := aes.NewCipher(f.cfg.Config.BlobKey.Data[:])
if err != nil {
return err
}
iv := make([]byte, aes.BlockSize)
if _, err := io.ReadFull(src, iv); err != nil {
return err
}
stream := cipher.NewCTR(block, iv)
src = cipher.StreamReader{
stream,
src,
}
}
if gotLength > -1 && length > -1 && gotLength != length {
return fmt.Errorf("bad content length: %d, expected %d", gotLength, length)
}
if length > -1 {
src = io.LimitReader(src, length)
err = dst.Truncate(length)
} else {
src = io.LimitReader(src, gotLength)
err = dst.Truncate(gotLength)
}
if err != nil {
return err
}
written, err := io.Copy(dst, src)
if err != nil {
return err
}
if gotLength > -1 && written != gotLength {
return fmt.Errorf("blob incomplete, only wrote %d out of %d bytes", written, gotLength)
}
if length > -1 && written != length {
return fmt.Errorf("blob incomplete, only wrote %d out of %d bytes", written, length)
}
return nil
}
func (f *Source) AutoWatch() {
if !f.Enabled() || !f.cfg.Config.Auto {
return
}
go func() {
for {
if !f.Enabled() {
return
}
req, err := http.NewRequest("GET", f.cfg.Config.RepoUrl+"/auto", nil)
if err != nil {
log.Printf("autowatch terminal error: %q: %s", f.cfg.Config.RepoUrl, err)
return
}
req.Header.Add("Accept", "text/event-stream")
cli, err := f.GetHttpClient()
if err != nil {
log.Printf("autowatch error for %q: %s", f.cfg.Config.RepoUrl, err)
time.Sleep(time.Minute)
continue
}
r, err := cli.Do(req.WithContext(f.ctx))
if err != nil {
if e, ok := err.(*url.Error); ok {
if e.Err == context.Canceled {
break
}
}
log.Printf("autowatch error for %q: %s", f.cfg.Config.RepoUrl, err)
time.Sleep(time.Minute)
continue
}
c, err := sse.New(r)
if err != nil {
log.Printf("autowatch error for %q: %s", f.cfg.Config.RepoUrl, err)
time.Sleep(time.Minute)
continue
}
for {
_, err := c.ReadEvent()
if err != nil {
break
}
if !f.Enabled() {
return
}
f.Update()
}
}
}()
}