Nodes generate Secret ID and used for retrieving allocations and registering

This commit is contained in:
Alex Dadgar 2016-08-15 23:11:57 -07:00
parent dfe0a95b63
commit 895c31f605
6 changed files with 230 additions and 47 deletions

View File

@ -486,33 +486,45 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}
// nodeID restores a persistent unique ID or generates a new one
func (c *Client) nodeID() (string, error) {
// nodeIDs restores the nodes persistent unique ID and SecretID or generates new
// ones
func (c *Client) nodeID() (id string, secret string, err error) {
// Do not persist in dev mode
if c.config.DevMode {
return structs.GenerateUUID(), nil
return structs.GenerateUUID(), structs.GenerateUUID(), nil
}
// Attempt to read existing ID
path := filepath.Join(c.config.StateDir, "client-id")
buf, err := ioutil.ReadFile(path)
idPath := filepath.Join(c.config.StateDir, "client-id")
idBuf, err := ioutil.ReadFile(idPath)
if err != nil && !os.IsNotExist(err) {
return "", err
return "", "", err
}
// Attempt to read existing secret ID
secretPath := filepath.Join(c.config.StateDir, "secret-id")
secretBuf, err := ioutil.ReadFile(secretPath)
if err != nil && !os.IsNotExist(err) {
return "", "", err
}
// Use existing ID if any
if len(buf) != 0 {
return string(buf), nil
if len(idBuf) != 0 && len(secretBuf) != 0 {
return string(idBuf), string(secretBuf), nil
}
// Generate new ID
id := structs.GenerateUUID()
id = structs.GenerateUUID()
secret = structs.GenerateUUID()
// Persist the ID
if err := ioutil.WriteFile(path, []byte(id), 0700); err != nil {
return "", err
// Persist the IDs
if err := ioutil.WriteFile(idPath, []byte(id), 0700); err != nil {
return "", "", err
}
return id, nil
if err := ioutil.WriteFile(secretPath, []byte(secret), 0700); err != nil {
return "", "", err
}
return id, secret, nil
}
// setupNode is used to setup the initial node
@ -523,11 +535,13 @@ func (c *Client) setupNode() error {
c.config.Node = node
}
// Generate an iD for the node
var err error
node.ID, err = c.nodeID()
id, secretID, err := c.nodeID()
if err != nil {
return fmt.Errorf("node ID setup failed: %v", err)
}
node.ID = id
node.SecretID = secretID
if node.Attributes == nil {
node.Attributes = make(map[string]string)
}
@ -827,6 +841,8 @@ func (c *Client) retryRegisterNode() {
for {
if err := c.registerNode(); err == nil {
break
} else {
c.logger.Printf("[ERR] client: %v", err)
}
select {
case <-time.After(c.retryIntv(registerRetryIntv)):
@ -992,8 +1008,10 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
// The request and response for getting the map of allocations that should
// be running on the Node to their AllocModifyIndex which is incremented
// when the allocation is updated by the servers.
n := c.Node()
req := structs.NodeSpecificRequest{
NodeID: c.Node().ID,
NodeID: n.ID,
SecretID: n.SecretID,
QueryOptions: structs.QueryOptions{
Region: c.Region(),
AllowStale: true,
@ -1417,10 +1435,7 @@ func (c *Client) collectHostStats() {
// emitStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitStats(hStats *stats.HostStats) {
nodeID, err := c.nodeID()
if err != nil {
return
}
nodeID := c.Node().ID
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used))

View File

@ -352,16 +352,26 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
})
defer c1.Shutdown()
// Wait til the node is ready
waitTilNodeReady(c1, t)
job := mock.Job()
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
alloc.Job = job
alloc.JobID = job.ID
originalStatus := "foo"
alloc.ClientStatus = originalStatus
// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)); err != nil {
if err := state.UpsertJob(0, job); err != nil {
t.Fatal(err)
}
state.UpsertAllocs(100, []*structs.Allocation{alloc})
if err := state.UpsertJobSummary(100, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
state.UpsertAllocs(101, []*structs.Allocation{alloc})
testutil.WaitForResult(func() (bool, error) {
out, err := state.AllocByID(alloc.ID)
@ -391,21 +401,29 @@ func TestClient_WatchAllocs(t *testing.T) {
})
defer c1.Shutdown()
// Wait til the node is ready
waitTilNodeReady(c1, t)
// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.Job = job
alloc1.NodeID = c1.Node().ID
alloc2 := mock.Alloc()
alloc2.NodeID = c1.Node().ID
alloc2.JobID = job.ID
alloc2.Job = job
// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)); err != nil {
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)); err != nil {
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(102, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -421,7 +439,7 @@ func TestClient_WatchAllocs(t *testing.T) {
})
// Delete one allocation
err = state.DeleteEval(101, nil, []string{alloc1.ID})
err = state.DeleteEval(103, nil, []string{alloc1.ID})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -432,8 +450,7 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(102,
[]*structs.Allocation{alloc2_2})
err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -459,6 +476,18 @@ func TestClient_WatchAllocs(t *testing.T) {
})
}
func waitTilNodeReady(client *Client, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
n := client.Node()
if n.Status != structs.NodeStatusReady {
return false, fmt.Errorf("node not registered")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_SaveRestoreState(t *testing.T) {
ctestutil.ExecCompatible(t)
s1, _ := testServer(t, nil)
@ -471,18 +500,27 @@ func TestClient_SaveRestoreState(t *testing.T) {
})
defer c1.Shutdown()
// Wait til the node is ready
waitTilNodeReady(c1, t)
// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.NodeID = c1.Node().ID
alloc1.Job = job
alloc1.JobID = job.ID
task := alloc1.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Config["args"] = []string{"100"}
state := s1.State()
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)); err != nil {
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}); err != nil {
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(102, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}
@ -491,7 +529,13 @@ func TestClient_SaveRestoreState(t *testing.T) {
c1.allocLock.RLock()
ar := c1.allocs[alloc1.ID]
c1.allocLock.RUnlock()
return ar != nil && ar.Alloc().ClientStatus == structs.AllocClientStatusRunning, nil
if ar == nil {
return false, fmt.Errorf("nil alloc runner")
}
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

View File

@ -9,13 +9,14 @@ import (
func Node() *structs.Node {
node := &structs.Node{
ID: structs.GenerateUUID(),
SecretID: structs.GenerateUUID(),
Datacenter: "dc1",
Name: "foobar",
Attributes: map[string]string{
"kernel.name": "linux",
"arch": "x86",
"version": "0.1.0",
"driver.exec": "1",
"kernel.name": "linux",
"arch": "x86",
"nomad.version": "0.5.0",
"driver.exec": "1",
},
Resources: &structs.Resources{
CPU: 4000,

View File

@ -2,6 +2,7 @@ package nomad
import (
"fmt"
"strings"
"sync"
"time"
@ -57,6 +58,18 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
if args.Node.Name == "" {
return fmt.Errorf("missing node name for client registration")
}
if len(args.Node.Attributes) == 0 {
return fmt.Errorf("missing attributes for client registration")
}
if args.Node.SecretID == "" {
// COMPAT: Remove after 0.6
// Need to check if this node is <0.4.x since SecretID is new in 0.5
if pre, err := nodePreSecretID(args.Node); err != nil {
return err
} else if !pre {
return fmt.Errorf("missing node secret ID for client registration")
}
}
// Default the status if none is given
if args.Node.Status == "" {
@ -135,6 +148,22 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return nil
}
// nodePreSecretID is a helper that returns whether the node is on a version
// that is before SecretIDs were introduced
func nodePreSecretID(node *structs.Node) (bool, error) {
a := node.Attributes
if a == nil {
return false, fmt.Errorf("node doesn't have attributes set")
}
v, ok := a["nomad.version"]
if !ok {
return false, fmt.Errorf("missing Nomad version in attributes")
}
return !strings.HasPrefix(v, "0.5"), nil
}
// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading.
func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
reply.LeaderRPCAddr = n.srv.raft.Leader()
@ -217,7 +246,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
return fmt.Errorf("missing node ID for client status update")
}
if !structs.ValidNodeStatus(args.Status) {
return fmt.Errorf("invalid status for node")
@ -236,6 +265,9 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
return fmt.Errorf("node not found")
}
// XXX: Could use the SecretID here but have to update the heartbeat system
// to track SecretIDs.
// Update the timestamp of when the node status was updated
node.StatusUpdatedAt = time.Now().Unix()
@ -423,8 +455,10 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
}
// Setup the output
reply.Node = out
if out != nil {
// Clear the secret ID
reply.Node = out.Copy()
reply.Node.SecretID = ""
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
@ -432,6 +466,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
if err != nil {
return err
}
reply.Node = nil
reply.Index = index
}
@ -524,11 +559,34 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
if err != nil {
return err
}
allocs, err := snap.AllocsByNode(args.NodeID)
// Look for the node
node, err := snap.NodeByID(args.NodeID)
if err != nil {
return err
}
var allocs []*structs.Allocation
if node != nil {
// COMPAT: Remove in 0.6
// Check if the node should have a SecretID set
if args.SecretID == "" {
if pre, err := nodePreSecretID(node); err != nil {
return err
} else if !pre {
return fmt.Errorf("missing node secret ID for client status update")
}
} else if args.SecretID != node.SecretID {
return fmt.Errorf("node secret ID does not match")
}
var err error
allocs, err = snap.AllocsByNode(args.NodeID)
if err != nil {
return err
}
}
reply.Allocs = make(map[string]uint64)
// Setup the output
if len(allocs) != 0 {

View File

@ -3,6 +3,7 @@ package nomad
import (
"fmt"
"reflect"
"strings"
"testing"
"time"
@ -51,6 +52,53 @@ func TestClientEndpoint_Register(t *testing.T) {
}
}
func TestClientEndpoint_Register_NoSecret(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
node := mock.Node()
node.SecretID = ""
req := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), "secret") {
t.Fatalf("Expecting error regarding missing secret id: %v", err)
}
// Update the node to be pre-0.5
node.Attributes["nomad.version"] = "0.4.1"
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp); err != nil {
t.Fatalf("Expecting error regarding missing secret id", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected node")
}
if out.CreateIndex != resp.Index {
t.Fatalf("index mis-match")
}
if out.ComputedClass == "" {
t.Fatal("ComputedClass not set")
}
}
func TestClientEndpoint_Deregister(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -609,6 +657,7 @@ func TestClientEndpoint_GetNode(t *testing.T) {
// Update the status updated at value
node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt
node.SecretID = ""
if !reflect.DeepEqual(node, resp2.Node) {
t.Fatalf("bad: %#v \n %#v", node, resp2.Node)
}
@ -822,6 +871,7 @@ func TestClientEndpoint_GetClientAllocs(t *testing.T) {
// Lookup the allocs
get := &structs.NodeSpecificRequest{
NodeID: node.ID,
SecretID: node.SecretID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp2 structs.NodeClientAllocsResponse
@ -836,16 +886,24 @@ func TestClientEndpoint_GetClientAllocs(t *testing.T) {
t.Fatalf("bad: %#v", resp2.Allocs)
}
// Lookup non-existing node
get.NodeID = "foobarbaz"
// Lookup node with bad SecretID
get.SecretID = "foobarbaz"
var resp3 structs.NodeClientAllocsResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp3); err != nil {
err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp3)
if err == nil || !strings.Contains(err.Error(), "does not match") {
t.Fatalf("err: %v", err)
}
if resp3.Index != 100 {
// Lookup non-existing node
get.NodeID = structs.GenerateUUID()
var resp4 structs.NodeClientAllocsResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp4); err != nil {
t.Fatalf("err: %v", err)
}
if resp4.Index != 100 {
t.Fatalf("Bad index: %d %d", resp3.Index, 100)
}
if len(resp3.Allocs) != 0 {
if len(resp4.Allocs) != 0 {
t.Fatalf("unexpected node %#v", resp3.Allocs)
}
}
@ -886,7 +944,8 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
// Lookup the allocs in a blocking query
req := &structs.NodeSpecificRequest{
NodeID: node.ID,
NodeID: node.ID,
SecretID: node.SecretID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,

View File

@ -210,7 +210,8 @@ type NodeEvaluateRequest struct {
// NodeSpecificRequest is used when we just need to specify a target node
type NodeSpecificRequest struct {
NodeID string
NodeID string
SecretID string
QueryOptions
}
@ -593,6 +594,11 @@ type Node struct {
// approach. Alternatively a UUID may be used.
ID string
// SecretID is an ID that is only known by the Node and the set of Servers.
// It is not accessible via the API and is used to authenticate nodes
// conducting priviledged activities.
SecretID string
// Datacenter for this node
Datacenter string