// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package symbolize

import (
	"context"
	"fmt"

	"go.fuchsia.dev/tools/lib/logger"
)

// TODO (jakehehrlich): Make as much of this private as possible.

type remuxer struct {
	seq chan (<-chan OutputLine)
	out chan<- OutputLine
}

func newRemuxer() *remuxer {
	return &remuxer{seq: make(chan (<-chan OutputLine), 1024)}
}

func (r *remuxer) sequence(in <-chan OutputLine) {
	r.seq <- in
}

func (r *remuxer) start(ctx context.Context) (<-chan OutputLine, error) {
	if r.out != nil {
		return nil, fmt.Errorf("Attempt to start an already running remuxer")
	}
	out := make(chan OutputLine)
	r.out = out
	go func() {
		defer close(r.out)
		for {
			select {
			case <-ctx.Done():
				return
			case in, ok := <-r.seq:
				if !ok {
					return
				}
				r.out <- (<-in)
			}
		}
	}()
	return out, nil
}

func (r *remuxer) stop() error {
	close(r.seq)
	if r.seq == nil {
		return fmt.Errorf("Attempt to stop a remuxer that hasn't been started")
	}
	return nil
}

type pipe struct {
	in  chan<- InputLine
	out <-chan OutputLine
}

// Demuxer demultiplexes incomming input lines, spins up new filters, and sends them input lines.
type Demuxer struct {
	// Spin up new filters as new procsses are found.
	filters map[LineSource]pipe
	// Use same symbolizer for all
	symbolizer Symbolizer
	// Use same repo for all
	repo Repository
}

// NewDemuxer creates a new demuxer.
func NewDemuxer(repo Repository, symbo Symbolizer) *Demuxer {
	return &Demuxer{
		repo:       repo,
		symbolizer: symbo,
		filters:    make(map[LineSource]pipe),
	}
}

// Start tells the demuxer to start consuming input and dispatching to the filters.
func (d *Demuxer) Start(ctx context.Context, input <-chan InputLine) <-chan OutputLine {
	// Create the remuxer.
	remux := newRemuxer()
	out, err := remux.start(ctx)
	if err != nil {
		logger.Fatalf(ctx, "Failed to start remuxer in case where that should never happen.")
	}
	go func() {
		// Clean up the channels/goroutines when we're done
		defer func() {
			for _, pipe := range d.filters {
				close(pipe.in)
			}
			err := remux.stop()
			if err != nil {
				logger.Warningf(ctx, "%v", err)
			}
		}()
		// Start multiplexing things out
		for {
			select {
			case <-ctx.Done():
				return
			case line, ok := <-input:
				if !ok {
					return
				}
				var fpipe pipe
				if fpipe, ok = d.filters[line.source]; !ok {
					fin := make(chan InputLine)
					// Spin up a new Filter for this process.
					filt := NewFilter(d.repo, d.symbolizer)
					fout := filt.Start(ctx, fin)
					fpipe = pipe{in: fin, out: fout}
					d.filters[line.source] = fpipe
				}
				// sequence a read from fpipe.out
				remux.sequence(fpipe.out)
				fpipe.in <- line
			}
		}
	}()
	return out
}
