HTTP and tests
This commit is contained in:
parent
be081b461f
commit
f5f43218f5
|
@ -190,6 +190,25 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
|
|||
return
|
||||
}
|
||||
|
||||
// Calculate the offset
|
||||
fileInfo, err := fs.Stat(req.Path)
|
||||
if err != nil {
|
||||
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
if fileInfo.IsDir {
|
||||
f.handleStreamResultError(
|
||||
fmt.Errorf("file %q is a directory", req.Path),
|
||||
helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
// If offsetting from the end subtract from the size
|
||||
if req.Origin == "end" {
|
||||
req.Offset = fileInfo.Size - req.Offset
|
||||
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
@ -218,6 +237,8 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
|
|||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
framer.Destroy()
|
||||
}()
|
||||
|
||||
// Create a goroutine to detect the remote side closing
|
||||
|
@ -265,6 +286,8 @@ OUTER:
|
|||
streamErr = err
|
||||
break OUTER
|
||||
}
|
||||
case <-ctx.Done():
|
||||
break OUTER
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,16 +1,24 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
|
@ -21,6 +29,29 @@ import (
|
|||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
|
||||
// should destroy the temp dir.
|
||||
func tempAllocDir(t testing.TB) *allocdir.AllocDir {
|
||||
dir, err := ioutil.TempDir("", "")
|
||||
if err != nil {
|
||||
t.Fatalf("TempDir() failed: %v", err)
|
||||
}
|
||||
|
||||
if err := os.Chmod(dir, 0777); err != nil {
|
||||
t.Fatalf("failed to chmod dir: %v", err)
|
||||
}
|
||||
|
||||
return allocdir.NewAllocDir(log.New(os.Stderr, "", log.LstdFlags), dir)
|
||||
}
|
||||
|
||||
type nopWriteCloser struct {
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (n nopWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestFS_Stat_NoAlloc(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
@ -554,11 +585,14 @@ func TestFS_Stream(t *testing.T) {
|
|||
defer p1.Close()
|
||||
defer p2.Close()
|
||||
|
||||
// Wrap the pipe so we can check it is closed
|
||||
pipeChecker := &ReadWriteCloseChecker{ReadWriteCloser: p2}
|
||||
|
||||
errCh := make(chan error)
|
||||
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
||||
|
||||
// Start the handler
|
||||
go handler(p2)
|
||||
go handler(pipeChecker)
|
||||
|
||||
// Start the decoder
|
||||
go func() {
|
||||
|
@ -601,6 +635,22 @@ OUTER:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return pipeChecker.Closed, nil
|
||||
}, func(err error) {
|
||||
t.Fatal("Pipe not closed")
|
||||
})
|
||||
}
|
||||
|
||||
type ReadWriteCloseChecker struct {
|
||||
io.ReadWriteCloser
|
||||
Closed bool
|
||||
}
|
||||
|
||||
func (r *ReadWriteCloseChecker) Close() error {
|
||||
r.Closed = true
|
||||
return r.ReadWriteCloser.Close()
|
||||
}
|
||||
|
||||
func TestFS_Stream_Follow(t *testing.T) {
|
||||
|
@ -1574,3 +1624,413 @@ func TestFS_findClosest(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFS_streamFile_NoFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
c := TestClient(t, nil)
|
||||
defer c.Shutdown()
|
||||
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
frames := make(chan *sframer.StreamFrame, 32)
|
||||
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||
framer.Run()
|
||||
defer framer.Destroy()
|
||||
|
||||
err := c.endpoints.FileSystem.streamFile(
|
||||
context.Background(), 0, "foo", 0, ad, framer, nil)
|
||||
require.NotNil(err)
|
||||
require.Contains(err.Error(), "no such file")
|
||||
}
|
||||
|
||||
func TestFS_streamFile_Modify(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := TestClient(t, nil)
|
||||
defer c.Shutdown()
|
||||
|
||||
// Get a temp alloc dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
// Create a file in the temp dir
|
||||
streamFile := "stream_file"
|
||||
f, err := os.Create(filepath.Join(ad.AllocDir, streamFile))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
data := []byte("helloworld")
|
||||
|
||||
// Start the reader
|
||||
resultCh := make(chan struct{})
|
||||
frames := make(chan *sframer.StreamFrame, 4)
|
||||
go func() {
|
||||
var collected []byte
|
||||
for {
|
||||
frame := <-frames
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
collected = append(collected, frame.Data...)
|
||||
if reflect.DeepEqual(data, collected) {
|
||||
resultCh <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Write a few bytes
|
||||
if _, err := f.Write(data[:3]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||
framer.Run()
|
||||
defer framer.Destroy()
|
||||
|
||||
// Start streaming
|
||||
go func() {
|
||||
if err := c.endpoints.FileSystem.streamFile(
|
||||
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
|
||||
t.Fatalf("stream() failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Sleep a little before writing more. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
||||
if _, err := f.Write(data[3:]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-resultCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
||||
t.Fatalf("failed to send new data")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFS_streamFile_Truncate(t *testing.T) {
|
||||
t.Parallel()
|
||||
c := TestClient(t, nil)
|
||||
defer c.Shutdown()
|
||||
|
||||
// Get a temp alloc dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
// Create a file in the temp dir
|
||||
data := []byte("helloworld")
|
||||
streamFile := "stream_file"
|
||||
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
|
||||
f, err := os.Create(streamFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Start the reader
|
||||
truncateCh := make(chan struct{})
|
||||
dataPostTruncCh := make(chan struct{})
|
||||
frames := make(chan *sframer.StreamFrame, 4)
|
||||
go func() {
|
||||
var collected []byte
|
||||
for {
|
||||
frame := <-frames
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
if frame.FileEvent == truncateEvent {
|
||||
close(truncateCh)
|
||||
}
|
||||
|
||||
collected = append(collected, frame.Data...)
|
||||
if reflect.DeepEqual(data, collected) {
|
||||
close(dataPostTruncCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Write a few bytes
|
||||
if _, err := f.Write(data[:3]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||
framer.Run()
|
||||
defer framer.Destroy()
|
||||
|
||||
// Start streaming
|
||||
go func() {
|
||||
if err := c.endpoints.FileSystem.streamFile(
|
||||
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
|
||||
t.Fatalf("stream() failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Sleep a little before truncating. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
||||
if err := f.Truncate(0); err != nil {
|
||||
t.Fatalf("truncate failed: %v", err)
|
||||
}
|
||||
if err := f.Sync(); err != nil {
|
||||
t.Fatalf("sync failed: %v", err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatalf("failed to close file: %v", err)
|
||||
}
|
||||
|
||||
f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to reopen file: %v", err)
|
||||
}
|
||||
defer f2.Close()
|
||||
if _, err := f2.Write(data[3:5]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-truncateCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
||||
t.Fatalf("did not receive truncate")
|
||||
}
|
||||
|
||||
// Sleep a little before writing more. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
||||
if _, err := f2.Write(data[5:]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-dataPostTruncCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
||||
t.Fatalf("did not receive post truncate data")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFS_streamImpl_Delete(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := TestClient(t, nil)
|
||||
defer c.Shutdown()
|
||||
|
||||
// Get a temp alloc dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
// Create a file in the temp dir
|
||||
data := []byte("helloworld")
|
||||
streamFile := "stream_file"
|
||||
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
|
||||
f, err := os.Create(streamFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Start the reader
|
||||
deleteCh := make(chan struct{})
|
||||
frames := make(chan *sframer.StreamFrame, 4)
|
||||
go func() {
|
||||
for {
|
||||
frame := <-frames
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
if frame.FileEvent == deleteEvent {
|
||||
close(deleteCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Write a few bytes
|
||||
if _, err := f.Write(data[:3]); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||
framer.Run()
|
||||
defer framer.Destroy()
|
||||
|
||||
// Start streaming
|
||||
go func() {
|
||||
if err := c.endpoints.FileSystem.streamFile(
|
||||
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
|
||||
t.Fatalf("stream() failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Sleep a little before deleting. This lets us check if the watch
|
||||
// is working.
|
||||
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
||||
if err := os.Remove(streamFilePath); err != nil {
|
||||
t.Fatalf("delete failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-deleteCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
||||
t.Fatalf("did not receive delete")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFS_logsImpl_NoFollow(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := TestClient(t, nil)
|
||||
defer c.Shutdown()
|
||||
|
||||
// Get a temp alloc dir and create the log dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
|
||||
if err := os.MkdirAll(logDir, 0777); err != nil {
|
||||
t.Fatalf("Failed to make log dir: %v", err)
|
||||
}
|
||||
|
||||
// Create a series of log files in the temp dir
|
||||
task := "foo"
|
||||
logType := "stdout"
|
||||
expected := []byte("012")
|
||||
for i := 0; i < 3; i++ {
|
||||
logFile := fmt.Sprintf("%s.%s.%d", task, logType, i)
|
||||
logFilePath := filepath.Join(logDir, logFile)
|
||||
err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the reader
|
||||
resultCh := make(chan struct{})
|
||||
frames := make(chan *sframer.StreamFrame, 4)
|
||||
var received []byte
|
||||
go func() {
|
||||
for {
|
||||
frame, ok := <-frames
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
received = append(received, frame.Data...)
|
||||
if reflect.DeepEqual(received, expected) {
|
||||
close(resultCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start streaming logs
|
||||
go func() {
|
||||
if err := c.endpoints.FileSystem.logsImpl(
|
||||
context.Background(), false, false, 0,
|
||||
OriginStart, task, logType, ad, frames); err != nil {
|
||||
t.Fatalf("logs() failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-resultCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
||||
t.Fatalf("did not receive data: got %q", string(received))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFS_logsImpl_Follow(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := TestClient(t, nil)
|
||||
defer c.Shutdown()
|
||||
|
||||
// Get a temp alloc dir and create the log dir
|
||||
ad := tempAllocDir(t)
|
||||
defer os.RemoveAll(ad.AllocDir)
|
||||
|
||||
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
|
||||
if err := os.MkdirAll(logDir, 0777); err != nil {
|
||||
t.Fatalf("Failed to make log dir: %v", err)
|
||||
}
|
||||
|
||||
// Create a series of log files in the temp dir
|
||||
task := "foo"
|
||||
logType := "stdout"
|
||||
expected := []byte("012345")
|
||||
initialWrites := 3
|
||||
|
||||
writeToFile := func(index int, data []byte) {
|
||||
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
|
||||
logFilePath := filepath.Join(logDir, logFile)
|
||||
err := ioutil.WriteFile(logFilePath, data, 777)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file: %v", err)
|
||||
}
|
||||
}
|
||||
for i := 0; i < initialWrites; i++ {
|
||||
writeToFile(i, expected[i:i+1])
|
||||
}
|
||||
|
||||
// Start the reader
|
||||
firstResultCh := make(chan struct{})
|
||||
fullResultCh := make(chan struct{})
|
||||
frames := make(chan *sframer.StreamFrame, 4)
|
||||
var received []byte
|
||||
go func() {
|
||||
for {
|
||||
frame, ok := <-frames
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if frame.IsHeartbeat() {
|
||||
continue
|
||||
}
|
||||
|
||||
received = append(received, frame.Data...)
|
||||
if reflect.DeepEqual(received, expected[:initialWrites]) {
|
||||
close(firstResultCh)
|
||||
} else if reflect.DeepEqual(received, expected) {
|
||||
close(fullResultCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start streaming logs
|
||||
go c.endpoints.FileSystem.logsImpl(
|
||||
context.Background(), true, false, 0,
|
||||
OriginStart, task, logType, ad, frames)
|
||||
|
||||
select {
|
||||
case <-firstResultCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
||||
t.Fatalf("did not receive data: got %q", string(received))
|
||||
}
|
||||
|
||||
// We got the first chunk of data, write out the rest to the next file
|
||||
// at an index much ahead to check that it is following and detecting
|
||||
// skips
|
||||
skipTo := initialWrites + 10
|
||||
writeToFile(skipTo, expected[initialWrites:])
|
||||
|
||||
select {
|
||||
case <-fullResultCh:
|
||||
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
||||
t.Fatalf("did not receive data: got %q", string(received))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,6 +64,7 @@ type StreamFramer struct {
|
|||
heartbeat *time.Ticker
|
||||
flusher *time.Ticker
|
||||
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
exitCh chan struct{}
|
||||
|
||||
|
@ -103,7 +104,14 @@ func NewStreamFramer(out chan<- *StreamFrame,
|
|||
// Destroy is used to cleanup the StreamFramer and flush any pending frames
|
||||
func (s *StreamFramer) Destroy() {
|
||||
s.l.Lock()
|
||||
close(s.shutdownCh)
|
||||
|
||||
wasShutdown := s.shutdown
|
||||
s.shutdown = true
|
||||
|
||||
if !wasShutdown {
|
||||
close(s.shutdownCh)
|
||||
}
|
||||
|
||||
s.heartbeat.Stop()
|
||||
s.flusher.Stop()
|
||||
running := s.running
|
||||
|
@ -113,7 +121,9 @@ func (s *StreamFramer) Destroy() {
|
|||
if running {
|
||||
<-s.exitCh
|
||||
}
|
||||
close(s.out)
|
||||
if !wasShutdown {
|
||||
close(s.out)
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts a long lived goroutine that handles sending data as well as
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package structs
|
||||
|
||||
//go:generate codecgen -d 102 -o structs.generated.go structs.go
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"io"
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package agent
|
||||
|
||||
//go:generate codecgen -d 101 -o fs_endpoint.generated.go fs_endpoint.go
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
@ -13,7 +11,6 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/docker/docker/pkg/ioutils"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/ugorji/go/codec"
|
||||
|
@ -29,20 +26,6 @@ var (
|
|||
)
|
||||
|
||||
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var secret string
|
||||
var namespace string
|
||||
s.parseToken(req, &secret)
|
||||
parseNamespace(req, &namespace)
|
||||
|
||||
var aclObj *acl.ACL
|
||||
if s.agent.client != nil {
|
||||
var err error
|
||||
aclObj, err = s.agent.Client().ResolveToken(secret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/")
|
||||
switch {
|
||||
case strings.HasPrefix(path, "ls/"):
|
||||
|
@ -50,20 +33,11 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int
|
|||
case strings.HasPrefix(path, "stat/"):
|
||||
return s.FileStatRequest(resp, req)
|
||||
case strings.HasPrefix(path, "readat/"):
|
||||
if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) {
|
||||
return nil, structs.ErrPermissionDenied
|
||||
}
|
||||
return s.FileReadAtRequest(resp, req)
|
||||
case strings.HasPrefix(path, "cat/"):
|
||||
if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) {
|
||||
return nil, structs.ErrPermissionDenied
|
||||
}
|
||||
return s.FileCatRequest(resp, req)
|
||||
//case strings.HasPrefix(path, "stream/"):
|
||||
//if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) {
|
||||
//return nil, structs.ErrPermissionDenied
|
||||
//}
|
||||
//return s.Stream(resp, req)
|
||||
case strings.HasPrefix(path, "stream/"):
|
||||
return s.Stream(resp, req)
|
||||
case strings.HasPrefix(path, "logs/"):
|
||||
return s.Logs(resp, req)
|
||||
default:
|
||||
|
@ -71,6 +45,32 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int
|
|||
}
|
||||
}
|
||||
|
||||
// rpcHandlerForAlloc is a helper that given an allocation ID returns whether to
|
||||
// use the local clients RPC, the local clients remote RPC or the server on the
|
||||
// agent.
|
||||
func (s *HTTPServer) rpcHandlerForAlloc(allocID string) (localClient, remoteClient, server bool) {
|
||||
c := s.agent.Client()
|
||||
srv := s.agent.Server()
|
||||
|
||||
// See if the local client can handle the request.
|
||||
localAlloc := false
|
||||
if c != nil {
|
||||
_, err := c.GetClientAlloc(allocID)
|
||||
if err == nil {
|
||||
localAlloc = true
|
||||
}
|
||||
}
|
||||
|
||||
// Only use the client RPC to server if we don't have a server and the local
|
||||
// client can't handle the call.
|
||||
useClientRPC := c != nil && !localAlloc && srv == nil
|
||||
|
||||
// Use the server as a last case.
|
||||
useServerRPC := !localAlloc && !useClientRPC && srv != nil
|
||||
|
||||
return localAlloc, useClientRPC, useServerRPC
|
||||
}
|
||||
|
||||
func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var allocID, path string
|
||||
|
||||
|
@ -80,11 +80,38 @@ func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Re
|
|||
if path = req.URL.Query().Get("path"); path == "" {
|
||||
path = "/"
|
||||
}
|
||||
fs, err := s.agent.client.GetAllocFS(allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
// Create the request
|
||||
args := &cstructs.FsListRequest{
|
||||
AllocID: allocID,
|
||||
Path: path,
|
||||
}
|
||||
return fs.List(path)
|
||||
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
||||
|
||||
// Make the RPC
|
||||
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
||||
|
||||
var reply cstructs.FsListResponse
|
||||
var rpcErr error
|
||||
if localClient {
|
||||
rpcErr = s.agent.Client().ClientRPC("FileSystem.List", &args, &reply)
|
||||
} else if remoteClient {
|
||||
rpcErr = s.agent.Client().RPC("FileSystem.List", &args, &reply)
|
||||
} else if localServer {
|
||||
rpcErr = s.agent.Server().RPC("FileSystem.List", &args, &reply)
|
||||
}
|
||||
|
||||
if rpcErr != nil {
|
||||
if structs.IsErrNoNodeConn(rpcErr) {
|
||||
rpcErr = CodedError(404, rpcErr.Error())
|
||||
} else if strings.Contains(rpcErr.Error(), "unknown allocation") {
|
||||
rpcErr = CodedError(404, rpcErr.Error())
|
||||
}
|
||||
|
||||
return nil, rpcErr
|
||||
}
|
||||
|
||||
return reply.Files, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
@ -95,11 +122,38 @@ func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request
|
|||
if path = req.URL.Query().Get("path"); path == "" {
|
||||
return nil, fileNameNotPresentErr
|
||||
}
|
||||
fs, err := s.agent.client.GetAllocFS(allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
// Create the request
|
||||
args := &cstructs.FsStatRequest{
|
||||
AllocID: allocID,
|
||||
Path: path,
|
||||
}
|
||||
return fs.Stat(path)
|
||||
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
||||
|
||||
// Make the RPC
|
||||
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
||||
|
||||
var reply cstructs.FsStatResponse
|
||||
var rpcErr error
|
||||
if localClient {
|
||||
rpcErr = s.agent.Client().ClientRPC("FileSystem.Stat", &args, &reply)
|
||||
} else if remoteClient {
|
||||
rpcErr = s.agent.Client().RPC("FileSystem.Stat", &args, &reply)
|
||||
} else if localServer {
|
||||
rpcErr = s.agent.Server().RPC("FileSystem.Stat", &args, &reply)
|
||||
}
|
||||
|
||||
if rpcErr != nil {
|
||||
if structs.IsErrNoNodeConn(rpcErr) {
|
||||
rpcErr = CodedError(404, rpcErr.Error())
|
||||
} else if strings.Contains(rpcErr.Error(), "unknown allocation") {
|
||||
rpcErr = CodedError(404, rpcErr.Error())
|
||||
}
|
||||
|
||||
return nil, rpcErr
|
||||
}
|
||||
|
||||
return reply.Info, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
@ -127,37 +181,23 @@ func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Reque
|
|||
}
|
||||
}
|
||||
|
||||
fs, err := s.agent.client.GetAllocFS(allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Create the request arguments
|
||||
fsReq := &cstructs.FsStreamRequest{
|
||||
AllocID: allocID,
|
||||
Path: path,
|
||||
Offset: offset,
|
||||
Origin: "start",
|
||||
Limit: limit,
|
||||
PlainText: true,
|
||||
}
|
||||
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
||||
|
||||
rc, err := fs.ReadAt(path, offset)
|
||||
if limit > 0 {
|
||||
rc = &ReadCloserWrapper{
|
||||
Reader: io.LimitReader(rc, limit),
|
||||
Closer: rc,
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
io.Copy(resp, rc)
|
||||
return nil, rc.Close()
|
||||
}
|
||||
|
||||
// ReadCloserWrapper wraps a LimitReader so that a file is closed once it has been
|
||||
// read
|
||||
type ReadCloserWrapper struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
// Make the request
|
||||
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
||||
}
|
||||
|
||||
func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var allocID, path string
|
||||
var err error
|
||||
|
||||
q := req.URL.Query()
|
||||
|
||||
|
@ -167,29 +207,20 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request)
|
|||
if path = q.Get("path"); path == "" {
|
||||
return nil, fileNameNotPresentErr
|
||||
}
|
||||
fs, err := s.agent.client.GetAllocFS(allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fileInfo, err := fs.Stat(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fileInfo.IsDir {
|
||||
return nil, fmt.Errorf("file %q is a directory", path)
|
||||
// Create the request arguments
|
||||
fsReq := &cstructs.FsStreamRequest{
|
||||
AllocID: allocID,
|
||||
Path: path,
|
||||
Origin: "start",
|
||||
PlainText: true,
|
||||
}
|
||||
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
||||
|
||||
r, err := fs.ReadAt(path, int64(0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
io.Copy(resp, r)
|
||||
return nil, r.Close()
|
||||
// Make the request
|
||||
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
// Stream streams the content of a file blocking on EOF.
|
||||
// The parameters are:
|
||||
// * path: path to file to stream.
|
||||
|
@ -198,7 +229,6 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request)
|
|||
// applied. Defaults to "start".
|
||||
func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var allocID, path string
|
||||
var err error
|
||||
|
||||
q := req.URL.Query()
|
||||
|
||||
|
@ -228,176 +258,19 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
|
|||
return nil, invalidOrigin
|
||||
}
|
||||
|
||||
fs, err := s.agent.client.GetAllocFS(allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Create the request arguments
|
||||
fsReq := &cstructs.FsStreamRequest{
|
||||
AllocID: allocID,
|
||||
Path: path,
|
||||
Origin: origin,
|
||||
Offset: offset,
|
||||
}
|
||||
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
||||
|
||||
fileInfo, err := fs.Stat(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fileInfo.IsDir {
|
||||
return nil, fmt.Errorf("file %q is a directory", path)
|
||||
}
|
||||
|
||||
// If offsetting from the end subtract from the size
|
||||
if origin == "end" {
|
||||
offset = fileInfo.Size - offset
|
||||
|
||||
}
|
||||
|
||||
// Create an output that gets flushed on every write
|
||||
output := ioutils.NewWriteFlusher(resp)
|
||||
|
||||
// Create the framer
|
||||
framer := sframer.NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
||||
framer.Run()
|
||||
defer framer.Destroy()
|
||||
|
||||
err = s.stream(offset, path, fs, framer, nil)
|
||||
if err != nil && err != syscall.EPIPE {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
// Make the request
|
||||
return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID)
|
||||
}
|
||||
|
||||
// parseFramerErr takes an error and returns an error. The error will
|
||||
// potentially change if it was caused by the connection being closed.
|
||||
func parseFramerErr(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
errMsg := err.Error()
|
||||
|
||||
if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
|
||||
// The pipe check is for tests
|
||||
return syscall.EPIPE
|
||||
}
|
||||
|
||||
// The connection was closed by our peer
|
||||
if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
|
||||
return syscall.EPIPE
|
||||
}
|
||||
|
||||
// Windows version of ECONNRESET
|
||||
//XXX(schmichael) I could find no existing error or constant to
|
||||
// compare this against.
|
||||
if strings.Contains(errMsg, "forcibly closed") {
|
||||
return syscall.EPIPE
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// stream is the internal method to stream the content of a file. eofCancelCh is
|
||||
// used to cancel the stream if triggered while at EOF. If the connection is
|
||||
// broken an EPIPE error is returned
|
||||
func (s *HTTPServer) stream(offset int64, path string,
|
||||
fs allocdir.AllocDirFS, framer *sframer.StreamFramer,
|
||||
eofCancelCh chan error) error {
|
||||
|
||||
// Get the reader
|
||||
f, err := fs.ReadAt(path, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Create a tomb to cancel watch events
|
||||
t := tomb.Tomb{}
|
||||
defer func() {
|
||||
t.Kill(nil)
|
||||
t.Done()
|
||||
}()
|
||||
|
||||
// Create a variable to allow setting the last event
|
||||
var lastEvent string
|
||||
|
||||
// Only create the file change watcher once. But we need to do it after we
|
||||
// read and reach EOF.
|
||||
var changes *watch.FileChanges
|
||||
|
||||
// Start streaming the data
|
||||
data := make([]byte, streamFrameSize)
|
||||
OUTER:
|
||||
for {
|
||||
// Read up to the max frame size
|
||||
n, readErr := f.Read(data)
|
||||
|
||||
// Update the offset
|
||||
offset += int64(n)
|
||||
|
||||
// Return non-EOF errors
|
||||
if readErr != nil && readErr != io.EOF {
|
||||
return readErr
|
||||
}
|
||||
|
||||
// Send the frame
|
||||
if n != 0 || lastEvent != "" {
|
||||
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
|
||||
return parseFramerErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the last event
|
||||
if lastEvent != "" {
|
||||
lastEvent = ""
|
||||
}
|
||||
|
||||
// Just keep reading
|
||||
if readErr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// If EOF is hit, wait for a change to the file
|
||||
if changes == nil {
|
||||
changes, err = fs.ChangeEvents(path, offset, &t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-changes.Modified:
|
||||
continue OUTER
|
||||
case <-changes.Deleted:
|
||||
return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
|
||||
case <-changes.Truncated:
|
||||
// Close the current reader
|
||||
if err := f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get a new reader at offset zero
|
||||
offset = 0
|
||||
var err error
|
||||
f, err = fs.ReadAt(path, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Store the last event
|
||||
lastEvent = truncateEvent
|
||||
continue OUTER
|
||||
case <-framer.ExitCh():
|
||||
return parseFramerErr(framer.Err())
|
||||
case err, ok := <-eofCancelCh:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Logs streams the content of a log blocking on EOF. The parameters are:
|
||||
// * task: task name to stream logs for.
|
||||
// * type: stdout/stderr to stream.
|
||||
|
@ -456,43 +329,6 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
|
|||
return nil, invalidOrigin
|
||||
}
|
||||
|
||||
// Create an output that gets flushed on every write
|
||||
output := ioutils.NewWriteFlusher(resp)
|
||||
|
||||
localClient := s.agent.Client()
|
||||
localServer := s.agent.Server()
|
||||
|
||||
// See if the local client can handle the request.
|
||||
localAlloc := false
|
||||
if localClient != nil {
|
||||
_, err := localClient.GetClientAlloc(allocID)
|
||||
if err == nil {
|
||||
localAlloc = true
|
||||
}
|
||||
}
|
||||
|
||||
// Only use the client RPC to server if we don't have a server and the local
|
||||
// client can't handle the call.
|
||||
useClientRPC := localClient != nil && !localAlloc && localServer == nil
|
||||
|
||||
// Use the server as a last case.
|
||||
useServerRPC := localServer != nil
|
||||
|
||||
// Get the correct handler
|
||||
var handler structs.StreamingRpcHandler
|
||||
var handlerErr error
|
||||
if localAlloc {
|
||||
handler, handlerErr = localClient.StreamingRpcHandler("FileSystem.Logs")
|
||||
} else if useClientRPC {
|
||||
handler, handlerErr = localClient.RemoteStreamingRpcHandler("FileSystem.Logs")
|
||||
} else if useServerRPC {
|
||||
handler, handlerErr = localServer.StreamingRpcHandler("FileSystem.Logs")
|
||||
}
|
||||
|
||||
if handlerErr != nil {
|
||||
return nil, CodedError(500, handlerErr.Error())
|
||||
}
|
||||
|
||||
// Create the request arguments
|
||||
fsReq := &cstructs.FsLogsRequest{
|
||||
AllocID: allocID,
|
||||
|
@ -505,6 +341,32 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
|
|||
}
|
||||
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
||||
|
||||
// Make the request
|
||||
return s.fsStreamImpl(resp, req, "FileSystem.Logs", fsReq, fsReq.AllocID)
|
||||
}
|
||||
|
||||
// fsStreamImpl is used to make a streaming filesystem call that serializes the
|
||||
// args and then expects a stream of StreamErrWrapper results where the payload
|
||||
// is copied to the response body.
|
||||
func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
|
||||
req *http.Request, method string, args interface{}, allocID string) (interface{}, error) {
|
||||
|
||||
// Get the correct handler
|
||||
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
|
||||
var handler structs.StreamingRpcHandler
|
||||
var handlerErr error
|
||||
if localClient {
|
||||
handler, handlerErr = s.agent.Client().StreamingRpcHandler(method)
|
||||
} else if remoteClient {
|
||||
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method)
|
||||
} else if localServer {
|
||||
handler, handlerErr = s.agent.Server().StreamingRpcHandler(method)
|
||||
}
|
||||
|
||||
if handlerErr != nil {
|
||||
return nil, CodedError(500, handlerErr.Error())
|
||||
}
|
||||
|
||||
p1, p2 := net.Pipe()
|
||||
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
||||
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
||||
|
@ -516,11 +378,14 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
|
|||
p1.Close()
|
||||
}()
|
||||
|
||||
// Create an output that gets flushed on every write
|
||||
output := ioutils.NewWriteFlusher(resp)
|
||||
|
||||
// Create a channel that decodes the results
|
||||
errCh := make(chan HTTPCodedError)
|
||||
go func() {
|
||||
// Send the request
|
||||
if err := encoder.Encode(fsReq); err != nil {
|
||||
if err := encoder.Encode(args); err != nil {
|
||||
errCh <- CodedError(500, err.Error())
|
||||
cancel()
|
||||
return
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue