blob: 559efb57b4e25a50b0301e89e52deeb78847474f [file] [log] [blame]
package main
import (
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"cloud.google.com/go/logging"
"github.com/fsnotify/fsnotify"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
)
const (
logDir = "/var/log/"
)
var (
stackdriverCreds string
cloudProject string
controller string
)
type logMsg struct {
// Message is the actual line in the log.
Message string `json:"message"`
// LogName is the name of the log within /var/log this came from.
LogName string `json:"log_name"`
// Controller is the name of the control server this message came from.
Controller string `json:"controller"`
}
func init() {
flag.StringVar(&stackdriverCreds, "stackdriver-creds", "", "Path to the stackdriver credentials file")
flag.StringVar(&cloudProject, "cloud-project", "", "Name of the cloud project to stream logs to")
flag.StringVar(&controller, "controller", "", "Name of the control server logd is running on")
}
func execute(ctx context.Context, logsToPersist []string) error {
// Create a stackdriver logging client.
client, err := logging.NewClient(ctx, cloudProject, option.WithCredentialsFile(stackdriverCreds))
if err != nil {
return err
}
defer client.Close()
// Iterate through each of the logs passed in and:
// 1) Ensure that it is in /var/log/
// 2) Spin up a goroutine to persist messages to cloud.
eg, ctx := errgroup.WithContext(ctx)
for _, name := range logsToPersist {
logName := name
logPath := filepath.Join(logDir, logName)
logger := client.Logger(fmt.Sprintf("%s-%s", controller, logName))
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()
logFile, err := os.Open(logPath)
if err != nil {
return err
}
defer logFile.Close()
if _, err := logFile.Seek(0, os.SEEK_END); err != nil {
return err
}
eg.Go(func() error {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return nil
}
if event.Op&fsnotify.Write == fsnotify.Write {
data, err := ioutil.ReadAll(logFile)
if err != nil {
return err
}
logger.Log(logging.Entry{
Payload: logMsg{
Message: string(data),
LogName: logName,
Controller: controller,
},
})
}
case err, ok := <-watcher.Errors:
if !ok {
return nil
}
return err
case <-ctx.Done():
return nil
}
}
})
if err := watcher.Add(logPath); err != nil {
return err
}
}
return eg.Wait()
}
func main() {
flag.Parse()
logsToPersist := flag.Args()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signals := make(chan os.Signal)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
go func() {
select {
case <-signals:
cancel()
case <-ctx.Done():
}
}()
if err := execute(ctx, logsToPersist); err != nil {
log.Fatalf("failed to run logd: %v", err)
}
}