Merge pull request #2832 from hashicorp/node-id-integrity

Adds node ID integrity checking to the catalog and the LAN and WAN clusters.
This commit is contained in:
James Phillips 2017-03-27 09:01:36 -07:00 committed by GitHub
commit df406332f0
12 changed files with 271 additions and 94 deletions

View file

@ -48,12 +48,18 @@ func nextConfig() *Config {
idx := int(atomic.AddUint64(&offset, numPortsPerIndex))
conf := DefaultConfig()
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
conf.Version = version.Version
conf.VersionPrerelease = "c.d"
conf.AdvertiseAddr = "127.0.0.1"
conf.Bootstrap = true
conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx)
conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1"
conf.Ports.DNS = basePortNumber + idx + portOffsetDNS
conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP
@ -314,6 +320,7 @@ func TestAgent_ReconnectConfigSettings(t *testing.T) {
func TestAgent_NodeID(t *testing.T) {
c := nextConfig()
c.NodeID = ""
dir, agent := makeAgent(t, c)
defer os.RemoveAll(dir)
defer agent.Shutdown()

View file

@ -15,7 +15,9 @@ import (
"github.com/hashicorp/consul/command/agent"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/version"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/cli"
)
@ -112,9 +114,15 @@ func nextConfig() *agent.Config {
idx := int(atomic.AddUint64(&offset, 1))
conf := agent.DefaultConfig()
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
conf.Bootstrap = true
conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx)
conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1"
conf.Server = true

View file

@ -90,11 +90,13 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
}
}
_, err = c.srv.raftApply(structs.RegisterRequestType, args)
resp, err := c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}

View file

@ -32,6 +32,7 @@ func TestCatalog_Register(t *testing.T) {
Port: 8000,
},
Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
}
@ -61,6 +62,7 @@ func TestCatalog_Register_NodeID(t *testing.T) {
Port: 8000,
},
Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
}

View file

@ -156,7 +156,11 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}

View file

@ -848,87 +848,6 @@ func TestLeader_RollRaftServer(t *testing.T) {
}
}
func TestLeader_ChangeServerAddress(t *testing.T) {
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal("should have 3 peers")
}
}
// Shut down a server, freeing up its address/port
s3.Shutdown()
if err := testutil.WaitForResult(func() (bool, error) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
return alive == 2, nil
}); err != nil {
t.Fatal("should have 2 alive members")
}
// Bring up a new server with s3's address that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
c.NodeID = s3.config.NodeID
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[2] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal("should have 3 members")
}
}
}
func TestLeader_ChangeServerID(t *testing.T) {
conf := func(c *Config) {
c.Bootstrap = false

View file

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
)
@ -11,11 +12,33 @@ import (
// ring. We check that the peers are in the same datacenter and abort the
// merge if there is a mis-match.
type lanMergeDelegate struct {
dc string
dc string
nodeID types.NodeID
nodeName string
}
func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
nodeMap := make(map[types.NodeID]string)
for _, m := range members {
if rawID, ok := m.Tags["id"]; ok && rawID != "" {
nodeID := types.NodeID(rawID)
// See if there's another node that conflicts with us.
if (nodeID == md.nodeID) && (m.Name != md.nodeName) {
return fmt.Errorf("Member '%s' has conflicting node ID '%s' with this agent's ID",
m.Name, nodeID)
}
// See if there are any two nodes that conflict with each
// other. This lets us only do joins into a hygienic
// cluster now that node IDs are critical for operation.
if other, ok := nodeMap[nodeID]; ok {
return fmt.Errorf("Member '%s' has conflicting node ID '%s' with member '%s'",
m.Name, nodeID, other)
}
nodeMap[nodeID] = m.Name
}
ok, dc := isConsulNode(*m)
if ok {
if dc != md.dc {

160
consul/merge_test.go Normal file
View file

@ -0,0 +1,160 @@
package consul
import (
"strings"
"testing"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
)
func makeNode(dc, name, id string, server bool) *serf.Member {
var role string
if server {
role = "consul"
} else {
role = "node"
}
return &serf.Member{
Name: name,
Tags: map[string]string{
"role": role,
"dc": dc,
"id": id,
"port": "8300",
"build": "0.7.5",
"vsn": "2",
"vsn_max": "3",
"vsn_min": "2",
},
}
}
func TestMerge_LAN(t *testing.T) {
cases := []struct {
members []*serf.Member
expect string
}{
// Client in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false),
},
expect: "wrong datacenter",
},
// Server in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
true),
},
expect: "wrong datacenter",
},
// Node ID conflict with delegate's ID.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"ee954a2f-80de-4b34-8780-97b942a50a99",
true),
},
expect: "with this agent's ID",
},
// Cluster with existing conflicting node IDs.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc1",
"node2",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
},
expect: "with member",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc1",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true),
},
expect: "",
},
}
delegate := &lanMergeDelegate{
dc: "dc1",
nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"),
nodeName: "node0",
}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
}
func TestMerge_WAN(t *testing.T) {
cases := []struct {
members []*serf.Member
expect string
}{
// Not a server
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false),
},
expect: "not a server",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc3",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true),
},
expect: "",
},
}
delegate := &wanMergeDelegate{}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
}

