package nomad import ( "fmt" "io" "net" "strings" "testing" "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" codec "github.com/ugorji/go/codec" ) func TestClientFS_List_Local(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) defer cleanupC() // Force an allocation onto the node a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state := s.State() require.Nil(state.UpsertJob(999, a.Job)) require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Make the request without having a node-id req := &cstructs.FsListRequest{ Path: "/", QueryOptions: structs.QueryOptions{Region: "global"}, } // Fetch the response var resp cstructs.FsListResponse err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp) require.NotNil(err) require.Contains(err.Error(), "missing") // Fetch the response setting the alloc id req.AllocID = a.ID var resp2 cstructs.FsListResponse err = msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp2) require.Nil(err) require.NotEmpty(resp2.Files) } func TestClientFS_List_ACL(t *testing.T) { t.Parallel() // Start a server s, root, cleanupS := TestACLServer(t, nil) defer cleanupS() codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) // Create a bad token policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny}) tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) // Upsert the allocation state := s.State() alloc := mock.Alloc() require.NoError(t, state.UpsertJob(1010, alloc.Job)) require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc})) cases := []struct { Name string Token string ExpectedError string }{ { Name: "bad token", Token: tokenBad.SecretID, ExpectedError: structs.ErrPermissionDenied.Error(), }, { Name: "good token", Token: tokenGood.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, { Name: "root token", Token: root.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, } for _, c := range cases { t.Run(c.Name, func(t *testing.T) { // Make the request req := &cstructs.FsListRequest{ AllocID: alloc.ID, Path: "/", QueryOptions: structs.QueryOptions{ Region: "global", Namespace: structs.DefaultNamespace, AuthToken: c.Token, }, } // Fetch the response var resp cstructs.FsListResponse err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp) require.NotNil(t, err) require.Contains(t, err.Error(), c.ExpectedError) }) } } func TestClientFS_List_Remote(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS2() TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) defer cleanupC() // Force an allocation onto the node a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state1 := s1.State() state2 := s2.State() require.Nil(state1.UpsertJob(999, a.Job)) require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a})) require.Nil(state2.UpsertJob(999, a.Job)) require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state2.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Force remove the connection locally in case it exists s1.nodeConnsLock.Lock() delete(s1.nodeConns, c.NodeID()) s1.nodeConnsLock.Unlock() // Make the request without having a node-id req := &cstructs.FsListRequest{ AllocID: a.ID, Path: "/", QueryOptions: structs.QueryOptions{Region: "global"}, } // Fetch the response var resp cstructs.FsListResponse err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp) require.Nil(err) require.NotEmpty(resp.Files) } func TestClientFS_Stat_OldNode(t *testing.T) { t.Parallel() require := require.New(t) // Start a server s, cleanupS := TestServer(t, nil) defer cleanupS() state := s.State() codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) // Test for an old version error node := mock.Node() node.Attributes["nomad.version"] = "0.7.1" require.Nil(state.UpsertNode(1005, node)) alloc := mock.Alloc() alloc.NodeID = node.ID require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc})) req := &cstructs.FsStatRequest{ AllocID: alloc.ID, Path: "/", QueryOptions: structs.QueryOptions{Region: "global"}, } var resp cstructs.FsStatResponse err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) require.True(structs.IsErrNodeLacksRpc(err), err.Error()) } func TestClientFS_Stat_Local(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) defer cleanupC() // Force an allocation onto the node a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state := s.State() require.Nil(state.UpsertJob(999, a.Job)) require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Make the request without having a node-id req := &cstructs.FsStatRequest{ Path: "/", QueryOptions: structs.QueryOptions{Region: "global"}, } // Fetch the response var resp cstructs.FsStatResponse err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) require.NotNil(err) require.Contains(err.Error(), "missing") // Fetch the response setting the alloc id req.AllocID = a.ID var resp2 cstructs.FsStatResponse err = msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp2) require.Nil(err) require.NotNil(resp2.Info) } func TestClientFS_Stat_ACL(t *testing.T) { t.Parallel() // Start a server s, root, cleanupS := TestACLServer(t, nil) defer cleanupS() codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) // Create a bad token policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny}) tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) // Upsert the allocation state := s.State() alloc := mock.Alloc() require.NoError(t, state.UpsertJob(1010, alloc.Job)) require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc})) cases := []struct { Name string Token string ExpectedError string }{ { Name: "bad token", Token: tokenBad.SecretID, ExpectedError: structs.ErrPermissionDenied.Error(), }, { Name: "good token", Token: tokenGood.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, { Name: "root token", Token: root.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, } for _, c := range cases { t.Run(c.Name, func(t *testing.T) { // Make the request req := &cstructs.FsStatRequest{ AllocID: alloc.ID, Path: "/", QueryOptions: structs.QueryOptions{ Region: "global", Namespace: structs.DefaultNamespace, AuthToken: c.Token, }, } // Fetch the response var resp cstructs.FsStatResponse err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) require.NotNil(t, err) require.Contains(t, err.Error(), c.ExpectedError) }) } } func TestClientFS_Stat_Remote(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS2() TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) defer cleanup() // Force an allocation onto the node a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state1 := s1.State() state2 := s2.State() require.Nil(state1.UpsertJob(999, a.Job)) require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a})) require.Nil(state2.UpsertJob(999, a.Job)) require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state2.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Force remove the connection locally in case it exists s1.nodeConnsLock.Lock() delete(s1.nodeConns, c.NodeID()) s1.nodeConnsLock.Unlock() // Make the request without having a node-id req := &cstructs.FsStatRequest{ AllocID: a.ID, Path: "/", QueryOptions: structs.QueryOptions{Region: "global"}, } // Fetch the response var resp cstructs.FsStatResponse err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) require.Nil(err) require.NotNil(resp.Info) } func TestClientFS_Streaming_NoAlloc(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) // Make the request with bad allocation id req := &cstructs.FsStreamRequest{ AllocID: uuid.Generate(), QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Stream") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(5 * time.Second) OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error == nil { continue } if structs.IsErrUnknownAllocation(msg.Error) { break OUTER } } } } func TestClientFS_Streaming_ACL(t *testing.T) { t.Parallel() // Start a server s, root, cleanupS := TestACLServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) // Create a bad token policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS}) tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) // Upsert the allocation state := s.State() alloc := mock.Alloc() require.NoError(t, state.UpsertJob(1010, alloc.Job)) require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc})) cases := []struct { Name string Token string ExpectedError string }{ { Name: "bad token", Token: tokenBad.SecretID, ExpectedError: structs.ErrPermissionDenied.Error(), }, { Name: "good token", Token: tokenGood.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, { Name: "root token", Token: root.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, } for _, c := range cases { t.Run(c.Name, func(t *testing.T) { // Make the request with bad allocation id req := &cstructs.FsStreamRequest{ AllocID: alloc.ID, QueryOptions: structs.QueryOptions{ Namespace: structs.DefaultNamespace, Region: "global", AuthToken: c.Token, }, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Stream") require.NoError(t, err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.NoError(t, encoder.Encode(req)) timeout := time.After(5 * time.Second) OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error == nil { continue } if strings.Contains(msg.Error.Error(), c.ExpectedError) { break OUTER } else { t.Fatalf("Bad error: %v", msg.Error) } } } }) } } func TestClientFS_Streaming_Local(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state := s.State() require.Nil(state.UpsertJob(999, a.Job)) require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Make the request req := &cstructs.FsStreamRequest{ AllocID: a.ID, Path: "alloc/logs/web.stdout.0", Origin: "start", PlainText: true, QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Stream") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(3 * time.Second) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } } func TestClientFS_Streaming_Local_Follow(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) defer cleanupC() // Force an allocation onto the node expectedBase := "Hello from the other side" repeat := 10 a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "3s", "stdout_string": expectedBase, "stdout_repeat": repeat, "stdout_repeat_duration": "200ms", }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state := s.State() require.Nil(state.UpsertJob(999, a.Job)) require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusRunning { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err) }) // Make the request req := &cstructs.FsStreamRequest{ AllocID: a.ID, Path: "alloc/logs/web.stdout.0", Origin: "start", PlainText: true, Follow: true, QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Stream") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(20 * time.Second) expected := strings.Repeat(expectedBase, repeat+1) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } } func TestClientFS_Streaming_Remote_Server(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS2() TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) defer cleanupC() // Force an allocation onto the node expected := "Hello from the other side" a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state1 := s1.State() state2 := s2.State() require.Nil(state1.UpsertJob(999, a.Job)) require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a})) require.Nil(state2.UpsertJob(999, a.Job)) require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state2.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Force remove the connection locally in case it exists s1.nodeConnsLock.Lock() delete(s1.nodeConns, c.NodeID()) s1.nodeConnsLock.Unlock() // Make the request req := &cstructs.FsStreamRequest{ AllocID: a.ID, Path: "alloc/logs/web.stdout.0", Origin: "start", PlainText: true, QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s1.StreamingRpcHandler("FileSystem.Stream") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(3 * time.Second) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } } func TestClientFS_Streaming_Remote_Region(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s1, cleanupS1 := TestServer(t, nil) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { c.Region = "two" }) defer cleanupS2() TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} c.Region = "two" }) defer cleanupC() // Force an allocation onto the node expected := "Hello from the other side" a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a client") }) // Upsert the allocation state2 := s2.State() require.Nil(state2.UpsertJob(999, a.Job)) require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state2.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Force remove the connection locally in case it exists s1.nodeConnsLock.Lock() delete(s1.nodeConns, c.NodeID()) s1.nodeConnsLock.Unlock() // Make the request req := &cstructs.FsStreamRequest{ AllocID: a.ID, Path: "alloc/logs/web.stdout.0", Origin: "start", PlainText: true, QueryOptions: structs.QueryOptions{Region: "two"}, } // Get the handler handler, err := s1.StreamingRpcHandler("FileSystem.Stream") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(3 * time.Second) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } } func TestClientFS_Logs_NoAlloc(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) // Make the request with bad allocation id req := &cstructs.FsLogsRequest{ AllocID: uuid.Generate(), QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Logs") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(5 * time.Second) OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error == nil { continue } if structs.IsErrUnknownAllocation(msg.Error) { break OUTER } } } } func TestClientFS_Logs_OldNode(t *testing.T) { t.Parallel() require := require.New(t) // Start a server s, cleanupS := TestServer(t, nil) defer cleanupS() state := s.State() testutil.WaitForLeader(t, s.RPC) // Test for an old version error node := mock.Node() node.Attributes["nomad.version"] = "0.7.1" require.Nil(state.UpsertNode(1005, node)) alloc := mock.Alloc() alloc.NodeID = node.ID require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc})) req := &cstructs.FsLogsRequest{ AllocID: alloc.ID, QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Logs") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(5 * time.Second) OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error == nil { continue } if structs.IsErrNodeLacksRpc(msg.Error) { break OUTER } } } } func TestClientFS_Logs_ACL(t *testing.T) { t.Parallel() // Start a server s, root, cleanupS := TestACLServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) // Create a bad token policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS}) tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) // Upsert the allocation state := s.State() alloc := mock.Alloc() require.NoError(t, state.UpsertJob(1010, alloc.Job)) require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc})) cases := []struct { Name string Token string ExpectedError string }{ { Name: "bad token", Token: tokenBad.SecretID, ExpectedError: structs.ErrPermissionDenied.Error(), }, { Name: "good token", Token: tokenGood.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, { Name: "root token", Token: root.SecretID, ExpectedError: structs.ErrUnknownNodePrefix, }, } for _, c := range cases { t.Run(c.Name, func(t *testing.T) { // Make the request with bad allocation id req := &cstructs.FsLogsRequest{ AllocID: alloc.ID, QueryOptions: structs.QueryOptions{ Namespace: structs.DefaultNamespace, Region: "global", AuthToken: c.Token, }, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Logs") require.NoError(t, err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.NoError(t, encoder.Encode(req)) timeout := time.After(5 * time.Second) OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error == nil { continue } if strings.Contains(msg.Error.Error(), c.ExpectedError) { break OUTER } else { t.Fatalf("Bad error: %v", msg.Error) } } } }) } } func TestClientFS_Logs_Local(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) defer cleanupC() // Force an allocation onto the node expected := "Hello from the other side" a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state := s.State() require.Nil(state.UpsertJob(999, a.Job)) require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Make the request req := &cstructs.FsLogsRequest{ AllocID: a.ID, Task: a.Job.TaskGroups[0].Tasks[0].Name, LogType: "stdout", Origin: "start", PlainText: true, QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Logs") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(3 * time.Second) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } } func TestClientFS_Logs_Local_Follow(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s, cleanupS := TestServer(t, nil) defer cleanupS() testutil.WaitForLeader(t, s.RPC) c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) defer cleanup() // Force an allocation onto the node expectedBase := "Hello from the other side" repeat := 10 a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "20s", "stdout_string": expectedBase, "stdout_repeat": repeat, "stdout_repeat_duration": "200ms", }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state := s.State() require.Nil(state.UpsertJob(999, a.Job)) require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusRunning { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err) }) // Make the request req := &cstructs.FsLogsRequest{ AllocID: a.ID, Task: a.Job.TaskGroups[0].Tasks[0].Name, LogType: "stdout", Origin: "start", PlainText: true, Follow: true, QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s.StreamingRpcHandler("FileSystem.Logs") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(20 * time.Second) expected := strings.Repeat(expectedBase, repeat+1) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } } func TestClientFS_Logs_Remote_Server(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) defer cleanupS2() TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a clients") }) // Upsert the allocation state1 := s1.State() state2 := s2.State() require.Nil(state1.UpsertJob(999, a.Job)) require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a})) require.Nil(state2.UpsertJob(999, a.Job)) require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state2.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Force remove the connection locally in case it exists s1.nodeConnsLock.Lock() delete(s1.nodeConns, c.NodeID()) s1.nodeConnsLock.Unlock() // Make the request req := &cstructs.FsLogsRequest{ AllocID: a.ID, Task: a.Job.TaskGroups[0].Tasks[0].Name, LogType: "stdout", Origin: "start", PlainText: true, QueryOptions: structs.QueryOptions{Region: "global"}, } // Get the handler handler, err := s1.StreamingRpcHandler("FileSystem.Logs") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(3 * time.Second) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } } func TestClientFS_Logs_Remote_Region(t *testing.T) { t.Parallel() require := require.New(t) // Start a server and client s1, cleanupS1 := TestServer(t, nil) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { c.Region = "two" }) defer cleanupS2() TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} c.Region = "two" }) defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" a := mock.Alloc() a.Job.Type = structs.JobTypeBatch a.NodeID = c.NodeID() a.Job.TaskGroups[0].Count = 1 a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ "run_for": "2s", "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, }, } // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() return len(nodes) == 1, nil }, func(err error) { t.Fatalf("should have a client") }) // Upsert the allocation state2 := s2.State() require.Nil(state2.UpsertJob(999, a.Job)) require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) // Wait for the client to run the allocation testutil.WaitForResult(func() (bool, error) { alloc, err := state2.AllocByID(nil, a.ID) if err != nil { return false, err } if alloc == nil { return false, fmt.Errorf("unknown alloc") } if alloc.ClientStatus != structs.AllocClientStatusComplete { return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) } return true, nil }, func(err error) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) // Force remove the connection locally in case it exists s1.nodeConnsLock.Lock() delete(s1.nodeConns, c.NodeID()) s1.nodeConnsLock.Unlock() // Make the request req := &cstructs.FsLogsRequest{ AllocID: a.ID, Task: a.Job.TaskGroups[0].Tasks[0].Name, LogType: "stdout", Origin: "start", PlainText: true, QueryOptions: structs.QueryOptions{Region: "two"}, } // Get the handler handler, err := s1.StreamingRpcHandler("FileSystem.Logs") require.Nil(err) // Create a pipe p1, p2 := net.Pipe() defer p1.Close() defer p2.Close() errCh := make(chan error) streamMsg := make(chan *cstructs.StreamErrWrapper) // Start the handler go handler(p2) // Start the decoder go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { var msg cstructs.StreamErrWrapper if err := decoder.Decode(&msg); err != nil { if err == io.EOF || strings.Contains(err.Error(), "closed") { return } errCh <- fmt.Errorf("error decoding: %v", err) } streamMsg <- &msg } }() // Send the request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) timeout := time.After(3 * time.Second) received := "" OUTER: for { select { case <-timeout: t.Fatal("timeout") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } // Add the payload received += string(msg.Payload) if received == expected { break OUTER } } } }