Add raft version 2/3 compatibility

This commit is contained in:
Kyle Havlovitz 2017-02-22 12:53:32 -08:00
parent 2c9001a389
commit f9588b8d7f
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
12 changed files with 279 additions and 51 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/host"
"github.com/hashicorp/raft"
) )
const ( const (
@ -412,6 +413,12 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.SessionTTLMinRaw != "" { if a.config.SessionTTLMinRaw != "" {
base.SessionTTLMin = a.config.SessionTTLMin base.SessionTTLMin = a.config.SessionTTLMin
} }
if a.config.Autopilot.RaftProtocolVersion != 0 {
base.RaftConfig.ProtocolVersion = raft.ProtocolVersion(a.config.Autopilot.RaftProtocolVersion)
}
if a.config.Autopilot.DeadServerCleanup != nil {
base.DeadServerCleanup = *a.config.Autopilot.DeadServerCleanup
}
// Format the build string // Format the build string
revision := a.config.Revision revision := a.config.Revision

View File

@ -262,6 +262,16 @@ type Telemetry struct {
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"` CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
} }
// Autopilot is used to configure helpful features for operating Consul servers.
type Autopilot struct {
// RaftProtocolVersion sets the Raft protocol version to use on this server.
RaftProtocolVersion int `mapstructure:"raft_protocol"`
// DeadServerCleanup enables the automatic cleanup of dead servers when new ones
// are added to the peer list. Defaults to true.
DeadServerCleanup *bool `mapstructure:"dead_server_cleanup"`
}
// Config is the configuration that can be set for an Agent. // Config is the configuration that can be set for an Agent.
// Some of this is configurable as CLI flags, but most must // Some of this is configurable as CLI flags, but most must
// be set using a configuration file. // be set using a configuration file.
@ -387,6 +397,9 @@ type Config struct {
// servers. This can be changed on reload. // servers. This can be changed on reload.
SkipLeaveOnInt *bool `mapstructure:"skip_leave_on_interrupt"` SkipLeaveOnInt *bool `mapstructure:"skip_leave_on_interrupt"`
// Autopilot is used to configure helpful features for operating Consul servers.
Autopilot Autopilot `mapstructure:"autopilot"`
Telemetry Telemetry `mapstructure:"telemetry"` Telemetry Telemetry `mapstructure:"telemetry"`
// Protocol is the Consul protocol version to use. // Protocol is the Consul protocol version to use.
@ -759,6 +772,9 @@ func DefaultConfig() *Config {
CheckReapInterval: 30 * time.Second, CheckReapInterval: 30 * time.Second,
AEInterval: time.Minute, AEInterval: time.Minute,
DisableCoordinates: false, DisableCoordinates: false,
Autopilot: Autopilot{
DeadServerCleanup: Bool(true),
},
// SyncCoordinateRateTarget is set based on the rate that we want // SyncCoordinateRateTarget is set based on the rate that we want
// the server to handle as an aggregate across the entire cluster. // the server to handle as an aggregate across the entire cluster.
@ -1331,6 +1347,12 @@ func MergeConfig(a, b *Config) *Config {
if b.SkipLeaveOnInt != nil { if b.SkipLeaveOnInt != nil {
result.SkipLeaveOnInt = b.SkipLeaveOnInt result.SkipLeaveOnInt = b.SkipLeaveOnInt
} }
if b.Autopilot.RaftProtocolVersion != 0 {
result.Autopilot.RaftProtocolVersion = b.Autopilot.RaftProtocolVersion
}
if b.Autopilot.DeadServerCleanup != nil {
result.Autopilot.DeadServerCleanup = b.Autopilot.DeadServerCleanup
}
if b.Telemetry.DisableHostname == true { if b.Telemetry.DisableHostname == true {
result.Telemetry.DisableHostname = true result.Telemetry.DisableHostname = true
} }

View File

@ -1098,6 +1098,20 @@ func TestDecodeConfig_Performance(t *testing.T) {
} }
} }
func TestDecodeConfig_Autopilot(t *testing.T) {
input := `{"autopilot": { "raft_protocol": 3, "dead_server_cleanup": true }}`
config, err := DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.Autopilot.RaftProtocolVersion != 3 {
t.Fatalf("bad: raft_protocol isn't set: %#v", config)
}
if config.Autopilot.DeadServerCleanup == nil || !*config.Autopilot.DeadServerCleanup {
t.Fatalf("bad: dead_server_cleanup isn't set: %#v", config)
}
}
func TestDecodeConfig_Services(t *testing.T) { func TestDecodeConfig_Services(t *testing.T) {
input := `{ input := `{
"services": [ "services": [
@ -1619,6 +1633,10 @@ func TestMergeConfig(t *testing.T) {
Server: true, Server: true,
LeaveOnTerm: Bool(true), LeaveOnTerm: Bool(true),
SkipLeaveOnInt: Bool(true), SkipLeaveOnInt: Bool(true),
Autopilot: Autopilot{
RaftProtocolVersion: 3,
DeadServerCleanup: Bool(true),
},
EnableDebug: true, EnableDebug: true,
VerifyIncoming: true, VerifyIncoming: true,
VerifyOutgoing: true, VerifyOutgoing: true,

View File

@ -25,13 +25,15 @@ func (k *Key) Equal(x *Key) bool {
// Server is used to return details of a consul server // Server is used to return details of a consul server
type Server struct { type Server struct {
Name string Name string
Datacenter string ID string
Port int Datacenter string
Bootstrap bool Port int
Expect int Bootstrap bool
Version int Expect int
Addr net.Addr Version int
RaftVersion int
Addr net.Addr
} }
// Key returns the corresponding Key // Key returns the corresponding Key
@ -84,16 +86,24 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil return false, nil
} }
raft_vsn_str := m.Tags["raft_vsn"]
raft_vsn, err := strconv.Atoi(raft_vsn_str)
if err != nil {
return false, nil
}
addr := &net.TCPAddr{IP: m.Addr, Port: port} addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &Server{ parts := &Server{
Name: m.Name, Name: m.Name,
ID: m.Tags["id"],
Datacenter: datacenter, Datacenter: datacenter,
Port: port, Port: port,
Bootstrap: bootstrap, Bootstrap: bootstrap,
Expect: expect, Expect: expect,
Addr: addr, Addr: addr,
Version: vsn, Version: vsn,
RaftVersion: raft_vsn,
} }
return true, parts return true, parts
} }

View File

@ -56,9 +56,11 @@ func TestIsConsulServer(t *testing.T) {
Addr: net.IP([]byte{127, 0, 0, 1}), Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{ Tags: map[string]string{
"role": "consul", "role": "consul",
"id": "asdf",
"dc": "east-aws", "dc": "east-aws",
"port": "10000", "port": "10000",
"vsn": "1", "vsn": "1",
"raft_vsn": "3",
}, },
} }
ok, parts := agent.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
@ -68,12 +70,18 @@ func TestIsConsulServer(t *testing.T) {
if parts.Name != "foo" { if parts.Name != "foo" {
t.Fatalf("bad: %v", parts) t.Fatalf("bad: %v", parts)
} }
if parts.ID != "asdf" {
t.Fatalf("bad: %v", parts.ID)
}
if parts.Bootstrap { if parts.Bootstrap {
t.Fatalf("unexpected bootstrap") t.Fatalf("unexpected bootstrap")
} }
if parts.Expect != 0 { if parts.Expect != 0 {
t.Fatalf("bad: %v", parts.Expect) t.Fatalf("bad: %v", parts.Expect)
} }
if parts.RaftVersion != 3 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
m.Tags["bootstrap"] = "1" m.Tags["bootstrap"] = "1"
m.Tags["disabled"] = "1" m.Tags["disabled"] = "1"
ok, parts = agent.IsConsulServer(m) ok, parts = agent.IsConsulServer(m)

View File

@ -275,9 +275,9 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd. // place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration RPCHoldTimeout time.Duration
// AutopilotServerCleanup controls whether to remove dead servers when a new // DeadServerCleanup controls whether to remove dead servers when a new
// server is added to the Raft peers // server is added to the Raft peers
AutopilotServerCleanup bool DeadServerCleanup bool
} }
// CheckVersion is used to check if the ProtocolVersion is valid // CheckVersion is used to check if the ProtocolVersion is valid
@ -351,7 +351,7 @@ func DefaultConfig() *Config {
TLSMinVersion: "tls10", TLSMinVersion: "tls10",
AutopilotServerCleanup: true, DeadServerCleanup: true,
} }
// Increase our reap interval to 3 days instead of 24h. // Increase our reap interval to 3 days instead of 24h.
@ -368,7 +368,7 @@ func DefaultConfig() *Config {
// Enable interoperability with unversioned Raft library, and don't // Enable interoperability with unversioned Raft library, and don't
// start using new ID-based features yet. // start using new ID-based features yet.
conf.RaftConfig.ProtocolVersion = 1 conf.RaftConfig.ProtocolVersion = 2
conf.ScaleRaft(DefaultRaftMultiplier) conf.ScaleRaft(DefaultRaftMultiplier)
// Disable shutdown on removal // Disable shutdown on removal

View File

@ -562,21 +562,32 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
} }
// Attempt to add as a peer // Attempt to add as a peer
addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err := addFuture.Error(); err != nil { if err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err return err
} }
if minRaftProtocol >= 2 && parts.RaftVersion >= 3 {
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
} else {
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
}
// Look for dead servers to clean up // Look for dead servers to clean up
if s.config.AutopilotServerCleanup { if s.config.DeadServerCleanup {
for _, member := range s.serfLAN.Members() { for _, member := range s.serfLAN.Members() {
valid, _ := agent.IsConsulServer(member) valid, _ := agent.IsConsulServer(member)
if valid && member.Name != m.Name && member.Status == serf.StatusFailed { if valid && member.Name != m.Name && member.Status == serf.StatusFailed {
if err := s.handleDeregisterMember("Removing failed server", member); err != nil { s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", member.Name)
return fmt.Errorf("[ERROR] consul: Couldn't deregister failed server (%s): %v", member.Name, err) go s.serfLAN.RemoveFailedNode(member.Name)
}
s.logger.Printf("[INFO] consul: Removed failed server: %v", member.Name)
} }
} }
} }
@ -597,21 +608,38 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err return err
} }
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
goto REMOVE
}
}
return nil
REMOVE: minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
// Attempt to remove as a peer. if err != nil {
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
addr, err)
return err return err
} }
_, parts := agent.IsConsulServer(m)
// Pick which remove API to use based on how the server was added.
for _, server := range configFuture.Configuration().Servers {
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) {
s.logger.Printf("[INFO] consul: removing server via new api, %q %q", server.ID, server.Address)
future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
addr, err)
return err
}
break
} else if server.Address == raft.ServerAddress(addr) {
// If not, use the old remove API
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
addr, err)
return err
}
break
}
}
return nil return nil
} }

