open-nomad/client/fs_endpoint_test.go
Seth Hoenig f0c3dca49c tests: swap lib/freeport for tweaked helper/freeport
Copy the updated version of freeport (sdk/freeport), and tweak it for use
in Nomad tests. This means staying below port 10000 to avoid conflicts with
the lib/freeport that is still transitively used by the old version of
consul that we vendor. Also provide implementations to find ephemeral ports
of macOS and Windows environments.

Ports acquired through freeport are supposed to be returned to freeport,
which this change now also introduces. Many tests are modified to include
calls to a cleanup function for Server objects.

This should help quite a bit with some flakey tests, but not all of them.
Our port problems will not go away completely until we upgrade our vendor
version of consul. With Go modules, we'll probably do a 'replace' to swap
out other copies of freeport with the one now in 'nomad/helper/freeport'.
2019-12-09 08:37:32 -06:00

1985 lines
45 KiB
Go

package client
import (
"context"
"fmt"
"io"
"io/ioutil"
"math"
"net"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
// should destroy the temp dir.
func tempAllocDir(t testing.TB) *allocdir.AllocDir {
dir, err := ioutil.TempDir("", "nomadtest")
if err != nil {
t.Fatalf("TempDir() failed: %v", err)
}
if err := os.Chmod(dir, 0777); err != nil {
t.Fatalf("failed to chmod dir: %v", err)
}
return allocdir.NewAllocDir(testlog.HCLogger(t), dir)
}
type nopWriteCloser struct {
io.Writer
}
func (n nopWriteCloser) Close() error {
return nil
}
func TestFS_Stat_NoAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a client
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsStatRequest{
AllocID: uuid.Generate(),
Path: "foo",
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp cstructs.FsStatResponse
err := c.ClientRPC("FileSystem.Stat", req, &resp)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
func TestFS_Stat(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanupC()
// Create and add an alloc
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsStatRequest{
AllocID: alloc.ID,
Path: "/",
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp cstructs.FsStatResponse
err := c.ClientRPC("FileSystem.Stat", req, &resp)
require.Nil(err)
require.NotNil(resp.Info)
require.True(resp.Info.IsDir)
}
func TestFS_Stat_ACL(t *testing.T) {
t.Parallel()
// Start a server
s, root, cleanupS := nomad.TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
}
// Wait for client to be running job
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
},
{
Name: "root token",
Token: root.SecretID,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
req := &cstructs.FsStatRequest{
AllocID: alloc.ID,
Path: "/",
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: c.Token,
Namespace: structs.DefaultNamespace,
},
}
var resp cstructs.FsStatResponse
err := client.ClientRPC("FileSystem.Stat", req, &resp)
if c.ExpectedError == "" {
require.NoError(t, err)
} else {
require.NotNil(t, err)
require.Contains(t, err.Error(), c.ExpectedError)
}
})
}
}
func TestFS_List_NoAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a client
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsListRequest{
AllocID: uuid.Generate(),
Path: "foo",
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp cstructs.FsListResponse
err := c.ClientRPC("FileSystem.List", req, &resp)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
func TestFS_List(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanupC()
// Create and add an alloc
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsListRequest{
AllocID: alloc.ID,
Path: "/",
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp cstructs.FsListResponse
err := c.ClientRPC("FileSystem.List", req, &resp)
require.Nil(err)
require.NotEmpty(resp.Files)
require.True(resp.Files[0].IsDir)
}
func TestFS_List_ACL(t *testing.T) {
t.Parallel()
// Start a server
s, root, cleanupS := nomad.TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
}
// Wait for client to be running job
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
},
{
Name: "root token",
Token: root.SecretID,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request with bad allocation id
req := &cstructs.FsListRequest{
AllocID: alloc.ID,
Path: "/",
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: c.Token,
Namespace: structs.DefaultNamespace,
},
}
var resp cstructs.FsListResponse
err := client.ClientRPC("FileSystem.List", req, &resp)
if c.ExpectedError == "" {
require.NoError(t, err)
} else {
require.EqualError(t, err, c.ExpectedError)
}
})
}
}
func TestFS_Stream_NoAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a client
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsStreamRequest{
AllocID: uuid.Generate(),
Path: "foo",
Origin: "start",
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
t.Logf("Got msg %+v", msg)
if msg.Error == nil {
continue
}
if structs.IsErrUnknownAllocation(msg.Error) {
break OUTER
} else {
t.Fatalf("bad error: %v", err)
}
}
}
}
func TestFS_Stream_ACL(t *testing.T) {
t.Parallel()
// Start a server
s, root, cleanupS := nomad.TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
}
// Wait for client to be running job
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
},
{
Name: "root token",
Token: root.SecretID,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request with bad allocation id
req := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "foo",
Origin: "start",
QueryOptions: structs.QueryOptions{
Namespace: structs.DefaultNamespace,
Region: "global",
AuthToken: c.Token,
},
}
// Get the handler
handler, err := client.StreamingRpcHandler("FileSystem.Stream")
require.Nil(t, err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
errCh <- err
return
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.NoError(t, encoder.Encode(req))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
eof := err == io.EOF || strings.Contains(err.Error(), "closed")
if c.ExpectedError == "" && eof {
// No error was expected!
return
}
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), c.ExpectedError) {
break OUTER
} else {
t.Fatalf("Bad error: %v", msg.Error)
}
}
}
})
}
}
func TestFS_Stream(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanupC()
expected := "Hello from the other side"
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "2s",
"stdout_string": expected,
}
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "alloc/logs/web.stdout.0",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// Wrap the pipe so we can check it is closed
pipeChecker := &ReadWriteCloseChecker{ReadWriteCloser: p2}
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(pipeChecker)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(3 * time.Second)
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// Add the payload
received += string(msg.Payload)
if received == expected {
break OUTER
}
}
}
testutil.WaitForResult(func() (bool, error) {
pipeChecker.l.Lock()
defer pipeChecker.l.Unlock()
return pipeChecker.Closed, nil
}, func(err error) {
t.Fatal("Pipe not closed")
})
}
type ReadWriteCloseChecker struct {
io.ReadWriteCloser
l sync.Mutex
Closed bool
}
func (r *ReadWriteCloseChecker) Close() error {
r.l.Lock()
r.Closed = true
r.l.Unlock()
return r.ReadWriteCloser.Close()
}
func TestFS_Stream_Follow(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanupC()
expectedBase := "Hello from the other side"
repeat := 10
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
"stdout_string": expectedBase,
"stdout_repeat": repeat,
"stdout_repeat_duration": "200ms",
}
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "alloc/logs/web.stdout.0",
PlainText: true,
Follow: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(20 * time.Second)
expected := strings.Repeat(expectedBase, repeat+1)
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// Add the payload
received += string(msg.Payload)
if received == expected {
break OUTER
}
}
}
}
func TestFS_Stream_Limit(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanup()
var limit int64 = 5
full := "Hello from the other side"
expected := full[:limit]
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "2s",
"stdout_string": full,
}
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "alloc/logs/web.stdout.0",
PlainText: true,
Limit: limit,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(3 * time.Second)
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// Add the payload
received += string(msg.Payload)
if received == expected {
break OUTER
}
}
}
}
func TestFS_Logs_NoAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a client
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsLogsRequest{
AllocID: uuid.Generate(),
Task: "foo",
LogType: "stdout",
Origin: "start",
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
t.Logf("Got msg %+v", msg)
if msg.Error == nil {
continue
}
if structs.IsErrUnknownAllocation(msg.Error) {
break OUTER
} else {
t.Fatalf("bad error: %v", err)
}
}
}
}
// TestFS_Logs_TaskPending asserts that trying to stream logs for tasks which
// have not started returns a 404 error.
func TestFS_Logs_TaskPending(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanupC()
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"start_block_for": "10s",
}
// Register job
args := &structs.JobRegisterRequest{}
args.Job = job
args.WriteRequest.Region = "global"
args.Namespace = job.Namespace
var jobResp structs.JobRegisterResponse
require.NoError(s.RPC("Job.Register", args, &jobResp))
// Get the allocation ID
var allocID string
testutil.WaitForResult(func() (bool, error) {
args := structs.AllocListRequest{}
args.Region = "global"
resp := structs.AllocListResponse{}
if err := s.RPC("Alloc.List", &args, &resp); err != nil {
return false, err
}
if len(resp.Allocations) != 1 {
return false, fmt.Errorf("expected 1 alloc, found %d", len(resp.Allocations))
}
allocID = resp.Allocations[0].ID
// wait for alloc runner to be created; otherwise, we get no alloc found error
if _, err := c.getAllocRunner(allocID); err != nil {
return false, fmt.Errorf("alloc runner was not created yet for %v", allocID)
}
return true, nil
}, func(err error) {
t.Fatalf("error getting alloc id: %v", err)
})
// Make the request
req := &cstructs.FsLogsRequest{
AllocID: allocID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatalf("unexpected stream error: %v", err)
case msg := <-streamMsg:
require.NotNil(msg.Error)
require.NotNil(msg.Error.Code)
require.EqualValues(404, *msg.Error.Code)
require.Contains(msg.Error.Message, "not started")
return
}
}
}
func TestFS_Logs_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s, root, cleanupS := nomad.TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
}
// Wait for client to be running job
alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
},
{
Name: "root token",
Token: root.SecretID,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request with bad allocation id
req := &cstructs.FsLogsRequest{
AllocID: alloc.ID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
QueryOptions: structs.QueryOptions{
Namespace: structs.DefaultNamespace,
Region: "global",
AuthToken: c.Token,
},
}
// Get the handler
handler, err := client.StreamingRpcHandler("FileSystem.Logs")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
errCh <- err
return
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
eof := err == io.EOF || strings.Contains(err.Error(), "closed")
if c.ExpectedError == "" && eof {
// No error was expected!
return
}
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), c.ExpectedError) {
// Ok! Error matched expectation.
break OUTER
} else {
t.Fatalf("Bad error: %v", msg.Error)
}
}
}
})
}
}
func TestFS_Logs(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanupC()
expected := "Hello from the other side\n"
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "2s",
"stdout_string": expected,
}
// Wait for client to be running job
testutil.WaitForRunning(t, s.RPC, job)
// Get the allocation ID
args := structs.AllocListRequest{}
args.Region = "global"
resp := structs.AllocListResponse{}
require.NoError(s.RPC("Alloc.List", &args, &resp))
require.Len(resp.Allocations, 1)
allocID := resp.Allocations[0].ID
// Make the request
req := &cstructs.FsLogsRequest{
AllocID: allocID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(3 * time.Second)
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// Add the payload
received += string(msg.Payload)
if received == expected {
break OUTER
}
}
}
}
func TestFS_Logs_Follow(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := nomad.TestServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanupC()
expectedBase := "Hello from the other side\n"
repeat := 10
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
"stdout_string": expectedBase,
"stdout_repeat": repeat,
"stdout_repeat_duration": "200ms",
}
// Wait for client to be running job
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsLogsRequest{
AllocID: alloc.ID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
PlainText: true,
Follow: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
require.NoError(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(20 * time.Second)
expected := strings.Repeat(expectedBase, repeat+1)
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// Add the payload
received += string(msg.Payload)
if received == expected {
break OUTER
}
}
}
}
func TestFS_findClosest(t *testing.T) {
task := "foo"
entries := []*cstructs.AllocFileInfo{
{
Name: "foo.stdout.0",
Size: 100,
},
{
Name: "foo.stdout.1",
Size: 100,
},
{
Name: "foo.stdout.2",
Size: 100,
},
{
Name: "foo.stdout.3",
Size: 100,
},
{
Name: "foo.stderr.0",
Size: 100,
},
{
Name: "foo.stderr.1",
Size: 100,
},
{
Name: "foo.stderr.2",
Size: 100,
},
}
cases := []struct {
Entries []*cstructs.AllocFileInfo
DesiredIdx int64
DesiredOffset int64
Task string
LogType string
ExpectedFile string
ExpectedIdx int64
ExpectedOffset int64
Error bool
}{
// Test error cases
{
Entries: nil,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
Error: true,
},
{
Entries: entries[0:3],
DesiredIdx: 0,
Task: task,
LogType: "stderr",
Error: true,
},
// Test beginning cases
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
},
{
// Desired offset should be ignored at edges
Entries: entries,
DesiredIdx: 0,
DesiredOffset: -100,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
ExpectedOffset: 0,
},
{
// Desired offset should be ignored at edges
Entries: entries,
DesiredIdx: 1,
DesiredOffset: -1000,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
ExpectedOffset: 0,
},
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stderr",
ExpectedFile: entries[4].Name,
ExpectedIdx: 0,
},
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
},
// Test middle cases
{
Entries: entries,
DesiredIdx: 1,
Task: task,
LogType: "stdout",
ExpectedFile: entries[1].Name,
ExpectedIdx: 1,
},
{
Entries: entries,
DesiredIdx: 1,
DesiredOffset: 10,
Task: task,
LogType: "stdout",
ExpectedFile: entries[1].Name,
ExpectedIdx: 1,
ExpectedOffset: 10,
},
{
Entries: entries,
DesiredIdx: 1,
DesiredOffset: 110,
Task: task,
LogType: "stdout",
ExpectedFile: entries[2].Name,
ExpectedIdx: 2,
ExpectedOffset: 10,
},
{
Entries: entries,
DesiredIdx: 1,
Task: task,
LogType: "stderr",
ExpectedFile: entries[5].Name,
ExpectedIdx: 1,
},
// Test end cases
{
Entries: entries,
DesiredIdx: math.MaxInt64,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
DesiredOffset: math.MaxInt64,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
ExpectedOffset: 100,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
DesiredOffset: -10,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
ExpectedOffset: 90,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
Task: task,
LogType: "stderr",
ExpectedFile: entries[6].Name,
ExpectedIdx: 2,
},
}
for i, c := range cases {
entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType)
if err != nil {
if !c.Error {
t.Fatalf("case %d: Unexpected error: %v", i, err)
}
continue
}
if entry.Name != c.ExpectedFile {
t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile)
}
if idx != c.ExpectedIdx {
t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx)
}
if offset != c.ExpectedOffset {
t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset)
}
}
}
func TestFS_streamFile_NoFile(t *testing.T) {
t.Parallel()
require := require.New(t)
c, cleanup := TestClient(t, nil)
defer cleanup()
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
frames := make(chan *sframer.StreamFrame, 32)
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, "foo", 0, ad, framer, nil)
require.NotNil(err)
if runtime.GOOS == "windows" {
require.Contains(err.Error(), "cannot find the file")
} else {
require.Contains(err.Error(), "no such file")
}
}
func TestFS_streamFile_Modify(t *testing.T) {
t.Parallel()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
// Create a file in the temp dir
streamFile := "stream_file"
f, err := os.Create(filepath.Join(ad.AllocDir, streamFile))
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
defer f.Close()
data := []byte("helloworld")
// Start the reader
resultCh := make(chan struct{})
frames := make(chan *sframer.StreamFrame, 4)
go func() {
var collected []byte
for {
frame := <-frames
if frame.IsHeartbeat() {
continue
}
collected = append(collected, frame.Data...)
if reflect.DeepEqual(data, collected) {
resultCh <- struct{}{}
return
}
}
}()
// Write a few bytes
if _, err := f.Write(data[:3]); err != nil {
t.Fatalf("write failed: %v", err)
}
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
// Sleep a little before writing more. This lets us check if the watch
// is working.
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
if _, err := f.Write(data[3:]); err != nil {
t.Fatalf("write failed: %v", err)
}
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
t.Fatalf("failed to send new data")
}
}
func TestFS_streamFile_Truncate(t *testing.T) {
t.Parallel()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
// Create a file in the temp dir
data := []byte("helloworld")
streamFile := "stream_file"
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
f, err := os.Create(streamFilePath)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
defer f.Close()
// Start the reader
truncateCh := make(chan struct{})
truncateClosed := false
dataPostTruncCh := make(chan struct{})
frames := make(chan *sframer.StreamFrame, 4)
go func() {
var collected []byte
for {
frame := <-frames
if frame.IsHeartbeat() {
continue
}
if frame.FileEvent == truncateEvent && !truncateClosed {
close(truncateCh)
truncateClosed = true
}
collected = append(collected, frame.Data...)
if reflect.DeepEqual(data, collected) {
close(dataPostTruncCh)
return
}
}
}()
// Write a few bytes
if _, err := f.Write(data[:3]); err != nil {
t.Fatalf("write failed: %v", err)
}
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
// Sleep a little before truncating. This lets us check if the watch
// is working.
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
if err := f.Truncate(0); err != nil {
t.Fatalf("truncate failed: %v", err)
}
if err := f.Sync(); err != nil {
t.Fatalf("sync failed: %v", err)
}
if err := f.Close(); err != nil {
t.Fatalf("failed to close file: %v", err)
}
f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0)
if err != nil {
t.Fatalf("failed to reopen file: %v", err)
}
defer f2.Close()
if _, err := f2.Write(data[3:5]); err != nil {
t.Fatalf("write failed: %v", err)
}
select {
case <-truncateCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
t.Fatalf("did not receive truncate")
}
// Sleep a little before writing more. This lets us check if the watch
// is working.
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
if _, err := f2.Write(data[5:]); err != nil {
t.Fatalf("write failed: %v", err)
}
select {
case <-dataPostTruncCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
t.Fatalf("did not receive post truncate data")
}
}
func TestFS_streamImpl_Delete(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Windows does not allow us to delete a file while it is open")
}
t.Parallel()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
// Create a file in the temp dir
data := []byte("helloworld")
streamFile := "stream_file"
streamFilePath := filepath.Join(ad.AllocDir, streamFile)
f, err := os.Create(streamFilePath)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
defer f.Close()
// Start the reader
deleteCh := make(chan struct{})
frames := make(chan *sframer.StreamFrame, 4)
go func() {
for {
frame, ok := <-frames
if !ok {
return
}
if frame.IsHeartbeat() {
continue
}
if frame.FileEvent == deleteEvent {
close(deleteCh)
return
}
}
}()
// Write a few bytes
if _, err := f.Write(data[:3]); err != nil {
t.Fatalf("write failed: %v", err)
}
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
// Sleep a little before deleting. This lets us check if the watch
// is working.
time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
if err := os.Remove(streamFilePath); err != nil {
t.Fatalf("delete failed: %v", err)
}
select {
case <-deleteCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
t.Fatalf("did not receive delete")
}
}
func TestFS_logsImpl_NoFollow(t *testing.T) {
t.Parallel()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir and create the log dir
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
if err := os.MkdirAll(logDir, 0777); err != nil {
t.Fatalf("Failed to make log dir: %v", err)
}
// Create a series of log files in the temp dir
task := "foo"
logType := "stdout"
expected := []byte("012")
for i := 0; i < 3; i++ {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, i)
logFilePath := filepath.Join(logDir, logFile)
err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
// Start the reader
resultCh := make(chan struct{})
frames := make(chan *sframer.StreamFrame, 4)
var received []byte
go func() {
for {
frame, ok := <-frames
if !ok {
return
}
if frame.IsHeartbeat() {
continue
}
received = append(received, frame.Data...)
if reflect.DeepEqual(received, expected) {
close(resultCh)
return
}
}
}()
// Start streaming logs
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := c.endpoints.FileSystem.logsImpl(
ctx, false, false, 0,
OriginStart, task, logType, ad, frames); err != nil {
t.Fatalf("logsImpl failed: %v", err)
}
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
t.Fatalf("did not receive data: got %q", string(received))
}
}
func TestFS_logsImpl_Follow(t *testing.T) {
t.Parallel()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir and create the log dir
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
if err := os.MkdirAll(logDir, 0777); err != nil {
t.Fatalf("Failed to make log dir: %v", err)
}
// Create a series of log files in the temp dir
task := "foo"
logType := "stdout"
expected := []byte("012345")
initialWrites := 3
writeToFile := func(index int, data []byte) {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
logFilePath := filepath.Join(logDir, logFile)
err := ioutil.WriteFile(logFilePath, data, 777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
for i := 0; i < initialWrites; i++ {
writeToFile(i, expected[i:i+1])
}
// Start the reader
firstResultCh := make(chan struct{})
fullResultCh := make(chan struct{})
frames := make(chan *sframer.StreamFrame, 4)
var received []byte
go func() {
for {
frame, ok := <-frames
if !ok {
return
}
if frame.IsHeartbeat() {
continue
}
received = append(received, frame.Data...)
if reflect.DeepEqual(received, expected[:initialWrites]) {
close(firstResultCh)
} else if reflect.DeepEqual(received, expected) {
close(fullResultCh)
return
}
}
}()
// Start streaming logs
go c.endpoints.FileSystem.logsImpl(
context.Background(), true, false, 0,
OriginStart, task, logType, ad, frames)
select {
case <-firstResultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
t.Fatalf("did not receive data: got %q", string(received))
}
// We got the first chunk of data, write out the rest to the next file
// at an index much ahead to check that it is following and detecting
// skips
skipTo := initialWrites + 10
writeToFile(skipTo, expected[initialWrites:])
select {
case <-fullResultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
t.Fatalf("did not receive data: got %q", string(received))
}
}