open-nomad/command/agent/fs_endpoint_test.go
Drew Bailey 6c788fdccd
Events/msgtype cleanup (#9117)
* use msgtype in upsert node

adds message type to signature for upsert node, update tests, remove placeholder method

* UpsertAllocs msg type test setup

* use upsertallocs with msg type in signature

update test usage of delete node

delete placeholder msgtype method

* add msgtype to upsert evals signature, update test call sites with test setup msg type

handle snapshot upsert eval outside of FSM and ignore eval event

remove placeholder upsertevalsmsgtype

handle job plan rpc and prevent event creation for plan

msgtype cleanup upsertnodeevents

updatenodedrain msgtype

msg type 0 is a node registration event, so set the default  to the ignore type

* fix named import

* fix signature ordering on upsertnode to match
2020-10-19 09:30:15 -04:00

626 lines
18 KiB
Go

package agent
import (
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
const (
defaultLoggerMockDriverStdout = "Hello from the other side"
xssLoggerMockDriverStdout = "<script>alert(document.domain);</script>"
)
var (
defaultLoggerMockDriver = map[string]interface{}{
"run_for": "2s",
"stdout_string": defaultLoggerMockDriverStdout,
}
xssLoggerMockDriver = map[string]interface{}{
"run_for": "2s",
"stdout_string": xssLoggerMockDriverStdout,
}
)
type clientAllocWaiter int
const (
noWaitClientAlloc clientAllocWaiter = iota
runningClientAlloc
terminalClientAlloc
)
func addAllocToClient(agent *TestAgent, alloc *structs.Allocation, wait clientAllocWaiter) {
require := require.New(agent.T)
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
node, err := agent.server.State().NodeByID(nil, agent.client.NodeID())
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("unknown node")
}
return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status")
}, func(err error) {
agent.T.Fatal(err)
})
// Upsert the allocation
state := agent.server.State()
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc}))
if wait == noWaitClientAlloc {
return
}
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, alloc.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
expectation := alloc.ClientStatus == structs.AllocClientStatusComplete ||
alloc.ClientStatus == structs.AllocClientStatusFailed
if wait == runningClientAlloc {
expectation = expectation || alloc.ClientStatus == structs.AllocClientStatusRunning
}
if !expectation {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
agent.T.Fatal(err)
})
}
// mockFSAlloc returns a suitable mock alloc for testing the fs system. If
// config isn't provided, the defaultLoggerMockDriver config is used.
func mockFSAlloc(nodeID string, config map[string]interface{}) *structs.Allocation {
a := mock.Alloc()
a.NodeID = nodeID
a.Job.Type = structs.JobTypeBatch
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
if config != nil {
a.Job.TaskGroups[0].Tasks[0].Config = config
} else {
a.Job.TaskGroups[0].Tasks[0].Config = defaultLoggerMockDriver
}
return a
}
func TestHTTP_FS_List_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/ls/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.DirectoryListRequest(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
})
}
func TestHTTP_FS_Stat_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/stat/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileStatRequest(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil)
require.Nil(err)
respW = httptest.NewRecorder()
_, err = s.Server.FileStatRequest(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
})
}
func TestHTTP_FS_ReadAt_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/readat/", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo?path=/path/to/file", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
})
}
func TestHTTP_FS_Cat_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/cat/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil)
require.Nil(err)
respW = httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
})
}
func TestHTTP_FS_Stream_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
require.NoError(err)
respW := httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo", nil)
require.NoError(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo?path=/path/to/file", nil)
require.NoError(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.Error(err)
require.Contains(err.Error(), "alloc lookup failed")
})
}
// TestHTTP_FS_Logs_MissingParams asserts proper error codes and messages are
// returned for incorrect parameters (eg missing tasks).
func TestHTTP_FS_Logs_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
// AllocID Not Present
req, err := http.NewRequest("GET", "/v1/client/fs/logs/", nil)
require.NoError(err)
respW := httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), allocIDNotPresentErr.Error())
require.Equal(400, respW.Code)
// Task Not Present
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo", nil)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), taskNotPresentErr.Error())
require.Equal(400, respW.Code)
// Log Type Not Present
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo", nil)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), logTypeNotPresentErr.Error())
require.Equal(400, respW.Code)
// case where all parameters are set but alloc isn't found
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo&type=stdout", nil)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(500, respW.Code)
require.Contains(respW.Body.String(), "alloc lookup failed")
})
}
func TestHTTP_FS_List(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
req, err := http.NewRequest("GET", "/v1/client/fs/ls/"+a.ID, nil)
require.Nil(err)
respW := httptest.NewRecorder()
raw, err := s.Server.DirectoryListRequest(respW, req)
require.Nil(err)
files, ok := raw.([]*cstructs.AllocFileInfo)
require.True(ok)
require.NotEmpty(files)
require.True(files[0].IsDir)
})
}
func TestHTTP_FS_Stat(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
path := fmt.Sprintf("/v1/client/fs/stat/%s?path=alloc/", a.ID)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
raw, err := s.Server.FileStatRequest(respW, req)
require.Nil(err)
info, ok := raw.(*cstructs.AllocFileInfo)
require.True(ok)
require.NotNil(info)
require.True(info.IsDir)
})
}
func TestHTTP_FS_ReadAt(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 1
limit := 3
expectation := defaultLoggerMockDriverStdout[offset : offset+limit]
path := fmt.Sprintf("/v1/client/fs/readat/%s?path=alloc/logs/web.stdout.0&offset=%d&limit=%d",
a.ID, offset, limit)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileReadAtRequest(respW, req)
require.Nil(err)
output, err := ioutil.ReadAll(respW.Result().Body)
require.Nil(err)
require.EqualValues(expectation, output)
})
}
// TestHTTP_FS_ReadAt_XSS asserts that the readat API is safe from XSS.
func TestHTTP_FS_ReadAt_XSS(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), xssLoggerMockDriver)
addAllocToClient(s, a, terminalClientAlloc)
path := fmt.Sprintf("%s/v1/client/fs/readat/%s?path=alloc/logs/web.stdout.0&offset=0&limit=%d",
s.HTTPAddr(), a.ID, len(xssLoggerMockDriverStdout))
resp, err := http.DefaultClient.Get(path)
require.NoError(t, err)
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, xssLoggerMockDriverStdout, string(buf))
require.Equal(t, []string{"text/plain"}, resp.Header.Values("Content-Type"))
require.Equal(t, []string{"nosniff"}, resp.Header.Values("X-Content-Type-Options"))
require.Equal(t, []string{"1; mode=block"}, resp.Header.Values("X-XSS-Protection"))
require.Equal(t, []string{"default-src 'none'; style-src 'unsafe-inline'; sandbox"},
resp.Header.Values("Content-Security-Policy"))
})
}
func TestHTTP_FS_Cat(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
path := fmt.Sprintf("/v1/client/fs/cat/%s?path=alloc/logs/web.stdout.0", a.ID)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.Nil(err)
output, err := ioutil.ReadAll(respW.Result().Body)
require.Nil(err)
require.EqualValues(defaultLoggerMockDriverStdout, output)
})
}
// TestHTTP_FS_Cat_XSS asserts that the cat API is safe from XSS.
func TestHTTP_FS_Cat_XSS(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), xssLoggerMockDriver)
addAllocToClient(s, a, terminalClientAlloc)
path := fmt.Sprintf("%s/v1/client/fs/cat/%s?path=alloc/logs/web.stdout.0", s.HTTPAddr(), a.ID)
resp, err := http.DefaultClient.Get(path)
require.NoError(t, err)
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, xssLoggerMockDriverStdout, string(buf))
require.Equal(t, []string{"text/plain"}, resp.Header.Values("Content-Type"))
require.Equal(t, []string{"nosniff"}, resp.Header.Values("X-Content-Type-Options"))
require.Equal(t, []string{"1; mode=block"}, resp.Header.Values("X-XSS-Protection"))
require.Equal(t, []string{"default-src 'none'; style-src 'unsafe-inline'; sandbox"},
resp.Header.Values("Content-Security-Policy"))
})
}
func TestHTTP_FS_Stream_NoFollow(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := base64.StdEncoding.EncodeToString(
[]byte(defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]))
path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end&follow=false",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
doneCh := make(chan struct{})
go func() {
_, err = s.Server.Stream(respW, req)
require.Nil(err)
close(doneCh)
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
if err != nil {
return false, err
}
out += string(output)
return strings.Contains(out, expectation), fmt.Errorf("%q doesn't contain %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
select {
case <-doneCh:
case <-time.After(1 * time.Second):
t.Fatal("should close but did not")
}
})
}
// TestHTTP_FS_Stream_NoFollow_XSS asserts that the stream API is safe from XSS.
func TestHTTP_FS_Stream_NoFollow_XSS(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), xssLoggerMockDriver)
addAllocToClient(s, a, terminalClientAlloc)
path := fmt.Sprintf("%s/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&follow=false",
s.HTTPAddr(), a.ID)
resp, err := http.DefaultClient.Get(path)
require.NoError(t, err)
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
expected := `{"Data":"PHNjcmlwdD5hbGVydChkb2N1bWVudC5kb21haW4pOzwvc2NyaXB0Pg==","File":"alloc/logs/web.stdout.0","Offset":40}`
require.Equal(t, expected, string(buf))
})
}
func TestHTTP_FS_Stream_Follow(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := base64.StdEncoding.EncodeToString(
[]byte(defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]))
path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
doneCh := make(chan struct{})
go func() {
_, err = s.Server.Stream(respW, req)
require.Nil(err)
close(doneCh)
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW.Body)
if err != nil {
return false, err
}
out += string(output)
return strings.Contains(out, expectation), fmt.Errorf("%q doesn't contain %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
select {
case <-doneCh:
t.Fatal("shouldn't close")
case <-time.After(1 * time.Second):
}
})
}
func TestHTTP_FS_Logs(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
go func() {
_, err = s.Server.Logs(respW, req)
require.Nil(err)
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
if err != nil {
return false, err
}
out += string(output)
return out == expectation, fmt.Errorf("%q != %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
})
}
// TestHTTP_FS_Logs_XSS asserts that the logs endpoint always returns
// text/plain or application/json content regardless of whether the logs are
// HTML+Javascript or not.
func TestHTTP_FS_Logs_XSS(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), xssLoggerMockDriver)
addAllocToClient(s, a, terminalClientAlloc)
// Must make a "real" request to ensure Go's default content
// type detection does not detect text/html
path := fmt.Sprintf("%s/v1/client/fs/logs/%s?type=stdout&task=web&plain=true", s.HTTPAddr(), a.ID)
resp, err := http.DefaultClient.Get(path)
require.NoError(t, err)
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, xssLoggerMockDriverStdout, string(buf))
require.Equal(t, []string{"text/plain"}, resp.Header.Values("Content-Type"))
})
}
func TestHTTP_FS_Logs_Follow(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true&follow=true",
a.ID, offset)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
errCh := make(chan error, 1)
go func() {
_, err := s.Server.Logs(respW, req)
errCh <- err
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
if err != nil {
return false, err
}
out += string(output)
return out == expectation, fmt.Errorf("%q != %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
select {
case err := <-errCh:
t.Fatalf("shouldn't exit: %v", err)
case <-time.After(1 * time.Second):
}
})
}
func TestHTTP_FS_Logs_PropagatesErrors(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=0&origin=end&plain=true",
uuid.Generate())
req, err := http.NewRequest("GET", path, nil)
require.NoError(t, err)
respW := testutil.NewResponseRecorder()
_, err = s.Server.Logs(respW, req)
require.Error(t, err)
_, ok := err.(HTTPCodedError)
require.Truef(t, ok, "expected a coded error but found: %#+v", err)
})
}