open-nomad/nomad/client_agent_endpoint_test.go
Drew Bailey 9b63828658
serverID to target remote leader or server
handle the case where we request a server-id which is this current server

update docs, error on node and server id params

more accurate names for tests

use shared no leader err, formatting

rm bad comment

remove redundant variable
2019-11-14 10:07:35 -05:00

459 lines
9.6 KiB
Go

package nomad
import (
"encoding/json"
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
func TestMonitor_Monitor_Remote_Client(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.GetConfig().RPCAddr.String()}
})
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
NodeID: c.NodeID(),
}
handler, err := s1.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(3 * time.Second)
expected := "[DEBUG]"
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)
received += string(frame.Data)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
}
}
}
}
func TestMonitor_Monitor_RemoteServer(t *testing.T) {
t.Parallel()
// start servers
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// determine leader and nonleader
servers := []*Server{s1, s2}
var nonLeader *Server
var leader *Server
for _, s := range servers {
if !s.IsLeader() {
nonLeader = s
} else {
leader = s
}
}
cases := []struct {
desc string
serverID string
expectedLog string
logger hclog.InterceptLogger
origin *Server
}{
{
desc: "remote leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: nonLeader,
},
{
desc: "remote server",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
},
{
desc: "serverID is current leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: leader,
},
{
desc: "serverID is current server",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "non leader log",
logger: nonLeader.logger,
origin: nonLeader,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
// send some specific logs
doneCh := make(chan struct{})
go func() {
for {
select {
case <-doneCh:
return
default:
tc.logger.Warn(tc.expectedLog)
time.Sleep(10 * time.Millisecond)
}
}
}()
req := cstructs.MonitorRequest{
LogLevel: "warn",
ServerID: tc.serverID,
}
handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(2 * time.Second)
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)
received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
}
}
}
})
}
}
func TestMonitor_MonitorServer(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server
s := TestServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
}
handler, err := s.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(1 * time.Second)
expected := "[DEBUG]"
received := ""
// send logs
go func() {
for {
s.logger.Debug("test log")
time.Sleep(100 * time.Millisecond)
}
}()
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)
received += string(frame.Data)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
}
}
}
}
func TestMonitor_Monitor_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server
s, root := TestACLServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.AgentPolicy(acl.PolicyRead)
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid", policyGood)
cases := []struct {
Name string
Token string
ExpectedErr string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedErr: "Unknown log level",
},
{
Name: "root token",
Token: root.SecretID,
ExpectedErr: "Unknown log level",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
req := &cstructs.MonitorRequest{
LogLevel: "unknown",
QueryOptions: structs.QueryOptions{
Namespace: structs.DefaultNamespace,
Region: "global",
AuthToken: tc.Token,
},
}
handler, err := s.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), tc.ExpectedErr) {
break OUTER
} else {
t.Fatalf("Bad error: %v", msg.Error)
}
}
}
})
}
}