449 lines
13 KiB
Go
449 lines
13 KiB
Go
package swift
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto/rand"
|
|
"crypto/sha1"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
gopath "path"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// NotLargeObject is returned if an operation is performed on an object which isn't large.
|
|
var NotLargeObject = errors.New("Not a large object")
|
|
|
|
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
|
|
var readAfterWriteTimeout = 15 * time.Second
|
|
|
|
// readAfterWriteWait defines the time to sleep between two retries
|
|
var readAfterWriteWait = 200 * time.Millisecond
|
|
|
|
// largeObjectCreateFile represents an open static or dynamic large object
|
|
type largeObjectCreateFile struct {
|
|
conn *Connection
|
|
container string
|
|
objectName string
|
|
currentLength int64
|
|
filePos int64
|
|
chunkSize int64
|
|
segmentContainer string
|
|
prefix string
|
|
contentType string
|
|
checkHash bool
|
|
segments []Object
|
|
headers Headers
|
|
minChunkSize int64
|
|
}
|
|
|
|
func swiftSegmentPath(path string) (string, error) {
|
|
checksum := sha1.New()
|
|
random := make([]byte, 32)
|
|
if _, err := rand.Read(random); err != nil {
|
|
return "", err
|
|
}
|
|
path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
|
|
return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
|
|
}
|
|
|
|
func getSegment(segmentPath string, partNumber int) string {
|
|
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
|
|
}
|
|
|
|
func parseFullPath(manifest string) (container string, prefix string) {
|
|
components := strings.SplitN(manifest, "/", 2)
|
|
container = components[0]
|
|
if len(components) > 1 {
|
|
prefix = components[1]
|
|
}
|
|
return container, prefix
|
|
}
|
|
|
|
func (headers Headers) IsLargeObjectDLO() bool {
|
|
_, isDLO := headers["X-Object-Manifest"]
|
|
return isDLO
|
|
}
|
|
|
|
func (headers Headers) IsLargeObjectSLO() bool {
|
|
_, isSLO := headers["X-Static-Large-Object"]
|
|
return isSLO
|
|
}
|
|
|
|
func (headers Headers) IsLargeObject() bool {
|
|
return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO()
|
|
}
|
|
|
|
func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) {
|
|
if manifest, isDLO := headers["X-Object-Manifest"]; isDLO {
|
|
segmentContainer, segmentPath := parseFullPath(manifest)
|
|
segments, err := c.getAllDLOSegments(segmentContainer, segmentPath)
|
|
return segmentContainer, segments, err
|
|
}
|
|
if headers.IsLargeObjectSLO() {
|
|
return c.getAllSLOSegments(container, path)
|
|
}
|
|
return "", nil, NotLargeObject
|
|
}
|
|
|
|
// LargeObjectOpts describes how a large object should be created
|
|
type LargeObjectOpts struct {
|
|
Container string // Name of container to place object
|
|
ObjectName string // Name of object
|
|
Flags int // Creation flags
|
|
CheckHash bool // If set Check the hash
|
|
Hash string // If set use this hash to check
|
|
ContentType string // Content-Type of the object
|
|
Headers Headers // Additional headers to upload the object with
|
|
ChunkSize int64 // Size of chunks of the object, defaults to 10MB if not set
|
|
MinChunkSize int64 // Minimum chunk size, automatically set for SLO's based on info
|
|
SegmentContainer string // Name of the container to place segments
|
|
SegmentPrefix string // Prefix to use for the segments
|
|
NoBuffer bool // Prevents using a bufio.Writer to write segments
|
|
}
|
|
|
|
type LargeObjectFile interface {
|
|
io.Writer
|
|
io.Seeker
|
|
io.Closer
|
|
Size() int64
|
|
Flush() error
|
|
}
|
|
|
|
// largeObjectCreate creates a large object at opts.Container, opts.ObjectName.
|
|
//
|
|
// opts.Flags can have the following bits set
|
|
// os.TRUNC - remove the contents of the large object if it exists
|
|
// os.APPEND - write at the end of the large object
|
|
func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) {
|
|
var (
|
|
segmentPath string
|
|
segmentContainer string
|
|
segments []Object
|
|
currentLength int64
|
|
err error
|
|
)
|
|
|
|
if opts.SegmentPrefix != "" {
|
|
segmentPath = opts.SegmentPrefix
|
|
} else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil {
|
|
if opts.Flags&os.O_TRUNC != 0 {
|
|
c.LargeObjectDelete(opts.Container, opts.ObjectName)
|
|
} else {
|
|
currentLength = info.Bytes
|
|
if headers.IsLargeObject() {
|
|
segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(segments) > 0 {
|
|
segmentPath = gopath.Dir(segments[0].Name)
|
|
}
|
|
} else {
|
|
if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil {
|
|
return nil, err
|
|
}
|
|
segments = append(segments, info)
|
|
}
|
|
}
|
|
} else if err != ObjectNotFound {
|
|
return nil, err
|
|
}
|
|
|
|
// segmentContainer is not empty when the manifest already existed
|
|
if segmentContainer == "" {
|
|
if opts.SegmentContainer != "" {
|
|
segmentContainer = opts.SegmentContainer
|
|
} else {
|
|
segmentContainer = opts.Container + "_segments"
|
|
}
|
|
}
|
|
|
|
file := &largeObjectCreateFile{
|
|
conn: c,
|
|
checkHash: opts.CheckHash,
|
|
container: opts.Container,
|
|
objectName: opts.ObjectName,
|
|
chunkSize: opts.ChunkSize,
|
|
minChunkSize: opts.MinChunkSize,
|
|
headers: opts.Headers,
|
|
segmentContainer: segmentContainer,
|
|
prefix: segmentPath,
|
|
segments: segments,
|
|
currentLength: currentLength,
|
|
}
|
|
|
|
if file.chunkSize == 0 {
|
|
file.chunkSize = 10 * 1024 * 1024
|
|
}
|
|
|
|
if file.minChunkSize > file.chunkSize {
|
|
file.chunkSize = file.minChunkSize
|
|
}
|
|
|
|
if opts.Flags&os.O_APPEND != 0 {
|
|
file.filePos = currentLength
|
|
}
|
|
|
|
return file, nil
|
|
}
|
|
|
|
// LargeObjectDelete deletes the large object named by container, path
|
|
func (c *Connection) LargeObjectDelete(container string, objectName string) error {
|
|
_, headers, err := c.Object(container, objectName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var objects [][]string
|
|
if headers.IsLargeObject() {
|
|
segmentContainer, segments, err := c.getAllSegments(container, objectName, headers)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, obj := range segments {
|
|
objects = append(objects, []string{segmentContainer, obj.Name})
|
|
}
|
|
}
|
|
objects = append(objects, []string{container, objectName})
|
|
|
|
info, err := c.cachedQueryInfo()
|
|
if err == nil && info.SupportsBulkDelete() && len(objects) > 0 {
|
|
filenames := make([]string, len(objects))
|
|
for i, obj := range objects {
|
|
filenames[i] = obj[0] + "/" + obj[1]
|
|
}
|
|
_, err = c.doBulkDelete(filenames)
|
|
// Don't fail on ObjectNotFound because eventual consistency
|
|
// makes this situation normal.
|
|
if err != nil && err != Forbidden && err != ObjectNotFound {
|
|
return err
|
|
}
|
|
} else {
|
|
for _, obj := range objects {
|
|
if err := c.ObjectDelete(obj[0], obj[1]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LargeObjectGetSegments returns all the segments that compose an object
|
|
// If the object is a Dynamic Large Object (DLO), it just returns the objects
|
|
// that have the prefix as indicated by the manifest.
|
|
// If the object is a Static Large Object (SLO), it retrieves the JSON content
|
|
// of the manifest and return all the segments of it.
|
|
func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) {
|
|
_, headers, err := c.Object(container, path)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
return c.getAllSegments(container, path, headers)
|
|
}
|
|
|
|
// Seek sets the offset for the next write operation
|
|
func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) {
|
|
switch whence {
|
|
case 0:
|
|
file.filePos = offset
|
|
case 1:
|
|
file.filePos += offset
|
|
case 2:
|
|
file.filePos = file.currentLength + offset
|
|
default:
|
|
return -1, fmt.Errorf("invalid value for whence")
|
|
}
|
|
if file.filePos < 0 {
|
|
return -1, fmt.Errorf("negative offset")
|
|
}
|
|
return file.filePos, nil
|
|
}
|
|
|
|
func (file *largeObjectCreateFile) Size() int64 {
|
|
return file.currentLength
|
|
}
|
|
|
|
func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) {
|
|
endTimer := time.NewTimer(readAfterWriteTimeout)
|
|
defer endTimer.Stop()
|
|
waitingTime := readAfterWriteWait
|
|
for {
|
|
var headers Headers
|
|
var sz int64
|
|
if headers, sz, err = fn(); err == nil {
|
|
if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz {
|
|
return
|
|
}
|
|
} else {
|
|
return
|
|
}
|
|
waitTimer := time.NewTimer(waitingTime)
|
|
select {
|
|
case <-endTimer.C:
|
|
waitTimer.Stop()
|
|
err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz)
|
|
return
|
|
case <-waitTimer.C:
|
|
waitingTime *= 2
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) {
|
|
err = withLORetry(expectedSize, func() (Headers, int64, error) {
|
|
var info Object
|
|
var headers Headers
|
|
info, headers, err = c.objectBase(container, objectName)
|
|
if err != nil {
|
|
return headers, 0, err
|
|
}
|
|
return headers, info.Bytes, nil
|
|
})
|
|
return
|
|
}
|
|
|
|
// Write satisfies the io.Writer interface
|
|
func (file *largeObjectCreateFile) Write(buf []byte) (int, error) {
|
|
var sz int64
|
|
var relativeFilePos int
|
|
writeSegmentIdx := 0
|
|
for i, obj := range file.segments {
|
|
if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) {
|
|
relativeFilePos = int(file.filePos - sz)
|
|
break
|
|
}
|
|
writeSegmentIdx++
|
|
sz += obj.Bytes
|
|
}
|
|
sizeToWrite := len(buf)
|
|
for offset := 0; offset < sizeToWrite; {
|
|
newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if writeSegmentIdx < len(file.segments) {
|
|
file.segments[writeSegmentIdx] = *newSegment
|
|
} else {
|
|
file.segments = append(file.segments, *newSegment)
|
|
}
|
|
offset += n
|
|
writeSegmentIdx++
|
|
relativeFilePos = 0
|
|
}
|
|
file.filePos += int64(sizeToWrite)
|
|
file.currentLength = 0
|
|
for _, obj := range file.segments {
|
|
file.currentLength += obj.Bytes
|
|
}
|
|
return sizeToWrite, nil
|
|
}
|
|
|
|
func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) {
|
|
var (
|
|
readers []io.Reader
|
|
existingSegment *Object
|
|
segmentSize int
|
|
)
|
|
segmentName := getSegment(file.prefix, writeSegmentIdx+1)
|
|
sizeToRead := int(file.chunkSize)
|
|
if writeSegmentIdx < len(file.segments) {
|
|
existingSegment = &file.segments[writeSegmentIdx]
|
|
if writeSegmentIdx != len(file.segments)-1 {
|
|
sizeToRead = int(existingSegment.Bytes)
|
|
}
|
|
if relativeFilePos > 0 {
|
|
headers := make(Headers)
|
|
headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10)
|
|
existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer existingSegmentReader.Close()
|
|
sizeToRead -= relativeFilePos
|
|
segmentSize += relativeFilePos
|
|
readers = []io.Reader{existingSegmentReader}
|
|
}
|
|
}
|
|
if sizeToRead > len(buf) {
|
|
sizeToRead = len(buf)
|
|
}
|
|
segmentSize += sizeToRead
|
|
readers = append(readers, bytes.NewReader(buf[:sizeToRead]))
|
|
if existingSegment != nil && segmentSize < int(existingSegment.Bytes) {
|
|
headers := make(Headers)
|
|
headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-"
|
|
tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer tailSegmentReader.Close()
|
|
segmentSize = int(existingSegment.Bytes)
|
|
readers = append(readers, tailSegmentReader)
|
|
}
|
|
segmentReader := io.MultiReader(readers...)
|
|
headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil
|
|
}
|
|
|
|
func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile {
|
|
if !opts.NoBuffer {
|
|
return &bufferedLargeObjectFile{
|
|
LargeObjectFile: lo,
|
|
bw: bufio.NewWriterSize(lo, int(opts.ChunkSize)),
|
|
}
|
|
}
|
|
return lo
|
|
}
|
|
|
|
type bufferedLargeObjectFile struct {
|
|
LargeObjectFile
|
|
bw *bufio.Writer
|
|
}
|
|
|
|
func (blo *bufferedLargeObjectFile) Close() error {
|
|
err := blo.bw.Flush()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return blo.LargeObjectFile.Close()
|
|
}
|
|
|
|
func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) {
|
|
return blo.bw.Write(p)
|
|
}
|
|
|
|
func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) {
|
|
err := blo.bw.Flush()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return blo.LargeObjectFile.Seek(offset, whence)
|
|
}
|
|
|
|
func (blo *bufferedLargeObjectFile) Size() int64 {
|
|
return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered())
|
|
}
|
|
|
|
func (blo *bufferedLargeObjectFile) Flush() error {
|
|
err := blo.bw.Flush()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return blo.LargeObjectFile.Flush()
|
|
}
|