blob: 352336dc68b9367b255624b4d05f97ba3bd68bef [file] [log] [blame]
package sideband
import (
"errors"
"fmt"
"io"
"gopkg.in/src-d/go-git.v4/plumbing/format/pktline"
)
// ErrMaxPackedExceeded returned by Read, if the maximum packed size is exceeded
var ErrMaxPackedExceeded = errors.New("max. packed size exceeded")
// Progress where the progress information is stored
type Progress interface {
io.Writer
}
// Demuxer demultiplexes the progress reports and error info interleaved with the
// packfile itself.
//
// A sideband has three different channels the main one, called PackData, contains
// the packfile data; the ErrorMessage channel, that contains server errors; and
// the last one, ProgressMessage channel, containing information about the ongoing
// task happening in the server (optional, can be suppressed sending NoProgress
// or Quiet capabilities to the server)
//
// In order to demultiplex the data stream, method `Read` should be called to
// retrieve the PackData channel, the incoming data from the ProgressMessage is
// written at `Progress` (if any), if any message is retrieved from the
// ErrorMessage channel an error is returned and we can assume that the
// connection has been closed.
type Demuxer struct {
t Type
r io.Reader
s *pktline.Scanner
max int
pending []byte
// Progress is where the progress messages are stored
Progress Progress
}
// NewDemuxer returns a new Demuxer for the given t and read from r
func NewDemuxer(t Type, r io.Reader) *Demuxer {
max := MaxPackedSize64k
if t == Sideband {
max = MaxPackedSize
}
return &Demuxer{
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
}
}
// Read reads up to len(p) bytes from the PackData channel into p, an error can
// be return if an error happens when reading or if a message is sent in the
// ErrorMessage channel.
//
// When a ProgressMessage is read, is not copy to b, instead of this is written
// to the Progress
func (d *Demuxer) Read(b []byte) (n int, err error) {
var read, req int
req = len(b)
for read < req {
n, err := d.doRead(b[read:req])
read += n
if err != nil {
return read, err
}
}
return read, nil
}
func (d *Demuxer) doRead(b []byte) (int, error) {
read, err := d.nextPackData()
size := len(read)
wanted := len(b)
if size > wanted {
d.pending = read[wanted:]
}
if wanted > size {
wanted = size
}
size = copy(b, read[:wanted])
return size, err
}
func (d *Demuxer) nextPackData() ([]byte, error) {
content := d.getPending()
if len(content) != 0 {
return content, nil
}
if !d.s.Scan() {
if err := d.s.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}
content = d.s.Bytes()
size := len(content)
if size == 0 {
return nil, nil
} else if size > d.max {
return nil, ErrMaxPackedExceeded
}
switch Channel(content[0]) {
case PackData:
return content[1:], nil
case ProgressMessage:
if d.Progress != nil {
_, err := d.Progress.Write(content[1:])
return nil, err
}
case ErrorMessage:
return nil, fmt.Errorf("unexpected error: %s", content[1:])
default:
return nil, fmt.Errorf("unknown channel %s", content)
}
return nil, nil
}
func (d *Demuxer) getPending() (b []byte) {
if len(d.pending) == 0 {
return nil
}
content := d.pending
d.pending = nil
return content
}