blob: 8de9f1f4d55d032265edbc71c42a3a7ab8906563 [file] [log] [blame]
// Copyright 2018 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
"net/http"
"strings"
"time"
"github.com/google/syzkaller/pkg/auth"
"github.com/google/syzkaller/pkg/fuzzer"
"github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/host"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/report"
"github.com/google/syzkaller/pkg/report/crash"
"github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/pkg/stats"
"github.com/google/syzkaller/prog"
)
type keyGetter func() (string, error)
func pickGetter(key string) keyGetter {
if key != "" {
return func() (string, error) { return key, nil }
}
// Attempts oauth when the configured hub_key is empty.
tokenCache, err := auth.MakeCache(http.NewRequest, http.DefaultClient.Do)
if err != nil {
log.Fatalf("failed to make auth cache %v", err)
}
return func() (string, error) {
return tokenCache.Get(time.Now())
}
}
func (mgr *Manager) hubSyncLoop(keyGet keyGetter) {
hc := &HubConnector{
mgr: mgr,
cfg: mgr.cfg,
target: mgr.target,
domain: mgr.cfg.TargetOS + "/" + mgr.cfg.HubDomain,
enabledCalls: mgr.targetEnabledSyscalls,
leak: mgr.checkFeatures[host.FeatureLeak].Enabled,
fresh: mgr.fresh,
hubReproQueue: mgr.externalReproQueue,
keyGet: keyGet,
statSendProgAdd: stats.Create("hub send prog add", "", stats.Graph("hub progs")),
statSendProgDel: stats.Create("hub send prog del", "", stats.Graph("hub progs")),
statRecvProg: stats.Create("hub recv prog", "", stats.Graph("hub progs")),
statRecvProgDrop: stats.Create("hub recv prog drop", "", stats.NoGraph),
statSendRepro: stats.Create("hub send repro", "", stats.Graph("hub repros")),
statRecvRepro: stats.Create("hub recv repro", "", stats.Graph("hub repros")),
statRecvReproDrop: stats.Create("hub recv repro drop", "", stats.NoGraph),
}
if mgr.cfg.Reproduce && mgr.dash != nil {
hc.needMoreRepros = mgr.needMoreRepros
}
hc.loop()
}
type HubConnector struct {
mgr HubManagerView
cfg *mgrconfig.Config
target *prog.Target
domain string
enabledCalls map[*prog.Syscall]bool
leak bool
fresh bool
hubCorpus map[hash.Sig]bool
newRepros [][]byte
hubReproQueue chan *Crash
needMoreRepros chan chan bool
keyGet keyGetter
statSendProgAdd *stats.Val
statSendProgDel *stats.Val
statRecvProg *stats.Val
statRecvProgDrop *stats.Val
statSendRepro *stats.Val
statRecvRepro *stats.Val
statRecvReproDrop *stats.Val
}
// HubManagerView restricts interface between HubConnector and Manager.
type HubManagerView interface {
getMinimizedCorpus() (corpus, repros [][]byte)
addNewCandidates(candidates []fuzzer.Candidate)
hubIsUnreachable()
}
func (hc *HubConnector) loop() {
var hub *rpctype.RPCClient
var doneOnce bool
for query := 0; ; time.Sleep(10 * time.Minute) {
corpus, repros := hc.mgr.getMinimizedCorpus()
hc.newRepros = append(hc.newRepros, repros...)
if hub == nil {
var err error
if hub, err = hc.connect(corpus); err != nil {
log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err)
} else {
log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus))
}
}
if hub != nil {
if err := hc.sync(hub, corpus); err != nil {
log.Logf(0, "hub sync failed: %v", err)
hub.Close()
hub = nil
} else {
doneOnce = true
}
}
query++
const maxAttempts = 3
if hub == nil && query >= maxAttempts && !doneOnce {
hc.mgr.hubIsUnreachable()
}
}
}
func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
key, err := hc.keyGet()
if err != nil {
return nil, err
}
hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
if err != nil {
return nil, err
}
a := &rpctype.HubConnectArgs{
Client: hc.cfg.HubClient,
Key: key,
Manager: hc.cfg.Name,
Domain: hc.domain,
Fresh: hc.fresh,
}
for call := range hc.enabledCalls {
a.Calls = append(a.Calls, call.Name)
}
hubCorpus := make(map[hash.Sig]bool)
for _, inp := range corpus {
hubCorpus[hash.Hash(inp)] = true
a.Corpus = append(a.Corpus, inp)
}
// Never send more than this, this is never healthy but happens episodically
// due to various reasons: problems with fallback coverage, bugs in kcov,
// fuzzer exploiting our infrastructure, etc.
const max = 100 * 1000
if len(a.Corpus) > max {
a.Corpus = a.Corpus[:max]
}
err = hub.Call("Hub.Connect", a, nil)
// Hub.Connect request can be very large, so do it on a transient connection
// (rpc connection buffers never shrink).
hub.Close()
if err != nil {
return nil, err
}
hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
if err != nil {
return nil, err
}
hc.hubCorpus = hubCorpus
hc.fresh = false
return hub, nil
}
func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error {
key, err := hc.keyGet()
if err != nil {
return err
}
a := &rpctype.HubSyncArgs{
Client: hc.cfg.HubClient,
Key: key,
Manager: hc.cfg.Name,
}
sigs := make(map[hash.Sig]bool)
for _, inp := range corpus {
sig := hash.Hash(inp)
sigs[sig] = true
if hc.hubCorpus[sig] {
continue
}
hc.hubCorpus[sig] = true
a.Add = append(a.Add, inp)
}
for sig := range hc.hubCorpus {
if sigs[sig] {
continue
}
delete(hc.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
if hc.needMoreRepros != nil {
needReproReply := make(chan bool)
hc.needMoreRepros <- needReproReply
a.NeedRepros = <-needReproReply
}
a.Repros = hc.newRepros
for {
r := new(rpctype.HubSyncRes)
if err := hub.Call("Hub.Sync", a, r); err != nil {
return err
}
minimized, smashed, progDropped := hc.processProgs(r.Inputs)
reproDropped := hc.processRepros(r.Repros)
hc.statSendProgAdd.Add(len(a.Add))
hc.statSendProgDel.Add(len(a.Del))
hc.statSendRepro.Add(len(a.Repros))
hc.statRecvProg.Add(len(r.Inputs) - progDropped)
hc.statRecvProgDrop.Add(progDropped)
hc.statRecvRepro.Add(len(r.Repros) - reproDropped)
hc.statRecvReproDrop.Add(reproDropped)
log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+
" recv: progs %v (min %v, smash %v), repros %v; more %v",
len(a.Add), len(a.Del), len(a.Repros),
len(r.Inputs)-progDropped, minimized, smashed,
len(r.Repros)-reproDropped, r.More)
a.Add = nil
a.Del = nil
a.Repros = nil
a.NeedRepros = false
hc.newRepros = nil
if len(r.Inputs)+r.More == 0 {
return nil
}
}
}
func (hc *HubConnector) processProgs(inputs []rpctype.HubInput) (minimized, smashed, dropped int) {
candidates := make([]fuzzer.Candidate, 0, len(inputs))
for _, inp := range inputs {
p, disabled, bad := parseProgram(hc.target, hc.enabledCalls, inp.Prog)
if bad != nil || disabled {
log.Logf(0, "rejecting program from hub (bad=%v, disabled=%v):\n%s",
bad, disabled, inp)
dropped++
continue
}
min, smash := matchDomains(hc.domain, inp.Domain)
if min {
minimized++
}
if smash {
smashed++
}
candidates = append(candidates, fuzzer.Candidate{
Prog: p,
Minimized: min,
Smashed: smash,
})
}
hc.mgr.addNewCandidates(candidates)
return
}
func matchDomains(self, input string) (bool, bool) {
if self == "" || input == "" {
return true, true
}
min0, smash0 := splitDomains(self)
min1, smash1 := splitDomains(input)
min := min0 != min1
smash := min || smash0 != smash1
return min, smash
}
func splitDomains(domain string) (string, string) {
delim0 := strings.IndexByte(domain, '/')
if delim0 == -1 {
return domain, ""
}
if delim0 == len(domain)-1 {
return domain[:delim0], ""
}
delim1 := strings.IndexByte(domain[delim0+1:], '/')
if delim1 == -1 {
return domain, ""
}
return domain[:delim0+delim1+1], domain[delim0+delim1+2:]
}
func (hc *HubConnector) processRepros(repros [][]byte) int {
dropped := 0
for _, repro := range repros {
_, disabled, bad := parseProgram(hc.target, hc.enabledCalls, repro)
if bad != nil || disabled {
log.Logf(0, "rejecting repro from hub (bad=%v, disabled=%v):\n%s",
bad, disabled, repro)
dropped++
continue
}
// On a leak instance we override repro type to leak,
// because otherwise repro package won't even enable leak detection
// and we won't reproduce leaks from other instances.
typ := crash.UnknownType
if hc.leak {
typ = crash.MemoryLeak
}
hc.hubReproQueue <- &Crash{
fromHub: true,
Report: &report.Report{
Title: "external repro",
Type: typ,
Output: repro,
},
}
}
return dropped
}