removed deprecated fields from Drain structs and API
node drain: use msgtype on txn so that events are emitted wip: encoding extension to add Node.Drain field back to API responses new approach for hiding Node.SecretID in the API, using `json` tag documented this approach in the contributing guide refactored the JSON handlers with extensions modified event stream encoding to use the go-msgpack encoders with the extensions
This commit is contained in:
parent
fa25e048b2
commit
dd291e69f4
|
@ -206,9 +206,7 @@ func TestNodes_ToggleDrain(t *testing.T) {
|
|||
// Check for drain mode
|
||||
out, _, err := nodes.Info(nodeID, nil)
|
||||
require.Nil(err)
|
||||
if out.Drain {
|
||||
t.Fatalf("drain mode should be off")
|
||||
}
|
||||
require.False(out.Drain)
|
||||
|
||||
// Toggle it on
|
||||
spec := &DrainSpec{
|
||||
|
@ -221,9 +219,9 @@ func TestNodes_ToggleDrain(t *testing.T) {
|
|||
// Check again
|
||||
out, _, err = nodes.Info(nodeID, nil)
|
||||
require.Nil(err)
|
||||
if out.SchedulingEligibility != NodeSchedulingIneligible {
|
||||
t.Fatalf("bad eligibility: %v vs %v", out.SchedulingEligibility, NodeSchedulingIneligible)
|
||||
}
|
||||
// NOTE: this is potentially flaky; drain may have already completed; if problems occur, switch to event stream
|
||||
require.True(out.Drain)
|
||||
require.Equal(NodeSchedulingIneligible, out.SchedulingEligibility)
|
||||
|
||||
// Toggle off again
|
||||
drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil)
|
||||
|
@ -233,15 +231,9 @@ func TestNodes_ToggleDrain(t *testing.T) {
|
|||
// Check again
|
||||
out, _, err = nodes.Info(nodeID, nil)
|
||||
require.Nil(err)
|
||||
if out.Drain {
|
||||
t.Fatalf("drain mode should be off")
|
||||
}
|
||||
if out.DrainStrategy != nil {
|
||||
t.Fatalf("drain strategy should be unset")
|
||||
}
|
||||
if out.SchedulingEligibility != NodeSchedulingEligible {
|
||||
t.Fatalf("should be eligible")
|
||||
}
|
||||
require.False(out.Drain)
|
||||
require.Nil(out.DrainStrategy)
|
||||
require.Equal(NodeSchedulingEligible, out.SchedulingEligibility)
|
||||
}
|
||||
|
||||
func TestNodes_ToggleEligibility(t *testing.T) {
|
||||
|
|
|
@ -20,10 +20,11 @@ import (
|
|||
"github.com/hashicorp/go-connlimit"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/rs/cors"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/noxssrw"
|
||||
"github.com/hashicorp/nomad/helper/tlsutil"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -500,7 +501,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
|
|||
buf.Write([]byte("\n"))
|
||||
}
|
||||
} else {
|
||||
enc := codec.NewEncoder(&buf, structs.JsonHandle)
|
||||
enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions)
|
||||
err = enc.Encode(obj)
|
||||
}
|
||||
if err != nil {
|
||||
|
|
|
@ -2,9 +2,7 @@ package agent
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -119,31 +117,9 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request
|
|||
|
||||
var drainRequest api.NodeUpdateDrainRequest
|
||||
|
||||
// COMPAT: Remove in 0.10. Allow the old style enable query param.
|
||||
// Get the enable parameter
|
||||
enableRaw := req.URL.Query().Get("enable")
|
||||
var enable bool
|
||||
if enableRaw != "" {
|
||||
var err error
|
||||
enable, err = strconv.ParseBool(enableRaw)
|
||||
if err != nil {
|
||||
return nil, CodedError(400, "invalid enable value")
|
||||
}
|
||||
|
||||
// Use the force drain to have it keep the same behavior as old clients.
|
||||
if enable {
|
||||
drainRequest.DrainSpec = &api.DrainSpec{
|
||||
Deadline: -1 * time.Second,
|
||||
}
|
||||
} else {
|
||||
// If drain is disabled on an old client, mark the node as eligible for backwards compatibility
|
||||
drainRequest.MarkEligible = true
|
||||
}
|
||||
} else {
|
||||
err := decodeBody(req, &drainRequest)
|
||||
if err != nil {
|
||||
return nil, CodedError(400, err.Error())
|
||||
}
|
||||
err := decodeBody(req, &drainRequest)
|
||||
if err != nil {
|
||||
return nil, CodedError(400, err.Error())
|
||||
}
|
||||
|
||||
args := structs.NodeUpdateDrainRequest{
|
||||
|
|
|
@ -284,11 +284,9 @@ func TestHTTP_NodeDrain(t *testing.T) {
|
|||
out, err := state.NodeByID(nil, node.ID)
|
||||
require.Nil(err)
|
||||
|
||||
// the node must either be in drain mode or in elligible
|
||||
// the node must either be in drain mode or ineligible
|
||||
// once the node is recognize as not having any running allocs
|
||||
if out.Drain {
|
||||
require.True(out.Drain)
|
||||
require.NotNil(out.DrainStrategy)
|
||||
if out.DrainStrategy != nil {
|
||||
require.Equal(10*time.Second, out.DrainStrategy.Deadline)
|
||||
} else {
|
||||
require.Equal(structs.NodeSchedulingIneligible, out.SchedulingEligibility)
|
||||
|
@ -307,7 +305,6 @@ func TestHTTP_NodeDrain(t *testing.T) {
|
|||
|
||||
out, err = state.NodeByID(nil, node.ID)
|
||||
require.Nil(err)
|
||||
require.False(out.Drain)
|
||||
require.Nil(out.DrainStrategy)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -15,10 +15,12 @@
|
|||
* Implement and test other logical methods
|
||||
* [ ] Add conversion between `api/` and `nomad/structs` in `command/agent/job_endpoint.go`
|
||||
* Add test for conversion
|
||||
* msgpack [encoding](http://ugorji.net/blog/go-codec-primer#drop-in-replacement-for-encoding-json-json-key-in-struct-tag-supported) only uses the [`codec` tag](https://github.com/hashicorp/nomad/blob/v1.0.0/nomad/structs/structs.go#L10557-L10558);
|
||||
the `json` tag is available for customizing API output when encoding `structs` objects
|
||||
* [ ] Implement diff logic for new structs/fields in `nomad/structs/diff.go`
|
||||
* Note that fields must be listed in alphabetical order in `FieldDiff` slices in `nomad/structs/diff_test.go`
|
||||
* Add test for diff of new structs/fields
|
||||
* [ ] Add change detection for new structs/feilds in `scheduler/util.go/tasksUpdated`
|
||||
* [ ] Add change detection for new structs/fields in `scheduler/util.go/tasksUpdated`
|
||||
* Might be covered by `.Equals` but might not be, check.
|
||||
* Should return true if the task must be replaced as a result of the change.
|
||||
|
||||
|
|
14
nomad/fsm.go
14
nomad/fsm.go
|
@ -429,20 +429,6 @@ func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, ind
|
|||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
// COMPAT Remove in version 0.10
|
||||
// As part of Nomad 0.8 we have deprecated the drain boolean in favor of a
|
||||
// drain strategy but we need to handle the upgrade path where the Raft log
|
||||
// contains drain updates with just the drain boolean being manipulated.
|
||||
if req.Drain && req.DrainStrategy == nil {
|
||||
// Mark the drain strategy as a force to imitate the old style drain
|
||||
// functionality.
|
||||
req.DrainStrategy = &structs.DrainStrategy{
|
||||
DrainSpec: structs.DrainSpec{
|
||||
Deadline: -1 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if err := n.state.UpdateNodeDrain(reqType, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
|
||||
n.logger.Error("UpdateNodeDrain failed", "error", err)
|
||||
return err
|
||||
|
|
|
@ -180,35 +180,6 @@ func TestFSM_UpsertNode(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestFSM_UpsertNode_Canonicalize(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
fsm := testFSM(t)
|
||||
fsm.blockedEvals.SetEnabled(true)
|
||||
|
||||
// Setup a node without eligibility
|
||||
node := mock.Node()
|
||||
node.SchedulingEligibility = ""
|
||||
|
||||
req := structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
}
|
||||
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
|
||||
require.Nil(err)
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
require.Nil(resp)
|
||||
|
||||
// Verify we are registered
|
||||
ws := memdb.NewWatchSet()
|
||||
n, err := fsm.State().NodeByID(ws, req.Node.ID)
|
||||
require.Nil(err)
|
||||
require.NotNil(n)
|
||||
require.EqualValues(1, n.CreateIndex)
|
||||
require.Equal(structs.NodeSchedulingEligible, n.SchedulingEligibility)
|
||||
}
|
||||
|
||||
func TestFSM_DeregisterNode(t *testing.T) {
|
||||
t.Parallel()
|
||||
fsm := testFSM(t)
|
||||
|
@ -353,7 +324,6 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
|
|||
ws := memdb.NewWatchSet()
|
||||
node, err = fsm.State().NodeByID(ws, req.Node.ID)
|
||||
require.Nil(err)
|
||||
require.True(node.Drain)
|
||||
require.Equal(node.DrainStrategy, strategy)
|
||||
require.Len(node.Events, 2)
|
||||
}
|
||||
|
@ -397,46 +367,10 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
|
|||
ws := memdb.NewWatchSet()
|
||||
node, err = fsm.State().NodeByID(ws, req.Node.ID)
|
||||
require.Nil(err)
|
||||
require.True(node.Drain)
|
||||
require.Equal(node.DrainStrategy, strategy)
|
||||
require.Len(node.Events, 2)
|
||||
}
|
||||
|
||||
func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
fsm := testFSM(t)
|
||||
|
||||
// Force a node into the state store without eligiblity
|
||||
node := mock.Node()
|
||||
node.SchedulingEligibility = ""
|
||||
require.Nil(fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1, node))
|
||||
|
||||
// Do an old style drain
|
||||
req := structs.NodeUpdateDrainRequest{
|
||||
NodeID: node.ID,
|
||||
Drain: true,
|
||||
}
|
||||
buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req)
|
||||
require.Nil(err)
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
require.Nil(resp)
|
||||
|
||||
// Verify we have upgraded to a force drain
|
||||
ws := memdb.NewWatchSet()
|
||||
node, err = fsm.State().NodeByID(ws, req.NodeID)
|
||||
require.Nil(err)
|
||||
require.True(node.Drain)
|
||||
|
||||
expected := &structs.DrainStrategy{
|
||||
DrainSpec: structs.DrainSpec{
|
||||
Deadline: -1 * time.Second,
|
||||
},
|
||||
}
|
||||
require.Equal(expected, node.DrainStrategy)
|
||||
}
|
||||
|
||||
func TestFSM_UpdateNodeEligibility(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
@ -2495,25 +2429,15 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) {
|
|||
// Add some state
|
||||
fsm := testFSM(t)
|
||||
state := fsm.State()
|
||||
node1 := mock.Node()
|
||||
state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)
|
||||
|
||||
// Upgrade this node
|
||||
node2 := mock.Node()
|
||||
node2.SchedulingEligibility = ""
|
||||
state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)
|
||||
node := mock.Node()
|
||||
state.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
|
||||
|
||||
// Verify the contents
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
out1, _ := state2.NodeByID(nil, node1.ID)
|
||||
out2, _ := state2.NodeByID(nil, node2.ID)
|
||||
node2.SchedulingEligibility = structs.NodeSchedulingEligible
|
||||
if !reflect.DeepEqual(node1, out1) {
|
||||
t.Fatalf("bad: \n%#v\n%#v", out1, node1)
|
||||
}
|
||||
if !reflect.DeepEqual(node2, out2) {
|
||||
t.Fatalf("bad: \n%#v\n%#v", out2, node2)
|
||||
out, _ := state2.NodeByID(nil, node.ID)
|
||||
if !reflect.DeepEqual(node, out) {
|
||||
t.Fatalf("bad: \n%#v\n%#v", out, node)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -118,6 +118,14 @@ func Node() *structs.Node {
|
|||
return node
|
||||
}
|
||||
|
||||
func DrainNode() *structs.Node {
|
||||
node := Node()
|
||||
node.DrainStrategy = &structs.DrainStrategy{
|
||||
DrainSpec: structs.DrainSpec{},
|
||||
}
|
||||
return node
|
||||
}
|
||||
|
||||
// NvidiaNode returns a node with two instances of an Nvidia GPU
|
||||
func NvidiaNode() *structs.Node {
|
||||
n := Node()
|
||||
|
|
|
@ -548,16 +548,6 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
|
|||
// Update the timestamp of when the node status was updated
|
||||
args.UpdatedAt = now.Unix()
|
||||
|
||||
// COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old
|
||||
// format.
|
||||
if args.Drain && args.DrainStrategy == nil {
|
||||
args.DrainStrategy = &structs.DrainStrategy{
|
||||
DrainSpec: structs.DrainSpec{
|
||||
Deadline: -1 * time.Second, // Force drain
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Setup drain strategy
|
||||
if args.DrainStrategy != nil {
|
||||
// Mark start time for the drain
|
||||
|
@ -811,9 +801,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
|
|||
|
||||
// Setup the output
|
||||
if out != nil {
|
||||
// Clear the secret ID
|
||||
reply.Node = out.Copy()
|
||||
reply.Node.SecretID = ""
|
||||
reply.Node = out
|
||||
reply.Index = out.ModifyIndex
|
||||
} else {
|
||||
// Use the last index that affected the nodes table
|
||||
|
|
|
@ -914,7 +914,7 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) {
|
|||
ws := memdb.NewWatchSet()
|
||||
out, err := state.NodeByID(ws, node.ID)
|
||||
require.Nil(err)
|
||||
require.True(out.Drain)
|
||||
require.NotNil(out.DrainStrategy)
|
||||
require.Equal(strategy.Deadline, out.DrainStrategy.Deadline)
|
||||
require.Len(out.Events, 2)
|
||||
require.Equal(NodeDrainEventDrainSet, out.Events[1].Message)
|
||||
|
@ -1314,7 +1314,6 @@ func TestClientEndpoint_GetNode(t *testing.T) {
|
|||
|
||||
// Update the status updated at value
|
||||
node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt
|
||||
node.SecretID = ""
|
||||
node.Events = resp2.Node.Events
|
||||
if !reflect.DeepEqual(node, resp2.Node) {
|
||||
t.Fatalf("bad: %#v \n %#v", node, resp2.Node)
|
||||
|
|
|
@ -648,7 +648,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
|
|||
return false, "node is not ready for placements", nil
|
||||
} else if node.SchedulingEligibility == structs.NodeSchedulingIneligible {
|
||||
return false, "node is not eligible for draining", nil
|
||||
} else if node.Drain {
|
||||
} else if node.DrainStrategy != nil {
|
||||
// Deprecate in favor of scheduling eligibility and remove post-0.8
|
||||
return false, "node is draining", nil
|
||||
}
|
||||
|
|
|
@ -715,7 +715,12 @@ func TestPlanApply_EvalNodePlan_NodeDrain(t *testing.T) {
|
|||
t.Parallel()
|
||||
state := testStateStore(t)
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node.DrainStrategy = &structs.DrainStrategy{
|
||||
DrainSpec: structs.DrainSpec{
|
||||
Deadline: 0,
|
||||
IgnoreSystemJobs: false,
|
||||
},
|
||||
}
|
||||
state.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
|
||||
snap, _ := state.Snapshot()
|
||||
|
||||
|
|
|
@ -80,15 +80,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
|||
return structs.Event{}, false
|
||||
}
|
||||
|
||||
// Node secret ID should not be included
|
||||
node := before.Copy()
|
||||
node.SecretID = ""
|
||||
|
||||
return structs.Event{
|
||||
Topic: structs.TopicNode,
|
||||
Key: node.ID,
|
||||
Key: before.ID,
|
||||
Payload: &structs.NodeStreamEvent{
|
||||
Node: node,
|
||||
Node: before,
|
||||
},
|
||||
}, true
|
||||
}
|
||||
|
@ -179,15 +175,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
|||
return structs.Event{}, false
|
||||
}
|
||||
|
||||
// Node secret ID should not be included
|
||||
node := after.Copy()
|
||||
node.SecretID = ""
|
||||
|
||||
return structs.Event{
|
||||
Topic: structs.TopicNode,
|
||||
Key: node.ID,
|
||||
Key: after.ID,
|
||||
Payload: &structs.NodeStreamEvent{
|
||||
Node: node,
|
||||
Node: after,
|
||||
},
|
||||
}, true
|
||||
case "deployment":
|
||||
|
|
|
@ -120,9 +120,10 @@ func TestEventFromChange_NodeSecretID(t *testing.T) {
|
|||
out := eventsFromChanges(s.db.ReadTxn(), changes)
|
||||
require.Len(t, out.Events, 1)
|
||||
|
||||
nodeEvent, ok := out.Events[0].Payload.(*structs.NodeStreamEvent)
|
||||
_, ok := out.Events[0].Payload.(*structs.NodeStreamEvent)
|
||||
require.True(t, ok)
|
||||
require.Empty(t, nodeEvent.Node.SecretID)
|
||||
// TODO: cgbaker: do we really want to remove this check?
|
||||
// require.Empty(t, nodeEvent.Node.SecretID)
|
||||
|
||||
// Delete
|
||||
changes = Changes{
|
||||
|
@ -140,9 +141,10 @@ func TestEventFromChange_NodeSecretID(t *testing.T) {
|
|||
out2 := eventsFromChanges(s.db.ReadTxn(), changes)
|
||||
require.Len(t, out2.Events, 1)
|
||||
|
||||
nodeEvent2, ok := out2.Events[0].Payload.(*structs.NodeStreamEvent)
|
||||
_, ok = out2.Events[0].Payload.(*structs.NodeStreamEvent)
|
||||
require.True(t, ok)
|
||||
require.Empty(t, nodeEvent2.Node.SecretID)
|
||||
// TODO: cgbaker: do we really want to remove this check?
|
||||
// require.Empty(t, nodeEvent2.Node.SecretID)
|
||||
}
|
||||
|
||||
func TestEventsFromChanges_DeploymentUpdate(t *testing.T) {
|
||||
|
|
|
@ -832,7 +832,6 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
|
|||
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))})
|
||||
}
|
||||
|
||||
node.Drain = exist.Drain // Retain the drain mode
|
||||
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
|
||||
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
|
||||
} else {
|
||||
|
@ -951,7 +950,8 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update
|
|||
return nil
|
||||
}
|
||||
|
||||
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
|
||||
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes.
|
||||
// This is only called when node drain is completed by the drainer.
|
||||
func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
|
||||
txn := s.db.WriteTxnMsgT(msgType, index)
|
||||
defer txn.Abort()
|
||||
|
@ -966,9 +966,10 @@ func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uin
|
|||
// UpdateNodeDrain is used to update the drain of a node
|
||||
func (s *StateStore) UpdateNodeDrain(msgType structs.MessageType, index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {
|
||||
|
||||
txn := s.db.WriteTxn(index)
|
||||
txn := s.db.WriteTxnMsgT(msgType, index)
|
||||
defer txn.Abort()
|
||||
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
return txn.Commit()
|
||||
|
@ -997,7 +998,6 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string,
|
|||
}
|
||||
|
||||
// Update the drain in the copy
|
||||
copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.10
|
||||
copyNode.DrainStrategy = drain
|
||||
if drain != nil {
|
||||
copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible
|
||||
|
|
|
@ -963,7 +963,6 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
|
|||
for _, id := range []string{n1.ID, n2.ID} {
|
||||
out, err := state.NodeByID(ws, id)
|
||||
require.Nil(err)
|
||||
require.True(out.Drain)
|
||||
require.NotNil(out.DrainStrategy)
|
||||
require.Equal(out.DrainStrategy, expectedDrain)
|
||||
require.Len(out.Events, 2)
|
||||
|
@ -1008,7 +1007,6 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
|
|||
ws = memdb.NewWatchSet()
|
||||
out, err := state.NodeByID(ws, node.ID)
|
||||
require.Nil(err)
|
||||
require.True(out.Drain)
|
||||
require.NotNil(out.DrainStrategy)
|
||||
require.Equal(out.DrainStrategy, expectedDrain)
|
||||
require.Len(out.Events, 2)
|
||||
|
@ -1152,7 +1150,6 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
|
|||
ws = memdb.NewWatchSet()
|
||||
out, err := state.NodeByID(ws, node.ID)
|
||||
require.Nil(err)
|
||||
require.False(out.Drain)
|
||||
require.Nil(out.DrainStrategy)
|
||||
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible)
|
||||
require.Len(out.Events, 3)
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
|
@ -71,7 +73,9 @@ func (n *JsonStream) Send(v interface{}) error {
|
|||
return n.ctx.Err()
|
||||
}
|
||||
|
||||
buf, err := json.Marshal(v)
|
||||
var buf bytes.Buffer
|
||||
enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions)
|
||||
err := enc.Encode(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshaling json for stream: %w", err)
|
||||
}
|
||||
|
@ -79,7 +83,7 @@ func (n *JsonStream) Send(v interface{}) error {
|
|||
select {
|
||||
case <-n.ctx.Done():
|
||||
return fmt.Errorf("error stream is no longer running: %w", err)
|
||||
case n.outCh <- &structs.EventJson{Data: buf}:
|
||||
case n.outCh <- &structs.EventJson{Data: buf.Bytes()}:
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
package structs
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
)
|
||||
|
||||
// Special encoding for structs.Node, to perform the following:
|
||||
// 1. provide backwards compatibility for the following fields:
|
||||
// * Node.Drain
|
||||
type nodeExt struct{}
|
||||
|
||||
// ConvertExt converts a structs.Node to a struct with the extra field, Drain
|
||||
func (n nodeExt) ConvertExt(v interface{}) interface{} {
|
||||
node := v.(*Node)
|
||||
if node == nil {
|
||||
return nil
|
||||
}
|
||||
type NodeAlias Node
|
||||
return &struct {
|
||||
*NodeAlias
|
||||
Drain bool
|
||||
}{
|
||||
NodeAlias: (*NodeAlias)(node),
|
||||
Drain: node.DrainStrategy != nil,
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateExt is not used
|
||||
func (n nodeExt) UpdateExt(_ interface{}, _ interface{}) {}
|
||||
|
||||
func RegisterJSONEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle {
|
||||
h.SetInterfaceExt(reflect.TypeOf(Node{}), 1, nodeExt{})
|
||||
return h
|
||||
}
|
|
@ -508,12 +508,6 @@ type NodeUpdateDrainRequest struct {
|
|||
NodeID string
|
||||
DrainStrategy *DrainStrategy
|
||||
|
||||
// COMPAT Remove in version 0.10
|
||||
// As part of Nomad 0.8 we have deprecated the drain boolean in favor of a
|
||||
// drain strategy but we need to handle the upgrade path where the Raft log
|
||||
// contains drain updates with just the drain boolean being manipulated.
|
||||
Drain bool
|
||||
|
||||
// MarkEligible marks the node as eligible if removing the drain strategy.
|
||||
MarkEligible bool
|
||||
|
||||
|
@ -1817,7 +1811,7 @@ type Node struct {
|
|||
// SecretID is an ID that is only known by the Node and the set of Servers.
|
||||
// It is not accessible via the API and is used to authenticate nodes
|
||||
// conducting privileged activities.
|
||||
SecretID string
|
||||
SecretID string `json:"-"`
|
||||
|
||||
// Datacenter for this node
|
||||
Datacenter string
|
||||
|
@ -1875,15 +1869,7 @@ type Node struct {
|
|||
// attributes and capabilities.
|
||||
ComputedClass string
|
||||
|
||||
// COMPAT: Remove in Nomad 0.9
|
||||
// Drain is controlled by the servers, and not the client.
|
||||
// If true, no jobs will be scheduled to this node, and existing
|
||||
// allocations will be drained. Superseded by DrainStrategy in Nomad
|
||||
// 0.8 but kept for backward compat.
|
||||
Drain bool
|
||||
|
||||
// DrainStrategy determines the node's draining behavior. Will be nil
|
||||
// when Drain=false.
|
||||
// DrainStrategy determines the node's draining behavior.
|
||||
DrainStrategy *DrainStrategy
|
||||
|
||||
// SchedulingEligibility determines whether this node will receive new
|
||||
|
@ -1922,8 +1908,7 @@ type Node struct {
|
|||
|
||||
// Ready returns true if the node is ready for running allocations
|
||||
func (n *Node) Ready() bool {
|
||||
// Drain is checked directly to support pre-0.8 Node data
|
||||
return n.Status == NodeStatusReady && !n.Drain && n.SchedulingEligibility == NodeSchedulingEligible
|
||||
return n.Status == NodeStatusReady && n.DrainStrategy == nil && n.SchedulingEligibility == NodeSchedulingEligible
|
||||
}
|
||||
|
||||
func (n *Node) Canonicalize() {
|
||||
|
@ -1931,17 +1916,6 @@ func (n *Node) Canonicalize() {
|
|||
return
|
||||
}
|
||||
|
||||
// COMPAT Remove in 0.10
|
||||
// In v0.8.0 we introduced scheduling eligibility, so we need to set it for
|
||||
// upgrading nodes
|
||||
if n.SchedulingEligibility == "" {
|
||||
if n.Drain {
|
||||
n.SchedulingEligibility = NodeSchedulingIneligible
|
||||
} else {
|
||||
n.SchedulingEligibility = NodeSchedulingEligible
|
||||
}
|
||||
}
|
||||
|
||||
// COMPAT remove in 1.0
|
||||
// In v0.12.0 we introduced a separate node specific network resource struct
|
||||
// so we need to covert any pre 0.12 clients to the correct struct
|
||||
|
@ -1965,6 +1939,14 @@ func (n *Node) Canonicalize() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if n.SchedulingEligibility == "" {
|
||||
if n.DrainStrategy != nil {
|
||||
n.SchedulingEligibility = NodeSchedulingIneligible
|
||||
} else {
|
||||
n.SchedulingEligibility = NodeSchedulingEligible
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Node) Copy() *Node {
|
||||
|
@ -2128,7 +2110,7 @@ func (n *Node) Stub(fields *NodeStubFields) *NodeListStub {
|
|||
Name: n.Name,
|
||||
NodeClass: n.NodeClass,
|
||||
Version: n.Attributes["nomad.version"],
|
||||
Drain: n.Drain,
|
||||
Drain: n.DrainStrategy != nil,
|
||||
SchedulingEligibility: n.SchedulingEligibility,
|
||||
Status: n.Status,
|
||||
StatusDescription: n.StatusDescription,
|
||||
|
@ -10602,13 +10584,18 @@ var MsgpackHandle = func() *codec.MsgpackHandle {
|
|||
var (
|
||||
// JsonHandle and JsonHandlePretty are the codec handles to JSON encode
|
||||
// structs. The pretty handle will add indents for easier human consumption.
|
||||
// JsonHandleWithExtensions and JsonHandlePretty include extensions for
|
||||
// encoding structs objects with API-specific fields
|
||||
JsonHandle = &codec.JsonHandle{
|
||||
HTMLCharsAsIs: true,
|
||||
}
|
||||
JsonHandlePretty = &codec.JsonHandle{
|
||||
JsonHandleWithExtensions = RegisterJSONEncodingExtensions(&codec.JsonHandle{
|
||||
HTMLCharsAsIs: true,
|
||||
})
|
||||
JsonHandlePretty = RegisterJSONEncodingExtensions(&codec.JsonHandle{
|
||||
HTMLCharsAsIs: true,
|
||||
Indent: 4,
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
// Decode is used to decode a MsgPack encoded object
|
||||
|
|
|
@ -5483,7 +5483,11 @@ func TestNode_Canonicalize(t *testing.T) {
|
|||
require.Equal(NodeSchedulingEligible, node.SchedulingEligibility)
|
||||
|
||||
node = &Node{
|
||||
Drain: true,
|
||||
DrainStrategy: &DrainStrategy{
|
||||
DrainSpec: DrainSpec{
|
||||
Deadline: 30000,
|
||||
},
|
||||
},
|
||||
}
|
||||
node.Canonicalize()
|
||||
require.Equal(NodeSchedulingIneligible, node.SchedulingEligibility)
|
||||
|
|
|
@ -2996,8 +2996,7 @@ func TestServiceSched_NodeDrain(t *testing.T) {
|
|||
h := NewHarness(t)
|
||||
|
||||
// Register a draining node
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
// Create some nodes
|
||||
|
@ -3078,8 +3077,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
|
|||
h := NewHarness(t)
|
||||
|
||||
// Register a draining node
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
node.Status = structs.NodeStatusDown
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
|
@ -3211,7 +3209,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
|
||||
|
||||
node.Drain = true
|
||||
node.DrainStrategy = mock.DrainNode().DrainStrategy
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
// Create a mock evaluation to deal with drain
|
||||
|
@ -4064,8 +4062,7 @@ func TestBatchSched_Run_LostAlloc(t *testing.T) {
|
|||
func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
// Create a job
|
||||
|
@ -4119,8 +4116,7 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) {
|
|||
|
||||
// Create two nodes, one that is drained and has a successfully finished
|
||||
// alloc and a fresh undrained one
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
node2 := mock.Node()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
|
||||
|
@ -4329,8 +4325,7 @@ func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) {
|
|||
|
||||
// Create two nodes, one that is drained and has a successfully finished
|
||||
// alloc and a fresh undrained one
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
node2 := mock.Node()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
|
||||
|
@ -4401,8 +4396,7 @@ func TestBatchSched_NodeDrain_Complete(t *testing.T) {
|
|||
|
||||
// Create two nodes, one that is drained and has a successfully finished
|
||||
// alloc and a fresh undrained one
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
node2 := mock.Node()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
|
||||
|
@ -4754,8 +4748,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) {
|
|||
h := NewHarness(t)
|
||||
|
||||
// Register a draining node
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
// Create an alloc on the draining node
|
||||
|
|
|
@ -885,10 +885,9 @@ func TestReconciler_DrainNode(t *testing.T) {
|
|||
// Build a map of tainted nodes
|
||||
tainted := make(map[string]*structs.Node, 2)
|
||||
for i := 0; i < 2; i++ {
|
||||
n := mock.Node()
|
||||
n := mock.DrainNode()
|
||||
n.ID = allocs[i].NodeID
|
||||
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
|
||||
n.Drain = true
|
||||
tainted[n.ID] = n
|
||||
}
|
||||
|
||||
|
@ -938,10 +937,9 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) {
|
|||
// Build a map of tainted nodes
|
||||
tainted := make(map[string]*structs.Node, 2)
|
||||
for i := 0; i < 2; i++ {
|
||||
n := mock.Node()
|
||||
n := mock.DrainNode()
|
||||
n.ID = allocs[i].NodeID
|
||||
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
|
||||
n.Drain = true
|
||||
tainted[n.ID] = n
|
||||
}
|
||||
|
||||
|
@ -992,10 +990,9 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) {
|
|||
// Build a map of tainted nodes
|
||||
tainted := make(map[string]*structs.Node, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
n := mock.Node()
|
||||
n := mock.DrainNode()
|
||||
n.ID = allocs[i].NodeID
|
||||
allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
|
||||
n.Drain = true
|
||||
tainted[n.ID] = n
|
||||
}
|
||||
|
||||
|
@ -2994,10 +2991,9 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
|
|||
|
||||
// Build a map of tainted nodes that contains the last canary
|
||||
tainted := make(map[string]*structs.Node, 1)
|
||||
n := mock.Node()
|
||||
n := mock.DrainNode()
|
||||
n.ID = allocs[11].NodeID
|
||||
allocs[11].DesiredTransition.Migrate = helper.BoolToPtr(true)
|
||||
n.Drain = true
|
||||
tainted[n.ID] = n
|
||||
|
||||
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
|
||||
|
@ -3785,7 +3781,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
|
|||
if i == 0 {
|
||||
n.Status = structs.NodeStatusDown
|
||||
} else {
|
||||
n.Drain = true
|
||||
n.DrainStrategy = mock.DrainNode().DrainStrategy
|
||||
allocs[2+i].DesiredTransition.Migrate = helper.BoolToPtr(true)
|
||||
}
|
||||
tainted[n.ID] = n
|
||||
|
@ -3870,7 +3866,7 @@ func TestReconciler_FailedDeployment_TaintedNodes(t *testing.T) {
|
|||
if i == 0 {
|
||||
n.Status = structs.NodeStatusDown
|
||||
} else {
|
||||
n.Drain = true
|
||||
n.DrainStrategy = mock.DrainNode().DrainStrategy
|
||||
allocs[6+i].DesiredTransition.Migrate = helper.BoolToPtr(true)
|
||||
}
|
||||
tainted[n.ID] = n
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -37,8 +38,8 @@ func TestAllocSet_filterByTainted(t *testing.T) {
|
|||
|
||||
nodes := map[string]*structs.Node{
|
||||
"draining": {
|
||||
ID: "draining",
|
||||
Drain: true,
|
||||
ID: "draining",
|
||||
DrainStrategy: mock.DrainNode().DrainStrategy,
|
||||
},
|
||||
"lost": {
|
||||
ID: "lost",
|
||||
|
|
|
@ -1051,8 +1051,7 @@ func TestSystemSched_NodeDrain_Down(t *testing.T) {
|
|||
h := NewHarness(t)
|
||||
|
||||
// Register a draining node
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
node.Status = structs.NodeStatusDown
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
|
@ -1113,8 +1112,7 @@ func TestSystemSched_NodeDrain(t *testing.T) {
|
|||
h := NewHarness(t)
|
||||
|
||||
// Register a draining node
|
||||
node := mock.Node()
|
||||
node.Drain = true
|
||||
node := mock.DrainNode()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
// Generate a fake job allocated on that node.
|
||||
|
@ -1708,9 +1706,8 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) {
|
|||
h := NewHarness(t)
|
||||
|
||||
// Register two nodes with two different classes
|
||||
node := mock.Node()
|
||||
node := mock.DrainNode()
|
||||
node.NodeClass = "green"
|
||||
node.Drain = true
|
||||
node.ComputeClass()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
|
|
|
@ -255,7 +255,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
|
|||
if node.Status != structs.NodeStatusReady {
|
||||
continue
|
||||
}
|
||||
if node.Drain {
|
||||
if node.DrainStrategy != nil {
|
||||
continue
|
||||
}
|
||||
if node.SchedulingEligibility != structs.NodeSchedulingEligible {
|
||||
|
@ -327,7 +327,7 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct
|
|||
out[alloc.NodeID] = nil
|
||||
continue
|
||||
}
|
||||
if structs.ShouldDrainNode(node.Status) || node.Drain {
|
||||
if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil {
|
||||
out[alloc.NodeID] = node
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,8 +39,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) {
|
|||
eligibleNode := mock.Node()
|
||||
eligibleNode.ID = "zip"
|
||||
|
||||
drainNode := mock.Node()
|
||||
drainNode.Drain = true
|
||||
drainNode := mock.DrainNode()
|
||||
|
||||
deadNode := mock.Node()
|
||||
deadNode.Status = structs.NodeStatusDown
|
||||
|
@ -220,8 +219,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) {
|
|||
func TestDiffSystemAllocs(t *testing.T) {
|
||||
job := mock.SystemJob()
|
||||
|
||||
drainNode := mock.Node()
|
||||
drainNode.Drain = true
|
||||
drainNode := mock.DrainNode()
|
||||
|
||||
deadNode := mock.Node()
|
||||
deadNode.Status = structs.NodeStatusDown
|
||||
|
@ -332,8 +330,7 @@ func TestReadyNodesInDCs(t *testing.T) {
|
|||
node3 := mock.Node()
|
||||
node3.Datacenter = "dc2"
|
||||
node3.Status = structs.NodeStatusDown
|
||||
node4 := mock.Node()
|
||||
node4.Drain = true
|
||||
node4 := mock.DrainNode()
|
||||
|
||||
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1))
|
||||
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2))
|
||||
|
@ -392,8 +389,7 @@ func TestTaintedNodes(t *testing.T) {
|
|||
node3 := mock.Node()
|
||||
node3.Datacenter = "dc2"
|
||||
node3.Status = structs.NodeStatusDown
|
||||
node4 := mock.Node()
|
||||
node4.Drain = true
|
||||
node4 := mock.DrainNode()
|
||||
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1))
|
||||
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2))
|
||||
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3))
|
||||
|
|
Loading…
Reference in New Issue