blob: 8de9f1f4d55d032265edbc71c42a3a7ab8906563 [file] [log] [blame]
// Copyright 2018 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
type keyGetter func() (string, error)
func pickGetter(key string) keyGetter {
if key != "" {
return func() (string, error) { return key, nil }
// Attempts oauth when the configured hub_key is empty.
tokenCache, err := auth.MakeCache(http.NewRequest, http.DefaultClient.Do)
if err != nil {
log.Fatalf("failed to make auth cache %v", err)
return func() (string, error) {
return tokenCache.Get(time.Now())
func (mgr *Manager) hubSyncLoop(keyGet keyGetter) {
hc := &HubConnector{
mgr: mgr,
cfg: mgr.cfg,
domain: mgr.cfg.TargetOS + "/" + mgr.cfg.HubDomain,
enabledCalls: mgr.targetEnabledSyscalls,
leak: mgr.checkFeatures[host.FeatureLeak].Enabled,
fresh: mgr.fresh,
hubReproQueue: mgr.externalReproQueue,
keyGet: keyGet,
statSendProgAdd: stats.Create("hub send prog add", "", stats.Graph("hub progs")),
statSendProgDel: stats.Create("hub send prog del", "", stats.Graph("hub progs")),
statRecvProg: stats.Create("hub recv prog", "", stats.Graph("hub progs")),
statRecvProgDrop: stats.Create("hub recv prog drop", "", stats.NoGraph),
statSendRepro: stats.Create("hub send repro", "", stats.Graph("hub repros")),
statRecvRepro: stats.Create("hub recv repro", "", stats.Graph("hub repros")),
statRecvReproDrop: stats.Create("hub recv repro drop", "", stats.NoGraph),
if mgr.cfg.Reproduce && mgr.dash != nil {
hc.needMoreRepros = mgr.needMoreRepros
type HubConnector struct {
mgr HubManagerView
cfg *mgrconfig.Config
target *prog.Target
domain string
enabledCalls map[*prog.Syscall]bool
leak bool
fresh bool
hubCorpus map[hash.Sig]bool
newRepros [][]byte
hubReproQueue chan *Crash
needMoreRepros chan chan bool
keyGet keyGetter
statSendProgAdd *stats.Val
statSendProgDel *stats.Val
statRecvProg *stats.Val
statRecvProgDrop *stats.Val
statSendRepro *stats.Val
statRecvRepro *stats.Val
statRecvReproDrop *stats.Val
// HubManagerView restricts interface between HubConnector and Manager.
type HubManagerView interface {
getMinimizedCorpus() (corpus, repros [][]byte)
addNewCandidates(candidates []fuzzer.Candidate)
func (hc *HubConnector) loop() {
var hub *rpctype.RPCClient
var doneOnce bool
for query := 0; ; time.Sleep(10 * time.Minute) {
corpus, repros := hc.mgr.getMinimizedCorpus()
hc.newRepros = append(hc.newRepros, repros...)
if hub == nil {
var err error
if hub, err = hc.connect(corpus); err != nil {
log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err)
} else {
log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus))
if hub != nil {
if err := hc.sync(hub, corpus); err != nil {
log.Logf(0, "hub sync failed: %v", err)
hub = nil
} else {
doneOnce = true
const maxAttempts = 3
if hub == nil && query >= maxAttempts && !doneOnce {
func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
key, err := hc.keyGet()
if err != nil {
return nil, err
hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
if err != nil {
return nil, err
a := &rpctype.HubConnectArgs{
Client: hc.cfg.HubClient,
Key: key,
Manager: hc.cfg.Name,
Domain: hc.domain,
Fresh: hc.fresh,
for call := range hc.enabledCalls {
a.Calls = append(a.Calls, call.Name)
hubCorpus := make(map[hash.Sig]bool)
for _, inp := range corpus {
hubCorpus[hash.Hash(inp)] = true
a.Corpus = append(a.Corpus, inp)
// Never send more than this, this is never healthy but happens episodically
// due to various reasons: problems with fallback coverage, bugs in kcov,
// fuzzer exploiting our infrastructure, etc.
const max = 100 * 1000
if len(a.Corpus) > max {
a.Corpus = a.Corpus[:max]
err = hub.Call("Hub.Connect", a, nil)
// Hub.Connect request can be very large, so do it on a transient connection
// (rpc connection buffers never shrink).
if err != nil {
return nil, err
hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
if err != nil {
return nil, err
hc.hubCorpus = hubCorpus
hc.fresh = false
return hub, nil
func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error {
key, err := hc.keyGet()
if err != nil {
return err
a := &rpctype.HubSyncArgs{
Client: hc.cfg.HubClient,
Key: key,
Manager: hc.cfg.Name,
sigs := make(map[hash.Sig]bool)
for _, inp := range corpus {
sig := hash.Hash(inp)
sigs[sig] = true
if hc.hubCorpus[sig] {
hc.hubCorpus[sig] = true
a.Add = append(a.Add, inp)
for sig := range hc.hubCorpus {
if sigs[sig] {
delete(hc.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
if hc.needMoreRepros != nil {
needReproReply := make(chan bool)
hc.needMoreRepros <- needReproReply
a.NeedRepros = <-needReproReply
a.Repros = hc.newRepros
for {
r := new(rpctype.HubSyncRes)
if err := hub.Call("Hub.Sync", a, r); err != nil {
return err
minimized, smashed, progDropped := hc.processProgs(r.Inputs)
reproDropped := hc.processRepros(r.Repros)
hc.statRecvProg.Add(len(r.Inputs) - progDropped)
hc.statRecvRepro.Add(len(r.Repros) - reproDropped)
log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+
" recv: progs %v (min %v, smash %v), repros %v; more %v",
len(a.Add), len(a.Del), len(a.Repros),
len(r.Inputs)-progDropped, minimized, smashed,
len(r.Repros)-reproDropped, r.More)
a.Add = nil
a.Del = nil
a.Repros = nil
a.NeedRepros = false
hc.newRepros = nil
if len(r.Inputs)+r.More == 0 {
return nil
func (hc *HubConnector) processProgs(inputs []rpctype.HubInput) (minimized, smashed, dropped int) {
candidates := make([]fuzzer.Candidate, 0, len(inputs))
for _, inp := range inputs {
p, disabled, bad := parseProgram(, hc.enabledCalls, inp.Prog)
if bad != nil || disabled {
log.Logf(0, "rejecting program from hub (bad=%v, disabled=%v):\n%s",
bad, disabled, inp)
min, smash := matchDomains(hc.domain, inp.Domain)
if min {
if smash {
candidates = append(candidates, fuzzer.Candidate{
Prog: p,
Minimized: min,
Smashed: smash,
func matchDomains(self, input string) (bool, bool) {
if self == "" || input == "" {
return true, true
min0, smash0 := splitDomains(self)
min1, smash1 := splitDomains(input)
min := min0 != min1
smash := min || smash0 != smash1
return min, smash
func splitDomains(domain string) (string, string) {
delim0 := strings.IndexByte(domain, '/')
if delim0 == -1 {
return domain, ""
if delim0 == len(domain)-1 {
return domain[:delim0], ""
delim1 := strings.IndexByte(domain[delim0+1:], '/')
if delim1 == -1 {
return domain, ""
return domain[:delim0+delim1+1], domain[delim0+delim1+2:]
func (hc *HubConnector) processRepros(repros [][]byte) int {
dropped := 0
for _, repro := range repros {
_, disabled, bad := parseProgram(, hc.enabledCalls, repro)
if bad != nil || disabled {
log.Logf(0, "rejecting repro from hub (bad=%v, disabled=%v):\n%s",
bad, disabled, repro)
// On a leak instance we override repro type to leak,
// because otherwise repro package won't even enable leak detection
// and we won't reproduce leaks from other instances.
typ := crash.UnknownType
if hc.leak {
typ = crash.MemoryLeak
hc.hubReproQueue <- &Crash{
fromHub: true,
Report: &report.Report{
Title: "external repro",
Type: typ,
Output: repro,
return dropped