Register events

This commit is contained in:
Alex Dadgar 2018-05-11 17:26:25 -07:00
parent 17aac1c9de
commit 21c5ed850d
9 changed files with 134 additions and 59 deletions

View file

@ -468,7 +468,7 @@ func TestHTTP_NodeQuery(t *testing.T) {
if len(n.Events) < 1 {
t.Fatalf("Expected node registration event to be populated: %#v", n)
}
if n.Events[0].Message != "Node Registered" {
if n.Events[0].Message != "Node registered" {
t.Fatalf("Expected node registration event to be first node event: %#v", n)
}
})

View file

@ -40,6 +40,10 @@ const (
// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
// ineligible
NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"
// NodeHeartbeatEventReregistered is the message used when the node becomes
// reregistered by the heartbeat.
NodeHeartbeatEventReregistered = "Node reregistered by heartbeat"
)
// Node endpoint is used for client interactions
@ -367,6 +371,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Commit this update via Raft
var index uint64
if node.Status != args.Status {
// Attach an event if we are updating the node status to ready when it
// is down via a heartbeat
if node.Status == structs.NodeStatusDown && args.NodeEvent == nil {
args.NodeEvent = structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeHeartbeatEventReregistered)
}
_, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err)

View file

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
@ -508,6 +509,56 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}
func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Check that we have no client connections
require.Empty(s1.connectedNodes())
// Create the register request but make the node down
node := mock.Node()
node.Status = structs.NodeStatusDown
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
// Update the status
dereg := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusInit,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2))
require.NotZero(resp2.Index)
// Check for heartbeat interval
ttl := resp2.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}
// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.NoError(err)
require.NotNil(out)
require.EqualValues(resp2.Index, out.ModifyIndex)
require.Len(out.Events, 2)
require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message)
}
func TestClientEndpoint_Register_GetEvals(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
@ -1222,7 +1273,7 @@ func TestClientEndpoint_GetNode(t *testing.T) {
if len(resp2.Node.Events) != 1 {
t.Fatalf("Did not set node events: %#v", resp2.Node)
}
if resp2.Node.Events[0].Message != "Node Registered" {
if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered {
t.Fatalf("Did not set node register event correctly: %#v", resp2.Node)
}

View file

@ -3,7 +3,6 @@ package nomad
import (
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strings"
@ -11,7 +10,6 @@ import (
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@ -531,22 +529,22 @@ func TestServer_InvalidSchedulers(t *testing.T) {
t.Parallel()
require := require.New(t)
config := DefaultConfig()
config.DevMode = true
config.LogOutput = testlog.NewWriter(t)
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
logger := log.New(config.LogOutput, "", log.LstdFlags)
catalog := consul.NewMockCatalog(logger)
// Set the config to not have the core scheduler
config := DefaultConfig()
logger := testlog.Logger(t)
s := &Server{
config: config,
logger: logger,
}
config.EnabledSchedulers = []string{"batch"}
_, err := NewServer(config, catalog, logger)
err := s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "scheduler not enabled")
// Set the config to have an unknown scheduler
config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"}
_, err = NewServer(config, catalog, logger)
err = s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "foo")
}

View file

@ -14,6 +14,16 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventRegistered = "Node registered"
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventReregistered = "Node re-registered"
)
// IndexEntry is used with the "index" table
// for managing the latest Raft index affecting a table.
type IndexEntry struct {
@ -530,17 +540,23 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
// Retain node events that have already been set on the node
node.Events = exist.Events
// If we are transitioning from down, record the re-registration
if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown {
appendNodeEvents(index, node, []*structs.NodeEvent{
structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventReregistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))})
}
node.Drain = exist.Drain // Retain the drain mode
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
nodeEvent := &structs.NodeEvent{
Message: "Node Registered",
Subsystem: "Cluster",
Timestamp: time.Unix(node.StatusUpdatedAt, 0),
}
nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventRegistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))
node.Events = []*structs.NodeEvent{nodeEvent}
node.CreateIndex = index
node.ModifyIndex = index

View file

