Added support for v2 raft APIs and -raft-protocol option

This commit is contained in:
Kyle Havlovitz 2017-11-21 16:29:11 -08:00
parent 71b612ede5
commit 55d6fd1ba3
17 changed files with 391 additions and 39 deletions

View file

@ -32,6 +32,9 @@ type RaftServer struct {
// it's a non-voting server, which will be added in a future release of
// Nomad.
Voter bool
// RaftProtocol is the version of the Raft protocol spoken by this server.
RaftProtocol string
}
// RaftConfigration is returned when querying for the current Raft configuration.

View file

@ -22,6 +22,7 @@ import (
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/raft"
)
const (
@ -141,6 +142,9 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
if agentConfig.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(agentConfig.Server.ProtocolVersion)
}
if agentConfig.Server.RaftProtocol != 0 {
conf.RaftConfig.ProtocolVersion = raft.ProtocolVersion(agentConfig.Server.RaftProtocol)
}
if agentConfig.Server.NumSchedulers != 0 {
conf.NumSchedulers = agentConfig.Server.NumSchedulers
}

View file

@ -83,6 +83,7 @@ func (c *Command) readConfig() *Config {
flags.IntVar(&cmdConfig.Server.RetryMaxAttempts, "retry-max", 0, "")
flags.StringVar(&cmdConfig.Server.RetryInterval, "retry-interval", "", "")
flags.StringVar(&cmdConfig.Server.EncryptKey, "encrypt", "", "gossip encryption key")
flags.IntVar(&cmdConfig.Server.RaftProtocol, "raft-protocol", 0, "")
// Client-only options
flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "")
@ -870,6 +871,10 @@ Server Options:
Address of an agent to join at start time. Can be specified
multiple times.
-raft-protocol=<num>
The Raft protocol version to use. Used for enabling certain Autopilot
features. Defaults to 2.
-retry-join=<address>
Address of an agent to join at start time with retries enabled.
Can be specified multiple times.

View file

