backport of commit 14a38bee7bc4386e74157f6a99f3db7382d7e6a5 (#18275)

Co-authored-by: Luiz Aoqui <luiz@hashicorp.com>
This commit is contained in:
hc-github-team-nomad-core 2023-08-21 15:34:32 -05:00 committed by GitHub
parent 27a14e4da1
commit 621bce1da2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 237 additions and 31 deletions

3
.changelog/18232.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
client: return 404 instead of 500 when trying to access logs and files from allocations that have been garbage collected
```

View File

@ -169,32 +169,42 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
encoder := codec.NewEncoder(conn, structs.MsgpackHandle) encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&req); err != nil { if err := decoder.Decode(&req); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder) handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
return return
} }
if req.AllocID == "" { if req.AllocID == "" {
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
alloc, err := f.c.GetAlloc(req.AllocID)
if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder)
return return
} }
ar, err := f.c.getAllocRunner(req.AllocID)
if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
return
}
if ar.IsDestroyed() {
handleStreamResultError(
fmt.Errorf("state for allocation %s not found on client", req.AllocID),
pointer.Of(int64(http.StatusNotFound)),
encoder,
)
return
}
alloc := ar.Alloc()
// Check read permissions // Check read permissions
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
handleStreamResultError(err, pointer.Of(int64(403)), encoder) handleStreamResultError(err, pointer.Of(int64(http.StatusForbidden)), encoder)
return return
} else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) { } else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder) handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(http.StatusForbidden)), encoder)
return return
} }
// Validate the arguments // Validate the arguments
if req.Path == "" { if req.Path == "" {
handleStreamResultError(pathNotPresentErr, pointer.Of(int64(400)), encoder) handleStreamResultError(pathNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
switch req.Origin { switch req.Origin {
@ -202,15 +212,15 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
case "": case "":
req.Origin = "start" req.Origin = "start"
default: default:
handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
fs, err := f.c.GetAllocFS(req.AllocID) fs, err := f.c.GetAllocFS(req.AllocID)
if err != nil { if err != nil {
code := pointer.Of(int64(500)) code := pointer.Of(int64(http.StatusInternalServerError))
if structs.IsErrUnknownAllocation(err) { if structs.IsErrUnknownAllocation(err) {
code = pointer.Of(int64(404)) code = pointer.Of(int64(http.StatusNotFound))
} }
handleStreamResultError(err, code, encoder) handleStreamResultError(err, code, encoder)
@ -220,13 +230,13 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
// Calculate the offset // Calculate the offset
fileInfo, err := fs.Stat(req.Path) fileInfo, err := fs.Stat(req.Path)
if err != nil { if err != nil {
handleStreamResultError(err, pointer.Of(int64(400)), encoder) handleStreamResultError(err, pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
if fileInfo.IsDir { if fileInfo.IsDir {
handleStreamResultError( handleStreamResultError(
fmt.Errorf("file %q is a directory", req.Path), fmt.Errorf("file %q is a directory", req.Path),
pointer.Of(int64(400)), encoder) pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
@ -328,7 +338,7 @@ OUTER:
} }
if streamErr != nil { if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) handleStreamResultError(streamErr, pointer.Of(int64(http.StatusInternalServerError)), encoder)
return return
} }
} }
@ -344,19 +354,29 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
encoder := codec.NewEncoder(conn, structs.MsgpackHandle) encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&req); err != nil { if err := decoder.Decode(&req); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder) handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
return return
} }
if req.AllocID == "" { if req.AllocID == "" {
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
alloc, err := f.c.GetAlloc(req.AllocID)
ar, err := f.c.getAllocRunner(req.AllocID)
if err != nil { if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
return return
} }
if ar.IsDestroyed() {
handleStreamResultError(
fmt.Errorf("state for allocation %s not found on client", req.AllocID),
pointer.Of(int64(http.StatusNotFound)),
encoder,
)
return
}
alloc := ar.Alloc()
// Check read permissions // Check read permissions
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
@ -373,13 +393,13 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
// Validate the arguments // Validate the arguments
if req.Task == "" { if req.Task == "" {
handleStreamResultError(taskNotPresentErr, pointer.Of(int64(400)), encoder) handleStreamResultError(taskNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
switch req.LogType { switch req.LogType {
case "stdout", "stderr": case "stdout", "stderr":
default: default:
handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(400)), encoder) handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
switch req.Origin { switch req.Origin {
@ -387,15 +407,15 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
case "": case "":
req.Origin = "start" req.Origin = "start"
default: default:
handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
return return
} }
fs, err := f.c.GetAllocFS(req.AllocID) fs, err := f.c.GetAllocFS(req.AllocID)
if err != nil { if err != nil {
code := pointer.Of(int64(500)) code := pointer.Of(int64(http.StatusInternalServerError))
if structs.IsErrUnknownAllocation(err) { if structs.IsErrUnknownAllocation(err) {
code = pointer.Of(int64(404)) code = pointer.Of(int64(http.StatusNotFound))
} }
handleStreamResultError(err, code, encoder) handleStreamResultError(err, code, encoder)
@ -404,9 +424,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
allocState, err := f.c.GetAllocState(req.AllocID) allocState, err := f.c.GetAllocState(req.AllocID)
if err != nil { if err != nil {
code := pointer.Of(int64(500)) code := pointer.Of(int64(http.StatusInternalServerError))
if structs.IsErrUnknownAllocation(err) { if structs.IsErrUnknownAllocation(err) {
code = pointer.Of(int64(404)) code = pointer.Of(int64(http.StatusNotFound))
} }
handleStreamResultError(err, code, encoder) handleStreamResultError(err, code, encoder)
@ -418,7 +438,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
if taskState == nil { if taskState == nil {
handleStreamResultError( handleStreamResultError(
fmt.Errorf("unknown task name %q", req.Task), fmt.Errorf("unknown task name %q", req.Task),
pointer.Of(int64(400)), pointer.Of(int64(http.StatusBadRequest)),
encoder) encoder)
return return
} }
@ -426,7 +446,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
if taskState.StartedAt.IsZero() { if taskState.StartedAt.IsZero() {
handleStreamResultError( handleStreamResultError(
fmt.Errorf("task %q not started yet. No logs available", req.Task), fmt.Errorf("task %q not started yet. No logs available", req.Task),
pointer.Of(int64(404)), pointer.Of(int64(http.StatusNotFound)),
encoder) encoder)
return return
} }
@ -512,7 +532,7 @@ OUTER:
if streamErr != nil { if streamErr != nil {
// If error has a Code, use it // If error has a Code, use it
var code int64 = 500 var code int64 = http.StatusInternalServerError
if codedErr, ok := streamErr.(interface{ Code() int }); ok { if codedErr, ok := streamErr.(interface{ Code() int }); ok {
code = int64(codedErr.Code()) code = int64(codedErr.Code())
} }

View File

@ -9,6 +9,7 @@ import (
"io" "io"
"math" "math"
"net" "net"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
@ -31,6 +32,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -394,6 +396,96 @@ OUTER:
} }
} }
// TestFS_Stream_GC asserts that reading files from an alloc that has been
// GC'ed from the client returns a 404 error.
func TestFS_Stream_GC(t *testing.T) {
ci.Parallel(t)
// Start a server and client.
s, cleanupS := nomad.TestServer(t, nil)
t.Cleanup(cleanupS)
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { cleanupC() })
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
// Wait for alloc to be running.
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// GC alloc from the client.
ar, err := c.getAllocRunner(alloc.ID)
must.NoError(t, err)
c.garbageCollector.MarkForCollection(alloc.ID, ar)
must.True(t, c.CollectAllocation(alloc.ID))
// Build the request.
req := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "alloc/logs/web.stdout.0",
PlainText: true,
Follow: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler.
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
must.NoError(t, err)
// Create a pipe.
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler.
go handler(p2)
// Start the decoder.
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.NoError(t, encoder.Encode(req))
for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
must.Error(t, msg.Error)
must.ErrorContains(t, msg.Error, "not found on client")
must.Eq(t, http.StatusNotFound, *msg.Error.Code)
return
}
}
}
func TestFS_Stream_ACL(t *testing.T) { func TestFS_Stream_ACL(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
@ -1015,13 +1107,104 @@ func TestFS_Logs_TaskPending(t *testing.T) {
case msg := <-streamMsg: case msg := <-streamMsg:
require.NotNil(msg.Error) require.NotNil(msg.Error)
require.NotNil(msg.Error.Code) require.NotNil(msg.Error.Code)
require.EqualValues(404, *msg.Error.Code) require.EqualValues(http.StatusNotFound, *msg.Error.Code)
require.Contains(msg.Error.Message, "not started") require.Contains(msg.Error.Message, "not started")
return return
} }
} }
} }
// TestFS_Logs_GC asserts that reading logs from an alloc that has been GC'ed
// from the client returns a 404 error.
func TestFS_Logs_GC(t *testing.T) {
ci.Parallel(t)
// Start a server and client.
s, cleanupS := nomad.TestServer(t, nil)
t.Cleanup(cleanupS)
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { cleanupC() })
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
// Wait for alloc to be running.
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// GC alloc from the client.
ar, err := c.getAllocRunner(alloc.ID)
must.NoError(t, err)
c.garbageCollector.MarkForCollection(alloc.ID, ar)
must.True(t, c.CollectAllocation(alloc.ID))
// Build the request.
req := &cstructs.FsLogsRequest{
AllocID: alloc.ID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler.
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
must.NoError(t, err)
// Create a pipe.
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler.
go handler(p2)
// Start the decoder.
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request.
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.NoError(t, encoder.Encode(req))
for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatalf("unexpected stream error: %v", err)
case msg := <-streamMsg:
must.Error(t, msg.Error)
must.ErrorContains(t, msg.Error, "not found on client")
must.Eq(t, http.StatusNotFound, *msg.Error.Code)
return
}
}
}
func TestFS_Logs_ACL(t *testing.T) { func TestFS_Logs_ACL(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
require := require.New(t) require := require.New(t)