From f9588b8d7f419899ad17a0759c7e129b1d4753f1 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 22 Feb 2017 12:53:32 -0800 Subject: [PATCH] Add raft version 2/3 compatibility --- command/agent/agent.go | 7 ++ command/agent/config.go | 22 +++++++ command/agent/config_test.go | 18 +++++ consul/agent/server.go | 24 +++++-- consul/agent/server_test.go | 8 +++ consul/config.go | 8 +-- consul/leader.go | 68 +++++++++++++------ consul/leader_test.go | 124 ++++++++++++++++++++++++++++++----- consul/serf.go | 13 +++- consul/server.go | 7 +- consul/server_test.go | 7 ++ consul/util.go | 24 +++++++ 12 files changed, 279 insertions(+), 51 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 4782f43b3..58bbef6cf 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -28,6 +28,7 @@ import ( "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "github.com/shirou/gopsutil/host" + "github.com/hashicorp/raft" ) const ( @@ -412,6 +413,12 @@ func (a *Agent) consulConfig() *consul.Config { if a.config.SessionTTLMinRaw != "" { base.SessionTTLMin = a.config.SessionTTLMin } + if a.config.Autopilot.RaftProtocolVersion != 0 { + base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.Autopilot.RaftProtocolVersion) + } + if a.config.Autopilot.DeadServerCleanup != nil { + base.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup + } // Format the build string revision := a.config.Revision diff --git a/command/agent/config.go b/command/agent/config.go index ac72e023d..1f7793146 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -262,6 +262,16 @@ type Telemetry struct { CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"` } +// Autopilot is used to configure helpful features for operating Consul servers. +type Autopilot struct { + // RaftProtocolVersion sets the Raft protocol version to use on this server. + RaftProtocolVersion int `mapstructure:"raft_protocol"` + + // DeadServerCleanup enables the automatic cleanup of dead servers when new ones + // are added to the peer list. Defaults to true. + DeadServerCleanup *bool `mapstructure:"dead_server_cleanup"` +} + // Config is the configuration that can be set for an Agent. // Some of this is configurable as CLI flags, but most must // be set using a configuration file. @@ -387,6 +397,9 @@ type Config struct { // servers. This can be changed on reload. SkipLeaveOnInt *bool `mapstructure:"skip_leave_on_interrupt"` + // Autopilot is used to configure helpful features for operating Consul servers. + Autopilot Autopilot `mapstructure:"autopilot"` + Telemetry Telemetry `mapstructure:"telemetry"` // Protocol is the Consul protocol version to use. @@ -759,6 +772,9 @@ func DefaultConfig() *Config { CheckReapInterval: 30 * time.Second, AEInterval: time.Minute, DisableCoordinates: false, + Autopilot: Autopilot{ + DeadServerCleanup: Bool(true), + }, // SyncCoordinateRateTarget is set based on the rate that we want // the server to handle as an aggregate across the entire cluster. @@ -1331,6 +1347,12 @@ func MergeConfig(a, b *Config) *Config { if b.SkipLeaveOnInt != nil { result.SkipLeaveOnInt = b.SkipLeaveOnInt } + if b.Autopilot.RaftProtocolVersion != 0 { + result.Autopilot.RaftProtocolVersion = b.Autopilot.RaftProtocolVersion + } + if b.Autopilot.DeadServerCleanup != nil { + result.Autopilot.DeadServerCleanup = b.Autopilot.DeadServerCleanup + } if b.Telemetry.DisableHostname == true { result.Telemetry.DisableHostname = true } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 4c9addc48..1510c3fdd 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -1098,6 +1098,20 @@ func TestDecodeConfig_Performance(t *testing.T) { } } +func TestDecodeConfig_Autopilot(t *testing.T) { + input := `{"autopilot": { "raft_protocol": 3, "dead_server_cleanup": true }}` + config, err := DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + if config.Autopilot.RaftProtocolVersion != 3 { + t.Fatalf("bad: raft_protocol isn't set: %#v", config) + } + if config.Autopilot.DeadServerCleanup == nil || !*config.Autopilot.DeadServerCleanup { + t.Fatalf("bad: dead_server_cleanup isn't set: %#v", config) + } +} + func TestDecodeConfig_Services(t *testing.T) { input := `{ "services": [ @@ -1619,6 +1633,10 @@ func TestMergeConfig(t *testing.T) { Server: true, LeaveOnTerm: Bool(true), SkipLeaveOnInt: Bool(true), + Autopilot: Autopilot{ + RaftProtocolVersion: 3, + DeadServerCleanup: Bool(true), + }, EnableDebug: true, VerifyIncoming: true, VerifyOutgoing: true, diff --git a/consul/agent/server.go b/consul/agent/server.go index 8ba1f0c5d..bd6c6c7cf 100644 --- a/consul/agent/server.go +++ b/consul/agent/server.go @@ -25,13 +25,15 @@ func (k *Key) Equal(x *Key) bool { // Server is used to return details of a consul server type Server struct { - Name string - Datacenter string - Port int - Bootstrap bool - Expect int - Version int - Addr net.Addr + Name string + ID string + Datacenter string + Port int + Bootstrap bool + Expect int + Version int + RaftVersion int + Addr net.Addr } // Key returns the corresponding Key @@ -84,16 +86,24 @@ func IsConsulServer(m serf.Member) (bool, *Server) { return false, nil } + raft_vsn_str := m.Tags["raft_vsn"] + raft_vsn, err := strconv.Atoi(raft_vsn_str) + if err != nil { + return false, nil + } + addr := &net.TCPAddr{IP: m.Addr, Port: port} parts := &Server{ Name: m.Name, + ID: m.Tags["id"], Datacenter: datacenter, Port: port, Bootstrap: bootstrap, Expect: expect, Addr: addr, Version: vsn, + RaftVersion: raft_vsn, } return true, parts } diff --git a/consul/agent/server_test.go b/consul/agent/server_test.go index 0dda84f85..adf762239 100644 --- a/consul/agent/server_test.go +++ b/consul/agent/server_test.go @@ -56,9 +56,11 @@ func TestIsConsulServer(t *testing.T) { Addr: net.IP([]byte{127, 0, 0, 1}), Tags: map[string]string{ "role": "consul", + "id": "asdf", "dc": "east-aws", "port": "10000", "vsn": "1", + "raft_vsn": "3", }, } ok, parts := agent.IsConsulServer(m) @@ -68,12 +70,18 @@ func TestIsConsulServer(t *testing.T) { if parts.Name != "foo" { t.Fatalf("bad: %v", parts) } + if parts.ID != "asdf" { + t.Fatalf("bad: %v", parts.ID) + } if parts.Bootstrap { t.Fatalf("unexpected bootstrap") } if parts.Expect != 0 { t.Fatalf("bad: %v", parts.Expect) } + if parts.RaftVersion != 3 { + t.Fatalf("bad: %v", parts.RaftVersion) + } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" ok, parts = agent.IsConsulServer(m) diff --git a/consul/config.go b/consul/config.go index e07500f5c..01d80711e 100644 --- a/consul/config.go +++ b/consul/config.go @@ -275,9 +275,9 @@ type Config struct { // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration - // AutopilotServerCleanup controls whether to remove dead servers when a new + // DeadServerCleanup controls whether to remove dead servers when a new // server is added to the Raft peers - AutopilotServerCleanup bool + DeadServerCleanup bool } // CheckVersion is used to check if the ProtocolVersion is valid @@ -351,7 +351,7 @@ func DefaultConfig() *Config { TLSMinVersion: "tls10", - AutopilotServerCleanup: true, + DeadServerCleanup: true, } // Increase our reap interval to 3 days instead of 24h. @@ -368,7 +368,7 @@ func DefaultConfig() *Config { // Enable interoperability with unversioned Raft library, and don't // start using new ID-based features yet. - conf.RaftConfig.ProtocolVersion = 1 + conf.RaftConfig.ProtocolVersion = 2 conf.ScaleRaft(DefaultRaftMultiplier) // Disable shutdown on removal diff --git a/consul/leader.go b/consul/leader.go index fc0a32bf2..8197a5e0f 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -562,21 +562,32 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } // Attempt to add as a peer - addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) - if err := addFuture.Error(); err != nil { - s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + if err != nil { return err } + if minRaftProtocol >= 2 && parts.RaftVersion >= 3 { + addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) + if err := addFuture.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + return err + } + } else { + addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) + if err := addFuture.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + return err + } + } + // Look for dead servers to clean up - if s.config.AutopilotServerCleanup { + if s.config.DeadServerCleanup { for _, member := range s.serfLAN.Members() { valid, _ := agent.IsConsulServer(member) if valid && member.Name != m.Name && member.Status == serf.StatusFailed { - if err := s.handleDeregisterMember("Removing failed server", member); err != nil { - return fmt.Errorf("[ERROR] consul: Couldn't deregister failed server (%s): %v", member.Name, err) - } - s.logger.Printf("[INFO] consul: Removed failed server: %v", member.Name) + s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name) + go s.serfLAN.RemoveFailedNode(member.Name) } } } @@ -597,21 +608,38 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) return err } - for _, server := range configFuture.Configuration().Servers { - if server.Address == raft.ServerAddress(addr) { - goto REMOVE - } - } - return nil -REMOVE: - // Attempt to remove as a peer. - future := s.raft.RemovePeer(raft.ServerAddress(addr)) - if err := future.Error(); err != nil { - s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", - addr, err) + minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + if err != nil { return err } + + _, parts := agent.IsConsulServer(m) + + // Pick which remove API to use based on how the server was added. + for _, server := range configFuture.Configuration().Servers { + // If we understand the new add/remove APIs and the server was added by ID, use the new remove API + if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) { + s.logger.Printf("[INFO] consul: removing server via new api, %q %q", server.ID, server.Address) + future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0) + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", + addr, err) + return err + } + break + } else if server.Address == raft.ServerAddress(addr) { + // If not, use the old remove API + future := s.raft.RemovePeer(raft.ServerAddress(addr)) + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", + addr, err) + return err + } + break + } + } + return nil } diff --git a/consul/leader_test.go b/consul/leader_test.go index b85acee05..053752122 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -624,15 +624,15 @@ func TestLeader_ReapTombstones(t *testing.T) { } func TestLeader_DeadServerCleanup(t *testing.T) { - dir1, s1 := testServerDCExpect(t, "dc1", 3) + dir1, s1 := testServerDCBootstrap(t, "dc1", true) defer os.RemoveAll(dir1) defer s1.Shutdown() - dir2, s2 := testServerDCExpect(t, "dc1", 3) + dir2, s2 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir2) defer s2.Shutdown() - dir3, s3 := testServerDCExpect(t, "dc1", 3) + dir3, s3 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir3) defer s3.Shutdown() @@ -657,31 +657,32 @@ func TestLeader_DeadServerCleanup(t *testing.T) { }) } - // Kill a non-leader server (s2 or s3, so s4 can still join s1) - var nonLeader *Server - var removedIndex int - for i, s := range servers { - if !s.IsLeader() && i > 0 { - nonLeader = s - removedIndex = i - break - } - } - nonLeader.Shutdown() + // Kill a non-leader server + s2.Shutdown() - time.Sleep(1 * time.Second) + testutil.WaitForResult(func() (bool, error) { + alive := 0 + for _, m := range s1.LANMembers() { + if m.Status == serf.StatusAlive { + alive++ + } + } + return alive == 2, nil + }, func(err error) { + t.Fatalf("should have 2 alive members") + }) // Bring up and join a new server - dir4, s4 := testServerDCExpect(t, "dc1", 3) + dir4, s4 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir4) defer s4.Shutdown() if _, err := s4.JoinLAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } + servers[1] = s4 // Make sure the dead server is removed and we're back to 3 total peers - servers[removedIndex] = s4 for _, s := range servers { testutil.WaitForResult(func() (bool, error) { peers, _ := s.numPeers() @@ -691,3 +692,92 @@ func TestLeader_DeadServerCleanup(t *testing.T) { }) } } + +func TestLeader_RollRaftServer(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = true + c.Datacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + c.Datacenter = "dc1" + c.RaftConfig.ProtocolVersion = 1 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + // Kill the v1 server + s2.Shutdown() + + for _, s := range []*Server{s1, s3} { + testutil.WaitForResult(func() (bool, error) { + minVer, err := ServerMinRaftProtocol(s.LANMembers()) + return minVer == 2, err + }, func(err error) { + t.Fatalf("minimum protocol version among servers should be 2") + }) + } + + // Replace the dead server with one running raft protocol v3 + dir4, s4 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + c.Datacenter = "dc1" + c.RaftConfig.ProtocolVersion = 3 + }) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + if _, err := s4.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + servers[1] = s4 + + // Make sure the dead server is removed and we're back to 3 total peers + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + addrs := 0 + ids := 0 + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + return false, err + } + for _, server := range future.Configuration().Servers { + if string(server.ID) == string(server.Address) { + addrs++ + } else { + ids++ + } + } + return addrs == 2 && ids == 1, nil + }, func(err error) { + t.Fatalf("should see 2 legacy IDs and 1 GUID") + }) + } +} \ No newline at end of file diff --git a/consul/serf.go b/consul/serf.go index 6306bd405..a9175c04f 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -280,11 +280,22 @@ func (s *Server) maybeBootstrap() { // Attempt a live bootstrap! var configuration raft.Configuration var addrs []string + minRaftVersion, err := ServerMinRaftProtocol(members) + if err != nil { + s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err) + } + for _, server := range servers { addr := server.Addr.String() addrs = append(addrs, addr) + var id raft.ServerID + if server.ID != "" && minRaftVersion >= 3 { + id = raft.ServerID(server.ID) + } else { + id = raft.ServerID(addr) + } peer := raft.Server{ - ID: raft.ServerID(addr), + ID: id, Address: raft.ServerAddress(addr), } configuration.Servers = append(configuration.Servers, peer) diff --git a/consul/server.go b/consul/server.go index d58d0d4a4..410910bd5 100644 --- a/consul/server.go +++ b/consul/server.go @@ -379,9 +379,12 @@ func (s *Server) setupRaft() error { // Make sure we set the LogOutput. s.config.RaftConfig.LogOutput = s.config.LogOutput - // Our version of Raft protocol requires the LocalID to match the network + // Versions of the Raft protocol below 3 require the LocalID to match the network // address of the transport. s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr()) + if s.config.RaftConfig.ProtocolVersion >= 3 { + s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID) + } // Build an all in-memory setup for dev mode, otherwise prepare a full // disk-based setup. @@ -479,7 +482,7 @@ func (s *Server) setupRaft() error { configuration := raft.Configuration{ Servers: []raft.Server{ raft.Server{ - ID: raft.ServerID(trans.LocalAddr()), + ID: s.config.RaftConfig.LocalID, Address: trans.LocalAddr(), }, }, diff --git a/consul/server_test.go b/consul/server_test.go index a3805dfff..72bf3bf6f 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -12,6 +12,8 @@ import ( "time" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/types" ) var nextPort int32 = 15000 @@ -46,6 +48,11 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) { IP: []byte{127, 0, 0, 1}, Port: getPort(), } + nodeID, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + config.NodeID = types.NodeID(nodeID) config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1" config.SerfLANConfig.MemberlistConfig.BindPort = getPort() config.SerfLANConfig.MemberlistConfig.SuspicionMult = 2 diff --git a/consul/util.go b/consul/util.go index b0000a199..0f5f33b21 100644 --- a/consul/util.go +++ b/consul/util.go @@ -91,6 +91,30 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e return (numServers > 0) && (numWhoGrok == numServers), nil } +// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers +func ServerMinRaftProtocol(members []serf.Member) (int, error) { + minVersion := -1 + for _, m := range members { + if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive { + continue + } + + vsn, ok := m.Tags["raft_vsn"] + if !ok { + vsn = "1" + } + raftVsn, err := strconv.Atoi(vsn) + if err != nil { + return -1, err + } + + if minVersion == -1 || raftVsn < minVersion { + minVersion = raftVsn + } + } + return minVersion, nil +} + // Returns if a member is a consul node. Returns a bool, // and the datacenter. func isConsulNode(m serf.Member) (bool, string) {