@ -256,6 +256,9 @@ type ServerConfig struct {
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion int `mapstructure:"protocol_version"`
// RaftProtocol is the Raft protocol version to speak. This must be from [1-3].
RaftProtocol int `mapstructure:"raft_protocol"`
// NumSchedulers is the number of scheduler thread that are run.
// This can be as many as one per core, or zero to disable this server
// from doing any scheduling work.
@ -976,6 +979,9 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.ProtocolVersion != 0 {
result.ProtocolVersion = b.ProtocolVersion
}
if b.RaftProtocol != 0 {
result.RaftProtocol = b.RaftProtocol
}
if b.NumSchedulers != 0 {
result.NumSchedulers = b.NumSchedulers
}

View file

@ -234,6 +234,7 @@ func TestConfig_Merge(t *testing.T) {
BootstrapExpect: 2,
DataDir: "/tmp/data2",
ProtocolVersion: 2,
RaftProtocol: 2,
NumSchedulers: 2,
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",

View file

@ -79,14 +79,14 @@ func (c *OperatorRaftListCommand) Run(args []string) int {
}
// Format it as a nice table.
result := []string{"Node|ID|Address|State|Voter"}
result := []string{"Node|ID|Address|State|Voter|RaftProtocol"}
for _, s := range reply.Servers {
state := "follower"
if s.Leader {
state = "leader"
}
result = append(result, fmt.Sprintf("%s|%s|%s|%s|%v",
s.Node, s.ID, s.Address, state, s.Voter))
result = append(result, fmt.Sprintf("%s|%s|%s|%s|%v|%s",
s.Node, s.ID, s.Address, state, s.Voter, s.RaftProtocol))
}
c.Ui.Output(columnize.SimpleFormat(result))

View file

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/scheduler"
@ -98,6 +99,9 @@ type Config struct {
// Node name is the name we use to advertise. Defaults to hostname.
NodeName string
// NodeID is the uuid of this server.
NodeID string
// Region is the region this Nomad server belongs to.
Region string
@ -283,6 +287,7 @@ func DefaultConfig() *Config {
AuthoritativeRegion: DefaultRegion,
Datacenter: DefaultDC,
NodeName: hostname,
NodeID: uuid.Generate(),
ProtocolVersion: ProtocolVersionMax,
RaftConfig: raft.DefaultConfig(),
RaftTimeout: 10 * time.Second,
@ -339,9 +344,9 @@ func DefaultConfig() *Config {
// Disable shutdown on removal
c.RaftConfig.ShutdownOnRemove = false
// Enable interoperability with unversioned Raft library, and don't
// Enable interoperability with raft protocol version 1, and don't
// start using new ID-based features yet.
c.RaftConfig.ProtocolVersion = 1
c.RaftConfig.ProtocolVersion = 2
return c
}

View file

@ -744,8 +744,8 @@ func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
}
// Check for possibility of multiple bootstrap nodes
members := s.serf.Members()
if parts.Bootstrap {
members := s.serf.Members()
for _, member := range members {
valid, p := isNomadServer(member)
if valid && member.Name != m.Name && p.Bootstrap {
@ -755,12 +755,10 @@ func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
}
}
// TODO (alexdadgar) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to get raft configuration: %v", err)
@ -772,14 +770,64 @@ func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
}
}
// Attempt to add as a peer
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to add raft peer: %v", err)
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
minRaftProtocol, err := MinRaftProtocol(s.config.Datacenter, members)
if err != nil {
return err
} else if err == nil {
s.logger.Printf("[INFO] nomad: added raft peer: %v", parts)
}
for _, server := range configFuture.Configuration().Servers {
// No-op if the raft version is too low
if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) {
return nil
}
// If the address or ID matches an existing server, see if we need to remove the old one first
if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) {
// Exit with no-op if this is being called on an existing server
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
return nil
}
future := s.raft.RemoveServer(server.ID, 0, 0)
if server.Address == raft.ServerAddress(addr) {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
}
s.logger.Printf("[INFO] nomad: removed server with duplicate address: %s", server.Address)
} else {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
}
s.logger.Printf("[INFO] nomad: removed server with duplicate ID: %s", server.ID)
}
}
}
// Attempt to add as a peer
switch {
case minRaftProtocol >= 3:
// todo(kyhavlov): change this to AddNonVoter when adding autopilot
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to add raft peer: %v", err)
return err
}
case minRaftProtocol == 2 && parts.RaftVersion >= 3:
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to add raft peer: %v", err)
return err
}
default:
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to add raft peer: %v", err)
return err
}
}
return nil
}
@ -797,21 +845,37 @@ func (s *Server) removeRaftPeer(m serf.Member, parts *serverParts) error {
s.logger.Printf("[ERR] nomad: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
goto REMOVE
}
}
return nil
REMOVE:
// Attempt to remove as a peer.
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to remove raft peer '%v': %v",
parts, err)
minRaftProtocol, err := MinRaftProtocol(s.config.Datacenter, s.serf.Members())
if err != nil {
return err
}
// Pick which remove API to use based on how the server was added.
for _, server := range configFuture.Configuration().Servers {
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) {
s.logger.Printf("[INFO] nomad: removing server by ID: %q", server.ID)
future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to remove raft peer '%v': %v",
server.ID, err)
return err
}
break
} else if server.Address == raft.ServerAddress(addr) {
// If not, use the old remove API
s.logger.Printf("[INFO] nomad: removing server by address: %q", server.Address)
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to remove raft peer '%v': %v",
addr, err)
return err
}
break
}
}
return nil
}

View file

@ -811,3 +811,94 @@ func TestLeader_DiffACLTokens(t *testing.T) {
// P2 is un-modified - ignore. P3 modified, P4 new.
assert.Equal(t, []string{p3.AccessorID, p4.AccessorID}, update)
}
func TestLeader_UpgradeRaftVersion(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer s1.Shutdown()
s2 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 1
})
defer s2.Shutdown()
s3 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
testJoin(t, s1, s2, s3)
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill the v1 server
if err := s2.Leave(); err != nil {
t.Fatal(err)
}
for _, s := range []*Server{s1, s3} {
minVer, err := MinRaftProtocol("dc1", s.Members())
if err != nil {
t.Fatal(err)
}
if got, want := minVer, 2; got != want {
t.Fatalf("got min raft version %d want %d", got, want)
}
}
// Replace the dead server with one running raft protocol v3
s4 := testServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
})
defer s4.Shutdown()
testJoin(t, s1, s4)
servers[1] = s4
// Make sure we're back to 3 total peers with the new one added via ID
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
if got, want := addrs, 2; got != want {
return false, fmt.Errorf("got %d server addresses want %d", got, want)
}
if got, want := ids, 1; got != want {
return false, fmt.Errorf("got %d server ids want %d", got, want)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}
}

