2018-07-06 16:09:34 +00:00
|
|
|
// Copyright 2016 Google LLC
|
2016-12-01 19:42:31 +00:00
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
2018-07-06 16:09:34 +00:00
|
|
|
"errors"
|
2017-04-17 15:17:06 +00:00
|
|
|
"fmt"
|
|
|
|
"hash/crc32"
|
2016-12-01 19:42:31 +00:00
|
|
|
"io"
|
2018-07-06 16:09:34 +00:00
|
|
|
"io/ioutil"
|
|
|
|
"net/http"
|
|
|
|
"net/url"
|
|
|
|
"reflect"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"cloud.google.com/go/internal/trace"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
"google.golang.org/api/googleapi"
|
2016-12-01 19:42:31 +00:00
|
|
|
)
|
|
|
|
|
2017-04-17 15:17:06 +00:00
|
|
|
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
|
|
|
|
|
2018-07-06 16:09:34 +00:00
|
|
|
// NewReader creates a new Reader to read the contents of the
|
|
|
|
// object.
|
|
|
|
// ErrObjectNotExist will be returned if the object is not found.
|
|
|
|
//
|
|
|
|
// The caller must call Close on the returned Reader when done reading.
|
|
|
|
func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) {
|
|
|
|
return o.NewRangeReader(ctx, 0, -1)
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewRangeReader reads part of an object, reading at most length bytes
|
|
|
|
// starting at the given offset. If length is negative, the object is read
|
|
|
|
// until the end.
|
|
|
|
func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (r *Reader, err error) {
|
|
|
|
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader")
|
|
|
|
defer func() { trace.EndSpan(ctx, err) }()
|
|
|
|
|
|
|
|
if err := o.validate(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if offset < 0 {
|
|
|
|
return nil, fmt.Errorf("storage: invalid offset %d < 0", offset)
|
|
|
|
}
|
|
|
|
if o.conds != nil {
|
|
|
|
if err := o.conds.validate("NewRangeReader"); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
u := &url.URL{
|
|
|
|
Scheme: "https",
|
|
|
|
Host: "storage.googleapis.com",
|
|
|
|
Path: fmt.Sprintf("/%s/%s", o.bucket, o.object),
|
|
|
|
RawQuery: conditionsQuery(o.gen, o.conds),
|
|
|
|
}
|
|
|
|
verb := "GET"
|
|
|
|
if length == 0 {
|
|
|
|
verb = "HEAD"
|
|
|
|
}
|
|
|
|
req, err := http.NewRequest(verb, u.String(), nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
req = withContext(req, ctx)
|
|
|
|
if o.userProject != "" {
|
|
|
|
req.Header.Set("X-Goog-User-Project", o.userProject)
|
|
|
|
}
|
|
|
|
if o.readCompressed {
|
|
|
|
req.Header.Set("Accept-Encoding", "gzip")
|
|
|
|
}
|
|
|
|
if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Define a function that initiates a Read with offset and length, assuming we
|
|
|
|
// have already read seen bytes.
|
|
|
|
reopen := func(seen int64) (*http.Response, error) {
|
|
|
|
start := offset + seen
|
|
|
|
if length < 0 && start > 0 {
|
|
|
|
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start))
|
|
|
|
} else if length > 0 {
|
|
|
|
// The end character isn't affected by how many bytes we've seen.
|
|
|
|
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1))
|
|
|
|
}
|
|
|
|
var res *http.Response
|
|
|
|
err = runWithRetry(ctx, func() error {
|
|
|
|
res, err = o.c.hc.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if res.StatusCode == http.StatusNotFound {
|
|
|
|
res.Body.Close()
|
|
|
|
return ErrObjectNotExist
|
|
|
|
}
|
|
|
|
if res.StatusCode < 200 || res.StatusCode > 299 {
|
|
|
|
body, _ := ioutil.ReadAll(res.Body)
|
|
|
|
res.Body.Close()
|
|
|
|
return &googleapi.Error{
|
|
|
|
Code: res.StatusCode,
|
|
|
|
Header: res.Header,
|
|
|
|
Body: string(body),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if start > 0 && length != 0 && res.StatusCode != http.StatusPartialContent {
|
|
|
|
res.Body.Close()
|
|
|
|
return errors.New("storage: partial request not satisfied")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
res, err := reopen(0)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var (
|
|
|
|
size int64 // total size of object, even if a range was requested.
|
|
|
|
checkCRC bool
|
|
|
|
crc uint32
|
|
|
|
)
|
|
|
|
if res.StatusCode == http.StatusPartialContent {
|
|
|
|
cr := strings.TrimSpace(res.Header.Get("Content-Range"))
|
|
|
|
if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
|
|
|
|
}
|
|
|
|
size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
size = res.ContentLength
|
|
|
|
// Check the CRC iff all of the following hold:
|
|
|
|
// - We asked for content (length != 0).
|
|
|
|
// - We got all the content (status != PartialContent).
|
|
|
|
// - The server sent a CRC header.
|
|
|
|
// - The Go http stack did not uncompress the file.
|
|
|
|
// - We were not served compressed data that was uncompressed on download.
|
|
|
|
// The problem with the last two cases is that the CRC will not match -- GCS
|
|
|
|
// computes it on the compressed contents, but we compute it on the
|
|
|
|
// uncompressed contents.
|
|
|
|
if length != 0 && !goHTTPUncompressed(res) && !uncompressedByServer(res) {
|
|
|
|
crc, checkCRC = parseCRC32c(res)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
remain := res.ContentLength
|
|
|
|
body := res.Body
|
|
|
|
if length == 0 {
|
|
|
|
remain = 0
|
|
|
|
body.Close()
|
|
|
|
body = emptyBody
|
|
|
|
}
|
|
|
|
return &Reader{
|
|
|
|
body: body,
|
|
|
|
size: size,
|
|
|
|
remain: remain,
|
|
|
|
contentType: res.Header.Get("Content-Type"),
|
|
|
|
contentEncoding: res.Header.Get("Content-Encoding"),
|
|
|
|
cacheControl: res.Header.Get("Cache-Control"),
|
|
|
|
wantCRC: crc,
|
|
|
|
checkCRC: checkCRC,
|
|
|
|
reopen: reopen,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func uncompressedByServer(res *http.Response) bool {
|
|
|
|
// If the data is stored as gzip but is not encoded as gzip, then it
|
|
|
|
// was uncompressed by the server.
|
|
|
|
return res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip" &&
|
|
|
|
res.Header.Get("Content-Encoding") != "gzip"
|
|
|
|
}
|
|
|
|
|
|
|
|
func parseCRC32c(res *http.Response) (uint32, bool) {
|
|
|
|
const prefix = "crc32c="
|
|
|
|
for _, spec := range res.Header["X-Goog-Hash"] {
|
|
|
|
if strings.HasPrefix(spec, prefix) {
|
|
|
|
c, err := decodeUint32(spec[len(prefix):])
|
|
|
|
if err == nil {
|
|
|
|
return c, true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return 0, false
|
|
|
|
}
|
|
|
|
|
|
|
|
var emptyBody = ioutil.NopCloser(strings.NewReader(""))
|
|
|
|
|
2016-12-01 19:42:31 +00:00
|
|
|
// Reader reads a Cloud Storage object.
|
|
|
|
// It implements io.Reader.
|
2018-01-26 23:51:00 +00:00
|
|
|
//
|
|
|
|
// Typically, a Reader computes the CRC of the downloaded content and compares it to
|
|
|
|
// the stored CRC, returning an error from Read if there is a mismatch. This integrity check
|
|
|
|
// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding.
|
2016-12-01 19:42:31 +00:00
|
|
|
type Reader struct {
|
2018-07-06 16:09:34 +00:00
|
|
|
body io.ReadCloser
|
|
|
|
seen, remain, size int64
|
|
|
|
contentType string
|
|
|
|
contentEncoding string
|
|
|
|
cacheControl string
|
|
|
|
checkCRC bool // should we check the CRC?
|
|
|
|
wantCRC uint32 // the CRC32c value the server sent in the header
|
|
|
|
gotCRC uint32 // running crc
|
|
|
|
reopen func(seen int64) (*http.Response, error)
|
2016-12-01 19:42:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the Reader. It must be called when done reading.
|
|
|
|
func (r *Reader) Close() error {
|
|
|
|
return r.body.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Reader) Read(p []byte) (int, error) {
|
2018-07-06 16:09:34 +00:00
|
|
|
n, err := r.readWithRetry(p)
|
2016-12-01 19:42:31 +00:00
|
|
|
if r.remain != -1 {
|
|
|
|
r.remain -= int64(n)
|
|
|
|
}
|
2017-04-17 15:17:06 +00:00
|
|
|
if r.checkCRC {
|
|
|
|
r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
|
|
|
|
// Check CRC here. It would be natural to check it in Close, but
|
|
|
|
// everybody defers Close on the assumption that it doesn't return
|
|
|
|
// anything worth looking at.
|
2018-07-06 16:09:34 +00:00
|
|
|
if err == io.EOF {
|
2018-01-26 23:51:00 +00:00
|
|
|
if r.gotCRC != r.wantCRC {
|
|
|
|
return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
|
|
|
|
r.gotCRC, r.wantCRC)
|
|
|
|
}
|
2017-04-17 15:17:06 +00:00
|
|
|
}
|
|
|
|
}
|
2016-12-01 19:42:31 +00:00
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
2018-07-06 16:09:34 +00:00
|
|
|
func (r *Reader) readWithRetry(p []byte) (int, error) {
|
|
|
|
n := 0
|
|
|
|
for len(p[n:]) > 0 {
|
|
|
|
m, err := r.body.Read(p[n:])
|
|
|
|
n += m
|
|
|
|
r.seen += int64(m)
|
|
|
|
if !shouldRetryRead(err) {
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
// Read failed, but we will try again. Send a ranged read request that takes
|
|
|
|
// into account the number of bytes we've already seen.
|
|
|
|
res, err := r.reopen(r.seen)
|
|
|
|
if err != nil {
|
|
|
|
// reopen already retries
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
r.body.Close()
|
|
|
|
r.body = res.Body
|
|
|
|
}
|
|
|
|
return n, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func shouldRetryRead(err error) bool {
|
|
|
|
if err == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
|
|
|
|
}
|
|
|
|
|
2016-12-01 19:42:31 +00:00
|
|
|
// Size returns the size of the object in bytes.
|
|
|
|
// The returned value is always the same and is not affected by
|
|
|
|
// calls to Read or Close.
|
|
|
|
func (r *Reader) Size() int64 {
|
|
|
|
return r.size
|
|
|
|
}
|
|
|
|
|
|
|
|
// Remain returns the number of bytes left to read, or -1 if unknown.
|
|
|
|
func (r *Reader) Remain() int64 {
|
|
|
|
return r.remain
|
|
|
|
}
|
|
|
|
|
|
|
|
// ContentType returns the content type of the object.
|
|
|
|
func (r *Reader) ContentType() string {
|
|
|
|
return r.contentType
|
|
|
|
}
|
2017-09-05 22:06:47 +00:00
|
|
|
|
2018-01-26 23:51:00 +00:00
|
|
|
// ContentEncoding returns the content encoding of the object.
|
|
|
|
func (r *Reader) ContentEncoding() string {
|
|
|
|
return r.contentEncoding
|
|
|
|
}
|
|
|
|
|
2017-09-05 22:06:47 +00:00
|
|
|
// CacheControl returns the cache control of the object.
|
|
|
|
func (r *Reader) CacheControl() string {
|
|
|
|
return r.cacheControl
|
|
|
|
}
|