2015-06-03 10:58:00 +00:00
|
|
|
package nomad
|
|
|
|
|
2016-06-16 19:00:15 +00:00
|
|
|
import (
|
2017-02-03 00:07:15 +00:00
|
|
|
"strings"
|
2016-06-16 19:00:15 +00:00
|
|
|
"sync/atomic"
|
|
|
|
|
2017-02-03 00:07:15 +00:00
|
|
|
"github.com/hashicorp/raft"
|
2016-06-16 19:00:15 +00:00
|
|
|
"github.com/hashicorp/serf/serf"
|
|
|
|
)
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 10:42:56 +00:00
|
|
|
const (
|
|
|
|
// StatusReap is used to update the status of a node if we
|
|
|
|
// are handling a EventMemberReap
|
|
|
|
StatusReap = serf.MemberStatus(-1)
|
|
|
|
)
|
|
|
|
|
2015-06-03 10:58:00 +00:00
|
|
|
// serfEventHandler is used to handle events from the serf cluster
|
|
|
|
func (s *Server) serfEventHandler() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case e := <-s.eventCh:
|
|
|
|
switch e.EventType() {
|
|
|
|
case serf.EventMemberJoin:
|
|
|
|
s.nodeJoin(e.(serf.MemberEvent))
|
2015-06-04 10:42:56 +00:00
|
|
|
s.localMemberEvent(e.(serf.MemberEvent))
|
2015-06-03 10:58:00 +00:00
|
|
|
case serf.EventMemberLeave, serf.EventMemberFailed:
|
|
|
|
s.nodeFailed(e.(serf.MemberEvent))
|
2015-06-04 10:42:56 +00:00
|
|
|
s.localMemberEvent(e.(serf.MemberEvent))
|
2015-06-03 10:58:00 +00:00
|
|
|
case serf.EventMemberUpdate, serf.EventMemberReap,
|
|
|
|
serf.EventUser, serf.EventQuery: // Ignore
|
|
|
|
default:
|
|
|
|
s.logger.Printf("[WARN] nomad: unhandled serf event: %#v", e)
|
|
|
|
}
|
|
|
|
|
|
|
|
case <-s.shutdownCh:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// nodeJoin is used to handle join events on the serf cluster
|
|
|
|
func (s *Server) nodeJoin(me serf.MemberEvent) {
|
2015-06-04 10:33:12 +00:00
|
|
|
for _, m := range me.Members {
|
|
|
|
ok, parts := isNomadServer(m)
|
|
|
|
if !ok {
|
|
|
|
s.logger.Printf("[WARN] nomad: non-server in gossip pool: %s", m.Name)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
s.logger.Printf("[INFO] nomad: adding server %s", parts)
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 10:33:12 +00:00
|
|
|
// Check if this server is known
|
|
|
|
found := false
|
|
|
|
s.peerLock.Lock()
|
|
|
|
existing := s.peers[parts.Region]
|
|
|
|
for idx, e := range existing {
|
|
|
|
if e.Name == parts.Name {
|
|
|
|
existing[idx] = parts
|
|
|
|
found = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 10:33:12 +00:00
|
|
|
// Add ot the list if not known
|
|
|
|
if !found {
|
|
|
|
s.peers[parts.Region] = append(existing, parts)
|
|
|
|
}
|
2015-06-07 18:37:59 +00:00
|
|
|
|
|
|
|
// Check if a local peer
|
|
|
|
if parts.Region == s.config.Region {
|
2017-02-03 00:07:15 +00:00
|
|
|
s.localPeers[raft.ServerAddress(parts.Addr.String())] = parts
|
2015-06-07 18:37:59 +00:00
|
|
|
}
|
2015-06-04 10:33:12 +00:00
|
|
|
s.peerLock.Unlock()
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 10:33:12 +00:00
|
|
|
// If we still expecting to bootstrap, may need to handle this
|
2016-06-16 19:00:15 +00:00
|
|
|
if atomic.LoadInt32(&s.config.BootstrapExpect) != 0 {
|
2015-06-04 10:33:12 +00:00
|
|
|
s.maybeBootstrap()
|
|
|
|
}
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
}
|
|
|
|
|
2016-06-16 00:23:02 +00:00
|
|
|
// maybeBootsrap is used to handle bootstrapping when a new server joins
|
2015-06-03 10:58:00 +00:00
|
|
|
func (s *Server) maybeBootstrap() {
|
2017-02-03 00:07:15 +00:00
|
|
|
// Bootstrap can only be done if there are no committed logs, remove our
|
|
|
|
// expectations of bootstrapping. This is slightly cheaper than the full
|
|
|
|
// check that BootstrapCluster will do, so this is a good pre-filter.
|
2015-06-05 22:11:16 +00:00
|
|
|
var index uint64
|
|
|
|
var err error
|
|
|
|
if s.raftStore != nil {
|
|
|
|
index, err = s.raftStore.LastIndex()
|
|
|
|
} else if s.raftInmem != nil {
|
|
|
|
index, err = s.raftInmem.LastIndex()
|
|
|
|
} else {
|
|
|
|
panic("neither raftInmem or raftStore is initialized")
|
|
|
|
}
|
2015-06-04 11:11:35 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Printf("[ERR] nomad: failed to read last raft index: %v", err)
|
|
|
|
return
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 11:11:35 +00:00
|
|
|
// Bootstrap can only be done if there are no committed logs,
|
|
|
|
// remove our expectations of bootstrapping
|
|
|
|
if index != 0 {
|
2016-06-16 19:00:15 +00:00
|
|
|
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
|
2015-06-04 11:11:35 +00:00
|
|
|
return
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 11:11:35 +00:00
|
|
|
// Scan for all the known servers
|
|
|
|
members := s.serf.Members()
|
|
|
|
addrs := make([]string, 0)
|
|
|
|
for _, member := range members {
|
|
|
|
valid, p := isNomadServer(member)
|
|
|
|
if !valid {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if p.Region != s.config.Region {
|
|
|
|
continue
|
|
|
|
}
|
2016-06-16 19:00:15 +00:00
|
|
|
if p.Expect != 0 && p.Expect != int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
|
2015-06-04 11:11:35 +00:00
|
|
|
s.logger.Printf("[ERR] nomad: peer %v has a conflicting expect value. All nodes should expect the same number.", member)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if p.Bootstrap {
|
|
|
|
s.logger.Printf("[ERR] nomad: peer %v has bootstrap mode. Expect disabled.", member)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
addrs = append(addrs, p.Addr.String())
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 11:11:35 +00:00
|
|
|
// Skip if we haven't met the minimum expect count
|
2016-06-16 19:00:15 +00:00
|
|
|
if len(addrs) < int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
|
2015-06-04 11:11:35 +00:00
|
|
|
return
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 11:11:35 +00:00
|
|
|
// Update the peer set
|
2017-02-03 00:07:15 +00:00
|
|
|
// Attempt a live bootstrap!
|
|
|
|
var configuration raft.Configuration
|
|
|
|
for _, addr := range addrs {
|
|
|
|
// TODO (alexdadgar) - This will need to be updated once we support
|
|
|
|
// node IDs.
|
|
|
|
server := raft.Server{
|
|
|
|
ID: raft.ServerID(addr),
|
|
|
|
Address: raft.ServerAddress(addr),
|
|
|
|
}
|
|
|
|
configuration.Servers = append(configuration.Servers, server)
|
|
|
|
}
|
|
|
|
s.logger.Printf("[INFO] nomad: Found expected number of peers (%s), attempting to bootstrap cluster...",
|
|
|
|
strings.Join(addrs, ","))
|
|
|
|
future := s.raft.BootstrapCluster(configuration)
|
|
|
|
if err := future.Error(); err != nil {
|
|
|
|
s.logger.Printf("[ERR] nomad: Failed to bootstrap cluster: %v", err)
|
2015-06-04 11:11:35 +00:00
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2016-06-16 19:00:15 +00:00
|
|
|
// Bootstrapping complete, don't enter this again
|
|
|
|
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
|
2015-06-03 10:58:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// nodeFailed is used to handle fail events on the serf cluster
|
|
|
|
func (s *Server) nodeFailed(me serf.MemberEvent) {
|
2015-06-04 11:02:39 +00:00
|
|
|
for _, m := range me.Members {
|
|
|
|
ok, parts := isNomadServer(m)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
s.logger.Printf("[INFO] nomad: removing server %s", parts)
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 11:02:39 +00:00
|
|
|
// Remove the server if known
|
|
|
|
s.peerLock.Lock()
|
|
|
|
existing := s.peers[parts.Region]
|
|
|
|
n := len(existing)
|
|
|
|
for i := 0; i < n; i++ {
|
|
|
|
if existing[i].Name == parts.Name {
|
|
|
|
existing[i], existing[n-1] = existing[n-1], nil
|
|
|
|
existing = existing[:n-1]
|
|
|
|
n--
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
|
2015-06-04 11:02:39 +00:00
|
|
|
// Trim the list there are no known servers in a region
|
|
|
|
if n == 0 {
|
|
|
|
delete(s.peers, parts.Region)
|
|
|
|
} else {
|
|
|
|
s.peers[parts.Region] = existing
|
|
|
|
}
|
2015-06-07 18:37:59 +00:00
|
|
|
|
|
|
|
// Check if local peer
|
|
|
|
if parts.Region == s.config.Region {
|
2017-02-03 00:07:15 +00:00
|
|
|
delete(s.localPeers, raft.ServerAddress(parts.Addr.String()))
|
2015-06-07 18:37:59 +00:00
|
|
|
}
|
2015-06-04 11:02:39 +00:00
|
|
|
s.peerLock.Unlock()
|
|
|
|
}
|
2015-06-03 10:58:00 +00:00
|
|
|
}
|
2015-06-04 10:42:56 +00:00
|
|
|
|
|
|
|
// localMemberEvent is used to reconcile Serf events with the
|
|
|
|
// consistent store if we are the current leader.
|
|
|
|
func (s *Server) localMemberEvent(me serf.MemberEvent) {
|
|
|
|
// Do nothing if we are not the leader
|
|
|
|
if !s.IsLeader() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check if this is a reap event
|
|
|
|
isReap := me.EventType() == serf.EventMemberReap
|
|
|
|
|
|
|
|
// Queue the members for reconciliation
|
|
|
|
for _, m := range me.Members {
|
|
|
|
// Change the status if this is a reap event
|
|
|
|
if isReap {
|
|
|
|
m.Status = StatusReap
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case s.reconcileCh <- m:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|