From f5f43218f564a9148b7cad308edc38c186cb528a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 5 Feb 2018 13:07:27 -0800 Subject: [PATCH] HTTP and tests --- client/fs_endpoint.go | 23 + client/fs_endpoint_test.go | 462 ++++++++++++++- client/lib/streamframer/framer.go | 14 +- client/structs/structs.go | 2 + command/agent/fs_endpoint.go | 433 +++++--------- command/agent/fs_endpoint_test.go | 943 +++++++++++------------------- 6 files changed, 1001 insertions(+), 876 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 4c7e51552..0bc537ac4 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -190,6 +190,25 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { return } + // Calculate the offset + fileInfo, err := fs.Stat(req.Path) + if err != nil { + f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + if fileInfo.IsDir { + f.handleStreamResultError( + fmt.Errorf("file %q is a directory", req.Path), + helper.Int64ToPtr(400), encoder) + return + } + + // If offsetting from the end subtract from the size + if req.Origin == "end" { + req.Offset = fileInfo.Size - req.Offset + + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -218,6 +237,8 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { case <-ctx.Done(): } } + + framer.Destroy() }() // Create a goroutine to detect the remote side closing @@ -265,6 +286,8 @@ OUTER: streamErr = err break OUTER } + case <-ctx.Done(): + break OUTER } } diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index f1ec4abbf..4256f16ce 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -1,16 +1,24 @@ package client import ( + "context" "fmt" "io" + "io/ioutil" + "log" "math" "net" + "os" + "path/filepath" + "reflect" "strings" "testing" "time" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad" @@ -21,6 +29,29 @@ import ( "github.com/ugorji/go/codec" ) +// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller +// should destroy the temp dir. +func tempAllocDir(t testing.TB) *allocdir.AllocDir { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("TempDir() failed: %v", err) + } + + if err := os.Chmod(dir, 0777); err != nil { + t.Fatalf("failed to chmod dir: %v", err) + } + + return allocdir.NewAllocDir(log.New(os.Stderr, "", log.LstdFlags), dir) +} + +type nopWriteCloser struct { + io.Writer +} + +func (n nopWriteCloser) Close() error { + return nil +} + func TestFS_Stat_NoAlloc(t *testing.T) { t.Parallel() require := require.New(t) @@ -554,11 +585,14 @@ func TestFS_Stream(t *testing.T) { defer p1.Close() defer p2.Close() + // Wrap the pipe so we can check it is closed + pipeChecker := &ReadWriteCloseChecker{ReadWriteCloser: p2} + errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler - go handler(p2) + go handler(pipeChecker) // Start the decoder go func() { @@ -601,6 +635,22 @@ OUTER: } } } + + testutil.WaitForResult(func() (bool, error) { + return pipeChecker.Closed, nil + }, func(err error) { + t.Fatal("Pipe not closed") + }) +} + +type ReadWriteCloseChecker struct { + io.ReadWriteCloser + Closed bool +} + +func (r *ReadWriteCloseChecker) Close() error { + r.Closed = true + return r.ReadWriteCloser.Close() } func TestFS_Stream_Follow(t *testing.T) { @@ -1574,3 +1624,413 @@ func TestFS_findClosest(t *testing.T) { } } } + +func TestFS_streamFile_NoFile(t *testing.T) { + t.Parallel() + require := require.New(t) + c := TestClient(t, nil) + defer c.Shutdown() + + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + frames := make(chan *sframer.StreamFrame, 32) + framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + err := c.endpoints.FileSystem.streamFile( + context.Background(), 0, "foo", 0, ad, framer, nil) + require.NotNil(err) + require.Contains(err.Error(), "no such file") +} + +func TestFS_streamFile_Modify(t *testing.T) { + t.Parallel() + + c := TestClient(t, nil) + defer c.Shutdown() + + // Get a temp alloc dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + // Create a file in the temp dir + streamFile := "stream_file" + f, err := os.Create(filepath.Join(ad.AllocDir, streamFile)) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + data := []byte("helloworld") + + // Start the reader + resultCh := make(chan struct{}) + frames := make(chan *sframer.StreamFrame, 4) + go func() { + var collected []byte + for { + frame := <-frames + if frame.IsHeartbeat() { + continue + } + + collected = append(collected, frame.Data...) + if reflect.DeepEqual(data, collected) { + resultCh <- struct{}{} + return + } + } + }() + + // Write a few bytes + if _, err := f.Write(data[:3]); err != nil { + t.Fatalf("write failed: %v", err) + } + + framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + // Start streaming + go func() { + if err := c.endpoints.FileSystem.streamFile( + context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + t.Fatalf("stream() failed: %v", err) + } + }() + + // Sleep a little before writing more. This lets us check if the watch + // is working. + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) + if _, err := f.Write(data[3:]); err != nil { + t.Fatalf("write failed: %v", err) + } + + select { + case <-resultCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + t.Fatalf("failed to send new data") + } +} + +func TestFS_streamFile_Truncate(t *testing.T) { + t.Parallel() + c := TestClient(t, nil) + defer c.Shutdown() + + // Get a temp alloc dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + // Create a file in the temp dir + data := []byte("helloworld") + streamFile := "stream_file" + streamFilePath := filepath.Join(ad.AllocDir, streamFile) + f, err := os.Create(streamFilePath) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + // Start the reader + truncateCh := make(chan struct{}) + dataPostTruncCh := make(chan struct{}) + frames := make(chan *sframer.StreamFrame, 4) + go func() { + var collected []byte + for { + frame := <-frames + if frame.IsHeartbeat() { + continue + } + + if frame.FileEvent == truncateEvent { + close(truncateCh) + } + + collected = append(collected, frame.Data...) + if reflect.DeepEqual(data, collected) { + close(dataPostTruncCh) + return + } + } + }() + + // Write a few bytes + if _, err := f.Write(data[:3]); err != nil { + t.Fatalf("write failed: %v", err) + } + + framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + // Start streaming + go func() { + if err := c.endpoints.FileSystem.streamFile( + context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + t.Fatalf("stream() failed: %v", err) + } + }() + + // Sleep a little before truncating. This lets us check if the watch + // is working. + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) + if err := f.Truncate(0); err != nil { + t.Fatalf("truncate failed: %v", err) + } + if err := f.Sync(); err != nil { + t.Fatalf("sync failed: %v", err) + } + if err := f.Close(); err != nil { + t.Fatalf("failed to close file: %v", err) + } + + f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0) + if err != nil { + t.Fatalf("failed to reopen file: %v", err) + } + defer f2.Close() + if _, err := f2.Write(data[3:5]); err != nil { + t.Fatalf("write failed: %v", err) + } + + select { + case <-truncateCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + t.Fatalf("did not receive truncate") + } + + // Sleep a little before writing more. This lets us check if the watch + // is working. + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) + if _, err := f2.Write(data[5:]); err != nil { + t.Fatalf("write failed: %v", err) + } + + select { + case <-dataPostTruncCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + t.Fatalf("did not receive post truncate data") + } +} + +func TestFS_streamImpl_Delete(t *testing.T) { + t.Parallel() + + c := TestClient(t, nil) + defer c.Shutdown() + + // Get a temp alloc dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + // Create a file in the temp dir + data := []byte("helloworld") + streamFile := "stream_file" + streamFilePath := filepath.Join(ad.AllocDir, streamFile) + f, err := os.Create(streamFilePath) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + // Start the reader + deleteCh := make(chan struct{}) + frames := make(chan *sframer.StreamFrame, 4) + go func() { + for { + frame := <-frames + if frame.IsHeartbeat() { + continue + } + + if frame.FileEvent == deleteEvent { + close(deleteCh) + return + } + } + }() + + // Write a few bytes + if _, err := f.Write(data[:3]); err != nil { + t.Fatalf("write failed: %v", err) + } + + framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + // Start streaming + go func() { + if err := c.endpoints.FileSystem.streamFile( + context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + t.Fatalf("stream() failed: %v", err) + } + }() + + // Sleep a little before deleting. This lets us check if the watch + // is working. + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) + if err := os.Remove(streamFilePath); err != nil { + t.Fatalf("delete failed: %v", err) + } + + select { + case <-deleteCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + t.Fatalf("did not receive delete") + } +} + +func TestFS_logsImpl_NoFollow(t *testing.T) { + t.Parallel() + + c := TestClient(t, nil) + defer c.Shutdown() + + // Get a temp alloc dir and create the log dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) + if err := os.MkdirAll(logDir, 0777); err != nil { + t.Fatalf("Failed to make log dir: %v", err) + } + + // Create a series of log files in the temp dir + task := "foo" + logType := "stdout" + expected := []byte("012") + for i := 0; i < 3; i++ { + logFile := fmt.Sprintf("%s.%s.%d", task, logType, i) + logFilePath := filepath.Join(logDir, logFile) + err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + } + + // Start the reader + resultCh := make(chan struct{}) + frames := make(chan *sframer.StreamFrame, 4) + var received []byte + go func() { + for { + frame, ok := <-frames + if !ok { + return + } + + if frame.IsHeartbeat() { + continue + } + + received = append(received, frame.Data...) + if reflect.DeepEqual(received, expected) { + close(resultCh) + return + } + } + }() + + // Start streaming logs + go func() { + if err := c.endpoints.FileSystem.logsImpl( + context.Background(), false, false, 0, + OriginStart, task, logType, ad, frames); err != nil { + t.Fatalf("logs() failed: %v", err) + } + }() + + select { + case <-resultCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } +} + +func TestFS_logsImpl_Follow(t *testing.T) { + t.Parallel() + + c := TestClient(t, nil) + defer c.Shutdown() + + // Get a temp alloc dir and create the log dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) + if err := os.MkdirAll(logDir, 0777); err != nil { + t.Fatalf("Failed to make log dir: %v", err) + } + + // Create a series of log files in the temp dir + task := "foo" + logType := "stdout" + expected := []byte("012345") + initialWrites := 3 + + writeToFile := func(index int, data []byte) { + logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) + logFilePath := filepath.Join(logDir, logFile) + err := ioutil.WriteFile(logFilePath, data, 777) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + } + for i := 0; i < initialWrites; i++ { + writeToFile(i, expected[i:i+1]) + } + + // Start the reader + firstResultCh := make(chan struct{}) + fullResultCh := make(chan struct{}) + frames := make(chan *sframer.StreamFrame, 4) + var received []byte + go func() { + for { + frame, ok := <-frames + if !ok { + return + } + + if frame.IsHeartbeat() { + continue + } + + received = append(received, frame.Data...) + if reflect.DeepEqual(received, expected[:initialWrites]) { + close(firstResultCh) + } else if reflect.DeepEqual(received, expected) { + close(fullResultCh) + return + } + } + }() + + // Start streaming logs + go c.endpoints.FileSystem.logsImpl( + context.Background(), true, false, 0, + OriginStart, task, logType, ad, frames) + + select { + case <-firstResultCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } + + // We got the first chunk of data, write out the rest to the next file + // at an index much ahead to check that it is following and detecting + // skips + skipTo := initialWrites + 10 + writeToFile(skipTo, expected[initialWrites:]) + + select { + case <-fullResultCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } +} diff --git a/client/lib/streamframer/framer.go b/client/lib/streamframer/framer.go index 6d24257c1..f2f928c65 100644 --- a/client/lib/streamframer/framer.go +++ b/client/lib/streamframer/framer.go @@ -64,6 +64,7 @@ type StreamFramer struct { heartbeat *time.Ticker flusher *time.Ticker + shutdown bool shutdownCh chan struct{} exitCh chan struct{} @@ -103,7 +104,14 @@ func NewStreamFramer(out chan<- *StreamFrame, // Destroy is used to cleanup the StreamFramer and flush any pending frames func (s *StreamFramer) Destroy() { s.l.Lock() - close(s.shutdownCh) + + wasShutdown := s.shutdown + s.shutdown = true + + if !wasShutdown { + close(s.shutdownCh) + } + s.heartbeat.Stop() s.flusher.Stop() running := s.running @@ -113,7 +121,9 @@ func (s *StreamFramer) Destroy() { if running { <-s.exitCh } - close(s.out) + if !wasShutdown { + close(s.out) + } } // Run starts a long lived goroutine that handles sending data as well as diff --git a/client/structs/structs.go b/client/structs/structs.go index 548286039..30da186e8 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -1,5 +1,7 @@ package structs +//go:generate codecgen -d 102 -o structs.generated.go structs.go + import ( "crypto/md5" "io" diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 7fedec37a..7a651d9b1 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -1,7 +1,5 @@ package agent -//go:generate codecgen -d 101 -o fs_endpoint.generated.go fs_endpoint.go - import ( "bytes" "context" @@ -13,7 +11,6 @@ import ( "strings" "github.com/docker/docker/pkg/ioutils" - "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" @@ -29,20 +26,6 @@ var ( ) func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - var secret string - var namespace string - s.parseToken(req, &secret) - parseNamespace(req, &namespace) - - var aclObj *acl.ACL - if s.agent.client != nil { - var err error - aclObj, err = s.agent.Client().ResolveToken(secret) - if err != nil { - return nil, err - } - } - path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/") switch { case strings.HasPrefix(path, "ls/"): @@ -50,20 +33,11 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int case strings.HasPrefix(path, "stat/"): return s.FileStatRequest(resp, req) case strings.HasPrefix(path, "readat/"): - if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - return nil, structs.ErrPermissionDenied - } return s.FileReadAtRequest(resp, req) case strings.HasPrefix(path, "cat/"): - if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - return nil, structs.ErrPermissionDenied - } return s.FileCatRequest(resp, req) - //case strings.HasPrefix(path, "stream/"): - //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - //return nil, structs.ErrPermissionDenied - //} - //return s.Stream(resp, req) + case strings.HasPrefix(path, "stream/"): + return s.Stream(resp, req) case strings.HasPrefix(path, "logs/"): return s.Logs(resp, req) default: @@ -71,6 +45,32 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int } } +// rpcHandlerForAlloc is a helper that given an allocation ID returns whether to +// use the local clients RPC, the local clients remote RPC or the server on the +// agent. +func (s *HTTPServer) rpcHandlerForAlloc(allocID string) (localClient, remoteClient, server bool) { + c := s.agent.Client() + srv := s.agent.Server() + + // See if the local client can handle the request. + localAlloc := false + if c != nil { + _, err := c.GetClientAlloc(allocID) + if err == nil { + localAlloc = true + } + } + + // Only use the client RPC to server if we don't have a server and the local + // client can't handle the call. + useClientRPC := c != nil && !localAlloc && srv == nil + + // Use the server as a last case. + useServerRPC := !localAlloc && !useClientRPC && srv != nil + + return localAlloc, useClientRPC, useServerRPC +} + func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string @@ -80,11 +80,38 @@ func (s *HTTPServer) DirectoryListRequest(resp http.ResponseWriter, req *http.Re if path = req.URL.Query().Get("path"); path == "" { path = "/" } - fs, err := s.agent.client.GetAllocFS(allocID) - if err != nil { - return nil, err + + // Create the request + args := &cstructs.FsListRequest{ + AllocID: allocID, + Path: path, } - return fs.List(path) + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + + // Make the RPC + localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID) + + var reply cstructs.FsListResponse + var rpcErr error + if localClient { + rpcErr = s.agent.Client().ClientRPC("FileSystem.List", &args, &reply) + } else if remoteClient { + rpcErr = s.agent.Client().RPC("FileSystem.List", &args, &reply) + } else if localServer { + rpcErr = s.agent.Server().RPC("FileSystem.List", &args, &reply) + } + + if rpcErr != nil { + if structs.IsErrNoNodeConn(rpcErr) { + rpcErr = CodedError(404, rpcErr.Error()) + } else if strings.Contains(rpcErr.Error(), "unknown allocation") { + rpcErr = CodedError(404, rpcErr.Error()) + } + + return nil, rpcErr + } + + return reply.Files, nil } func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -95,11 +122,38 @@ func (s *HTTPServer) FileStatRequest(resp http.ResponseWriter, req *http.Request if path = req.URL.Query().Get("path"); path == "" { return nil, fileNameNotPresentErr } - fs, err := s.agent.client.GetAllocFS(allocID) - if err != nil { - return nil, err + + // Create the request + args := &cstructs.FsStatRequest{ + AllocID: allocID, + Path: path, } - return fs.Stat(path) + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + + // Make the RPC + localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID) + + var reply cstructs.FsStatResponse + var rpcErr error + if localClient { + rpcErr = s.agent.Client().ClientRPC("FileSystem.Stat", &args, &reply) + } else if remoteClient { + rpcErr = s.agent.Client().RPC("FileSystem.Stat", &args, &reply) + } else if localServer { + rpcErr = s.agent.Server().RPC("FileSystem.Stat", &args, &reply) + } + + if rpcErr != nil { + if structs.IsErrNoNodeConn(rpcErr) { + rpcErr = CodedError(404, rpcErr.Error()) + } else if strings.Contains(rpcErr.Error(), "unknown allocation") { + rpcErr = CodedError(404, rpcErr.Error()) + } + + return nil, rpcErr + } + + return reply.Info, nil } func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -127,37 +181,23 @@ func (s *HTTPServer) FileReadAtRequest(resp http.ResponseWriter, req *http.Reque } } - fs, err := s.agent.client.GetAllocFS(allocID) - if err != nil { - return nil, err + // Create the request arguments + fsReq := &cstructs.FsStreamRequest{ + AllocID: allocID, + Path: path, + Offset: offset, + Origin: "start", + Limit: limit, + PlainText: true, } + s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions) - rc, err := fs.ReadAt(path, offset) - if limit > 0 { - rc = &ReadCloserWrapper{ - Reader: io.LimitReader(rc, limit), - Closer: rc, - } - } - - if err != nil { - return nil, err - } - - io.Copy(resp, rc) - return nil, rc.Close() -} - -// ReadCloserWrapper wraps a LimitReader so that a file is closed once it has been -// read -type ReadCloserWrapper struct { - io.Reader - io.Closer + // Make the request + return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID) } func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string - var err error q := req.URL.Query() @@ -167,29 +207,20 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) if path = q.Get("path"); path == "" { return nil, fileNameNotPresentErr } - fs, err := s.agent.client.GetAllocFS(allocID) - if err != nil { - return nil, err - } - fileInfo, err := fs.Stat(path) - if err != nil { - return nil, err - } - if fileInfo.IsDir { - return nil, fmt.Errorf("file %q is a directory", path) + // Create the request arguments + fsReq := &cstructs.FsStreamRequest{ + AllocID: allocID, + Path: path, + Origin: "start", + PlainText: true, } + s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions) - r, err := fs.ReadAt(path, int64(0)) - if err != nil { - return nil, err - } - io.Copy(resp, r) - return nil, r.Close() + // Make the request + return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID) } -/* - // Stream streams the content of a file blocking on EOF. // The parameters are: // * path: path to file to stream. @@ -198,7 +229,6 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) // applied. Defaults to "start". func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, path string - var err error q := req.URL.Query() @@ -228,176 +258,19 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf return nil, invalidOrigin } - fs, err := s.agent.client.GetAllocFS(allocID) - if err != nil { - return nil, err + // Create the request arguments + fsReq := &cstructs.FsStreamRequest{ + AllocID: allocID, + Path: path, + Origin: origin, + Offset: offset, } + s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions) - fileInfo, err := fs.Stat(path) - if err != nil { - return nil, err - } - if fileInfo.IsDir { - return nil, fmt.Errorf("file %q is a directory", path) - } - - // If offsetting from the end subtract from the size - if origin == "end" { - offset = fileInfo.Size - offset - - } - - // Create an output that gets flushed on every write - output := ioutils.NewWriteFlusher(resp) - - // Create the framer - framer := sframer.NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - defer framer.Destroy() - - err = s.stream(offset, path, fs, framer, nil) - if err != nil && err != syscall.EPIPE { - return nil, err - } - - return nil, nil + // Make the request + return s.fsStreamImpl(resp, req, "FileSystem.Stream", fsReq, fsReq.AllocID) } -// parseFramerErr takes an error and returns an error. The error will -// potentially change if it was caused by the connection being closed. -func parseFramerErr(err error) error { - if err == nil { - return nil - } - - errMsg := err.Error() - - if strings.Contains(errMsg, io.ErrClosedPipe.Error()) { - // The pipe check is for tests - return syscall.EPIPE - } - - // The connection was closed by our peer - if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) { - return syscall.EPIPE - } - - // Windows version of ECONNRESET - //XXX(schmichael) I could find no existing error or constant to - // compare this against. - if strings.Contains(errMsg, "forcibly closed") { - return syscall.EPIPE - } - - return err -} - -// stream is the internal method to stream the content of a file. eofCancelCh is -// used to cancel the stream if triggered while at EOF. If the connection is -// broken an EPIPE error is returned -func (s *HTTPServer) stream(offset int64, path string, - fs allocdir.AllocDirFS, framer *sframer.StreamFramer, - eofCancelCh chan error) error { - - // Get the reader - f, err := fs.ReadAt(path, offset) - if err != nil { - return err - } - defer f.Close() - - // Create a tomb to cancel watch events - t := tomb.Tomb{} - defer func() { - t.Kill(nil) - t.Done() - }() - - // Create a variable to allow setting the last event - var lastEvent string - - // Only create the file change watcher once. But we need to do it after we - // read and reach EOF. - var changes *watch.FileChanges - - // Start streaming the data - data := make([]byte, streamFrameSize) -OUTER: - for { - // Read up to the max frame size - n, readErr := f.Read(data) - - // Update the offset - offset += int64(n) - - // Return non-EOF errors - if readErr != nil && readErr != io.EOF { - return readErr - } - - // Send the frame - if n != 0 || lastEvent != "" { - if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { - return parseFramerErr(err) - } - } - - // Clear the last event - if lastEvent != "" { - lastEvent = "" - } - - // Just keep reading - if readErr == nil { - continue - } - - // If EOF is hit, wait for a change to the file - if changes == nil { - changes, err = fs.ChangeEvents(path, offset, &t) - if err != nil { - return err - } - } - - for { - select { - case <-changes.Modified: - continue OUTER - case <-changes.Deleted: - return parseFramerErr(framer.Send(path, deleteEvent, nil, offset)) - case <-changes.Truncated: - // Close the current reader - if err := f.Close(); err != nil { - return err - } - - // Get a new reader at offset zero - offset = 0 - var err error - f, err = fs.ReadAt(path, offset) - if err != nil { - return err - } - defer f.Close() - - // Store the last event - lastEvent = truncateEvent - continue OUTER - case <-framer.ExitCh(): - return parseFramerErr(framer.Err()) - case err, ok := <-eofCancelCh: - if !ok { - return nil - } - - return err - } - } - } -} -*/ - // Logs streams the content of a log blocking on EOF. The parameters are: // * task: task name to stream logs for. // * type: stdout/stderr to stream. @@ -456,43 +329,6 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac return nil, invalidOrigin } - // Create an output that gets flushed on every write - output := ioutils.NewWriteFlusher(resp) - - localClient := s.agent.Client() - localServer := s.agent.Server() - - // See if the local client can handle the request. - localAlloc := false - if localClient != nil { - _, err := localClient.GetClientAlloc(allocID) - if err == nil { - localAlloc = true - } - } - - // Only use the client RPC to server if we don't have a server and the local - // client can't handle the call. - useClientRPC := localClient != nil && !localAlloc && localServer == nil - - // Use the server as a last case. - useServerRPC := localServer != nil - - // Get the correct handler - var handler structs.StreamingRpcHandler - var handlerErr error - if localAlloc { - handler, handlerErr = localClient.StreamingRpcHandler("FileSystem.Logs") - } else if useClientRPC { - handler, handlerErr = localClient.RemoteStreamingRpcHandler("FileSystem.Logs") - } else if useServerRPC { - handler, handlerErr = localServer.StreamingRpcHandler("FileSystem.Logs") - } - - if handlerErr != nil { - return nil, CodedError(500, handlerErr.Error()) - } - // Create the request arguments fsReq := &cstructs.FsLogsRequest{ AllocID: allocID, @@ -505,6 +341,32 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac } s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions) + // Make the request + return s.fsStreamImpl(resp, req, "FileSystem.Logs", fsReq, fsReq.AllocID) +} + +// fsStreamImpl is used to make a streaming filesystem call that serializes the +// args and then expects a stream of StreamErrWrapper results where the payload +// is copied to the response body. +func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, + req *http.Request, method string, args interface{}, allocID string) (interface{}, error) { + + // Get the correct handler + localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID) + var handler structs.StreamingRpcHandler + var handlerErr error + if localClient { + handler, handlerErr = s.agent.Client().StreamingRpcHandler(method) + } else if remoteClient { + handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method) + } else if localServer { + handler, handlerErr = s.agent.Server().StreamingRpcHandler(method) + } + + if handlerErr != nil { + return nil, CodedError(500, handlerErr.Error()) + } + p1, p2 := net.Pipe() decoder := codec.NewDecoder(p1, structs.MsgpackHandle) encoder := codec.NewEncoder(p1, structs.MsgpackHandle) @@ -516,11 +378,14 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac p1.Close() }() + // Create an output that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + // Create a channel that decodes the results errCh := make(chan HTTPCodedError) go func() { // Send the request - if err := encoder.Encode(fsReq); err != nil { + if err := encoder.Encode(args); err != nil { errCh <- CodedError(500, err.Error()) cancel() return diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 5d0b07e49..b67e9379d 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -1,691 +1,456 @@ package agent import ( + "encoding/base64" "fmt" + "io" + "io/ioutil" "net/http" "net/http/httptest" + "strings" "testing" - "github.com/hashicorp/nomad/acl" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) -func TestAllocDirFS_List_MissingParams(t *testing.T) { - t.Parallel() - httpTest(t, nil, func(s *TestAgent) { - req, err := http.NewRequest("GET", "/v1/client/fs/ls/", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW := httptest.NewRecorder() +const ( + defaultLoggerMockDriverStdout = "Hello from the other side" +) - _, err = s.Server.DirectoryListRequest(respW, req) - if err != allocIDNotPresentErr { - t.Fatalf("expected err: %v, actual: %v", allocIDNotPresentErr, err) +var ( + defaultLoggerMockDriver = map[string]interface{}{ + "run_for": "2s", + "stdout_string": defaultLoggerMockDriverStdout, + } +) + +type clientAllocWaiter int + +const ( + noWaitClientAlloc clientAllocWaiter = iota + runningClientAlloc + terminalClientAlloc +) + +func addAllocToClient(agent *TestAgent, alloc *structs.Allocation, wait clientAllocWaiter) { + require := require.New(agent.T) + + // Wait for the client to connect + testutil.WaitForResult(func() (bool, error) { + node, err := agent.server.State().NodeByID(nil, agent.client.NodeID()) + if err != nil { + return false, err } + if node == nil { + return false, fmt.Errorf("unknown node") + } + + return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status") + }, func(err error) { + agent.T.Fatal(err) + }) + + // Upsert the allocation + state := agent.server.State() + require.Nil(state.UpsertJob(999, alloc.Job)) + require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{alloc})) + + if wait == noWaitClientAlloc { + return + } + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state.AllocByID(nil, alloc.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + + expectation := alloc.ClientStatus == structs.AllocClientStatusComplete || + alloc.ClientStatus == structs.AllocClientStatusFailed + if wait == runningClientAlloc { + expectation = expectation || alloc.ClientStatus == structs.AllocClientStatusRunning + } + + if !expectation { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + agent.T.Fatal(err) }) } -func TestAllocDirFS_Stat_MissingParams(t *testing.T) { - t.Parallel() - httpTest(t, nil, func(s *TestAgent) { - req, err := http.NewRequest("GET", "/v1/client/fs/stat/", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW := httptest.NewRecorder() +// mockFSAlloc returns a suitable mock alloc for testing the fs system. If +// config isn't provided, the defaultLoggerMockDriver config is used. +func mockFSAlloc(nodeID string, config map[string]interface{}) *structs.Allocation { + a := mock.Alloc() + a.NodeID = nodeID + a.Job.Type = structs.JobTypeBatch + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" - _, err = s.Server.FileStatRequest(respW, req) - if err != allocIDNotPresentErr { - t.Fatalf("expected err: %v, actual: %v", allocIDNotPresentErr, err) - } + if config != nil { + a.Job.TaskGroups[0].Tasks[0].Config = config + } else { + a.Job.TaskGroups[0].Tasks[0].Config = defaultLoggerMockDriver + } - req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW = httptest.NewRecorder() - - _, err = s.Server.FileStatRequest(respW, req) - if err != fileNameNotPresentErr { - t.Fatalf("expected err: %v, actual: %v", allocIDNotPresentErr, err) - } - - }) + return a } -func TestAllocDirFS_ReadAt_MissingParams(t *testing.T) { - t.Parallel() - httpTest(t, nil, func(s *TestAgent) { - req, err := http.NewRequest("GET", "/v1/client/fs/readat/", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW := httptest.NewRecorder() - - _, err = s.Server.FileReadAtRequest(respW, req) - if err == nil { - t.Fatal("expected error") - } - - req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW = httptest.NewRecorder() - - _, err = s.Server.FileReadAtRequest(respW, req) - if err == nil { - t.Fatal("expected error") - } - - req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo?path=/path/to/file", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW = httptest.NewRecorder() - - _, err = s.Server.FileReadAtRequest(respW, req) - if err == nil { - t.Fatal("expected error") - } - }) -} - -func TestAllocDirFS_ACL(t *testing.T) { +func TestHTTP_FS_rpcHandlerForAlloc(t *testing.T) { t.Parallel() require := require.New(t) + agent := NewTestAgent(t, t.Name(), nil) - // TODO This whole thing can go away since the ACLs should be tested in the - // RPC test - //for _, endpoint := range []string{"ls", "stat", "readat", "cat", "stream"} { - for _, endpoint := range []string{"ls", "stat", "readat", "cat"} { - t.Run(endpoint, func(t *testing.T) { + a := mockFSAlloc(agent.client.NodeID(), nil) + addAllocToClient(agent, a, terminalClientAlloc) - httpACLTest(t, nil, func(s *TestAgent) { - state := s.Agent.server.State() + // Case 1: Client has allocation + // Outcome: Use local client + lc, rc, s := agent.Server.rpcHandlerForAlloc(a.ID) + require.True(lc) + require.False(rc) + require.False(s) - req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/fs/%s/", endpoint), nil) - require.Nil(err) + // Case 2: Client doesn't have allocation and there is a server + // Outcome: Use server + lc, rc, s = agent.Server.rpcHandlerForAlloc(uuid.Generate()) + require.False(lc) + require.False(rc) + require.True(s) - // Try request without a token and expect failure - { - respW := httptest.NewRecorder() - _, err := s.Server.FsRequest(respW, req) - require.NotNil(err) - require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) - } + // Case 3: Client doesn't have allocation and there is no server + // Outcome: Use client RPC to server + srv := agent.server + agent.server = nil + lc, rc, s = agent.Server.rpcHandlerForAlloc(uuid.Generate()) + require.False(lc) + require.True(rc) + require.False(s) + agent.server = srv - // Try request with an invalid token and expect failure - { - respW := httptest.NewRecorder() - policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadLogs}) - token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", policy) - setToken(req, token) - _, err := s.Server.FsRequest(respW, req) - require.NotNil(err) - require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) - } - - // Try request with a valid token - // No alloc id set, so expect an error - just not a permissions error - { - respW := httptest.NewRecorder() - policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) - token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy) - setToken(req, token) - _, err := s.Server.FsRequest(respW, req) - require.NotNil(err) - require.Equal(allocIDNotPresentErr, err) - } - - // Try request with a management token - // No alloc id set, so expect an error - just not a permissions error - { - respW := httptest.NewRecorder() - setToken(req, s.RootToken) - _, err := s.Server.FsRequest(respW, req) - require.NotNil(err) - require.Equal(allocIDNotPresentErr, err) - } - }) - }) - } + // Case 4: No client + // Outcome: Use server + client := agent.client + agent.client = nil + lc, rc, s = agent.Server.rpcHandlerForAlloc(uuid.Generate()) + require.False(lc) + require.False(rc) + require.True(s) + agent.client = client } -/* -func TestHTTP_Stream_MissingParams(t *testing.T) { +func TestHTTP_FS_List_MissingParams(t *testing.T) { t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + req, err := http.NewRequest("GET", "/v1/client/fs/ls/", nil) + require.Nil(err) + respW := httptest.NewRecorder() + _, err = s.Server.DirectoryListRequest(respW, req) + require.EqualError(err, allocIDNotPresentErr.Error()) + }) +} + +func TestHTTP_FS_Stat_MissingParams(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + req, err := http.NewRequest("GET", "/v1/client/fs/stat/", nil) + require.Nil(err) + respW := httptest.NewRecorder() + + _, err = s.Server.FileStatRequest(respW, req) + require.EqualError(err, allocIDNotPresentErr.Error()) + + req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + _, err = s.Server.FileStatRequest(respW, req) + require.EqualError(err, fileNameNotPresentErr.Error()) + }) +} + +func TestHTTP_FS_ReadAt_MissingParams(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + req, err := http.NewRequest("GET", "/v1/client/fs/readat/", nil) + require.Nil(err) + respW := httptest.NewRecorder() + + _, err = s.Server.FileReadAtRequest(respW, req) + require.NotNil(err) + + req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + _, err = s.Server.FileReadAtRequest(respW, req) + require.NotNil(err) + + req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo?path=/path/to/file", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + _, err = s.Server.FileReadAtRequest(respW, req) + require.NotNil(err) + }) +} + +func TestHTTP_FS_Cat_MissingParams(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + req, err := http.NewRequest("GET", "/v1/client/fs/cat/", nil) + require.Nil(err) + respW := httptest.NewRecorder() + + _, err = s.Server.FileCatRequest(respW, req) + require.EqualError(err, allocIDNotPresentErr.Error()) + + req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + _, err = s.Server.FileCatRequest(respW, req) + require.EqualError(err, fileNameNotPresentErr.Error()) + }) +} + +func TestHTTP_FS_Stream_MissingParams(t *testing.T) { + t.Parallel() + require := require.New(t) httpTest(t, nil, func(s *TestAgent) { req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) - if err != nil { - t.Fatalf("err: %v", err) - } + require.Nil(err) respW := httptest.NewRecorder() _, err = s.Server.Stream(respW, req) - if err == nil { - t.Fatal("expected error") - } + require.EqualError(err, allocIDNotPresentErr.Error()) req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo", nil) - if err != nil { - t.Fatalf("err: %v", err) - } + require.Nil(err) respW = httptest.NewRecorder() _, err = s.Server.Stream(respW, req) - if err == nil { - t.Fatal("expected error") - } + require.EqualError(err, fileNameNotPresentErr.Error()) req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo?path=/path/to/file", nil) - if err != nil { - t.Fatalf("err: %v", err) - } + require.Nil(err) respW = httptest.NewRecorder() _, err = s.Server.Stream(respW, req) - if err == nil { - t.Fatal("expected error") - } + require.Nil(err) }) } -// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller -// should destroy the temp dir. -func tempAllocDir(t testing.TB) *allocdir.AllocDir { - dir, err := ioutil.TempDir("", "") - if err != nil { - t.Fatalf("TempDir() failed: %v", err) - } - - if err := os.Chmod(dir, 0777); err != nil { - t.Fatalf("failed to chmod dir: %v", err) - } - - return allocdir.NewAllocDir(log.New(os.Stderr, "", log.LstdFlags), dir) -} - -type nopWriteCloser struct { - io.Writer -} - -func (n nopWriteCloser) Close() error { - return nil -} - -func TestHTTP_Stream_NoFile(t *testing.T) { +func TestHTTP_FS_Logs_MissingParams(t *testing.T) { t.Parallel() + require := require.New(t) httpTest(t, nil, func(s *TestAgent) { - // Get a temp alloc dir - ad := tempAllocDir(t) - defer os.RemoveAll(ad.AllocDir) + req, err := http.NewRequest("GET", "/v1/client/fs/logs/", nil) + require.Nil(err) + respW := httptest.NewRecorder() - framer := sframer.NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - defer framer.Destroy() + _, err = s.Server.Logs(respW, req) + require.EqualError(err, allocIDNotPresentErr.Error()) - if err := s.Server.stream(0, "foo", ad, framer, nil); err == nil { - t.Fatalf("expected an error when streaming unknown file") - } + req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + _, err = s.Server.Logs(respW, req) + require.EqualError(err, taskNotPresentErr.Error()) + + req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + _, err = s.Server.Logs(respW, req) + require.EqualError(err, logTypeNotPresentErr.Error()) + + req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo&type=stdout", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + _, err = s.Server.Logs(respW, req) + require.Nil(err) }) } -func TestHTTP_Stream_Modify(t *testing.T) { +func TestHTTP_FS_List(t *testing.T) { t.Parallel() + require := require.New(t) httpTest(t, nil, func(s *TestAgent) { - // Get a temp alloc dir - ad := tempAllocDir(t) - defer os.RemoveAll(ad.AllocDir) + a := mockFSAlloc(s.client.NodeID(), nil) + addAllocToClient(s, a, terminalClientAlloc) - // Create a file in the temp dir - streamFile := "stream_file" - f, err := os.Create(filepath.Join(ad.AllocDir, streamFile)) - if err != nil { - t.Fatalf("Failed to create file: %v", err) - } - defer f.Close() + req, err := http.NewRequest("GET", "/v1/client/fs/ls/"+a.ID, nil) + require.Nil(err) + respW := httptest.NewRecorder() + raw, err := s.Server.DirectoryListRequest(respW, req) + require.Nil(err) - // Create a decoder - r, w := io.Pipe() - defer r.Close() - defer w.Close() - dec := codec.NewDecoder(r, structs.JsonHandle) - - data := []byte("helloworld") - - // Start the reader - resultCh := make(chan struct{}) - go func() { - var collected []byte - for { - var frame sframer.StreamFrame - if err := dec.Decode(&frame); err != nil { - t.Fatalf("failed to decode: %v", err) - } - - if frame.IsHeartbeat() { - continue - } - - collected = append(collected, frame.Data...) - if reflect.DeepEqual(data, collected) { - resultCh <- struct{}{} - return - } - } - }() - - // Write a few bytes - if _, err := f.Write(data[:3]); err != nil { - t.Fatalf("write failed: %v", err) - } - - framer := sframer.NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - defer framer.Destroy() - - // Start streaming - go func() { - if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { - t.Fatalf("stream() failed: %v", err) - } - }() - - // Sleep a little before writing more. This lets us check if the watch - // is working. - time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) - if _, err := f.Write(data[3:]); err != nil { - t.Fatalf("write failed: %v", err) - } - - select { - case <-resultCh: - case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): - t.Fatalf("failed to send new data") - } + files, ok := raw.([]*cstructs.AllocFileInfo) + require.True(ok) + require.NotEmpty(files) + require.True(files[0].IsDir) }) } -func TestHTTP_Stream_Truncate(t *testing.T) { +func TestHTTP_FS_Stat(t *testing.T) { t.Parallel() + require := require.New(t) httpTest(t, nil, func(s *TestAgent) { - // Get a temp alloc dir - ad := tempAllocDir(t) - defer os.RemoveAll(ad.AllocDir) + a := mockFSAlloc(s.client.NodeID(), nil) + addAllocToClient(s, a, terminalClientAlloc) - // Create a file in the temp dir - streamFile := "stream_file" - streamFilePath := filepath.Join(ad.AllocDir, streamFile) - f, err := os.Create(streamFilePath) - if err != nil { - t.Fatalf("Failed to create file: %v", err) - } - defer f.Close() + path := fmt.Sprintf("/v1/client/fs/stat/%s?path=alloc/", a.ID) + req, err := http.NewRequest("GET", path, nil) + require.Nil(err) + respW := httptest.NewRecorder() + raw, err := s.Server.FileStatRequest(respW, req) + require.Nil(err) - // Create a decoder - r, w := io.Pipe() - defer r.Close() - defer w.Close() - dec := codec.NewDecoder(r, structs.JsonHandle) - - data := []byte("helloworld") - - // Start the reader - truncateCh := make(chan struct{}) - dataPostTruncCh := make(chan struct{}) - go func() { - var collected []byte - for { - var frame sframer.StreamFrame - if err := dec.Decode(&frame); err != nil { - t.Fatalf("failed to decode: %v", err) - } - - if frame.IsHeartbeat() { - continue - } - - if frame.FileEvent == truncateEvent { - close(truncateCh) - } - - collected = append(collected, frame.Data...) - if reflect.DeepEqual(data, collected) { - close(dataPostTruncCh) - return - } - } - }() - - // Write a few bytes - if _, err := f.Write(data[:3]); err != nil { - t.Fatalf("write failed: %v", err) - } - - framer := sframer.NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - defer framer.Destroy() - - // Start streaming - go func() { - if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { - t.Fatalf("stream() failed: %v", err) - } - }() - - // Sleep a little before truncating. This lets us check if the watch - // is working. - time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) - if err := f.Truncate(0); err != nil { - t.Fatalf("truncate failed: %v", err) - } - if err := f.Sync(); err != nil { - t.Fatalf("sync failed: %v", err) - } - if err := f.Close(); err != nil { - t.Fatalf("failed to close file: %v", err) - } - - f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0) - if err != nil { - t.Fatalf("failed to reopen file: %v", err) - } - defer f2.Close() - if _, err := f2.Write(data[3:5]); err != nil { - t.Fatalf("write failed: %v", err) - } - - select { - case <-truncateCh: - case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): - t.Fatalf("did not receive truncate") - } - - // Sleep a little before writing more. This lets us check if the watch - // is working. - time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) - if _, err := f2.Write(data[5:]); err != nil { - t.Fatalf("write failed: %v", err) - } - - select { - case <-dataPostTruncCh: - case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): - t.Fatalf("did not receive post truncate data") - } + info, ok := raw.(*cstructs.AllocFileInfo) + require.True(ok) + require.NotNil(info) + require.True(info.IsDir) }) } -func TestHTTP_Stream_Delete(t *testing.T) { +func TestHTTP_FS_ReadAt(t *testing.T) { t.Parallel() + require := require.New(t) httpTest(t, nil, func(s *TestAgent) { - // Get a temp alloc dir - ad := tempAllocDir(t) - defer os.RemoveAll(ad.AllocDir) + a := mockFSAlloc(s.client.NodeID(), nil) + addAllocToClient(s, a, terminalClientAlloc) - // Create a file in the temp dir - streamFile := "stream_file" - streamFilePath := filepath.Join(ad.AllocDir, streamFile) - f, err := os.Create(streamFilePath) - if err != nil { - t.Fatalf("Failed to create file: %v", err) - } - defer f.Close() + offset := 1 + limit := 3 + expectation := defaultLoggerMockDriverStdout[offset : offset+limit] + path := fmt.Sprintf("/v1/client/fs/readat/%s?path=alloc/logs/web.stdout.0&offset=%d&limit=%d", + a.ID, offset, limit) - // Create a decoder - r, w := io.Pipe() - wrappedW := &WriteCloseChecker{WriteCloser: w} - defer r.Close() - defer w.Close() - dec := codec.NewDecoder(r, structs.JsonHandle) + req, err := http.NewRequest("GET", path, nil) + require.Nil(err) + respW := httptest.NewRecorder() + _, err = s.Server.FileReadAtRequest(respW, req) + require.Nil(err) - data := []byte("helloworld") + output, err := ioutil.ReadAll(respW.Result().Body) + require.Nil(err) + require.EqualValues(expectation, output) + }) +} - // Start the reader - deleteCh := make(chan struct{}) +func TestHTTP_FS_Cat(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + a := mockFSAlloc(s.client.NodeID(), nil) + addAllocToClient(s, a, terminalClientAlloc) + + path := fmt.Sprintf("/v1/client/fs/cat/%s?path=alloc/logs/web.stdout.0", a.ID) + + req, err := http.NewRequest("GET", path, nil) + require.Nil(err) + respW := httptest.NewRecorder() + _, err = s.Server.FileCatRequest(respW, req) + require.Nil(err) + + output, err := ioutil.ReadAll(respW.Result().Body) + require.Nil(err) + require.EqualValues(defaultLoggerMockDriverStdout, output) + }) +} + +func TestHTTP_FS_Stream(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + a := mockFSAlloc(s.client.NodeID(), nil) + addAllocToClient(s, a, terminalClientAlloc) + + offset := 4 + expectation := base64.StdEncoding.EncodeToString( + []byte(defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:])) + path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end", + a.ID, offset) + + p, _ := io.Pipe() + req, err := http.NewRequest("GET", path, p) + require.Nil(err) + respW := httptest.NewRecorder() go func() { - for { - var frame sframer.StreamFrame - if err := dec.Decode(&frame); err != nil { - t.Fatalf("failed to decode: %v", err) - } - - if frame.IsHeartbeat() { - continue - } - - if frame.FileEvent == deleteEvent { - close(deleteCh) - return - } - } + _, err = s.Server.Stream(respW, req) + require.Nil(err) }() - // Write a few bytes - if _, err := f.Write(data[:3]); err != nil { - t.Fatalf("write failed: %v", err) - } - - framer := sframer.NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - - // Start streaming - go func() { - if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { - t.Fatalf("stream() failed: %v", err) - } - }() - - // Sleep a little before deleting. This lets us check if the watch - // is working. - time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) - if err := os.Remove(streamFilePath); err != nil { - t.Fatalf("delete failed: %v", err) - } - - select { - case <-deleteCh: - case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): - t.Fatalf("did not receive delete") - } - - framer.Destroy() + out := "" testutil.WaitForResult(func() (bool, error) { - return wrappedW.Closed, nil - }, func(err error) { - t.Fatalf("connection not closed") - }) - - }) -} - -func TestHTTP_Logs_NoFollow(t *testing.T) { - t.Parallel() - httpTest(t, nil, func(s *TestAgent) { - // Get a temp alloc dir and create the log dir - ad := tempAllocDir(t) - defer os.RemoveAll(ad.AllocDir) - - logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) - if err := os.MkdirAll(logDir, 0777); err != nil { - t.Fatalf("Failed to make log dir: %v", err) - } - - // Create a series of log files in the temp dir - task := "foo" - logType := "stdout" - expected := []byte("012") - for i := 0; i < 3; i++ { - logFile := fmt.Sprintf("%s.%s.%d", task, logType, i) - logFilePath := filepath.Join(logDir, logFile) - err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777) + output, err := ioutil.ReadAll(respW.Body) if err != nil { - t.Fatalf("Failed to create file: %v", err) + return false, err } - } - // Create a decoder - r, w := io.Pipe() - wrappedW := &WriteCloseChecker{WriteCloser: w} - defer r.Close() - defer w.Close() - dec := codec.NewDecoder(r, structs.JsonHandle) - - var received []byte - - // Start the reader - resultCh := make(chan struct{}) - go func() { - for { - var frame sframer.StreamFrame - if err := dec.Decode(&frame); err != nil { - if err == io.EOF { - t.Logf("EOF") - return - } - - t.Fatalf("failed to decode: %v", err) - } - - if frame.IsHeartbeat() { - continue - } - - received = append(received, frame.Data...) - if reflect.DeepEqual(received, expected) { - close(resultCh) - return - } - } - }() - - // Start streaming logs - go func() { - if err := s.Server.logs(false, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { - t.Fatalf("logs() failed: %v", err) - } - }() - - select { - case <-resultCh: - case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): - t.Fatalf("did not receive data: got %q", string(received)) - } - - testutil.WaitForResult(func() (bool, error) { - return wrappedW.Closed, nil + out += string(output) + return strings.Contains(out, expectation), fmt.Errorf("%q doesn't contain %q", out, expectation) }, func(err error) { - t.Fatalf("connection not closed") + t.Fatal(err) }) + p.Close() }) } -func TestHTTP_Logs_Follow(t *testing.T) { +func TestHTTP_FS_Logs(t *testing.T) { t.Parallel() + require := require.New(t) httpTest(t, nil, func(s *TestAgent) { - // Get a temp alloc dir and create the log dir - ad := tempAllocDir(t) - defer os.RemoveAll(ad.AllocDir) + a := mockFSAlloc(s.client.NodeID(), nil) + addAllocToClient(s, a, terminalClientAlloc) - logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) - if err := os.MkdirAll(logDir, 0777); err != nil { - t.Fatalf("Failed to make log dir: %v", err) - } + offset := 4 + expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:] + path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true", + a.ID, offset) - // Create a series of log files in the temp dir - task := "foo" - logType := "stdout" - expected := []byte("012345") - initialWrites := 3 - - writeToFile := func(index int, data []byte) { - logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) - logFilePath := filepath.Join(logDir, logFile) - err := ioutil.WriteFile(logFilePath, data, 777) - if err != nil { - t.Fatalf("Failed to create file: %v", err) - } - } - for i := 0; i < initialWrites; i++ { - writeToFile(i, expected[i:i+1]) - } - - // Create a decoder - r, w := io.Pipe() - wrappedW := &WriteCloseChecker{WriteCloser: w} - defer r.Close() - defer w.Close() - dec := codec.NewDecoder(r, structs.JsonHandle) - - var received []byte - - // Start the reader - firstResultCh := make(chan struct{}) - fullResultCh := make(chan struct{}) + p, _ := io.Pipe() + req, err := http.NewRequest("GET", path, p) + require.Nil(err) + respW := httptest.NewRecorder() go func() { - for { - var frame sframer.StreamFrame - if err := dec.Decode(&frame); err != nil { - if err == io.EOF { - t.Logf("EOF") - return - } - - t.Fatalf("failed to decode: %v", err) - } - - if frame.IsHeartbeat() { - continue - } - - received = append(received, frame.Data...) - if reflect.DeepEqual(received, expected[:initialWrites]) { - close(firstResultCh) - } else if reflect.DeepEqual(received, expected) { - close(fullResultCh) - return - } - } + _, err = s.Server.Logs(respW, req) + require.Nil(err) }() - // Start streaming logs - go func() { - if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { - t.Fatalf("logs() failed: %v", err) - } - }() - - select { - case <-firstResultCh: - case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): - t.Fatalf("did not receive data: got %q", string(received)) - } - - // We got the first chunk of data, write out the rest to the next file - // at an index much ahead to check that it is following and detecting - // skips - skipTo := initialWrites + 10 - writeToFile(skipTo, expected[initialWrites:]) - - select { - case <-fullResultCh: - case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): - t.Fatalf("did not receive data: got %q", string(received)) - } - - // Close the reader - r.Close() - + out := "" testutil.WaitForResult(func() (bool, error) { - return wrappedW.Closed, nil + output, err := ioutil.ReadAll(respW.Body) + if err != nil { + return false, err + } + + out += string(output) + return out == expectation, fmt.Errorf("%q != %q", out, expectation) }, func(err error) { - t.Fatalf("connection not closed") + t.Fatal(err) }) + + p.Close() }) } -*/