View File

@ -624,15 +624,15 @@ func TestLeader_ReapTombstones(t *testing.T) {
} }
func TestLeader_DeadServerCleanup(t *testing.T) { func TestLeader_DeadServerCleanup(t *testing.T) {
dir1, s1 := testServerDCExpect(t, "dc1", 3) dir1, s1 := testServerDCBootstrap(t, "dc1", true)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
dir2, s2 := testServerDCExpect(t, "dc1", 3) dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
dir3, s3 := testServerDCExpect(t, "dc1", 3) dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3) defer os.RemoveAll(dir3)
defer s3.Shutdown() defer s3.Shutdown()
@ -657,31 +657,32 @@ func TestLeader_DeadServerCleanup(t *testing.T) {
}) })
} }
// Kill a non-leader server (s2 or s3, so s4 can still join s1) // Kill a non-leader server
var nonLeader *Server s2.Shutdown()
var removedIndex int
for i, s := range servers {
if !s.IsLeader() && i > 0 {
nonLeader = s
removedIndex = i
break
}
}
nonLeader.Shutdown()
time.Sleep(1 * time.Second) testutil.WaitForResult(func() (bool, error) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
return alive == 2, nil
}, func(err error) {
t.Fatalf("should have 2 alive members")
})
// Bring up and join a new server // Bring up and join a new server
dir4, s4 := testServerDCExpect(t, "dc1", 3) dir4, s4 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir4) defer os.RemoveAll(dir4)
defer s4.Shutdown() defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil { if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers // Make sure the dead server is removed and we're back to 3 total peers
servers[removedIndex] = s4
for _, s := range servers { for _, s := range servers {
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers() peers, _ := s.numPeers()
@ -691,3 +692,92 @@ func TestLeader_DeadServerCleanup(t *testing.T) {
}) })
} }
} }
func TestLeader_RollRaftServer(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 1
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill the v1 server
s2.Shutdown()
for _, s := range []*Server{s1, s3} {
testutil.WaitForResult(func() (bool, error) {
minVer, err := ServerMinRaftProtocol(s.LANMembers())
return minVer == 2, err
}, func(err error) {
t.Fatalf("minimum protocol version among servers should be 2")
})
}
// Replace the dead server with one running raft protocol v3
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
return addrs == 2 && ids == 1, nil
}, func(err error) {
t.Fatalf("should see 2 legacy IDs and 1 GUID")
})
}
}

