open-nomad/api/nodes_test.go

617 lines
15 KiB
Go

package api
import (
"context"
"fmt"
"sort"
"testing"
"time"
"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)
func queryNodeList(t *testing.T, nodes *Nodes) ([]*NodeListStub, *QueryMeta) {
var (
nodeListStub []*NodeListStub
queryMeta *QueryMeta
err error
)
f := func() error {
nodeListStub, queryMeta, err = nodes.List(nil)
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
if len(nodeListStub) == 0 {
return fmt.Errorf("no nodes yet")
}
return nil
}
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(f),
wait.Timeout(10*time.Second),
wait.Gap(1*time.Second),
))
return nodeListStub, queryMeta
}
func oneNodeFromNodeList(t *testing.T, nodes *Nodes) *NodeListStub {
nodeListStub, _ := queryNodeList(t, nodes)
must.Len(t, 1, nodeListStub, must.Sprint("expected 1 node"))
return nodeListStub[0]
}
func TestNodes_List(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
nodeListStub, queryMeta := queryNodeList(t, nodes)
must.Len(t, 1, nodeListStub)
// Check that we got valid QueryMeta.
assertQueryMeta(t, queryMeta)
}
func TestNodes_PrefixList(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Get the node ID
nodeID := oneNodeFromNodeList(t, nodes).ID
// Find node based on four character prefix
out, qm, err := nodes.PrefixList(nodeID[:4])
must.NoError(t, err)
must.Len(t, 1, out, must.Sprint("expected only 1 node"))
// Check that we got valid QueryMeta.
assertQueryMeta(t, qm)
}
// TestNodes_List_Resources asserts that ?resources=true includes allocated and
// reserved resources in the response.
func TestNodes_List_Resources(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
node := oneNodeFromNodeList(t, nodes)
// By default resources should *not* be included
must.Nil(t, node.NodeResources)
must.Nil(t, node.ReservedResources)
qo := &QueryOptions{
Params: map[string]string{"resources": "true"},
}
out, _, err := nodes.List(qo)
must.NoError(t, err)
must.NotNil(t, out[0].NodeResources)
must.NotNil(t, out[0].ReservedResources)
}
func TestNodes_Info(t *testing.T) {
testutil.Parallel(t)
startTime := time.Now().Unix()
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Retrieving a nonexistent node returns error
_, _, infoErr := nodes.Info("12345678-abcd-efab-cdef-123456789abc", nil)
must.ErrorContains(t, infoErr, "not found")
// Get the node ID and DC
node := oneNodeFromNodeList(t, nodes)
nodeID, dc := node.ID, node.Datacenter
// Querying for existing nodes returns properly
result, qm, err := nodes.Info(nodeID, nil)
must.NoError(t, err)
assertQueryMeta(t, qm)
// Check that the result is what we expect
must.Eq(t, nodeID, result.ID)
must.Eq(t, dc, result.Datacenter)
must.Eq(t, 20000, result.NodeResources.MinDynamicPort)
must.Eq(t, 32000, result.NodeResources.MaxDynamicPort)
// Check that the StatusUpdatedAt field is being populated correctly
must.Less(t, result.StatusUpdatedAt, startTime)
// check we have at least one event
must.GreaterEq(t, 1, len(result.Events))
}
func TestNodes_NoSecretID(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Get the node ID
nodeID := oneNodeFromNodeList(t, nodes).ID
// perform a raw http call and make sure that:
// - "ID" to make sure that raw decoding is working correctly
// - "SecretID" to make sure it's not present
resp := make(map[string]interface{})
_, err := c.query("/v1/node/"+nodeID, &resp, nil)
must.NoError(t, err)
must.Eq(t, nodeID, resp["ID"].(string))
must.Eq(t, "", resp["SecretID"])
}
func TestNodes_ToggleDrain(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Wait for node registration and get the ID
nodeID := oneNodeFromNodeList(t, nodes).ID
// Check for drain mode
out, _, err := nodes.Info(nodeID, nil)
must.NoError(t, err)
must.False(t, out.Drain)
must.Nil(t, out.LastDrain)
// Toggle it on
timeBeforeDrain := time.Now().Add(-1 * time.Second)
spec := &DrainSpec{
Deadline: 10 * time.Second,
}
drainMeta := map[string]string{
"reason": "this node needs to go",
}
drainOut, err := nodes.UpdateDrainOpts(nodeID, &DrainOptions{
DrainSpec: spec,
MarkEligible: false,
Meta: drainMeta,
}, nil)
must.NoError(t, err)
assertWriteMeta(t, &drainOut.WriteMeta)
// Drain may have completed before we can check, use event stream
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streamCh, err := c.EventStream().Stream(ctx, map[Topic][]string{
TopicNode: {nodeID},
}, 0, nil)
must.NoError(t, err)
// we expect to see the node change to Drain:true and then back to Drain:false+ineligible
var sawDraining, sawDrainComplete uint64
for sawDrainComplete == 0 {
select {
case events := <-streamCh:
must.NoError(t, events.Err)
for _, e := range events.Events {
node, err := e.Node()
must.NoError(t, err)
must.Eq(t, node.DrainStrategy != nil, node.Drain)
must.True(t, !node.Drain || node.SchedulingEligibility == NodeSchedulingIneligible) // node.Drain => "ineligible"
if node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible {
must.NotNil(t, node.LastDrain)
must.Eq(t, DrainStatusDraining, node.LastDrain.Status)
now := time.Now()
must.False(t, node.LastDrain.StartedAt.Before(timeBeforeDrain))
must.False(t, node.LastDrain.StartedAt.After(now))
must.Eq(t, drainMeta, node.LastDrain.Meta)
sawDraining = node.ModifyIndex
} else if sawDraining != 0 && !node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible {
must.NotNil(t, node.LastDrain)
must.Eq(t, DrainStatusComplete, node.LastDrain.Status)
must.True(t, !node.LastDrain.UpdatedAt.Before(node.LastDrain.StartedAt))
must.Eq(t, drainMeta, node.LastDrain.Meta)
sawDrainComplete = node.ModifyIndex
}
}
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("waiting on stream event that never happened"))
}
}
// Toggle off again
drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil)
must.NoError(t, err)
assertWriteMeta(t, &drainOut.WriteMeta)
// Check again
out, _, err = nodes.Info(nodeID, nil)
must.NoError(t, err)
must.False(t, out.Drain)
must.Nil(t, out.DrainStrategy)
must.Eq(t, NodeSchedulingEligible, out.SchedulingEligibility)
}
func TestNodes_ToggleEligibility(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Get node ID
nodeID := oneNodeFromNodeList(t, nodes).ID
// Check for eligibility
out, _, err := nodes.Info(nodeID, nil)
must.NoError(t, err)
must.Eq(t, NodeSchedulingEligible, out.SchedulingEligibility)
// Toggle it off
eligOut, err := nodes.ToggleEligibility(nodeID, false, nil)
must.NoError(t, err)
assertWriteMeta(t, &eligOut.WriteMeta)
// Check again
out, _, err = nodes.Info(nodeID, nil)
must.NoError(t, err)
must.Eq(t, NodeSchedulingIneligible, out.SchedulingEligibility)
// Toggle on
eligOut, err = nodes.ToggleEligibility(nodeID, true, nil)
must.NoError(t, err)
assertWriteMeta(t, &eligOut.WriteMeta)
// Check again
out, _, err = nodes.Info(nodeID, nil)
must.NoError(t, err)
must.Eq(t, NodeSchedulingEligible, out.SchedulingEligibility)
must.Nil(t, out.DrainStrategy)
}
func TestNodes_Allocations(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Looking up by a nonexistent node returns nothing. We
// don't check the index here because it's possible the node
// has already registered, in which case we will get a non-
// zero result anyways.
allocations, _, err := nodes.Allocations("nope", nil)
must.NoError(t, err)
must.Len(t, 0, allocations)
}
func TestNodes_ForceEvaluate(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Force-eval on a nonexistent node fails
_, _, err := nodes.ForceEvaluate("12345678-abcd-efab-cdef-123456789abc", nil)
must.ErrorContains(t, err, "not found")
// Wait for node registration and get the ID
nodeID := oneNodeFromNodeList(t, nodes).ID
// Try force-eval again. We don't check the WriteMeta because
// there are no allocations to process, so we would get an index
// of zero. Same goes for the eval ID.
_, _, err = nodes.ForceEvaluate(nodeID, nil)
must.NoError(t, err)
}
func TestNodes_Sort(t *testing.T) {
testutil.Parallel(t)
nodes := []*NodeListStub{
{CreateIndex: 2},
{CreateIndex: 1},
{CreateIndex: 5},
}
sort.Sort(NodeIndexSort(nodes))
expect := []*NodeListStub{
{CreateIndex: 5},
{CreateIndex: 2},
{CreateIndex: 1},
}
must.Eq(t, expect, nodes)
}
// Unittest monitorDrainMultiplex when an error occurs
func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) {
testutil.Parallel(t)
ctx := context.Background()
multiplexCtx, cancel := context.WithCancel(ctx)
// monitorDrainMultiplex doesn't require anything on *Nodes, so we
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
}()
// Fake an alloc update
msg := Messagef(0, "alloc update")
allocCh <- msg
must.Eq(t, msg, <-outCh)
// Fake a node update
msg = Messagef(0, "node update")
nodeCh <- msg
must.Eq(t, msg, <-outCh)
// Fake an error that should shut everything down
msg = Messagef(MonitorMsgLevelError, "fake error")
nodeCh <- msg
must.Eq(t, msg, <-outCh)
_, ok := <-exitedCh
must.False(t, ok)
_, ok = <-outCh
must.False(t, ok)
// Exiting should also cancel the context that would be passed to the
// node & alloc watchers
select {
case <-multiplexCtx.Done():
case <-time.After(100 * time.Millisecond):
must.Unreachable(t, must.Sprint("multiplex context was not cancelled"))
}
}
// Unittest monitorDrainMultiplex when drain finishes
func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) {
testutil.Parallel(t)
ctx := context.Background()
multiplexCtx, cancel := context.WithCancel(ctx)
// monitorDrainMultiplex doesn't require anything on *Nodes, so we
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
}()
// Fake a node updating and finishing
msg := Messagef(MonitorMsgLevelInfo, "node update")
nodeCh <- msg
close(nodeCh)
must.Eq(t, msg, <-outCh)
// Nothing else should have exited yet
select {
case badMsg, ok := <-outCh:
must.False(t, ok, must.Sprintf("unexpected output %v", badMsg))
must.Unreachable(t, must.Sprint("out channel closed unexpectedly"))
case <-exitedCh:
must.Unreachable(t, must.Sprint("multiplexer exited unexpectedly"))
case <-multiplexCtx.Done():
must.Unreachable(t, must.Sprint("multiplexer context canceled unexpectedly"))
case <-time.After(10 * time.Millisecond):
t.Logf("multiplexer still running as expected")
}
// Fake an alloc update coming in after the node monitor has finished
msg = Messagef(0, "alloc update")
allocCh <- msg
must.Eq(t, msg, <-outCh)
// Closing the allocCh should cause everything to exit
close(allocCh)
_, ok := <-exitedCh
must.False(t, ok)
_, ok = <-outCh
must.False(t, ok)
// Exiting should also cancel the context that would be passed to the
// node & alloc watchers
select {
case <-multiplexCtx.Done():
case <-time.After(100 * time.Millisecond):
must.Unreachable(t, must.Sprint("context was not cancelled"))
}
}
func TestNodes_DrainStrategy_Equal(t *testing.T) {
testutil.Parallel(t)
// nil
var d *DrainStrategy
must.Equal(t, nil, d)
o := &DrainStrategy{}
must.NotEqual(t, d, o)
must.NotEqual(t, o, d)
d = &DrainStrategy{}
must.Equal(t, d, o)
must.Equal(t, o, d)
// ForceDeadline
d.ForceDeadline = time.Now()
must.NotEqual(t, d, o)
o.ForceDeadline = d.ForceDeadline
must.Equal(t, d, o)
// Deadline
d.Deadline = 1
must.NotEqual(t, d, o)
o.Deadline = 1
must.Equal(t, d, o)
// IgnoreSystemJobs
d.IgnoreSystemJobs = true
must.NotEqual(t, d, o)
o.IgnoreSystemJobs = true
must.Equal(t, d, o)
}
func TestNodes_Purge(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
nodes := c.Nodes()
// Purge on a nonexistent node fails.
_, _, err := c.Nodes().Purge("12345678-abcd-efab-cdef-123456789abc", nil)
must.ErrorContains(t, err, "not found")
// Wait for nodeID
nodeID := oneNodeFromNodeList(t, nodes).ID
// Perform the node purge and check the response objects.
out, meta, err := c.Nodes().Purge(nodeID, nil)
must.NoError(t, err)
must.NotNil(t, out)
// We can't use assertQueryMeta here, as the RPC response does not populate
// the known leader field.
must.Positive(t, meta.LastIndex)
}
func TestNodeStatValueFormatting(t *testing.T) {
testutil.Parallel(t)
cases := []struct {
expected string
value StatValue
}{
{
"true",
StatValue{BoolVal: pointerOf(true)},
},
{
"false",
StatValue{BoolVal: pointerOf(false)},
},
{
"myvalue",
StatValue{StringVal: pointerOf("myvalue")},
},
{
"2.718",
StatValue{
FloatNumeratorVal: float64ToPtr(2.718),
},
},
{
"2.718 / 3.14",
StatValue{
FloatNumeratorVal: float64ToPtr(2.718),
FloatDenominatorVal: float64ToPtr(3.14),
},
},
{
"2.718 MHz",
StatValue{
FloatNumeratorVal: float64ToPtr(2.718),
Unit: "MHz",
},
},
{
"2.718 / 3.14 MHz",
StatValue{
FloatNumeratorVal: float64ToPtr(2.718),
FloatDenominatorVal: float64ToPtr(3.14),
Unit: "MHz",
},
},
{
"2",
StatValue{
IntNumeratorVal: pointerOf(int64(2)),
},
},
{
"2 / 3",
StatValue{
IntNumeratorVal: pointerOf(int64(2)),
IntDenominatorVal: pointerOf(int64(3)),
},
},
{
"2 MHz",
StatValue{
IntNumeratorVal: pointerOf(int64(2)),
Unit: "MHz",
},
},
{
"2 / 3 MHz",
StatValue{
IntNumeratorVal: pointerOf(int64(2)),
IntDenominatorVal: pointerOf(int64(3)),
Unit: "MHz",
},
},
}
for i, c := range cases {
t.Run(fmt.Sprintf("case %d %v", i, c.expected), func(t *testing.T) {
formatted := c.value.String()
must.Eq(t, c.expected, formatted)
})
}
}