open-nomad/command/agent/fs_endpoint_test.go
2019-10-15 17:22:03 -04:00

528 lines
14 KiB
Go

package agent
import (
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
const (
defaultLoggerMockDriverStdout = "Hello from the other side"
)
var (
defaultLoggerMockDriver = map[string]interface{}{
"run_for": "2s",
"stdout_string": defaultLoggerMockDriverStdout,
}
)
type clientAllocWaiter int
const (
noWaitClientAlloc clientAllocWaiter = iota
runningClientAlloc
terminalClientAlloc
)
func addAllocToClient(agent *TestAgent, alloc *structs.Allocation, wait clientAllocWaiter) {
require := require.New(agent.T)
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
node, err := agent.server.State().NodeByID(nil, agent.client.NodeID())
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("unknown node")
}
return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status")
}, func(err error) {
agent.T.Fatal(err)
})
// Upsert the allocation
state := agent.server.State()
require.Nil(state.UpsertJob(999, alloc.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{alloc}))
if wait == noWaitClientAlloc {
return
}
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, alloc.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
expectation := alloc.ClientStatus == structs.AllocClientStatusComplete ||
alloc.ClientStatus == structs.AllocClientStatusFailed
if wait == runningClientAlloc {
expectation = expectation || alloc.ClientStatus == structs.AllocClientStatusRunning
}
if !expectation {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
agent.T.Fatal(err)
})
}
// mockFSAlloc returns a suitable mock alloc for testing the fs system. If
// config isn't provided, the defaultLoggerMockDriver config is used.
func mockFSAlloc(nodeID string, config map[string]interface{}) *structs.Allocation {
a := mock.Alloc()
a.NodeID = nodeID
a.Job.Type = structs.JobTypeBatch
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
if config != nil {
a.Job.TaskGroups[0].Tasks[0].Config = config
} else {
a.Job.TaskGroups[0].Tasks[0].Config = defaultLoggerMockDriver
}
return a
}
func TestHTTP_FS_List_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/ls/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.DirectoryListRequest(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
})
}
func TestHTTP_FS_Stat_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/stat/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileStatRequest(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil)
require.Nil(err)
respW = httptest.NewRecorder()
_, err = s.Server.FileStatRequest(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
})
}
func TestHTTP_FS_ReadAt_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/readat/", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo?path=/path/to/file", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
})
}
func TestHTTP_FS_Cat_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/cat/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil)
require.Nil(err)
respW = httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
})
}
func TestHTTP_FS_Stream_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
require.NoError(err)
respW := httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo", nil)
require.NoError(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo?path=/path/to/file", nil)
require.NoError(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.Error(err)
require.Contains(err.Error(), "alloc lookup failed")
})
}
// TestHTTP_FS_Logs_MissingParams asserts proper error codes and messages are
// returned for incorrect parameters (eg missing tasks).
func TestHTTP_FS_Logs_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
// AllocID Not Present
req, err := http.NewRequest("GET", "/v1/client/fs/logs/", nil)
require.NoError(err)
respW := httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), allocIDNotPresentErr.Error())
require.Equal(400, respW.Code)
// Task Not Present
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo", nil)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), taskNotPresentErr.Error())
require.Equal(400, respW.Code)
// Log Type Not Present
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo", nil)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), logTypeNotPresentErr.Error())
require.Equal(400, respW.Code)
// case where all parameters are set but alloc isn't found
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo&type=stdout", nil)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(500, respW.Code)
require.Contains(respW.Body.String(), "alloc lookup failed")
})
}
func TestHTTP_FS_List(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
req, err := http.NewRequest("GET", "/v1/client/fs/ls/"+a.ID, nil)
require.Nil(err)
respW := httptest.NewRecorder()
raw, err := s.Server.DirectoryListRequest(respW, req)
require.Nil(err)
files, ok := raw.([]*cstructs.AllocFileInfo)
require.True(ok)
require.NotEmpty(files)
require.True(files[0].IsDir)
})
}
func TestHTTP_FS_Stat(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
path := fmt.Sprintf("/v1/client/fs/stat/%s?path=alloc/", a.ID)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
raw, err := s.Server.FileStatRequest(respW, req)
require.Nil(err)
info, ok := raw.(*cstructs.AllocFileInfo)
require.True(ok)
require.NotNil(info)
require.True(info.IsDir)
})
}
func TestHTTP_FS_ReadAt(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 1
limit := 3
expectation := defaultLoggerMockDriverStdout[offset : offset+limit]
path := fmt.Sprintf("/v1/client/fs/readat/%s?path=alloc/logs/web.stdout.0&offset=%d&limit=%d",
a.ID, offset, limit)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileReadAtRequest(respW, req)
require.Nil(err)
output, err := ioutil.ReadAll(respW.Result().Body)
require.Nil(err)
require.EqualValues(expectation, output)
})
}
func TestHTTP_FS_Cat(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
path := fmt.Sprintf("/v1/client/fs/cat/%s?path=alloc/logs/web.stdout.0", a.ID)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.Nil(err)
output, err := ioutil.ReadAll(respW.Result().Body)
require.Nil(err)
require.EqualValues(defaultLoggerMockDriverStdout, output)
})
}
func TestHTTP_FS_Stream_NoFollow(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := base64.StdEncoding.EncodeToString(
[]byte(defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]))
path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end&follow=false",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
doneCh := make(chan struct{})
go func() {
_, err = s.Server.Stream(respW, req)
require.Nil(err)
close(doneCh)
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
if err != nil {
return false, err
}
out += string(output)
return strings.Contains(out, expectation), fmt.Errorf("%q doesn't contain %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
select {
case <-doneCh:
case <-time.After(1 * time.Second):
t.Fatal("should close but did not")
}
})
}
func TestHTTP_FS_Stream_Follow(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := base64.StdEncoding.EncodeToString(
[]byte(defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]))
path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
doneCh := make(chan struct{})
go func() {
_, err = s.Server.Stream(respW, req)
require.Nil(err)
close(doneCh)
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW.Body)
if err != nil {
return false, err
}
out += string(output)
return strings.Contains(out, expectation), fmt.Errorf("%q doesn't contain %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
select {
case <-doneCh:
t.Fatal("shouldn't close")
case <-time.After(1 * time.Second):
}
})
}
func TestHTTP_FS_Logs(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
go func() {
_, err = s.Server.Logs(respW, req)
require.Nil(err)
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
if err != nil {
return false, err
}
out += string(output)
return out == expectation, fmt.Errorf("%q != %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
})
}
func TestHTTP_FS_Logs_Follow(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true&follow=true",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
errCh := make(chan error)
go func() {
_, err := s.Server.Logs(respW, req)
errCh <- err
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
if err != nil {
return false, err
}
out += string(output)
return out == expectation, fmt.Errorf("%q != %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
select {
case err := <-errCh:
t.Fatalf("shouldn't exit: %v", err)
case <-time.After(1 * time.Second):
}
})
}
func TestHTTP_FS_Logs_PropagatesErrors(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=0&origin=end&plain=true",
uuid.Generate())
req, err := http.NewRequest("GET", path, nil)
require.NoError(t, err)
respW := testutil.NewResponseRecorder()
_, err = s.Server.Logs(respW, req)
require.Error(t, err)
_, ok := err.(HTTPCodedError)
require.Truef(t, ok, "expected a coded error but found: %#+v", err)
})
}