open-nomad/command/agent/fs_endpoint_test.go

474 lines
12 KiB
Go
Raw Normal View History

2016-01-13 19:19:45 +00:00
package agent
import (
2018-02-05 21:07:27 +00:00
"encoding/base64"
2016-07-20 20:06:05 +00:00
"fmt"
2018-02-05 21:07:27 +00:00
"io"
"io/ioutil"
2016-01-13 19:19:45 +00:00
"net/http"
"net/http/httptest"
2018-02-05 21:07:27 +00:00
"strings"
2016-01-13 19:19:45 +00:00
"testing"
"time"
2016-07-10 17:55:52 +00:00
2018-02-05 21:07:27 +00:00
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
2017-05-03 19:38:49 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2018-02-05 21:07:27 +00:00
"github.com/hashicorp/nomad/testutil"
2018-02-01 01:35:21 +00:00
"github.com/stretchr/testify/require"
2016-01-13 19:19:45 +00:00
)
2018-02-05 21:07:27 +00:00
const (
defaultLoggerMockDriverStdout = "Hello from the other side"
)
var (
defaultLoggerMockDriver = map[string]interface{}{
"run_for": "2s",
"stdout_string": defaultLoggerMockDriverStdout,
}
)
type clientAllocWaiter int
const (
noWaitClientAlloc clientAllocWaiter = iota
runningClientAlloc
terminalClientAlloc
)
func addAllocToClient(agent *TestAgent, alloc *structs.Allocation, wait clientAllocWaiter) {
require := require.New(agent.T)
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
node, err := agent.server.State().NodeByID(nil, agent.client.NodeID())
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("unknown node")
}
return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status")
}, func(err error) {
agent.T.Fatal(err)
})
// Upsert the allocation
state := agent.server.State()
require.Nil(state.UpsertJob(999, alloc.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{alloc}))
if wait == noWaitClientAlloc {
return
}
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, alloc.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
expectation := alloc.ClientStatus == structs.AllocClientStatusComplete ||
alloc.ClientStatus == structs.AllocClientStatusFailed
if wait == runningClientAlloc {
expectation = expectation || alloc.ClientStatus == structs.AllocClientStatusRunning
}
if !expectation {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
agent.T.Fatal(err)
})
}
// mockFSAlloc returns a suitable mock alloc for testing the fs system. If
// config isn't provided, the defaultLoggerMockDriver config is used.
func mockFSAlloc(nodeID string, config map[string]interface{}) *structs.Allocation {
a := mock.Alloc()
a.NodeID = nodeID
a.Job.Type = structs.JobTypeBatch
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
if config != nil {
a.Job.TaskGroups[0].Tasks[0].Config = config
} else {
a.Job.TaskGroups[0].Tasks[0].Config = defaultLoggerMockDriver
}
return a
}
func TestHTTP_FS_List_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
2016-01-13 19:49:39 +00:00
req, err := http.NewRequest("GET", "/v1/client/fs/ls/", nil)
2018-02-05 21:07:27 +00:00
require.Nil(err)
2016-01-13 19:19:45 +00:00
respW := httptest.NewRecorder()
_, err = s.Server.DirectoryListRequest(respW, req)
2018-02-05 21:07:27 +00:00
require.EqualError(err, allocIDNotPresentErr.Error())
2016-01-13 19:19:45 +00:00
})
}
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Stat_MissingParams(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
2016-01-13 19:19:45 +00:00
req, err := http.NewRequest("GET", "/v1/client/fs/stat/", nil)
2018-02-05 21:07:27 +00:00
require.Nil(err)
2016-01-13 19:19:45 +00:00
respW := httptest.NewRecorder()
_, err = s.Server.FileStatRequest(respW, req)
2018-02-05 21:07:27 +00:00
require.EqualError(err, allocIDNotPresentErr.Error())
2016-01-13 19:19:45 +00:00
req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil)
2018-02-05 21:07:27 +00:00
require.Nil(err)
2016-01-13 19:19:45 +00:00
respW = httptest.NewRecorder()
_, err = s.Server.FileStatRequest(respW, req)
2018-02-05 21:07:27 +00:00
require.EqualError(err, fileNameNotPresentErr.Error())
})
}
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_ReadAt_MissingParams(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/readat/", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
req, err = http.NewRequest("GET", "/v1/client/fs/readat/foo?path=/path/to/file", nil)
require.NoError(err)
_, err = s.Server.FileReadAtRequest(httptest.NewRecorder(), req)
require.Error(err)
2016-01-13 19:19:45 +00:00
})
}
2016-07-07 15:15:22 +00:00
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Cat_MissingParams(t *testing.T) {
t.Parallel()
2018-02-01 01:35:21 +00:00
require := require.New(t)
2018-02-05 21:07:27 +00:00
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/cat/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
2018-02-01 01:35:21 +00:00
2018-02-05 21:07:27 +00:00
_, err = s.Server.FileCatRequest(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stat/foo", nil)
require.Nil(err)
respW = httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
})
}
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Stream_MissingParams(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
2018-02-05 21:07:27 +00:00
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
2018-02-05 21:07:27 +00:00
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo", nil)
2018-02-05 21:07:27 +00:00
require.Nil(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
2018-02-05 21:07:27 +00:00
require.EqualError(err, fileNameNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo?path=/path/to/file", nil)
2018-02-05 21:07:27 +00:00
require.Nil(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
2018-02-05 21:07:27 +00:00
require.Nil(err)
})
}
// TestHTTP_FS_Logs_MissingParams asserts proper error codes and messages are
// returned for incorrect parameters (eg missing tasks).
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Logs_MissingParams(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
// AllocID Not Present
2018-02-05 21:07:27 +00:00
req, err := http.NewRequest("GET", "/v1/client/fs/logs/", nil)
require.Nil(err)
respW := httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), allocIDNotPresentErr.Error())
require.Equal(500, respW.Code) // 500 for backward compat
2016-07-20 20:06:05 +00:00
// Task Not Present
2018-02-05 21:07:27 +00:00
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo", nil)
require.Nil(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), taskNotPresentErr.Error())
require.Equal(500, respW.Code) // 500 for backward compat
// Log Type Not Present
2018-02-05 21:07:27 +00:00
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo", nil)
require.Nil(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), logTypeNotPresentErr.Error())
require.Equal(500, respW.Code) // 500 for backward compat
// Ok
2018-02-05 21:07:27 +00:00
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo&type=stdout", nil)
require.Nil(err)
respW = httptest.NewRecorder()
2016-07-19 01:41:21 +00:00
s.Server.mux.ServeHTTP(respW, req)
require.Equal(200, respW.Code)
})
}
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_List(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
2018-02-05 21:07:27 +00:00
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
2018-02-05 21:07:27 +00:00
req, err := http.NewRequest("GET", "/v1/client/fs/ls/"+a.ID, nil)
require.Nil(err)
respW := httptest.NewRecorder()
raw, err := s.Server.DirectoryListRequest(respW, req)
require.Nil(err)
2018-02-05 21:07:27 +00:00
files, ok := raw.([]*cstructs.AllocFileInfo)
require.True(ok)
require.NotEmpty(files)
require.True(files[0].IsDir)
})
}
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Stat(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
2018-02-05 21:07:27 +00:00
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
2018-02-05 21:07:27 +00:00
path := fmt.Sprintf("/v1/client/fs/stat/%s?path=alloc/", a.ID)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
raw, err := s.Server.FileStatRequest(respW, req)
require.Nil(err)
2018-02-05 21:07:27 +00:00
info, ok := raw.(*cstructs.AllocFileInfo)
require.True(ok)
require.NotNil(info)
require.True(info.IsDir)
})
}
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_ReadAt(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
2018-02-05 21:07:27 +00:00
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
2018-02-05 21:07:27 +00:00
offset := 1
limit := 3
expectation := defaultLoggerMockDriverStdout[offset : offset+limit]
path := fmt.Sprintf("/v1/client/fs/readat/%s?path=alloc/logs/web.stdout.0&offset=%d&limit=%d",
a.ID, offset, limit)
2018-02-05 21:07:27 +00:00
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileReadAtRequest(respW, req)
require.Nil(err)
2018-02-05 21:07:27 +00:00
output, err := ioutil.ReadAll(respW.Result().Body)
require.Nil(err)
require.EqualValues(expectation, output)
})
}
2016-07-19 22:58:02 +00:00
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Cat(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
2018-02-05 21:07:27 +00:00
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
2016-07-20 20:06:05 +00:00
2018-02-05 21:07:27 +00:00
path := fmt.Sprintf("/v1/client/fs/cat/%s?path=alloc/logs/web.stdout.0", a.ID)
2016-07-20 20:06:05 +00:00
2018-02-05 21:07:27 +00:00
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.FileCatRequest(respW, req)
require.Nil(err)
2016-07-20 20:06:05 +00:00
2018-02-05 21:07:27 +00:00
output, err := ioutil.ReadAll(respW.Result().Body)
require.Nil(err)
require.EqualValues(defaultLoggerMockDriverStdout, output)
})
}
2016-07-20 20:06:05 +00:00
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Stream(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := base64.StdEncoding.EncodeToString(
[]byte(defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]))
path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end",
a.ID, offset)
p, _ := io.Pipe()
2018-02-05 21:07:27 +00:00
req, err := http.NewRequest("GET", path, p)
require.Nil(err)
respW := httptest.NewRecorder()
doneCh := make(chan struct{})
2016-07-20 20:06:05 +00:00
go func() {
2018-02-05 21:07:27 +00:00
_, err = s.Server.Stream(respW, req)
require.Nil(err)
close(doneCh)
2016-07-20 20:06:05 +00:00
}()
2018-02-05 21:07:27 +00:00
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW.Body)
if err != nil {
return false, err
2016-07-20 20:06:05 +00:00
}
2018-02-05 21:07:27 +00:00
out += string(output)
return strings.Contains(out, expectation), fmt.Errorf("%q doesn't contain %q", out, expectation)
2016-07-20 20:06:05 +00:00
}, func(err error) {
2018-02-05 21:07:27 +00:00
t.Fatal(err)
2016-07-20 20:06:05 +00:00
})
select {
case <-doneCh:
t.Fatal("shouldn't close")
case <-time.After(1 * time.Second):
}
2018-02-05 21:07:27 +00:00
p.Close()
2016-07-20 20:06:05 +00:00
})
}
2018-02-05 21:07:27 +00:00
func TestHTTP_FS_Logs(t *testing.T) {
2017-07-20 05:42:15 +00:00
t.Parallel()
2018-02-05 21:07:27 +00:00
require := require.New(t)
2017-07-20 05:14:36 +00:00
httpTest(t, nil, func(s *TestAgent) {
2018-02-05 21:07:27 +00:00
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
2016-07-20 20:06:05 +00:00
2018-02-05 21:07:27 +00:00
offset := 4
expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true",
a.ID, offset)
2016-07-20 20:06:05 +00:00
2018-02-05 21:07:27 +00:00
p, _ := io.Pipe()
req, err := http.NewRequest("GET", path, p)
require.Nil(err)
respW := testutil.NewResponseRecorder()
2016-07-20 20:06:05 +00:00
go func() {
2018-02-05 21:07:27 +00:00
_, err = s.Server.Logs(respW, req)
require.Nil(err)
2016-07-20 20:06:05 +00:00
}()
2018-02-05 21:07:27 +00:00
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
2018-02-05 21:07:27 +00:00
if err != nil {
return false, err
2016-07-20 20:06:05 +00:00
}
2018-02-05 21:07:27 +00:00
out += string(output)
return out == expectation, fmt.Errorf("%q != %q", out, expectation)
2016-07-20 20:06:05 +00:00
}, func(err error) {
2018-02-05 21:07:27 +00:00
t.Fatal(err)
2016-07-20 20:06:05 +00:00
})
2018-02-05 21:07:27 +00:00
p.Close()
2016-07-20 20:06:05 +00:00
})
}
func TestHTTP_FS_Logs_Follow(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
a := mockFSAlloc(s.client.NodeID(), nil)
addAllocToClient(s, a, terminalClientAlloc)
offset := 4
expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true&follow=true",
a.ID, offset)
p, _ := io.Pipe()
req, err := http.NewRequest("GET", path, p)
require.Nil(err)
respW := testutil.NewResponseRecorder()
errCh := make(chan error)
go func() {
_, err := s.Server.Logs(respW, req)
errCh <- err
}()
out := ""
testutil.WaitForResult(func() (bool, error) {
output, err := ioutil.ReadAll(respW)
if err != nil {
return false, err
}
out += string(output)
return out == expectation, fmt.Errorf("%q != %q", out, expectation)
}, func(err error) {
t.Fatal(err)
})
select {
case err := <-errCh:
t.Fatalf("shouldn't exit: %v", err)
case <-time.After(1 * time.Second):
}
p.Close()
})
}