Simplify Raft peer adds using only reconciliation
This commit is contained in:
parent
786755ebcd
commit
2030b2288d
|
@ -2,7 +2,9 @@ package consul
|
|||
|
||||
import (
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -38,7 +40,15 @@ func (s *Server) monitorLeadership() {
|
|||
// leaderLoop runs as long as we are the leader to run various
|
||||
// maintence activities
|
||||
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
||||
// Reconcile channel is only used once initial reconcile
|
||||
// has succeeded
|
||||
var reconcileCh chan serf.Member
|
||||
|
||||
RECONCILE:
|
||||
// Setup a reconciliation timer
|
||||
reconcileCh = nil
|
||||
interval := time.After(s.config.ReconcileInterval)
|
||||
|
||||
// Apply a raft barrier to ensure our FSM is caught up
|
||||
barrier := s.raft.Barrier(0)
|
||||
if err := barrier.Error(); err != nil {
|
||||
|
@ -52,15 +62,24 @@ RECONCILE:
|
|||
goto WAIT
|
||||
}
|
||||
|
||||
// Initial reconcile worked, now we can process the channel
|
||||
// updates
|
||||
reconcileCh = s.reconcileCh
|
||||
|
||||
WAIT:
|
||||
// Periodically reconcile as long as we are the leader
|
||||
// Periodically reconcile as long as we are the leader,
|
||||
// or when Serf events arrive
|
||||
for {
|
||||
select {
|
||||
case <-time.After(s.config.ReconcileInterval):
|
||||
goto RECONCILE
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
case <-interval:
|
||||
goto RECONCILE
|
||||
case member := <-reconcileCh:
|
||||
s.reconcileMember(member)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,6 +146,11 @@ func (s *Server) handleAliveMember(member serf.Member) error {
|
|||
Service: "consul",
|
||||
Port: port,
|
||||
}
|
||||
|
||||
// Attempt to join the consul server
|
||||
if err := s.joinConsulServer(member, port); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the node exists
|
||||
|
@ -220,6 +244,17 @@ func (s *Server) handleLeftMember(member serf.Member) error {
|
|||
}
|
||||
s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name)
|
||||
|
||||
// Remove from Raft peers if this was a server
|
||||
if valid, _, port := isConsulServer(member); valid {
|
||||
peer := &net.TCPAddr{IP: member.Addr, Port: port}
|
||||
future := s.raft.RemovePeer(peer)
|
||||
if err := future.Error(); err != nil && err != raft.UnknownPeer {
|
||||
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
|
||||
peer, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Deregister the node
|
||||
req := structs.DeregisterRequest{
|
||||
Datacenter: s.config.Datacenter,
|
||||
|
@ -228,3 +263,20 @@ func (s *Server) handleLeftMember(member serf.Member) error {
|
|||
var out struct{}
|
||||
return s.endpoints.Catalog.Deregister(&req, &out)
|
||||
}
|
||||
|
||||
// joinConsulServer is used to try to join another consul server
|
||||
func (s *Server) joinConsulServer(m serf.Member, port int) error {
|
||||
// Do not join ourself
|
||||
if m.Name == s.config.NodeName {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Attempt to add as a peer
|
||||
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
|
||||
future := s.raft.AddPeer(addr)
|
||||
if err := future.Error(); err != nil && err != raft.KnownPeer {
|
||||
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -188,3 +188,72 @@ func TestLeader_Reconcile(t *testing.T) {
|
|||
t.Fatalf("client not registered")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeader_LeftServer(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
servers := []*Server{s1, s2, s3}
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait until we have 3 peers
|
||||
start := time.Now()
|
||||
CHECK1:
|
||||
for _, s := range servers {
|
||||
peers, _ := s.raftPeers.Peers()
|
||||
if len(peers) != 3 {
|
||||
if time.Now().Sub(start) >= 2*time.Second {
|
||||
t.Fatalf("should have 3 peers")
|
||||
} else {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto CHECK1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Kill any server
|
||||
servers[0].Shutdown()
|
||||
|
||||
// Wait for failure detection
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Force remove the non-leader (transition to left state)
|
||||
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for intent propagation
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Wait until we have 2 peers
|
||||
start = time.Now()
|
||||
CHECK2:
|
||||
for _, s := range servers[1:] {
|
||||
peers, _ := s.raftPeers.Peers()
|
||||
if len(peers) != 2 {
|
||||
if time.Now().Sub(start) >= 2*time.Second {
|
||||
t.Fatalf("should have 2 peers")
|
||||
} else {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto CHECK2
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// lanEventHandler is used to handle events from the lan Serf cluster
|
||||
|
@ -14,7 +12,6 @@ func (s *Server) lanEventHandler() {
|
|||
case e := <-s.eventChLAN:
|
||||
switch e.EventType() {
|
||||
case serf.EventMemberJoin:
|
||||
s.localJoin(e.(serf.MemberEvent))
|
||||
fallthrough
|
||||
case serf.EventMemberLeave:
|
||||
fallthrough
|
||||
|
@ -62,26 +59,12 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) {
|
|||
return
|
||||
}
|
||||
|
||||
// Dispatch an async handler for each member
|
||||
// Queue the members for reconciliation
|
||||
for _, m := range me.Members {
|
||||
go s.reconcileMember(m)
|
||||
select {
|
||||
case s.reconcileCh <- m:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// localJoin is used to handle join events on the lan serf cluster
|
||||
func (s *Server) localJoin(me serf.MemberEvent) {
|
||||
// Check for consul members
|
||||
for _, m := range me.Members {
|
||||
ok, dc, port := isConsulServer(m)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if dc != s.config.Datacenter {
|
||||
s.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
|
||||
m.Name, dc)
|
||||
continue
|
||||
}
|
||||
go s.joinConsulServer(m, port)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,60 +130,3 @@ func (s *Server) remoteFailed(me serf.MemberEvent) {
|
|||
s.remoteLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// joinConsulServer is used to try to join another consul server
|
||||
func (s *Server) joinConsulServer(m serf.Member, port int) {
|
||||
if m.Name == s.config.NodeName {
|
||||
return
|
||||
}
|
||||
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
|
||||
var future raft.Future
|
||||
|
||||
CHECK:
|
||||
// Get the Raft peers
|
||||
peers, err := s.raftPeers.Peers()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to get raft peers: %v", err)
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
// Bail if this node is already a peer
|
||||
for _, p := range peers {
|
||||
if p.String() == addr.String() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Bail if the node is not alive
|
||||
if memberStatus(s.serfLAN.Members(), m.Name) != serf.StatusAlive {
|
||||
return
|
||||
}
|
||||
|
||||
// Attempt to add as a peer
|
||||
future = s.raft.AddPeer(addr)
|
||||
if err := future.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
|
||||
WAIT:
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
default:
|
||||
goto CHECK
|
||||
}
|
||||
}
|
||||
|
||||
// memberStatus scans a list of members for a matching one,
|
||||
// returning the status or StatusNone
|
||||
func memberStatus(members []serf.Member, name string) serf.MemberStatus {
|
||||
for _, m := range members {
|
||||
if m.Name == name {
|
||||
return m.Status
|
||||
}
|
||||
}
|
||||
return serf.StatusNone
|
||||
}
|
||||
|
|
|
@ -53,6 +53,11 @@ type Server struct {
|
|||
raftStore *raft.MDBStore
|
||||
raftTransport *raft.NetworkTransport
|
||||
|
||||
// reconcileCh is used to pass events from the serf handler
|
||||
// into the leader manager, so that the strong state can be
|
||||
// updated
|
||||
reconcileCh chan serf.Member
|
||||
|
||||
// remoteConsuls is used to track the known consuls in
|
||||
// remote data centers. Used to do DC forwarding.
|
||||
remoteConsuls map[string][]net.Addr
|
||||
|
@ -110,6 +115,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
eventChLAN: make(chan serf.Event, 256),
|
||||
eventChWAN: make(chan serf.Event, 256),
|
||||
logger: logger,
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
remoteConsuls: make(map[string][]net.Addr),
|
||||
rpcClients: make(map[net.Conn]struct{}),
|
||||
rpcServer: rpc.NewServer(),
|
||||
|
@ -202,7 +208,7 @@ func (s *Server) setupRaft() error {
|
|||
}
|
||||
|
||||
// Create a transport layer
|
||||
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
|
||||
trans := raft.NewNetworkTransport(s.raftLayer, 3, 500*time.Millisecond, s.config.LogOutput)
|
||||
s.raftTransport = trans
|
||||
|
||||
// Setup the peer store
|
||||
|
|
|
@ -64,7 +64,7 @@ func testServerDCBootstrap(t *testing.T, dc string, bootstrap bool) (string, *Se
|
|||
config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
|
||||
config.RaftConfig.ElectionTimeout = 40 * time.Millisecond
|
||||
|
||||
config.ReconcileInterval = 50 * time.Millisecond
|
||||
config.ReconcileInterval = 100 * time.Millisecond
|
||||
|
||||
server, err := NewServer(config)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue