| // Copyright 2012 Google Inc. All rights reserved. |
| // Use of this source code is governed by the Apache 2.0 |
| // license that can be found in the LICENSE file. |
| |
| package blobstore |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "os" |
| "sync" |
| |
| "github.com/golang/protobuf/proto" |
| "golang.org/x/net/context" |
| |
| "google.golang.org/appengine" |
| "google.golang.org/appengine/internal" |
| |
| blobpb "google.golang.org/appengine/internal/blobstore" |
| ) |
| |
| // openBlob returns a reader for a blob. It always succeeds; if the blob does |
| // not exist then an error will be reported upon first read. |
| func openBlob(c context.Context, blobKey appengine.BlobKey) Reader { |
| return &reader{ |
| c: c, |
| blobKey: blobKey, |
| } |
| } |
| |
| const readBufferSize = 256 * 1024 |
| |
| // reader is a blob reader. It implements the Reader interface. |
| type reader struct { |
| c context.Context |
| |
| // Either blobKey or filename is set: |
| blobKey appengine.BlobKey |
| filename string |
| |
| closeFunc func() // is nil if unavailable or already closed. |
| |
| // buf is the read buffer. r is how much of buf has been read. |
| // off is the offset of buf[0] relative to the start of the blob. |
| // An invariant is 0 <= r && r <= len(buf). |
| // Reads that don't require an RPC call will increment r but not off. |
| // Seeks may modify r without discarding the buffer, but only if the |
| // invariant can be maintained. |
| mu sync.Mutex |
| buf []byte |
| r int |
| off int64 |
| } |
| |
| func (r *reader) Close() error { |
| if f := r.closeFunc; f != nil { |
| f() |
| } |
| r.closeFunc = nil |
| return nil |
| } |
| |
| func (r *reader) Read(p []byte) (int, error) { |
| if len(p) == 0 { |
| return 0, nil |
| } |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| if r.r == len(r.buf) { |
| if err := r.fetch(r.off + int64(r.r)); err != nil { |
| return 0, err |
| } |
| } |
| n := copy(p, r.buf[r.r:]) |
| r.r += n |
| return n, nil |
| } |
| |
| func (r *reader) ReadAt(p []byte, off int64) (int, error) { |
| if len(p) == 0 { |
| return 0, nil |
| } |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| // Convert relative offsets to absolute offsets. |
| ab0 := r.off + int64(r.r) |
| ab1 := r.off + int64(len(r.buf)) |
| ap0 := off |
| ap1 := off + int64(len(p)) |
| // Check if we can satisfy the read entirely out of the existing buffer. |
| if r.off <= ap0 && ap1 <= ab1 { |
| // Convert off from an absolute offset to a relative offset. |
| rp0 := int(ap0 - r.off) |
| return copy(p, r.buf[rp0:]), nil |
| } |
| // Restore the original Read/Seek offset after ReadAt completes. |
| defer r.seek(ab0) |
| // Repeatedly fetch and copy until we have filled p. |
| n := 0 |
| for len(p) > 0 { |
| if err := r.fetch(off + int64(n)); err != nil { |
| return n, err |
| } |
| r.r = copy(p, r.buf) |
| n += r.r |
| p = p[r.r:] |
| } |
| return n, nil |
| } |
| |
| func (r *reader) Seek(offset int64, whence int) (ret int64, err error) { |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| switch whence { |
| case os.SEEK_SET: |
| ret = offset |
| case os.SEEK_CUR: |
| ret = r.off + int64(r.r) + offset |
| case os.SEEK_END: |
| return 0, errors.New("seeking relative to the end of a blob isn't supported") |
| default: |
| return 0, fmt.Errorf("invalid Seek whence value: %d", whence) |
| } |
| if ret < 0 { |
| return 0, errors.New("negative Seek offset") |
| } |
| return r.seek(ret) |
| } |
| |
| // fetch fetches readBufferSize bytes starting at the given offset. On success, |
| // the data is saved as r.buf. |
| func (r *reader) fetch(off int64) error { |
| req := &blobpb.FetchDataRequest{ |
| BlobKey: proto.String(string(r.blobKey)), |
| StartIndex: proto.Int64(off), |
| EndIndex: proto.Int64(off + readBufferSize - 1), // EndIndex is inclusive. |
| } |
| res := &blobpb.FetchDataResponse{} |
| if err := internal.Call(r.c, "blobstore", "FetchData", req, res); err != nil { |
| return err |
| } |
| if len(res.Data) == 0 { |
| return io.EOF |
| } |
| r.buf, r.r, r.off = res.Data, 0, off |
| return nil |
| } |
| |
| // seek seeks to the given offset with an effective whence equal to SEEK_SET. |
| // It discards the read buffer if the invariant cannot be maintained. |
| func (r *reader) seek(off int64) (int64, error) { |
| delta := off - r.off |
| if delta >= 0 && delta < int64(len(r.buf)) { |
| r.r = int(delta) |
| return off, nil |
| } |
| r.buf, r.r, r.off = nil, 0, off |
| return off, nil |
| } |