| package main |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "io/ioutil" |
| "log" |
| "os" |
| "os/signal" |
| "path/filepath" |
| "syscall" |
| |
| "cloud.google.com/go/logging" |
| "github.com/fsnotify/fsnotify" |
| "golang.org/x/sync/errgroup" |
| "google.golang.org/api/option" |
| ) |
| |
| const ( |
| logDir = "/var/log/" |
| ) |
| |
| var ( |
| stackdriverCreds string |
| cloudProject string |
| controller string |
| ) |
| |
| type logMsg struct { |
| // Message is the actual line in the log. |
| Message string `json:"message"` |
| // LogName is the name of the log within /var/log this came from. |
| LogName string `json:"log_name"` |
| // Controller is the name of the control server this message came from. |
| Controller string `json:"controller"` |
| } |
| |
| func init() { |
| flag.StringVar(&stackdriverCreds, "stackdriver-creds", "", "Path to the stackdriver credentials file") |
| flag.StringVar(&cloudProject, "cloud-project", "", "Name of the cloud project to stream logs to") |
| flag.StringVar(&controller, "controller", "", "Name of the control server logd is running on") |
| } |
| |
| func execute(ctx context.Context, logsToPersist []string) error { |
| // Create a stackdriver logging client. |
| client, err := logging.NewClient(ctx, cloudProject, option.WithCredentialsFile(stackdriverCreds)) |
| if err != nil { |
| return err |
| } |
| defer client.Close() |
| // Iterate through each of the logs passed in and: |
| // 1) Ensure that it is in /var/log/ |
| // 2) Spin up a goroutine to persist messages to cloud. |
| eg, ctx := errgroup.WithContext(ctx) |
| for _, name := range logsToPersist { |
| logName := name |
| logPath := filepath.Join(logDir, logName) |
| logger := client.Logger(fmt.Sprintf("%s-%s", controller, logName)) |
| watcher, err := fsnotify.NewWatcher() |
| if err != nil { |
| return err |
| } |
| defer watcher.Close() |
| |
| logFile, err := os.Open(logPath) |
| if err != nil { |
| return err |
| } |
| defer logFile.Close() |
| |
| if _, err := logFile.Seek(0, os.SEEK_END); err != nil { |
| return err |
| } |
| |
| eg.Go(func() error { |
| for { |
| select { |
| case event, ok := <-watcher.Events: |
| if !ok { |
| return nil |
| } |
| if event.Op&fsnotify.Write == fsnotify.Write { |
| data, err := ioutil.ReadAll(logFile) |
| if err != nil { |
| return err |
| } |
| logger.Log(logging.Entry{ |
| Payload: logMsg{ |
| Message: string(data), |
| LogName: logName, |
| Controller: controller, |
| }, |
| }) |
| |
| } |
| case err, ok := <-watcher.Errors: |
| if !ok { |
| return nil |
| } |
| return err |
| case <-ctx.Done(): |
| return nil |
| } |
| } |
| }) |
| if err := watcher.Add(logPath); err != nil { |
| return err |
| } |
| } |
| return eg.Wait() |
| } |
| |
| func main() { |
| flag.Parse() |
| logsToPersist := flag.Args() |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| signals := make(chan os.Signal) |
| signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) |
| |
| go func() { |
| select { |
| case <-signals: |
| cancel() |
| case <-ctx.Done(): |
| } |
| }() |
| |
| if err := execute(ctx, logsToPersist); err != nil { |
| log.Fatalf("failed to run logd: %v", err) |
| } |
| } |