View file

@ -51,8 +51,10 @@ func (op *Operator) RaftGetConfiguration(args *structs.GenericRequest, reply *st
reply.Index = future.Index()
for _, server := range future.Configuration().Servers {
node := "(unknown)"
raftProtocolVersion := "unknown"
if member, ok := serverMap[server.Address]; ok {
node = member.Name
raftProtocolVersion = member.Tags["raft_vsn"]
}
entry := &structs.RaftServer{
@ -61,6 +63,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.GenericRequest, reply *st
Address: server.Address,
Leader: server.Address == leader,
Voter: server.Suffrage == raft.Voter,
RaftProtocol: raftProtocolVersion,
}
reply.Servers = append(reply.Servers, entry)
}

View file

@ -44,11 +44,12 @@ func TestOperator_RaftGetConfiguration(t *testing.T) {
expected := structs.RaftConfigurationResponse{
Servers: []*structs.RaftServer{
{
ID: me.ID,
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
Address: me.Address,
Leader: true,
Voter: true,
ID: me.ID,
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
Address: me.Address,
Leader: true,
Voter: true,
RaftProtocol: fmt.Sprintf("%d", s1.config.RaftConfig.ProtocolVersion),
},
},
Index: future.Index(),
@ -107,11 +108,12 @@ func TestOperator_RaftGetConfiguration_ACL(t *testing.T) {
expected := structs.RaftConfigurationResponse{
Servers: []*structs.RaftServer{
{
ID: me.ID,
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
Address: me.Address,
Leader: true,
Voter: true,
ID: me.ID,
Node: fmt.Sprintf("%v.%v", s1.config.NodeName, s1.config.Region),
Address: me.Address,
Leader: true,
Voter: true,
RaftProtocol: fmt.Sprintf("%d", s1.config.RaftConfig.ProtocolVersion),
},
},
Index: future.Index(),

View file

@ -184,11 +184,22 @@ func (s *Server) maybeBootstrap() {
// Attempt a live bootstrap!
var configuration raft.Configuration
var addrs []string
minRaftVersion, err := MinRaftProtocol(s.config.Datacenter, members)
if err != nil {
s.logger.Printf("[ERR] consul: Failed to read server raft versions: %v", err)
}
for _, server := range servers {
addr := server.Addr.String()
addrs = append(addrs, addr)
var id raft.ServerID
if minRaftVersion >= 3 {
id = raft.ServerID(server.ID)
} else {
id = raft.ServerID(addr)
}
peer := raft.Server{
ID: raft.ServerID(addr),
ID: id,
Address: raft.ServerAddress(addr),
}
configuration.Servers = append(configuration.Servers, peer)

View file

@ -835,6 +835,9 @@ func (s *Server) setupRaft() error {
// Our version of Raft protocol requires the LocalID to match the network
// address of the transport.
s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr())
if s.config.RaftConfig.ProtocolVersion >= 3 {
s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID)
}
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
@ -937,7 +940,7 @@ func (s *Server) setupRaft() error {
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(trans.LocalAddr()),
ID: s.config.RaftConfig.LocalID,
Address: trans.LocalAddr(),
},
},
@ -972,6 +975,8 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion)
conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["id"] = s.config.NodeID
conf.Tags["port"] = fmt.Sprintf("%d", s.rpcAdvertise.(*net.TCPAddr).Port)
if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {
conf.Tags["bootstrap"] = "1"

View file

@ -26,6 +26,9 @@ type RaftServer struct {
// it's a non-voting server, which will be added in a future release of
// Nomad.
Voter bool
// RaftProtocol is the version of the Raft protocol spoken by this server.
RaftProtocol string
}
// RaftConfigrationResponse is returned when querying for the current Raft

View file

@ -36,6 +36,7 @@ func RuntimeStats() map[string]string {
// serverParts is used to return the parts of a server role
type serverParts struct {
Name string
ID string
Region string
Datacenter string
Port int
@ -44,6 +45,7 @@ type serverParts struct {
MajorVersion int
MinorVersion int
Build version.Version
RaftVersion int
Addr net.Addr
Status serf.MemberStatus
}
@ -60,6 +62,7 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
return false, nil
}
id := m.Tags["id"]
region := m.Tags["region"]
datacenter := m.Tags["dc"]
_, bootstrap := m.Tags["bootstrap"]
@ -100,9 +103,19 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
minorVersion = 0
}
raft_vsn := 0
raft_vsn_str, ok := m.Tags["raft_vsn"]
if ok {
raft_vsn, err = strconv.Atoi(raft_vsn_str)
if err != nil {
return false, nil
}
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &serverParts{
Name: m.Name,
ID: id,
Region: region,
Datacenter: datacenter,
Port: port,
@ -112,6 +125,7 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
MajorVersion: majorVersion,
MinorVersion: minorVersion,
Build: *build_version,
RaftVersion: raft_vsn,
Status: m.Status,
}
return true, parts
@ -131,6 +145,36 @@ func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Versio
return true
}
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
// in the given datacenter.
func MinRaftProtocol(datacenter string, members []serf.Member) (int, error) {
minVersion := -1
for _, m := range members {
if m.Tags["role"] != "nomad" || m.Tags["dc"] != datacenter || m.Status != serf.StatusAlive {
continue
}
vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}
if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
}
}
if minVersion == -1 {
return minVersion, fmt.Errorf("no servers found")
}
return minVersion, nil
}
// shuffleStrings randomly shuffles the list of strings
func shuffleStrings(list []string) {
for i := range list {

View file

@ -1,6 +1,7 @@
package nomad
import (
"errors"
"net"
"reflect"
"testing"
@ -151,6 +152,105 @@ func TestServersMeetMinimumVersion(t *testing.T) {
}
}
func TestMinRaftProtocol(t *testing.T) {
t.Parallel()
makeMember := func(version, datacenter string) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "nomad",
"region": "aws",
"dc": datacenter,
"port": "10000",
"vsn": "1",
"raft_vsn": version,
},
Status: serf.StatusAlive,
}
}
cases := []struct {
members []serf.Member
datacenter string
expected int
err error
}{
// No servers, error
{
members: []serf.Member{},
expected: -1,
err: errors.New("no servers found"),
},
// One server
{
members: []serf.Member{
makeMember("1", "dc1"),
},
datacenter: "dc1",
expected: 1,
},
// One server, bad version formatting
{
members: []serf.Member{
makeMember("asdf", "dc1"),
},
datacenter: "dc1",
expected: -1,
err: errors.New(`strconv.Atoi: parsing "asdf": invalid syntax`),
},
// One server, wrong datacenter
{
members: []serf.Member{
makeMember("1", "dc1"),
},
datacenter: "dc2",
expected: -1,
err: errors.New("no servers found"),
},
// Multiple servers, different versions
{
members: []serf.Member{
makeMember("1", "dc1"),
makeMember("2", "dc1"),
},
datacenter: "dc1",
expected: 1,
},
// Multiple servers, same version
{
members: []serf.Member{
makeMember("2", "dc1"),
makeMember("2", "dc1"),
},
datacenter: "dc1",
expected: 2,
},
// Multiple servers, multiple datacenters
{
members: []serf.Member{
makeMember("3", "dc1"),
makeMember("2", "dc1"),
makeMember("1", "dc2"),
},
datacenter: "dc1",
expected: 2,
},
}
for _, tc := range cases {
result, err := MinRaftProtocol(tc.datacenter, tc.members)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.expected, tc)
}
if tc.err != nil {
if err == nil || tc.err.Error() != err.Error() {
t.Fatalf("bad: %v, %v, %v", err, tc.err, tc)
}
}
}
}
func TestShuffleStrings(t *testing.T) {
t.Parallel()
// Generate input

View file

@ -112,6 +112,11 @@ server {
required as the agent internally knows the latest version, but may be useful
in some upgrade scenarios.
- `raft_protocol` `(int: 2)` - Specifies the Raft protocol version to use when
communicating with other Nomad servers. This affects available Autopilot
features and is typically not required as the agent internally knows the
latest version, but may be useful in some upgrade scenarios.
- `rejoin_after_leave` `(bool: false)` - Specifies if Nomad will ignore a
previous leave and attempt to rejoin the cluster when starting. By default,
Nomad treats leave as a permanent intent and does not attempt to join the