f0c3dca49c
Copy the updated version of freeport (sdk/freeport), and tweak it for use in Nomad tests. This means staying below port 10000 to avoid conflicts with the lib/freeport that is still transitively used by the old version of consul that we vendor. Also provide implementations to find ephemeral ports of macOS and Windows environments. Ports acquired through freeport are supposed to be returned to freeport, which this change now also introduces. Many tests are modified to include calls to a cleanup function for Server objects. This should help quite a bit with some flakey tests, but not all of them. Our port problems will not go away completely until we upgrade our vendor version of consul. With Go modules, we'll probably do a 'replace' to swap out other copies of freeport with the one now in 'nomad/helper/freeport'.
1985 lines
45 KiB
Go
1985 lines
45 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
"github.com/hashicorp/nomad/client/config"
|
|
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/helper/testlog"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
|
|
// should destroy the temp dir.
|
|
func tempAllocDir(t testing.TB) *allocdir.AllocDir {
|
|
dir, err := ioutil.TempDir("", "nomadtest")
|
|
if err != nil {
|
|
t.Fatalf("TempDir() failed: %v", err)
|
|
}
|
|
|
|
if err := os.Chmod(dir, 0777); err != nil {
|
|
t.Fatalf("failed to chmod dir: %v", err)
|
|
}
|
|
|
|
return allocdir.NewAllocDir(testlog.HCLogger(t), dir)
|
|
}
|
|
|
|
type nopWriteCloser struct {
|
|
io.Writer
|
|
}
|
|
|
|
func (n nopWriteCloser) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func TestFS_Stat_NoAlloc(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a client
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Make the request with bad allocation id
|
|
req := &cstructs.FsStatRequest{
|
|
AllocID: uuid.Generate(),
|
|
Path: "foo",
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
var resp cstructs.FsStatResponse
|
|
err := c.ClientRPC("FileSystem.Stat", req, &resp)
|
|
require.NotNil(err)
|
|
require.True(structs.IsErrUnknownAllocation(err))
|
|
}
|
|
|
|
func TestFS_Stat(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanupC()
|
|
|
|
// Create and add an alloc
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "500ms",
|
|
}
|
|
// Wait for alloc to be running
|
|
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
|
|
|
|
// Make the request
|
|
req := &cstructs.FsStatRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "/",
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
var resp cstructs.FsStatResponse
|
|
err := c.ClientRPC("FileSystem.Stat", req, &resp)
|
|
require.Nil(err)
|
|
require.NotNil(resp.Info)
|
|
require.True(resp.Info.IsDir)
|
|
}
|
|
|
|
func TestFS_Stat_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Start a server
|
|
s, root, cleanupS := nomad.TestACLServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
client, cleanup := TestClient(t, func(c *config.Config) {
|
|
c.ACLEnabled = true
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanup()
|
|
|
|
// Create a bad token
|
|
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
|
|
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
|
|
|
|
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
|
|
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
|
|
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
|
|
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "20s",
|
|
}
|
|
|
|
// Wait for client to be running job
|
|
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
|
|
|
|
cases := []struct {
|
|
Name string
|
|
Token string
|
|
ExpectedError string
|
|
}{
|
|
{
|
|
Name: "bad token",
|
|
Token: tokenBad.SecretID,
|
|
ExpectedError: structs.ErrPermissionDenied.Error(),
|
|
},
|
|
{
|
|
Name: "good token",
|
|
Token: tokenGood.SecretID,
|
|
},
|
|
{
|
|
Name: "root token",
|
|
Token: root.SecretID,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
t.Run(c.Name, func(t *testing.T) {
|
|
req := &cstructs.FsStatRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "/",
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
AuthToken: c.Token,
|
|
Namespace: structs.DefaultNamespace,
|
|
},
|
|
}
|
|
|
|
var resp cstructs.FsStatResponse
|
|
err := client.ClientRPC("FileSystem.Stat", req, &resp)
|
|
if c.ExpectedError == "" {
|
|
require.NoError(t, err)
|
|
} else {
|
|
require.NotNil(t, err)
|
|
require.Contains(t, err.Error(), c.ExpectedError)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFS_List_NoAlloc(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a client
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Make the request with bad allocation id
|
|
req := &cstructs.FsListRequest{
|
|
AllocID: uuid.Generate(),
|
|
Path: "foo",
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
var resp cstructs.FsListResponse
|
|
err := c.ClientRPC("FileSystem.List", req, &resp)
|
|
require.NotNil(err)
|
|
require.True(structs.IsErrUnknownAllocation(err))
|
|
}
|
|
|
|
func TestFS_List(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanupC()
|
|
|
|
// Create and add an alloc
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "500ms",
|
|
}
|
|
// Wait for alloc to be running
|
|
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
|
|
|
|
// Make the request
|
|
req := &cstructs.FsListRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "/",
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
var resp cstructs.FsListResponse
|
|
err := c.ClientRPC("FileSystem.List", req, &resp)
|
|
require.Nil(err)
|
|
require.NotEmpty(resp.Files)
|
|
require.True(resp.Files[0].IsDir)
|
|
}
|
|
|
|
func TestFS_List_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Start a server
|
|
s, root, cleanupS := nomad.TestACLServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
client, cleanup := TestClient(t, func(c *config.Config) {
|
|
c.ACLEnabled = true
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanup()
|
|
|
|
// Create a bad token
|
|
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
|
|
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
|
|
|
|
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
|
|
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
|
|
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
|
|
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "20s",
|
|
}
|
|
|
|
// Wait for client to be running job
|
|
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
|
|
|
|
cases := []struct {
|
|
Name string
|
|
Token string
|
|
ExpectedError string
|
|
}{
|
|
{
|
|
Name: "bad token",
|
|
Token: tokenBad.SecretID,
|
|
ExpectedError: structs.ErrPermissionDenied.Error(),
|
|
},
|
|
{
|
|
Name: "good token",
|
|
Token: tokenGood.SecretID,
|
|
},
|
|
{
|
|
Name: "root token",
|
|
Token: root.SecretID,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
t.Run(c.Name, func(t *testing.T) {
|
|
// Make the request with bad allocation id
|
|
req := &cstructs.FsListRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "/",
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
AuthToken: c.Token,
|
|
Namespace: structs.DefaultNamespace,
|
|
},
|
|
}
|
|
|
|
var resp cstructs.FsListResponse
|
|
err := client.ClientRPC("FileSystem.List", req, &resp)
|
|
if c.ExpectedError == "" {
|
|
require.NoError(t, err)
|
|
} else {
|
|
require.EqualError(t, err, c.ExpectedError)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFS_Stream_NoAlloc(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a client
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Make the request with bad allocation id
|
|
req := &cstructs.FsStreamRequest{
|
|
AllocID: uuid.Generate(),
|
|
Path: "foo",
|
|
Origin: "start",
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(3 * time.Second)
|
|
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
t.Logf("Got msg %+v", msg)
|
|
if msg.Error == nil {
|
|
continue
|
|
}
|
|
|
|
if structs.IsErrUnknownAllocation(msg.Error) {
|
|
break OUTER
|
|
} else {
|
|
t.Fatalf("bad error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFS_Stream_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Start a server
|
|
s, root, cleanupS := nomad.TestACLServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
client, cleanup := TestClient(t, func(c *config.Config) {
|
|
c.ACLEnabled = true
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanup()
|
|
|
|
// Create a bad token
|
|
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
|
|
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
|
|
|
|
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
|
|
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
|
|
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
|
|
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "20s",
|
|
}
|
|
|
|
// Wait for client to be running job
|
|
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
|
|
|
|
cases := []struct {
|
|
Name string
|
|
Token string
|
|
ExpectedError string
|
|
}{
|
|
{
|
|
Name: "bad token",
|
|
Token: tokenBad.SecretID,
|
|
ExpectedError: structs.ErrPermissionDenied.Error(),
|
|
},
|
|
{
|
|
Name: "good token",
|
|
Token: tokenGood.SecretID,
|
|
},
|
|
{
|
|
Name: "root token",
|
|
Token: root.SecretID,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
t.Run(c.Name, func(t *testing.T) {
|
|
// Make the request with bad allocation id
|
|
req := &cstructs.FsStreamRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "foo",
|
|
Origin: "start",
|
|
QueryOptions: structs.QueryOptions{
|
|
Namespace: structs.DefaultNamespace,
|
|
Region: "global",
|
|
AuthToken: c.Token,
|
|
},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := client.StreamingRpcHandler("FileSystem.Stream")
|
|
require.Nil(t, err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.NoError(t, encoder.Encode(req))
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
eof := err == io.EOF || strings.Contains(err.Error(), "closed")
|
|
if c.ExpectedError == "" && eof {
|
|
// No error was expected!
|
|
return
|
|
}
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
if msg.Error == nil {
|
|
continue
|
|
}
|
|
|
|
if strings.Contains(msg.Error.Error(), c.ExpectedError) {
|
|
break OUTER
|
|
} else {
|
|
t.Fatalf("Bad error: %v", msg.Error)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFS_Stream(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanupC()
|
|
|
|
expected := "Hello from the other side"
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "2s",
|
|
"stdout_string": expected,
|
|
}
|
|
|
|
// Wait for alloc to be running
|
|
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
|
|
|
|
// Make the request
|
|
req := &cstructs.FsStreamRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "alloc/logs/web.stdout.0",
|
|
PlainText: true,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
// Wrap the pipe so we can check it is closed
|
|
pipeChecker := &ReadWriteCloseChecker{ReadWriteCloser: p2}
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(pipeChecker)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(3 * time.Second)
|
|
received := ""
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
if msg.Error != nil {
|
|
t.Fatalf("Got error: %v", msg.Error.Error())
|
|
}
|
|
|
|
// Add the payload
|
|
received += string(msg.Payload)
|
|
if received == expected {
|
|
break OUTER
|
|
}
|
|
}
|
|
}
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
pipeChecker.l.Lock()
|
|
defer pipeChecker.l.Unlock()
|
|
|
|
return pipeChecker.Closed, nil
|
|
}, func(err error) {
|
|
t.Fatal("Pipe not closed")
|
|
})
|
|
}
|
|
|
|
type ReadWriteCloseChecker struct {
|
|
io.ReadWriteCloser
|
|
l sync.Mutex
|
|
Closed bool
|
|
}
|
|
|
|
func (r *ReadWriteCloseChecker) Close() error {
|
|
r.l.Lock()
|
|
r.Closed = true
|
|
r.l.Unlock()
|
|
return r.ReadWriteCloser.Close()
|
|
}
|
|
|
|
func TestFS_Stream_Follow(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanupC()
|
|
|
|
expectedBase := "Hello from the other side"
|
|
repeat := 10
|
|
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "20s",
|
|
"stdout_string": expectedBase,
|
|
"stdout_repeat": repeat,
|
|
"stdout_repeat_duration": "200ms",
|
|
}
|
|
|
|
// Wait for alloc to be running
|
|
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
|
|
|
|
// Make the request
|
|
req := &cstructs.FsStreamRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "alloc/logs/web.stdout.0",
|
|
PlainText: true,
|
|
Follow: true,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(20 * time.Second)
|
|
expected := strings.Repeat(expectedBase, repeat+1)
|
|
received := ""
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
if msg.Error != nil {
|
|
t.Fatalf("Got error: %v", msg.Error.Error())
|
|
}
|
|
|
|
// Add the payload
|
|
received += string(msg.Payload)
|
|
if received == expected {
|
|
break OUTER
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFS_Stream_Limit(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanup := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanup()
|
|
|
|
var limit int64 = 5
|
|
full := "Hello from the other side"
|
|
expected := full[:limit]
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "2s",
|
|
"stdout_string": full,
|
|
}
|
|
|
|
// Wait for alloc to be running
|
|
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
|
|
|
|
// Make the request
|
|
req := &cstructs.FsStreamRequest{
|
|
AllocID: alloc.ID,
|
|
Path: "alloc/logs/web.stdout.0",
|
|
PlainText: true,
|
|
Limit: limit,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(3 * time.Second)
|
|
received := ""
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
if msg.Error != nil {
|
|
t.Fatalf("Got error: %v", msg.Error.Error())
|
|
}
|
|
|
|
// Add the payload
|
|
received += string(msg.Payload)
|
|
if received == expected {
|
|
break OUTER
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFS_Logs_NoAlloc(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a client
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Make the request with bad allocation id
|
|
req := &cstructs.FsLogsRequest{
|
|
AllocID: uuid.Generate(),
|
|
Task: "foo",
|
|
LogType: "stdout",
|
|
Origin: "start",
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(3 * time.Second)
|
|
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
t.Logf("Got msg %+v", msg)
|
|
if msg.Error == nil {
|
|
continue
|
|
}
|
|
|
|
if structs.IsErrUnknownAllocation(msg.Error) {
|
|
break OUTER
|
|
} else {
|
|
t.Fatalf("bad error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestFS_Logs_TaskPending asserts that trying to stream logs for tasks which
|
|
// have not started returns a 404 error.
|
|
func TestFS_Logs_TaskPending(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanupC()
|
|
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"start_block_for": "10s",
|
|
}
|
|
|
|
// Register job
|
|
args := &structs.JobRegisterRequest{}
|
|
args.Job = job
|
|
args.WriteRequest.Region = "global"
|
|
args.Namespace = job.Namespace
|
|
var jobResp structs.JobRegisterResponse
|
|
require.NoError(s.RPC("Job.Register", args, &jobResp))
|
|
|
|
// Get the allocation ID
|
|
var allocID string
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
args := structs.AllocListRequest{}
|
|
args.Region = "global"
|
|
resp := structs.AllocListResponse{}
|
|
if err := s.RPC("Alloc.List", &args, &resp); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if len(resp.Allocations) != 1 {
|
|
return false, fmt.Errorf("expected 1 alloc, found %d", len(resp.Allocations))
|
|
}
|
|
|
|
allocID = resp.Allocations[0].ID
|
|
|
|
// wait for alloc runner to be created; otherwise, we get no alloc found error
|
|
if _, err := c.getAllocRunner(allocID); err != nil {
|
|
return false, fmt.Errorf("alloc runner was not created yet for %v", allocID)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("error getting alloc id: %v", err)
|
|
})
|
|
|
|
// Make the request
|
|
req := &cstructs.FsLogsRequest{
|
|
AllocID: allocID,
|
|
Task: job.TaskGroups[0].Tasks[0].Name,
|
|
LogType: "stdout",
|
|
Origin: "start",
|
|
PlainText: true,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatalf("unexpected stream error: %v", err)
|
|
case msg := <-streamMsg:
|
|
require.NotNil(msg.Error)
|
|
require.NotNil(msg.Error.Code)
|
|
require.EqualValues(404, *msg.Error.Code)
|
|
require.Contains(msg.Error.Message, "not started")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFS_Logs_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server
|
|
s, root, cleanupS := nomad.TestACLServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
client, cleanup := TestClient(t, func(c *config.Config) {
|
|
c.ACLEnabled = true
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanup()
|
|
|
|
// Create a bad token
|
|
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
|
|
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
|
|
|
|
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
|
|
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
|
|
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
|
|
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "20s",
|
|
}
|
|
|
|
// Wait for client to be running job
|
|
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
|
|
|
|
cases := []struct {
|
|
Name string
|
|
Token string
|
|
ExpectedError string
|
|
}{
|
|
{
|
|
Name: "bad token",
|
|
Token: tokenBad.SecretID,
|
|
ExpectedError: structs.ErrPermissionDenied.Error(),
|
|
},
|
|
{
|
|
Name: "good token",
|
|
Token: tokenGood.SecretID,
|
|
},
|
|
{
|
|
Name: "root token",
|
|
Token: root.SecretID,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
t.Run(c.Name, func(t *testing.T) {
|
|
// Make the request with bad allocation id
|
|
req := &cstructs.FsLogsRequest{
|
|
AllocID: alloc.ID,
|
|
Task: job.TaskGroups[0].Tasks[0].Name,
|
|
LogType: "stdout",
|
|
Origin: "start",
|
|
QueryOptions: structs.QueryOptions{
|
|
Namespace: structs.DefaultNamespace,
|
|
Region: "global",
|
|
AuthToken: c.Token,
|
|
},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := client.StreamingRpcHandler("FileSystem.Logs")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(5 * time.Second)
|
|
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
eof := err == io.EOF || strings.Contains(err.Error(), "closed")
|
|
if c.ExpectedError == "" && eof {
|
|
// No error was expected!
|
|
return
|
|
}
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
if msg.Error == nil {
|
|
continue
|
|
}
|
|
|
|
if strings.Contains(msg.Error.Error(), c.ExpectedError) {
|
|
// Ok! Error matched expectation.
|
|
break OUTER
|
|
} else {
|
|
t.Fatalf("Bad error: %v", msg.Error)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFS_Logs(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanupC()
|
|
|
|
expected := "Hello from the other side\n"
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "2s",
|
|
"stdout_string": expected,
|
|
}
|
|
|
|
// Wait for client to be running job
|
|
testutil.WaitForRunning(t, s.RPC, job)
|
|
|
|
// Get the allocation ID
|
|
args := structs.AllocListRequest{}
|
|
args.Region = "global"
|
|
resp := structs.AllocListResponse{}
|
|
require.NoError(s.RPC("Alloc.List", &args, &resp))
|
|
require.Len(resp.Allocations, 1)
|
|
allocID := resp.Allocations[0].ID
|
|
|
|
// Make the request
|
|
req := &cstructs.FsLogsRequest{
|
|
AllocID: allocID,
|
|
Task: job.TaskGroups[0].Tasks[0].Name,
|
|
LogType: "stdout",
|
|
Origin: "start",
|
|
PlainText: true,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
|
|
require.Nil(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(3 * time.Second)
|
|
received := ""
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
if msg.Error != nil {
|
|
t.Fatalf("Got error: %v", msg.Error.Error())
|
|
}
|
|
|
|
// Add the payload
|
|
received += string(msg.Payload)
|
|
if received == expected {
|
|
break OUTER
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFS_Logs_Follow(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
|
|
// Start a server and client
|
|
s, cleanupS := nomad.TestServer(t, nil)
|
|
defer cleanupS()
|
|
testutil.WaitForLeader(t, s.RPC)
|
|
|
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
|
c.Servers = []string{s.GetConfig().RPCAddr.String()}
|
|
})
|
|
defer cleanupC()
|
|
|
|
expectedBase := "Hello from the other side\n"
|
|
repeat := 10
|
|
|
|
job := mock.BatchJob()
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
|
"run_for": "20s",
|
|
"stdout_string": expectedBase,
|
|
"stdout_repeat": repeat,
|
|
"stdout_repeat_duration": "200ms",
|
|
}
|
|
|
|
// Wait for client to be running job
|
|
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
|
|
|
|
// Make the request
|
|
req := &cstructs.FsLogsRequest{
|
|
AllocID: alloc.ID,
|
|
Task: job.TaskGroups[0].Tasks[0].Name,
|
|
LogType: "stdout",
|
|
Origin: "start",
|
|
PlainText: true,
|
|
Follow: true,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Get the handler
|
|
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
|
|
require.NoError(err)
|
|
|
|
// Create a pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
// Start the handler
|
|
go handler(p2)
|
|
|
|
// Start the decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("error decoding: %v", err)
|
|
}
|
|
|
|
streamMsg <- &msg
|
|
}
|
|
}()
|
|
|
|
// Send the request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
require.Nil(encoder.Encode(req))
|
|
|
|
timeout := time.After(20 * time.Second)
|
|
expected := strings.Repeat(expectedBase, repeat+1)
|
|
received := ""
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timeout")
|
|
case err := <-errCh:
|
|
t.Fatal(err)
|
|
case msg := <-streamMsg:
|
|
if msg.Error != nil {
|
|
t.Fatalf("Got error: %v", msg.Error.Error())
|
|
}
|
|
|
|
// Add the payload
|
|
received += string(msg.Payload)
|
|
if received == expected {
|
|
break OUTER
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFS_findClosest(t *testing.T) {
|
|
task := "foo"
|
|
entries := []*cstructs.AllocFileInfo{
|
|
{
|
|
Name: "foo.stdout.0",
|
|
Size: 100,
|
|
},
|
|
{
|
|
Name: "foo.stdout.1",
|
|
Size: 100,
|
|
},
|
|
{
|
|
Name: "foo.stdout.2",
|
|
Size: 100,
|
|
},
|
|
{
|
|
Name: "foo.stdout.3",
|
|
Size: 100,
|
|
},
|
|
{
|
|
Name: "foo.stderr.0",
|
|
Size: 100,
|
|
},
|
|
{
|
|
Name: "foo.stderr.1",
|
|
Size: 100,
|
|
},
|
|
{
|
|
Name: "foo.stderr.2",
|
|
Size: 100,
|
|
},
|
|
}
|
|
|
|
cases := []struct {
|
|
Entries []*cstructs.AllocFileInfo
|
|
DesiredIdx int64
|
|
DesiredOffset int64
|
|
Task string
|
|
LogType string
|
|
ExpectedFile string
|
|
ExpectedIdx int64
|
|
ExpectedOffset int64
|
|
Error bool
|
|
}{
|
|
// Test error cases
|
|
{
|
|
Entries: nil,
|
|
DesiredIdx: 0,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
Error: true,
|
|
},
|
|
{
|
|
Entries: entries[0:3],
|
|
DesiredIdx: 0,
|
|
Task: task,
|
|
LogType: "stderr",
|
|
Error: true,
|
|
},
|
|
|
|
// Test beginning cases
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: 0,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[0].Name,
|
|
ExpectedIdx: 0,
|
|
},
|
|
{
|
|
// Desired offset should be ignored at edges
|
|
Entries: entries,
|
|
DesiredIdx: 0,
|
|
DesiredOffset: -100,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[0].Name,
|
|
ExpectedIdx: 0,
|
|
ExpectedOffset: 0,
|
|
},
|
|
{
|
|
// Desired offset should be ignored at edges
|
|
Entries: entries,
|
|
DesiredIdx: 1,
|
|
DesiredOffset: -1000,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[0].Name,
|
|
ExpectedIdx: 0,
|
|
ExpectedOffset: 0,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: 0,
|
|
Task: task,
|
|
LogType: "stderr",
|
|
ExpectedFile: entries[4].Name,
|
|
ExpectedIdx: 0,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: 0,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[0].Name,
|
|
ExpectedIdx: 0,
|
|
},
|
|
|
|
// Test middle cases
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: 1,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[1].Name,
|
|
ExpectedIdx: 1,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: 1,
|
|
DesiredOffset: 10,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[1].Name,
|
|
ExpectedIdx: 1,
|
|
ExpectedOffset: 10,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: 1,
|
|
DesiredOffset: 110,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[2].Name,
|
|
ExpectedIdx: 2,
|
|
ExpectedOffset: 10,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: 1,
|
|
Task: task,
|
|
LogType: "stderr",
|
|
ExpectedFile: entries[5].Name,
|
|
ExpectedIdx: 1,
|
|
},
|
|
// Test end cases
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: math.MaxInt64,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[3].Name,
|
|
ExpectedIdx: 3,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: math.MaxInt64,
|
|
DesiredOffset: math.MaxInt64,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[3].Name,
|
|
ExpectedIdx: 3,
|
|
ExpectedOffset: 100,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: math.MaxInt64,
|
|
DesiredOffset: -10,
|
|
Task: task,
|
|
LogType: "stdout",
|
|
ExpectedFile: entries[3].Name,
|
|
ExpectedIdx: 3,
|
|
ExpectedOffset: 90,
|
|
},
|
|
{
|
|
Entries: entries,
|
|
DesiredIdx: math.MaxInt64,
|
|
Task: task,
|
|
LogType: "stderr",
|
|
ExpectedFile: entries[6].Name,
|
|
ExpectedIdx: 2,
|
|
},
|
|
}
|
|
|
|
for i, c := range cases {
|
|
entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType)
|
|
if err != nil {
|
|
if !c.Error {
|
|
t.Fatalf("case %d: Unexpected error: %v", i, err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
if entry.Name != c.ExpectedFile {
|
|
t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile)
|
|
}
|
|
if idx != c.ExpectedIdx {
|
|
t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx)
|
|
}
|
|
if offset != c.ExpectedOffset {
|
|
t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFS_streamFile_NoFile(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
ad := tempAllocDir(t)
|
|
defer os.RemoveAll(ad.AllocDir)
|
|
|
|
frames := make(chan *sframer.StreamFrame, 32)
|
|
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
err := c.endpoints.FileSystem.streamFile(
|
|
context.Background(), 0, "foo", 0, ad, framer, nil)
|
|
require.NotNil(err)
|
|
if runtime.GOOS == "windows" {
|
|
require.Contains(err.Error(), "cannot find the file")
|
|
} else {
|
|
require.Contains(err.Error(), "no such file")
|
|
}
|
|
}
|
|
|
|
func TestFS_streamFile_Modify(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Get a temp alloc dir
|
|
ad := tempAllocDir(t)
|
|
defer os.RemoveAll(ad.AllocDir)
|
|
|
|
// Create a file in the temp dir
|
|
streamFile := "stream_file"
|
|
f, err := os.Create(filepath.Join(ad.AllocDir, streamFile))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create file: %v", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
data := []byte("helloworld")
|
|
|
|
// Start the reader
|
|
resultCh := make(chan struct{})
|
|
frames := make(chan *sframer.StreamFrame, 4)
|
|
go func() {
|
|
var collected []byte
|
|
for {
|
|
frame := <-frames
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
collected = append(collected, frame.Data...)
|
|
if reflect.DeepEqual(data, collected) {
|
|
resultCh <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Write a few bytes
|
|
if _, err := f.Write(data[:3]); err != nil {
|
|
t.Fatalf("write failed: %v", err)
|
|
}
|
|
|
|
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
// Start streaming
|
|
go func() {
|
|
if err := c.endpoints.FileSystem.streamFile(
|
|
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
|
|
t.Fatalf("stream() failed: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Sleep a little before writing more. This lets us check if the watch
|
|
// is working.
|
|
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
|
if _, err := f.Write(data[3:]); err != nil {
|
|
t.Fatalf("write failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case <-resultCh:
|
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
|
t.Fatalf("failed to send new data")
|
|
}
|
|
}
|
|
|
|
func TestFS_streamFile_Truncate(t *testing.T) {
|
|
t.Parallel()
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Get a temp alloc dir
|
|
ad := tempAllocDir(t)
|
|
defer os.RemoveAll(ad.AllocDir)
|
|
|
|
// Create a file in the temp dir
|
|
data := []byte("helloworld")
|
|
streamFile := "stream_file"
|
|
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
|
|
f, err := os.Create(streamFilePath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create file: %v", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
// Start the reader
|
|
truncateCh := make(chan struct{})
|
|
truncateClosed := false
|
|
dataPostTruncCh := make(chan struct{})
|
|
frames := make(chan *sframer.StreamFrame, 4)
|
|
go func() {
|
|
var collected []byte
|
|
for {
|
|
frame := <-frames
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
if frame.FileEvent == truncateEvent && !truncateClosed {
|
|
close(truncateCh)
|
|
truncateClosed = true
|
|
}
|
|
|
|
collected = append(collected, frame.Data...)
|
|
if reflect.DeepEqual(data, collected) {
|
|
close(dataPostTruncCh)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Write a few bytes
|
|
if _, err := f.Write(data[:3]); err != nil {
|
|
t.Fatalf("write failed: %v", err)
|
|
}
|
|
|
|
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
// Start streaming
|
|
go func() {
|
|
if err := c.endpoints.FileSystem.streamFile(
|
|
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
|
|
t.Fatalf("stream() failed: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Sleep a little before truncating. This lets us check if the watch
|
|
// is working.
|
|
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
|
if err := f.Truncate(0); err != nil {
|
|
t.Fatalf("truncate failed: %v", err)
|
|
}
|
|
if err := f.Sync(); err != nil {
|
|
t.Fatalf("sync failed: %v", err)
|
|
}
|
|
if err := f.Close(); err != nil {
|
|
t.Fatalf("failed to close file: %v", err)
|
|
}
|
|
|
|
f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0)
|
|
if err != nil {
|
|
t.Fatalf("failed to reopen file: %v", err)
|
|
}
|
|
defer f2.Close()
|
|
if _, err := f2.Write(data[3:5]); err != nil {
|
|
t.Fatalf("write failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case <-truncateCh:
|
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
|
t.Fatalf("did not receive truncate")
|
|
}
|
|
|
|
// Sleep a little before writing more. This lets us check if the watch
|
|
// is working.
|
|
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
|
if _, err := f2.Write(data[5:]); err != nil {
|
|
t.Fatalf("write failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case <-dataPostTruncCh:
|
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
|
t.Fatalf("did not receive post truncate data")
|
|
}
|
|
}
|
|
|
|
func TestFS_streamImpl_Delete(t *testing.T) {
|
|
if runtime.GOOS == "windows" {
|
|
t.Skip("Windows does not allow us to delete a file while it is open")
|
|
}
|
|
t.Parallel()
|
|
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Get a temp alloc dir
|
|
ad := tempAllocDir(t)
|
|
defer os.RemoveAll(ad.AllocDir)
|
|
|
|
// Create a file in the temp dir
|
|
data := []byte("helloworld")
|
|
streamFile := "stream_file"
|
|
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
|
|
f, err := os.Create(streamFilePath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create file: %v", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
// Start the reader
|
|
deleteCh := make(chan struct{})
|
|
frames := make(chan *sframer.StreamFrame, 4)
|
|
go func() {
|
|
for {
|
|
frame, ok := <-frames
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
if frame.FileEvent == deleteEvent {
|
|
close(deleteCh)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Write a few bytes
|
|
if _, err := f.Write(data[:3]); err != nil {
|
|
t.Fatalf("write failed: %v", err)
|
|
}
|
|
|
|
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
|
|
framer.Run()
|
|
defer framer.Destroy()
|
|
|
|
// Start streaming
|
|
go func() {
|
|
if err := c.endpoints.FileSystem.streamFile(
|
|
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
|
|
t.Fatalf("stream() failed: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Sleep a little before deleting. This lets us check if the watch
|
|
// is working.
|
|
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
|
|
if err := os.Remove(streamFilePath); err != nil {
|
|
t.Fatalf("delete failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case <-deleteCh:
|
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
|
t.Fatalf("did not receive delete")
|
|
}
|
|
}
|
|
|
|
func TestFS_logsImpl_NoFollow(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Get a temp alloc dir and create the log dir
|
|
ad := tempAllocDir(t)
|
|
defer os.RemoveAll(ad.AllocDir)
|
|
|
|
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
|
|
if err := os.MkdirAll(logDir, 0777); err != nil {
|
|
t.Fatalf("Failed to make log dir: %v", err)
|
|
}
|
|
|
|
// Create a series of log files in the temp dir
|
|
task := "foo"
|
|
logType := "stdout"
|
|
expected := []byte("012")
|
|
for i := 0; i < 3; i++ {
|
|
logFile := fmt.Sprintf("%s.%s.%d", task, logType, i)
|
|
logFilePath := filepath.Join(logDir, logFile)
|
|
err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create file: %v", err)
|
|
}
|
|
}
|
|
|
|
// Start the reader
|
|
resultCh := make(chan struct{})
|
|
frames := make(chan *sframer.StreamFrame, 4)
|
|
var received []byte
|
|
go func() {
|
|
for {
|
|
frame, ok := <-frames
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
received = append(received, frame.Data...)
|
|
if reflect.DeepEqual(received, expected) {
|
|
close(resultCh)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start streaming logs
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
defer cancel()
|
|
|
|
if err := c.endpoints.FileSystem.logsImpl(
|
|
ctx, false, false, 0,
|
|
OriginStart, task, logType, ad, frames); err != nil {
|
|
t.Fatalf("logsImpl failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case <-resultCh:
|
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
|
t.Fatalf("did not receive data: got %q", string(received))
|
|
}
|
|
}
|
|
|
|
func TestFS_logsImpl_Follow(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, cleanup := TestClient(t, nil)
|
|
defer cleanup()
|
|
|
|
// Get a temp alloc dir and create the log dir
|
|
ad := tempAllocDir(t)
|
|
defer os.RemoveAll(ad.AllocDir)
|
|
|
|
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
|
|
if err := os.MkdirAll(logDir, 0777); err != nil {
|
|
t.Fatalf("Failed to make log dir: %v", err)
|
|
}
|
|
|
|
// Create a series of log files in the temp dir
|
|
task := "foo"
|
|
logType := "stdout"
|
|
expected := []byte("012345")
|
|
initialWrites := 3
|
|
|
|
writeToFile := func(index int, data []byte) {
|
|
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
|
|
logFilePath := filepath.Join(logDir, logFile)
|
|
err := ioutil.WriteFile(logFilePath, data, 777)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create file: %v", err)
|
|
}
|
|
}
|
|
for i := 0; i < initialWrites; i++ {
|
|
writeToFile(i, expected[i:i+1])
|
|
}
|
|
|
|
// Start the reader
|
|
firstResultCh := make(chan struct{})
|
|
fullResultCh := make(chan struct{})
|
|
frames := make(chan *sframer.StreamFrame, 4)
|
|
var received []byte
|
|
go func() {
|
|
for {
|
|
frame, ok := <-frames
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
received = append(received, frame.Data...)
|
|
if reflect.DeepEqual(received, expected[:initialWrites]) {
|
|
close(firstResultCh)
|
|
} else if reflect.DeepEqual(received, expected) {
|
|
close(fullResultCh)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start streaming logs
|
|
go c.endpoints.FileSystem.logsImpl(
|
|
context.Background(), true, false, 0,
|
|
OriginStart, task, logType, ad, frames)
|
|
|
|
select {
|
|
case <-firstResultCh:
|
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
|
t.Fatalf("did not receive data: got %q", string(received))
|
|
}
|
|
|
|
// We got the first chunk of data, write out the rest to the next file
|
|
// at an index much ahead to check that it is following and detecting
|
|
// skips
|
|
skipTo := initialWrites + 10
|
|
writeToFile(skipTo, expected[initialWrites:])
|
|
|
|
select {
|
|
case <-fullResultCh:
|
|
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
|
|
t.Fatalf("did not receive data: got %q", string(received))
|
|
}
|
|
}
|