open-nomad/nomad/serf.go

280 lines
7.7 KiB
Go
Raw Normal View History

2015-06-03 10:58:00 +00:00
package nomad
import (
2017-02-03 00:07:15 +00:00
"strings"
"sync/atomic"
2017-02-08 22:50:19 +00:00
"time"
2018-09-15 23:23:13 +00:00
log "github.com/hashicorp/go-hclog"
2017-02-08 22:50:19 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2017-02-03 00:07:15 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
2015-06-03 10:58:00 +00:00
2015-06-04 10:42:56 +00:00
const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)
2017-02-08 22:50:19 +00:00
// maxPeerRetries limits how many invalidate attempts are made
maxPeerRetries = 6
// peerRetryBase is a baseline retry time
peerRetryBase = 1 * time.Second
2015-06-04 10:42:56 +00:00
)
2015-06-03 10:58:00 +00:00
// serfEventHandler is used to handle events from the serf cluster
func (s *Server) serfEventHandler() {
for {
select {
case e := <-s.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
s.nodeJoin(e.(serf.MemberEvent))
2015-06-04 10:42:56 +00:00
s.localMemberEvent(e.(serf.MemberEvent))
2015-06-03 10:58:00 +00:00
case serf.EventMemberLeave, serf.EventMemberFailed:
s.nodeFailed(e.(serf.MemberEvent))
2015-06-04 10:42:56 +00:00
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberReap:
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore
2015-06-03 10:58:00 +00:00
default:
2018-09-15 23:23:13 +00:00
s.logger.Warn("unhandled serf event", "event", log.Fmt("%#v", e))
2015-06-03 10:58:00 +00:00
}
case <-s.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (s *Server) nodeJoin(me serf.MemberEvent) {
2015-06-04 10:33:12 +00:00
for _, m := range me.Members {
ok, parts := isNomadServer(m)
if !ok {
2018-09-15 23:23:13 +00:00
s.logger.Warn("non-server in gossip pool", "member", m.Name)
2015-06-04 10:33:12 +00:00
continue
}
2018-09-15 23:23:13 +00:00
s.logger.Info("adding server", "server", parts)
2015-06-03 10:58:00 +00:00
2015-06-04 10:33:12 +00:00
// Check if this server is known
found := false
s.peerLock.Lock()
existing := s.peers[parts.Region]
for idx, e := range existing {
if e.Name == parts.Name {
existing[idx] = parts
found = true
break
}
}
2015-06-03 10:58:00 +00:00
2015-06-04 10:33:12 +00:00
// Add ot the list if not known
if !found {
s.peers[parts.Region] = append(existing, parts)
}
2015-06-07 18:37:59 +00:00
// Check if a local peer
if parts.Region == s.config.Region {
2017-02-03 00:07:15 +00:00
s.localPeers[raft.ServerAddress(parts.Addr.String())] = parts
2015-06-07 18:37:59 +00:00
}
2015-06-04 10:33:12 +00:00
s.peerLock.Unlock()
2015-06-03 10:58:00 +00:00
2015-06-04 10:33:12 +00:00
// If we still expecting to bootstrap, may need to handle this
if atomic.LoadInt32(&s.config.BootstrapExpect) != 0 {
2015-06-04 10:33:12 +00:00
s.maybeBootstrap()
}
}
2015-06-03 10:58:00 +00:00
}
2017-02-08 22:50:19 +00:00
// maybeBootstrap is used to handle bootstrapping when a new server joins
2015-06-03 10:58:00 +00:00
func (s *Server) maybeBootstrap() {
2017-02-03 00:07:15 +00:00
// Bootstrap can only be done if there are no committed logs, remove our
// expectations of bootstrapping. This is slightly cheaper than the full
// check that BootstrapCluster will do, so this is a good pre-filter.
2015-06-05 22:11:16 +00:00
var index uint64
var err error
if s.raftStore != nil {
index, err = s.raftStore.LastIndex()
} else if s.raftInmem != nil {
index, err = s.raftInmem.LastIndex()
} else {
panic("neither raftInmem or raftStore is initialized")
}
2015-06-04 11:11:35 +00:00
if err != nil {
2018-09-15 23:23:13 +00:00
s.logger.Error("failed to read last raft index", "error", err)
2015-06-04 11:11:35 +00:00
return
}
2015-06-03 10:58:00 +00:00
2015-06-04 11:11:35 +00:00
// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
2015-06-04 11:11:35 +00:00
return
}
2015-06-03 10:58:00 +00:00
2015-06-04 11:11:35 +00:00
// Scan for all the known servers
members := s.serf.Members()
2017-02-08 22:50:19 +00:00
var servers []serverParts
2015-06-04 11:11:35 +00:00
for _, member := range members {
valid, p := isNomadServer(member)
if !valid {
continue
}
if p.Region != s.config.Region {
continue
}
if p.Expect != 0 && p.Expect != int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
2018-09-15 23:23:13 +00:00
s.logger.Error("peer has a conflicting expect value. All nodes should expect the same number", "member", member)
2015-06-04 11:11:35 +00:00
return
}
if p.Bootstrap {
2018-09-15 23:23:13 +00:00
s.logger.Error("peer has bootstrap mode. Expect disabled", "member", member)
2015-06-04 11:11:35 +00:00
return
}
2017-02-08 22:50:19 +00:00
servers = append(servers, *p)
2015-06-04 11:11:35 +00:00
}
2015-06-03 10:58:00 +00:00
2015-06-04 11:11:35 +00:00
// Skip if we haven't met the minimum expect count
2017-02-08 22:50:19 +00:00
if len(servers) < int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
2015-06-04 11:11:35 +00:00
return
}
2015-06-03 10:58:00 +00:00
2017-02-08 22:50:19 +00:00
// Query each of the servers and make sure they report no Raft peers.
req := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{
AllowStale: true,
},
}
for _, server := range servers {
var peers []string
// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.Peers", req, &peers); err != nil {
2017-09-26 22:26:33 +00:00
nextRetry := (1 << attempt) * peerRetryBase
2018-09-15 23:23:13 +00:00
s.logger.Error("failed to confirm peer status", "peer", server.Name, "error", err, "retry", nextRetry)
2017-02-08 22:50:19 +00:00
time.Sleep(nextRetry)
} else {
break
}
}
// Found a node with some Raft peers, stop bootstrap since there's
// evidence of an existing cluster. We should get folded in by the
// existing servers if that's the case, so it's cleaner to sit as a
// candidate with no peers so we don't cause spurious elections.
// It's OK this is racy, because even with an initial bootstrap
// as long as one peer runs bootstrap things will work, and if we
// have multiple peers bootstrap in the same way, that's OK. We
// just don't want a server added much later to do a live bootstrap
// and interfere with the cluster. This isn't required for Raft's
// correctness because no server in the existing cluster will vote
// for this server, but it makes things much more stable.
if len(peers) > 0 {
2018-09-15 23:23:13 +00:00
s.logger.Info("disabling bootstrap mode because existing Raft peers being reported by peer",
"peer_name", server.Name, "peer_address", server.Addr)
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
2017-02-08 22:50:19 +00:00
return
}
}
2015-06-04 11:11:35 +00:00
// Update the peer set
2017-02-03 00:07:15 +00:00
// Attempt a live bootstrap!
var configuration raft.Configuration
2017-02-08 22:50:19 +00:00
var addrs []string
minRaftVersion, err := s.autopilot.MinRaftProtocol()
if err != nil {
2018-09-15 23:23:13 +00:00
s.logger.Error("failed to read server raft versions", "error", err)
}
2017-02-08 22:50:19 +00:00
for _, server := range servers {
addr := server.Addr.String()
addrs = append(addrs, addr)
var id raft.ServerID
if minRaftVersion >= 3 {
id = raft.ServerID(server.ID)
} else {
id = raft.ServerID(addr)
}
2017-02-08 22:50:19 +00:00
peer := raft.Server{
ID: id,
2017-02-03 00:07:15 +00:00
Address: raft.ServerAddress(addr),
}
2017-02-08 22:50:19 +00:00
configuration.Servers = append(configuration.Servers, peer)
2017-02-03 00:07:15 +00:00
}
2018-09-15 23:23:13 +00:00
s.logger.Info("found expected number of peers, attempting to bootstrap cluster...",
"peers", strings.Join(addrs, ","))
2017-02-03 00:07:15 +00:00
future := s.raft.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
2018-09-15 23:23:13 +00:00
s.logger.Error("failed to bootstrap cluster", "error", err)
2015-06-04 11:11:35 +00:00
}
2015-06-03 10:58:00 +00:00
2017-02-08 22:50:19 +00:00
// Bootstrapping complete, or failed for some reason, don't enter this again
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
2015-06-03 10:58:00 +00:00
}
// nodeFailed is used to handle fail events on the serf cluster
func (s *Server) nodeFailed(me serf.MemberEvent) {
2015-06-04 11:02:39 +00:00
for _, m := range me.Members {
ok, parts := isNomadServer(m)
if !ok {
continue
}
2018-09-15 23:23:13 +00:00
s.logger.Info("removing server", "server", parts)
2015-06-03 10:58:00 +00:00
2015-06-04 11:02:39 +00:00
// Remove the server if known
s.peerLock.Lock()
existing := s.peers[parts.Region]
n := len(existing)
for i := 0; i < n; i++ {
if existing[i].Name == parts.Name {
existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1]
n--
break
}
}
2015-06-03 10:58:00 +00:00
2015-06-04 11:02:39 +00:00
// Trim the list there are no known servers in a region
if n == 0 {
delete(s.peers, parts.Region)
} else {
s.peers[parts.Region] = existing
}
2015-06-07 18:37:59 +00:00
// Check if local peer
if parts.Region == s.config.Region {
2017-02-03 00:07:15 +00:00
delete(s.localPeers, raft.ServerAddress(parts.Addr.String()))
2015-06-07 18:37:59 +00:00
}
2015-06-04 11:02:39 +00:00
s.peerLock.Unlock()
}
2015-06-03 10:58:00 +00:00
}
2015-06-04 10:42:56 +00:00
// localMemberEvent is used to reconcile Serf events with the
// consistent store if we are the current leader.
func (s *Server) localMemberEvent(me serf.MemberEvent) {
// Do nothing if we are not the leader
if !s.IsLeader() {
return
}
// Check if this is a reap event
isReap := me.EventType() == serf.EventMemberReap
// Queue the members for reconciliation
for _, m := range me.Members {
// Change the status if this is a reap event
if isReap {
m.Status = StatusReap
}
select {
case s.reconcileCh <- m:
default:
}
}
}