View File

@ -280,11 +280,22 @@ func (s *Server) maybeBootstrap() {
// Attempt a live bootstrap! // Attempt a live bootstrap!
var configuration raft.Configuration var configuration raft.Configuration
var addrs []string var addrs []string
minRaftVersion, err := ServerMinRaftProtocol(members)
if err != nil {
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
}
for _, server := range servers { for _, server := range servers {
addr := server.Addr.String() addr := server.Addr.String()
addrs = append(addrs, addr) addrs = append(addrs, addr)
var id raft.ServerID
if server.ID != "" && minRaftVersion >= 3 {
id = raft.ServerID(server.ID)
} else {
id = raft.ServerID(addr)
}
peer := raft.Server{ peer := raft.Server{
ID: raft.ServerID(addr), ID: id,
Address: raft.ServerAddress(addr), Address: raft.ServerAddress(addr),
} }
configuration.Servers = append(configuration.Servers, peer) configuration.Servers = append(configuration.Servers, peer)

View File

@ -379,9 +379,12 @@ func (s *Server) setupRaft() error {
// Make sure we set the LogOutput. // Make sure we set the LogOutput.
s.config.RaftConfig.LogOutput = s.config.LogOutput s.config.RaftConfig.LogOutput = s.config.LogOutput
// Our version of Raft protocol requires the LocalID to match the network // Versions of the Raft protocol below 3 require the LocalID to match the network
// address of the transport. // address of the transport.
s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr()) s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr())
if s.config.RaftConfig.ProtocolVersion >= 3 {
s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID)
}
// Build an all in-memory setup for dev mode, otherwise prepare a full // Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup. // disk-based setup.
@ -479,7 +482,7 @@ func (s *Server) setupRaft() error {
configuration := raft.Configuration{ configuration := raft.Configuration{
Servers: []raft.Server{ Servers: []raft.Server{
raft.Server{ raft.Server{
ID: raft.ServerID(trans.LocalAddr()), ID: s.config.RaftConfig.LocalID,
Address: trans.LocalAddr(), Address: trans.LocalAddr(),
}, },
}, },

View File

@ -12,6 +12,8 @@ import (
"time" "time"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/types"
) )
var nextPort int32 = 15000 var nextPort int32 = 15000
@ -46,6 +48,11 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
IP: []byte{127, 0, 0, 1}, IP: []byte{127, 0, 0, 1},
Port: getPort(), Port: getPort(),
} }
nodeID, err := uuid.GenerateUUID()
if err != nil {
t.Fatal(err)
}
config.NodeID = types.NodeID(nodeID)
config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1" config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfLANConfig.MemberlistConfig.BindPort = getPort() config.SerfLANConfig.MemberlistConfig.BindPort = getPort()
config.SerfLANConfig.MemberlistConfig.SuspicionMult = 2 config.SerfLANConfig.MemberlistConfig.SuspicionMult = 2

View File

@ -91,6 +91,30 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
return (numServers > 0) && (numWhoGrok == numServers), nil return (numServers > 0) && (numWhoGrok == numServers), nil
} }
// ServerMinRaftProtocol returns the lowest supported Raft protocol among alive servers
func ServerMinRaftProtocol(members []serf.Member) (int, error) {
minVersion := -1
for _, m := range members {
if m.Tags["role"] != "consul" || m.Status != serf.StatusAlive {
continue
}
vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}
if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
}
}
return minVersion, nil
}
// Returns if a member is a consul node. Returns a bool, // Returns if a member is a consul node. Returns a bool,
// and the datacenter. // and the datacenter.
func isConsulNode(m serf.Member) (bool, string) { func isConsulNode(m serf.Member) (bool, string) {