@ -551,54 +551,45 @@ func TestStateStore_DeploymentsByIDPrefix(t *testing.T) {
}
func TestStateStore_UpsertNode_Node(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
node := mock.Node()
// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
_, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
require.NoError(err)
err = state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
require.NoError(state.UpsertNode(1000, node))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)
out2, err := state.NodeBySecretID(ws, node.SecretID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(node, out2) {
t.Fatalf("bad: %#v %#v", node, out2)
}
if !reflect.DeepEqual(node, out) {
t.Fatalf("bad: %#v %#v", node, out)
}
require.NoError(err)
require.EqualValues(node, out)
require.EqualValues(node, out2)
require.Len(out.Events, 1)
require.Equal(NodeRegisterEventRegistered, out.Events[0].Message)
index, err := state.Index("nodes")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1000 {
t.Fatalf("bad: %d", index)
}
require.NoError(err)
require.EqualValues(1000, index)
require.False(watchFired(ws))
if watchFired(ws) {
t.Fatalf("bad")
}
// Transition the node to down and then up and ensure we get a re-register
// event
down := out.Copy()
down.Status = structs.NodeStatusDown
require.NoError(state.UpsertNode(1001, down))
require.NoError(state.UpsertNode(1002, out))
out, err = state.NodeByID(ws, node.ID)
require.NoError(err)
require.Len(out.Events, 2)
require.Equal(NodeRegisterEventReregistered, out.Events[1].Message)
}
func TestStateStore_DeleteNode_Node(t *testing.T) {
@ -794,7 +785,7 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) {
require.Equal(1, len(node.Events))
require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem)
require.Equal("Node Registered", node.Events[0].Message)
require.Equal(NodeRegisterEventRegistered, node.Events[0].Message)
// Create a watchset so we can test that AddNodeEvent fires the watch
ws := memdb.NewWatchSet()
@ -836,7 +827,7 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
}
require.Equal(1, len(node.Events))
require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem)
require.Equal("Node Registered", node.Events[0].Message)
require.Equal(NodeRegisterEventRegistered, node.Events[0].Message)
var out *structs.Node
for i := 1; i <= 20; i++ {

View file

@ -293,7 +293,8 @@ type WriteMeta struct {
// NodeRegisterRequest is used for Node.Register endpoint
// to register a node as being a schedulable entity.
type NodeRegisterRequest struct {
Node *Node
Node *Node
NodeEvent *NodeEvent
WriteRequest
}
@ -1258,6 +1259,12 @@ func (ne *NodeEvent) SetSubsystem(sys string) *NodeEvent {
return ne
}
// SetTimestamp is used to set the timestamp on the node event
func (ne *NodeEvent) SetTimestamp(ts time.Time) *NodeEvent {
ne.Timestamp = ts
return ne
}
// AddDetail is used to add a detail to the node event
func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent {
if ne.Details == nil {

View file

@ -257,7 +257,7 @@ $ curl \
{
"CreateIndex": 0,
"Details": null,
"Message": "Node Registered",
"Message": "Node registered",
"Subsystem": "Cluster",
"Timestamp": "2018-04-10T23:43:17Z"
}

View file

@ -111,7 +111,7 @@ rkt true true
Node Events
Time Subsystem Message
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected
2018-03-29T17:23:42Z Cluster Node Registered
2018-03-29T17:23:42Z Cluster Node registered
Allocated Resources
CPU Memory Disk IOPS
@ -154,7 +154,7 @@ rkt true true
Node Events
Time Subsystem Message
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected
2018-03-29T17:23:42Z Cluster Node Registered
2018-03-29T17:23:42Z Cluster Node registered
Allocated Resources
CPU Memory Disk IOPS
@ -219,7 +219,7 @@ rkt true true
Node Events
Time Subsystem Message
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected
2018-03-29T17:23:42Z Cluster Node Registered
2018-03-29T17:23:42Z Cluster Node registered
Allocated Resources
CPU Memory Disk IOPS
@ -300,7 +300,7 @@ rkt true true <none> 2018-03-29T17:23:42Z
Node Events
Time Subsystem Message Details
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected driver: docker,
2018-03-29T17:23:42Z Cluster Node Registered <none>
2018-03-29T17:23:42Z Cluster Node registered <none>
Allocated Resources
CPU Memory Disk IOPS