open-consul/consul/serf.go

253 lines
6.2 KiB
Go
Raw Normal View History

2013-12-07 01:18:09 +00:00
package consul
2013-12-07 00:54:33 +00:00
2013-12-09 23:29:44 +00:00
import (
"net"
"strings"
"github.com/hashicorp/serf/serf"
2013-12-09 23:29:44 +00:00
)
2014-03-20 19:51:49 +00:00
const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)
)
2013-12-07 00:54:33 +00:00
// lanEventHandler is used to handle events from the lan Serf cluster
func (s *Server) lanEventHandler() {
for {
select {
case e := <-s.eventChLAN:
2013-12-09 23:29:44 +00:00
switch e.EventType() {
case serf.EventMemberJoin:
s.nodeJoin(e.(serf.MemberEvent), false)
s.localMemberEvent(e.(serf.MemberEvent))
2013-12-09 23:29:44 +00:00
case serf.EventMemberLeave:
2014-01-09 23:49:09 +00:00
fallthrough
2013-12-09 23:29:44 +00:00
case serf.EventMemberFailed:
s.nodeFailed(e.(serf.MemberEvent), false)
s.localMemberEvent(e.(serf.MemberEvent))
2014-03-20 19:51:49 +00:00
case serf.EventMemberReap:
2014-01-09 23:49:09 +00:00
s.localMemberEvent(e.(serf.MemberEvent))
2013-12-09 23:29:44 +00:00
case serf.EventUser:
s.localEvent(e.(serf.UserEvent))
2014-03-20 19:51:49 +00:00
case serf.EventMemberUpdate: // Ignore
2014-03-12 19:46:14 +00:00
case serf.EventQuery: // Ignore
2013-12-09 23:29:44 +00:00
default:
2014-01-10 19:06:11 +00:00
s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
2013-12-09 23:29:44 +00:00
}
2013-12-07 00:54:33 +00:00
case <-s.shutdownCh:
return
}
}
}
// wanEventHandler is used to handle events from the wan Serf cluster
func (s *Server) wanEventHandler() {
for {
select {
case e := <-s.eventChWAN:
2013-12-09 23:29:44 +00:00
switch e.EventType() {
case serf.EventMemberJoin:
s.nodeJoin(e.(serf.MemberEvent), true)
2013-12-09 23:29:44 +00:00
case serf.EventMemberLeave:
2013-12-12 00:24:34 +00:00
fallthrough
2013-12-09 23:29:44 +00:00
case serf.EventMemberFailed:
s.nodeFailed(e.(serf.MemberEvent), true)
2014-03-12 19:46:14 +00:00
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
2013-12-09 23:29:44 +00:00
case serf.EventUser:
2014-03-12 19:46:14 +00:00
case serf.EventQuery: // Ignore
2013-12-09 23:29:44 +00:00
default:
2014-01-10 19:06:11 +00:00
s.logger.Printf("[WARN] consul: unhandled WAN Serf Event: %#v", e)
2013-12-09 23:29:44 +00:00
}
2013-12-07 00:54:33 +00:00
case <-s.shutdownCh:
return
}
}
}
2013-12-09 23:29:44 +00:00
2014-01-09 23:49:09 +00:00
// localMemberEvent is used to reconcile Serf events with the strongly
// consistent store if we are the current leader
func (s *Server) localMemberEvent(me serf.MemberEvent) {
// Do nothing if we are not the leader
if !s.IsLeader() {
return
}
2014-03-20 19:51:49 +00:00
// Check if this is a reap event
isReap := me.EventType() == serf.EventMemberReap
// Queue the members for reconciliation
2014-01-09 23:49:09 +00:00
for _, m := range me.Members {
2014-03-20 19:51:49 +00:00
// Change the status if this is a reap event
if isReap {
m.Status = StatusReap
}
select {
case s.reconcileCh <- m:
default:
}
}
2013-12-09 23:29:44 +00:00
}
// localEvent is called when we receive an event on the local Serf
func (s *Server) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch event.Name {
case newLeaderEvent:
s.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if s.config.ServerUp != nil {
s.config.ServerUp()
}
default:
s.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}
// nodeJoin is used to handle join events on the both serf clusters
func (s *Server) nodeJoin(me serf.MemberEvent, wan bool) {
2013-12-12 00:24:34 +00:00
for _, m := range me.Members {
2014-01-20 23:39:07 +00:00
ok, parts := isConsulServer(m)
2013-12-12 00:24:34 +00:00
if !ok {
if wan {
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name)
}
2013-12-12 00:24:34 +00:00
continue
}
s.logger.Printf("[INFO] consul: adding server %s", parts)
2013-12-12 00:24:34 +00:00
// Check if this server is known
found := false
s.remoteLock.Lock()
2014-01-20 23:39:07 +00:00
existing := s.remoteConsuls[parts.Datacenter]
for idx, e := range existing {
if e.Name == parts.Name {
existing[idx] = parts
2013-12-12 00:24:34 +00:00
found = true
break
}
}
2013-12-09 23:29:44 +00:00
2013-12-12 00:24:34 +00:00
// Add ot the list if not known
if !found {
s.remoteConsuls[parts.Datacenter] = append(existing, parts)
2013-12-12 00:24:34 +00:00
}
s.remoteLock.Unlock()
// Add to the local list as well
2014-06-18 23:15:28 +00:00
if !wan && parts.Datacenter == s.config.Datacenter {
s.localLock.Lock()
s.localConsuls[parts.Addr.String()] = parts
s.localLock.Unlock()
}
2014-06-18 23:15:28 +00:00
// If we still expecting to bootstrap, may need to handle this
if s.config.BootstrapExpect != 0 {
2014-06-18 23:15:28 +00:00
s.maybeBootstrap()
}
2013-12-12 00:24:34 +00:00
}
2013-12-09 23:29:44 +00:00
}
2014-06-18 23:15:28 +00:00
// maybeBootsrap is used to handle bootstrapping when a new consul server joins
func (s *Server) maybeBootstrap() {
index, err := s.raftStore.LastIndex()
if err != nil {
s.logger.Printf("[ERR] consul: failed to read last raft index: %v", err)
return
}
// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
s.config.BootstrapExpect = 0
2014-06-18 23:15:28 +00:00
return
}
// Scan for all the known servers
members := s.serfLAN.Members()
addrs := make([]net.Addr, 0)
for _, member := range members {
valid, p := isConsulServer(member)
if !valid {
continue
}
if p.Datacenter != s.config.Datacenter {
s.logger.Printf("[ERR] consul: Member %v has a conflicting datacenter, ignoring", member)
continue
}
if p.Expect != 0 && p.Expect != s.config.BootstrapExpect {
2014-06-18 23:15:28 +00:00
s.logger.Printf("[ERR] consul: Member %v has a conflicting expect value. All nodes should expect the same number.", member)
return
}
if p.Bootstrap {
s.logger.Printf("[ERR] consul: Member %v has bootstrap mode. Expect disabled.", member)
return
}
addrs = append(addrs, &net.TCPAddr{IP: member.Addr, Port: p.Port})
}
// Skip if we haven't met the minimum expect count
if len(addrs) < s.config.BootstrapExpect {
2014-06-18 23:15:28 +00:00
return
}
// Update the peer set
s.logger.Printf("[INFO] consul: Attempting bootstrap with nodes: %v", addrs)
if err := s.raft.SetPeers(addrs).Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to bootstrap peers: %v", err)
}
// Bootstrapping comlete, don't enter this again
s.config.BootstrapExpect = 0
2014-06-18 23:15:28 +00:00
}
// nodeFailed is used to handle fail events on both the serf clustes
func (s *Server) nodeFailed(me serf.MemberEvent, wan bool) {
2013-12-12 00:24:34 +00:00
for _, m := range me.Members {
2014-01-20 23:39:07 +00:00
ok, parts := isConsulServer(m)
2013-12-12 00:24:34 +00:00
if !ok {
continue
}
s.logger.Printf("[INFO] consul: removing server %s", parts)
2013-12-12 00:24:34 +00:00
// Remove the server if known
s.remoteLock.Lock()
2014-01-20 23:39:07 +00:00
existing := s.remoteConsuls[parts.Datacenter]
2013-12-12 00:24:34 +00:00
n := len(existing)
for i := 0; i < n; i++ {
if existing[i].Name == parts.Name {
2013-12-12 00:24:34 +00:00
existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1]
n--
break
}
}
2013-12-09 23:29:44 +00:00
2013-12-12 00:24:34 +00:00
// Trim the list if all known consuls are dead
if n == 0 {
2014-01-20 23:39:07 +00:00
delete(s.remoteConsuls, parts.Datacenter)
2013-12-12 00:24:34 +00:00
} else {
2014-01-20 23:39:07 +00:00
s.remoteConsuls[parts.Datacenter] = existing
2013-12-12 00:24:34 +00:00
}
s.remoteLock.Unlock()
// Remove from the local list as well
if !wan {
s.localLock.Lock()
delete(s.localConsuls, parts.Addr.String())
s.localLock.Unlock()
}
2013-12-12 00:24:34 +00:00
}
2013-12-09 23:29:44 +00:00
}