blob: 8b6bd9f6fa3411e38062fc2f4fe7e2552e2d3652 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package symbolize
import (
"context"
)
// PostProcessor is the type of modifications to a stream of OutputLines
type PostProcessor interface {
Process(OutputLine, chan<- OutputLine)
}
// PreProcessor is the type of modifications to a stream of InputLines
type PreProcessor interface {
Process(InputLine, chan<- InputLine)
}
// Processor is the type of transformations from OutputLines to InputLines
type Processor interface {
Process(InputLine, chan<- OutputLine)
}
type postProcessorNode struct {
postProc PostProcessor
}
func (p postProcessorNode) run(ctx context.Context, input <-chan OutputLine) <-chan OutputLine {
out := make(chan OutputLine)
go func() {
defer func() {
close(out)
}()
for {
select {
case <-ctx.Done():
return
case elem, ok := <-input:
if !ok {
return
}
p.postProc.Process(elem, out)
}
}
}()
return out
}
type preProcessorNode struct {
preProc PreProcessor
}
func (p preProcessorNode) run(ctx context.Context, input <-chan InputLine) <-chan InputLine {
out := make(chan InputLine)
go func() {
defer func() {
close(out)
}()
for {
select {
case <-ctx.Done():
return
case elem, ok := <-input:
if !ok {
return
}
p.preProc.Process(elem, out)
}
}
}()
return out
}
type processorNode struct {
proc Processor
}
func (p processorNode) run(ctx context.Context, input <-chan InputLine) <-chan OutputLine {
out := make(chan OutputLine)
go func() {
defer func() {
close(out)
}()
for {
select {
case <-ctx.Done():
return
case elem, ok := <-input:
if !ok {
return
}
p.proc.Process(elem, out)
}
}
}()
return out
}
// Consume eats and forgets all input from a channel. This is useful for keeping a goroutine alive
// until a pipeline has finished processing to completion.
func Consume(input <-chan OutputLine) {
for range input {
}
}
// ComposePreProcessors takes several PreProcessors and runs them in sequence
func ComposePreProcessors(ctx context.Context, input <-chan InputLine, pres ...PreProcessor) <-chan InputLine {
for _, pre := range pres {
input = preProcessorNode{pre}.run(ctx, input)
}
return input
}
// ComposePostProcessors takes several PostProcessors and runs them in sequence
func ComposePostProcessors(ctx context.Context, input <-chan OutputLine, posts ...PostProcessor) <-chan OutputLine {
for _, post := range posts {
input = postProcessorNode{post}.run(ctx, input)
}
return input
}
// Compose takes a single Processor and causes it to start transforming Input into Output
func ComposeProcessors(ctx context.Context, input <-chan InputLine, proc Processor) <-chan OutputLine {
return processorNode{proc}.run(ctx, input)
}