View file

@ -396,7 +396,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{dc: s.config.Datacenter}
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
}
}
// Until Consul supports this fully, we disable automatic resolution.

View file

@ -165,22 +165,44 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
// registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn.
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
// Check for an existing node
existing, err := tx.First("nodes", "id", node.Node)
if err != nil {
return fmt.Errorf("node name lookup failed: %s", err)
// See if there's an existing node with this UUID, and make sure the
// name is the same.
var n *structs.Node
if node.ID != "" {
existing, err := tx.First("nodes", "uuid", string(node.ID))
if err != nil {
return fmt.Errorf("node lookup failed: %s", err)
}
if existing != nil {
n = existing.(*structs.Node)
if n.Node != node.Node {
return fmt.Errorf("node ID %q for node %q aliases existing node %q",
node.ID, node.Node, n.Node)
}
}
}
// Get the indexes
if existing != nil {
node.CreateIndex = existing.(*structs.Node).CreateIndex
// Check for an existing node by name to support nodes with no IDs.
if n == nil {
existing, err := tx.First("nodes", "id", node.Node)
if err != nil {
return fmt.Errorf("node name lookup failed: %s", err)
}
if existing != nil {
n = existing.(*structs.Node)
}
}
// Get the indexes.
if n != nil {
node.CreateIndex = n.CreateIndex
node.ModifyIndex = idx
} else {
node.CreateIndex = idx
node.ModifyIndex = idx
}
// Insert the node and update the index
// Insert the node and update the index.
if err := tx.Insert("nodes", node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
@ -419,6 +420,23 @@ func TestStateStore_EnsureNode(t *testing.T) {
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Add an ID to the node
in.ID = types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c")
if err := s.EnsureNode(4, in); err != nil {
t.Fatalf("err: %v", err)
}
// Now try to add another node with the same ID
in = &structs.Node{
Node: "nope",
ID: types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c"),
Address: "1.2.3.4",
}
err = s.EnsureNode(5, in)
if err == nil || !strings.Contains(err.Error(), "aliases existing node") {
t.Fatalf("err: %v", err)
}
}
func TestStateStore_GetNodes(t *testing.T) {

View file

@ -25,6 +25,7 @@ import (
"strings"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-uuid"
"github.com/pkg/errors"
)
@ -55,6 +56,7 @@ type TestAddressConfig struct {
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
NodeName string `json:"node_name"`
NodeID string `json:"node_id"`
NodeMeta map[string]string `json:"node_meta,omitempty"`
Performance *TestPerformanceConfig `json:"performance,omitempty"`
Bootstrap bool `json:"bootstrap,omitempty"`
@ -83,8 +85,14 @@ type ServerConfigCallback func(c *TestServerConfig)
// defaultServerConfig returns a new TestServerConfig struct
// with all of the listen ports incremented by one.
func defaultServerConfig() *TestServerConfig {
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
return &TestServerConfig{
NodeName: fmt.Sprintf("node%d", randomPort()),
NodeID: nodeID,
DisableCheckpoint: true,
Performance: &TestPerformanceConfig{
RaftMultiplier: 1,