blob: 64d23f1fa5dba6c75726953d0f2cdc3dbd2b2e8f [file] [log] [blame] [edit]
// Copyright 2022 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
"errors"
"fmt"
"net/http"
"sort"
"time"
"github.com/google/syzkaller/dashboard/dashapi"
"github.com/google/syzkaller/pkg/asset"
"github.com/google/syzkaller/sys/targets"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
"google.golang.org/appengine/v2"
db "google.golang.org/appengine/v2/datastore"
"google.golang.org/appengine/v2/log"
)
// TODO: decide if we want to save job-related assets.
func appendBuildAssets(c context.Context, ns, buildID string, assets []Asset) (*Build, error) {
var retBuild *Build
tx := func(c context.Context) error {
build, err := loadBuild(c, ns, buildID)
if err != nil {
return err
}
retBuild = build
appendedOk := false
var appendErr error
for _, newAsset := range assets {
appendErr = build.AppendAsset(newAsset)
if appendErr == nil {
appendedOk = true
}
}
// It took quite a number of resources to upload the files, so we return success
// even if we managed to save at least one of the new assets.
if !appendedOk {
return fmt.Errorf("failed to append all assets, last error %w", appendErr)
}
if _, err := db.Put(c, buildKey(c, ns, buildID), build); err != nil {
return fmt.Errorf("failed to put build: %w", err)
}
log.Infof(c, "updated build: %#v", build)
return nil
}
if err := db.RunInTransaction(c, tx, &db.TransactionOptions{}); err != nil {
return nil, err
}
return retBuild, nil
}
var ErrAssetDuplicated = errors.New("an asset of this type is already present")
func (build *Build) AppendAsset(addAsset Asset) error {
typeInfo := asset.GetTypeDescription(addAsset.Type)
if typeInfo == nil {
return fmt.Errorf("unknown asset type")
}
if !typeInfo.AllowMultiple {
for _, obj := range build.Assets {
if obj.Type == addAsset.Type {
return ErrAssetDuplicated
}
}
}
build.Assets = append(build.Assets, addAsset)
return nil
}
func queryNeededAssets(c context.Context) (*dashapi.NeededAssetsResp, error) {
buildURLs, crashURLs := []string{}, []string{}
g, _ := errgroup.WithContext(c)
g.Go(func() error {
var err error
buildURLs, err = neededBuildURLs(c)
return err
})
g.Go(func() error {
var err error
crashURLs, err = neededCrashURLs(c)
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
return &dashapi.NeededAssetsResp{
DownloadURLs: append(buildURLs, crashURLs...),
}, nil
}
// nolint: dupl
func neededBuildURLs(c context.Context) ([]string, error) {
var builds []*Build
_, err := db.NewQuery("Build").
Filter("Assets.DownloadURL>", "").
Project("Assets.DownloadURL").
GetAll(c, &builds)
if err != nil {
return nil, fmt.Errorf("failed to query builds: %w", err)
}
log.Infof(c, "queried %v builds with assets", len(builds))
ret := []string{}
for _, build := range builds {
for _, asset := range build.Assets {
ret = append(ret, asset.DownloadURL)
}
}
return ret, nil
}
// nolint: dupl
func neededCrashURLs(c context.Context) ([]string, error) {
var crashes []*Crash
_, err := db.NewQuery("Crash").
Filter("Assets.DownloadURL>", "").
Project("Assets.DownloadURL").
GetAll(c, &crashes)
if err != nil {
return nil, fmt.Errorf("failed to query assets: %w", err)
}
log.Infof(c, "queried %v crashes with assets", len(crashes))
ret := []string{}
for _, crash := range crashes {
for _, asset := range crash.Assets {
ret = append(ret, asset.DownloadURL)
}
}
return ret, nil
}
func handleDeprecateAssets(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
for ns := range getConfig(c).Namespaces {
err := deprecateNamespaceAssets(c, ns)
if err != nil {
log.Errorf(c, "deprecateNamespaceAssets failed for ns=%v: %v", ns, err)
}
}
err := deprecateCrashAssets(c)
if err != nil {
log.Errorf(c, "deprecateCrashAssets failed: %v", err)
}
}
func deprecateCrashAssets(c context.Context) error {
ad := crashAssetDeprecator{c: c}
const crashBatchSize = 16
return ad.batchProcessCrashes(crashBatchSize)
}
func deprecateNamespaceAssets(c context.Context, ns string) error {
ad := buildAssetDeprecator{
ns: ns,
c: c,
lastBuilds: map[string]*Build{},
}
const buildBatchSize = 16
err := ad.batchProcessBuilds(buildBatchSize)
if err != nil {
return fmt.Errorf("build batch processing failed: %w", err)
}
return nil
}
type buildAssetDeprecator struct {
ns string
c context.Context
bugsQueried bool
relevantBugs map[string]bool
lastBuilds map[string]*Build
}
const keepAssetsForClosedBugs = time.Hour * 24 * 30
func (ad *buildAssetDeprecator) lastBuild(manager string) (*Build, error) {
build, ok := ad.lastBuilds[manager]
if ok {
return build, nil
}
lastBuild, err := lastManagerBuild(ad.c, ad.ns, manager)
if err != nil {
return nil, err
}
ad.lastBuilds[manager] = lastBuild
return lastBuild, err
}
func (ad *buildAssetDeprecator) queryBugs() error {
if ad.bugsQueried {
return nil
}
var openBugKeys []*db.Key
var closedBugKeys []*db.Key
g, _ := errgroup.WithContext(context.Background())
g.Go(func() error {
// Query open bugs.
var err error
openBugKeys, err = db.NewQuery("Bug").
Filter("Namespace=", ad.ns).
Filter("Status=", BugStatusOpen).
KeysOnly().
GetAll(ad.c, nil)
if err != nil {
return fmt.Errorf("failed to fetch open builds: %w", err)
}
return nil
})
g.Go(func() error {
// Query recently closed bugs.
var err error
closedBugKeys, err = db.NewQuery("Bug").
Filter("Namespace=", ad.ns).
Filter("Closed>", timeNow(ad.c).Add(-keepAssetsForClosedBugs)).
KeysOnly().
GetAll(ad.c, nil)
if err != nil {
return fmt.Errorf("failed to fetch closed builds: %w", err)
}
return nil
})
err := g.Wait()
if err != nil {
return fmt.Errorf("failed to query bugs: %w", err)
}
ad.relevantBugs = map[string]bool{}
for _, key := range append(append([]*db.Key{}, openBugKeys...), closedBugKeys...) {
ad.relevantBugs[key.String()] = true
}
return nil
}
func (ad *buildAssetDeprecator) buildArchivePolicy(build *Build, asset *Asset) (bool, error) {
// Query builds to see whether there's a newer same-type asset on the same week.
var builds []*Build
_, err := db.NewQuery("Build").
Filter("Namespace=", ad.ns).
Filter("Manager=", build.Manager).
Filter("Assets.Type=", asset.Type).
Filter("Assets.CreateDate>", asset.CreateDate).
Limit(1).
Order("Assets.CreateDate").
GetAll(ad.c, &builds)
if err != nil {
return false, fmt.Errorf("failed to query newer assets: %w", err)
}
log.Infof(ad.c, "running archive policy for %s, date %s; queried %d builds",
asset.DownloadURL, asset.CreateDate, len(builds))
sameWeek := false
if len(builds) > 0 {
origY, origW := asset.CreateDate.ISOWeek()
for _, nextAsset := range builds[0].Assets {
if nextAsset.Type != asset.Type {
continue
}
if nextAsset.CreateDate.Before(asset.CreateDate) ||
nextAsset.CreateDate.Equal(asset.CreateDate) {
continue
}
nextY, nextW := nextAsset.CreateDate.ISOWeek()
if origY == nextY && origW == nextW {
log.Infof(ad.c, "found a newer asset: %s, date %s",
nextAsset.DownloadURL, nextAsset.CreateDate)
sameWeek = true
break
}
}
}
return !sameWeek, nil
}
func (ad *buildAssetDeprecator) buildBugStatusPolicy(build *Build) (bool, error) {
if err := ad.queryBugs(); err != nil {
return false, fmt.Errorf("failed to query bugs: %w", err)
}
keys, err := db.NewQuery("Crash").
Filter("BuildID=", build.ID).
KeysOnly().
GetAll(ad.c, nil)
if err != nil {
return false, fmt.Errorf("failed to query crashes: %w", err)
}
for _, key := range keys {
bugKey := key.Parent()
if _, ok := ad.relevantBugs[bugKey.String()]; ok {
// At least one crash is related to an opened/recently closed bug.
return true, nil
}
}
// If there are no crashes, but it's the latest build, they may still appear.
lastBuild, err := ad.lastBuild(build.Manager)
if err != nil {
return false, nil
}
return build.ID == lastBuild.ID, nil
}
func (ad *buildAssetDeprecator) needThisBuildAsset(build *Build, buildAsset *Asset) (bool, error) {
// If the asset is reasonably new, we always keep it.
const alwaysKeepPeriod = time.Hour * 24 * 14
if buildAsset.CreateDate.After(timeNow(ad.c).Add(-alwaysKeepPeriod)) {
return true, nil
}
if buildAsset.Type == dashapi.HTMLCoverageReport {
// We want to keep coverage reports forever, not just
// while there are any open bugs. But we don't want to
// keep all coverage reports, just a share of them.
return ad.buildArchivePolicy(build, buildAsset)
}
if build.Type == BuildNormal || build.Type == BuildFailed {
// A build-related asset, keep it only while there are open bugs with crashes
// related to this build.
return ad.buildBugStatusPolicy(build)
}
// TODO: fix this once this is no longer the case.
return false, fmt.Errorf("job-related assets are not supported yet")
}
func filterOutAssets(assets []Asset, deleteList []string) []Asset {
toDelete := map[string]bool{}
for _, url := range deleteList {
toDelete[url] = true
}
newAssets := []Asset{}
for _, asset := range assets {
if _, ok := toDelete[asset.DownloadURL]; !ok {
newAssets = append(newAssets, asset)
}
}
return newAssets
}
func (ad *buildAssetDeprecator) updateBuild(buildID string, urlsToDelete []string) error {
tx := func(c context.Context) error {
build, err := loadBuild(ad.c, ad.ns, buildID)
if build == nil || err != nil {
// Assume the DB has been updated in the meanwhile.
return nil
}
build.Assets = filterOutAssets(build.Assets, urlsToDelete)
build.AssetsLastCheck = timeNow(ad.c)
if _, err := db.Put(ad.c, buildKey(ad.c, ad.ns, buildID), build); err != nil {
return fmt.Errorf("failed to save build: %w", err)
}
return nil
}
if err := db.RunInTransaction(ad.c, tx, nil); err != nil {
return fmt.Errorf("failed to update build: %w", err)
}
return nil
}
func (ad *buildAssetDeprecator) batchProcessBuilds(count int) error {
// We cannot query only the Build with non-empty Assets array and yet sort
// by AssetsLastCheck. The datastore returns "The first sort property must
// be the same as the property to which the inequality filter is applied.
// In your query the first sort property is AssetsLastCheck but the inequality
// filter is on Assets.DownloadURL.
// So we have to omit Filter("Assets.DownloadURL>", ""). here.
var builds []*Build
_, err := db.NewQuery("Build").
Filter("Namespace=", ad.ns).
Order("AssetsLastCheck").
Limit(count).
GetAll(ad.c, &builds)
if err != nil {
return fmt.Errorf("failed to fetch builds: %w", err)
}
for _, build := range builds {
toDelete := []string{}
for _, asset := range build.Assets {
needed, err := ad.needThisBuildAsset(build, &asset)
if err != nil {
return fmt.Errorf("failed to test asset: %w", err)
} else if !needed {
toDelete = append(toDelete, asset.DownloadURL)
}
}
err := ad.updateBuild(build.ID, toDelete)
if err != nil {
return err
}
}
return nil
}
type crashAssetDeprecator struct {
c context.Context
}
func (ad *crashAssetDeprecator) batchProcessCrashes(count int) error {
// Unfortunately we cannot only query the crashes with assets.
// See the explanation in batchProcessBuilds().
var crashes []*Crash
crashKeys, err := db.NewQuery("Crash").
Order("AssetsLastCheck").
Limit(count).
GetAll(ad.c, &crashes)
if err != nil {
return fmt.Errorf("failed to fetch crashes: %w", err)
}
for i, crash := range crashes {
toDelete := []string{}
for _, asset := range crash.Assets {
needed, err := ad.needThisCrashAsset(crashKeys[i], &asset)
if err != nil {
return fmt.Errorf("failed to test crash asset: %w", err)
} else if !needed {
toDelete = append(toDelete, asset.DownloadURL)
}
}
if i > 0 {
// Sleep for one second to prevent the "API error 2 (datastore_v3:
// CONCURRENT_TRANSACTION): too much contention on these datastore
// entities. please try again." error.
time.Sleep(time.Second)
}
err := ad.updateCrash(crashKeys[i], toDelete)
if err != nil {
return err
}
}
return nil
}
func (ad *crashAssetDeprecator) needThisCrashAsset(crashKey *db.Key, crashAsset *Asset) (bool, error) {
if crashAsset.Type == dashapi.MountInRepro {
// We keed mount images from reproducers for as long as the bug is still relevant.
// They're not that big to set stricter limits.
return ad.bugStatusPolicy(crashKey, crashAsset)
}
return false, fmt.Errorf("no deprecation policy for %s", crashAsset.Type)
}
func (ad *crashAssetDeprecator) bugStatusPolicy(crashKey *db.Key, crashAsset *Asset) (bool, error) {
bugKey := crashKey.Parent()
bug := new(Bug)
err := db.Get(ad.c, bugKey, bug)
if err != nil {
return false, fmt.Errorf("failed to query bug: %w", err)
}
return bug.Status == BugStatusOpen ||
bug.Closed.After(timeNow(ad.c).Add(-keepAssetsForClosedBugs)), nil
}
func (ad *crashAssetDeprecator) updateCrash(crashKey *db.Key, urlsToDelete []string) error {
tx := func(c context.Context) error {
crash := new(Crash)
err := db.Get(c, crashKey, crash)
if err != nil {
// Assume the DB has been updated in the meanwhile.
return nil
}
crash.Assets = filterOutAssets(crash.Assets, urlsToDelete)
crash.AssetsLastCheck = timeNow(ad.c)
if _, err := db.Put(ad.c, crashKey, crash); err != nil {
return fmt.Errorf("failed to save crash: %w", err)
}
return nil
}
if err := db.RunInTransaction(ad.c, tx, &db.TransactionOptions{Attempts: 10}); err != nil {
return fmt.Errorf("failed to update crash: %w", err)
}
return nil
}
func queryLatestManagerAssets(c context.Context, ns string, assetType dashapi.AssetType,
period time.Duration) (map[string]Asset, error) {
var builds []*Build
startTime := timeNow(c).Add(-period)
query := db.NewQuery("Build")
if ns != "" {
query = query.Filter("Namespace=", ns)
}
_, err := query.Filter("Assets.Type=", assetType).
Filter("Assets.CreateDate>", startTime).
Order("Assets.CreateDate").
GetAll(c, &builds)
if err != nil {
return nil, err
}
ret := map[string]Asset{}
for _, build := range builds {
for _, asset := range build.Assets {
if asset.Type != assetType {
continue
}
ret[build.Manager] = asset
}
}
return ret, nil
}
func createAssetList(build *Build, crash *Crash) []dashapi.Asset {
assetList := []dashapi.Asset{}
for _, reportAsset := range append(build.Assets, crash.Assets...) {
typeDescr := asset.GetTypeDescription(reportAsset.Type)
if typeDescr == nil || typeDescr.NoReporting {
continue
}
assetList = append(assetList, dashapi.Asset{
Title: typeDescr.GetTitle(targets.Get(build.OS, build.Arch)),
DownloadURL: reportAsset.DownloadURL,
Type: reportAsset.Type,
})
}
sort.SliceStable(assetList, func(i, j int) bool {
return asset.GetTypeDescription(assetList[i].Type).ReportingPrio <
asset.GetTypeDescription(assetList[j].Type).ReportingPrio
})
handleDupAssetTitles(assetList)
return assetList
}
// Convert asset lists like {"Mounted image", "Mounted image"} to {"Mounted image #1", "Mounted image #2"}.
func handleDupAssetTitles(assetList []dashapi.Asset) {
duplicates := map[string]bool{}
for _, asset := range assetList {
if _, ok := duplicates[asset.Title]; ok {
duplicates[asset.Title] = true
} else {
duplicates[asset.Title] = false
}
}
counts := map[string]int{}
for i, asset := range assetList {
if !duplicates[asset.Title] {
continue
}
counts[asset.Title]++
assetList[i].Title = fmt.Sprintf("%s #%d", asset.Title, counts[asset.Title])
}
}