package consul import ( "fmt" "net" "strconv" "sync" "time" "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-version" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) const ( newLeaderEvent = "consul:new-leader" barrierWriteTimeout = 2 * time.Minute ) var minAutopilotVersion = version.Must(version.NewVersion("0.8.0")) // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes func (s *Server) monitorLeadership() { // We use the notify channel we configured Raft with, NOT Raft's // leaderCh, which is only notified best-effort. Doing this ensures // that we get all notifications in order, which is required for // cleanup and to ensure we never run multiple leader loops. raftNotifyCh := s.raftNotifyCh var weAreLeaderCh chan struct{} var leaderLoop sync.WaitGroup for { select { case isLeader := <-raftNotifyCh: switch { case isLeader: if weAreLeaderCh != nil { s.logger.Printf("[ERR] consul: attempted to start the leader loop while running") continue } weAreLeaderCh = make(chan struct{}) leaderLoop.Add(1) go func(ch chan struct{}) { defer leaderLoop.Done() s.leaderLoop(ch) }(weAreLeaderCh) s.logger.Printf("[INFO] consul: cluster leadership acquired") default: if weAreLeaderCh == nil { s.logger.Printf("[ERR] consul: attempted to stop the leader loop while not running") continue } s.logger.Printf("[DEBUG] consul: shutting down leader loop") close(weAreLeaderCh) leaderLoop.Wait() weAreLeaderCh = nil s.logger.Printf("[INFO] consul: cluster leadership lost") } case <-s.shutdownCh: return } } } // leaderLoop runs as long as we are the leader to run various // maintenance activities func (s *Server) leaderLoop(stopCh chan struct{}) { // Fire a user event indicating a new leader payload := []byte(s.config.NodeName) for name, segment := range s.LANSegments() { if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil { s.logger.Printf("[WARN] consul: failed to broadcast new leader event on segment %q: %v", name, err) } } // Reconcile channel is only used once initial reconcile // has succeeded var reconcileCh chan serf.Member establishedLeader := false reassert := func() error { if !establishedLeader { return fmt.Errorf("leadership has not been established") } if err := s.revokeLeadership(); err != nil { return err } if err := s.establishLeadership(); err != nil { return err } return nil } RECONCILE: // Setup a reconciliation timer reconcileCh = nil interval := time.After(s.config.ReconcileInterval) // Apply a raft barrier to ensure our FSM is caught up start := time.Now() barrier := s.raft.Barrier(barrierWriteTimeout) if err := barrier.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err) goto WAIT } metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start) metrics.MeasureSince([]string{"leader", "barrier"}, start) // Check if we need to handle initial leadership actions if !establishedLeader { if err := s.establishLeadership(); err != nil { s.logger.Printf("[ERR] consul: failed to establish leadership: %v", err) goto WAIT } establishedLeader = true defer func() { if err := s.revokeLeadership(); err != nil { s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err) } }() } // Reconcile any missing data if err := s.reconcile(); err != nil { s.logger.Printf("[ERR] consul: failed to reconcile: %v", err) goto WAIT } // Initial reconcile worked, now we can process the channel // updates reconcileCh = s.reconcileCh WAIT: // Poll the stop channel to give it priority so we don't waste time // trying to perform the other operations if we have been asked to shut // down. select { case <-stopCh: return default: } // Periodically reconcile as long as we are the leader, // or when Serf events arrive for { select { case <-stopCh: return case <-s.shutdownCh: return case <-interval: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) case index := <-s.tombstoneGC.ExpireCh(): go s.reapTombstones(index) case errCh := <-s.reassertLeaderCh: errCh <- reassert() } } } // establishLeadership is invoked once we become leader and are able // to invoke an initial barrier. The barrier is used to ensure any // previously inflight transactions have been committed and that our // state is up-to-date. func (s *Server) establishLeadership() error { // This will create the anonymous token and master token (if that is // configured). if err := s.initializeACL(); err != nil { return err } // Hint the tombstone expiration timer. When we freshly establish leadership // we become the authoritative timer, and so we need to start the clock // on any pending GC events. s.tombstoneGC.SetEnabled(true) lastIndex := s.raft.LastIndex() s.tombstoneGC.Hint(lastIndex) // Setup the session timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node along, effectively this means all the timers are renewed at the // time of failover. The TTL contract is that the session will not be expired // before the TTL, so expiring it later is allowable. // // This MUST be done after the initial barrier to ensure the latest Sessions // are available to be initialized. Otherwise initialization may use stale // data. if err := s.initializeSessionTimers(); err != nil { return err } s.getOrCreateAutopilotConfig() s.autopilot.Start() s.setConsistentReadReady() return nil } // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { // Disable the tombstone GC, since it is only useful as a leader s.tombstoneGC.SetEnabled(false) // Clear the session timers on either shutdown or step down, since we // are no longer responsible for session expirations. if err := s.clearAllSessionTimers(); err != nil { return err } s.resetConsistentReadReady() s.autopilot.Stop() return nil } // initializeACL is used to setup the ACLs if we are the leader // and need to do this. func (s *Server) initializeACL() error { // Bail if not configured or we are not authoritative. authDC := s.config.ACLDatacenter if len(authDC) == 0 || authDC != s.config.Datacenter { return nil } // Purge the cache, since it could've changed while we were not the // leader. s.aclAuthCache.Purge() // Create anonymous token if missing. state := s.fsm.State() _, acl, err := state.ACLGet(nil, anonymousToken) if err != nil { return fmt.Errorf("failed to get anonymous token: %v", err) } if acl == nil { req := structs.ACLRequest{ Datacenter: authDC, Op: structs.ACLSet, ACL: structs.ACL{ ID: anonymousToken, Name: "Anonymous Token", Type: structs.ACLTypeClient, }, } _, err := s.raftApply(structs.ACLRequestType, &req) if err != nil { return fmt.Errorf("failed to create anonymous token: %v", err) } } // Check for configured master token. if master := s.config.ACLMasterToken; len(master) > 0 { _, acl, err = state.ACLGet(nil, master) if err != nil { return fmt.Errorf("failed to get master token: %v", err) } if acl == nil { req := structs.ACLRequest{ Datacenter: authDC, Op: structs.ACLSet, ACL: structs.ACL{ ID: master, Name: "Master Token", Type: structs.ACLTypeManagement, }, } _, err := s.raftApply(structs.ACLRequestType, &req) if err != nil { return fmt.Errorf("failed to create master token: %v", err) } s.logger.Printf("[INFO] consul: Created ACL master token from configuration") } } // Check to see if we need to initialize the ACL bootstrap info. This // needs a Consul version check since it introduces a new Raft operation // that'll produce an error on older servers, and it also makes a piece // of state in the state store that will cause problems with older // servers consuming snapshots, so we have to wait to create it. var minVersion = version.Must(version.NewVersion("0.9.1")) if ServersMeetMinimumVersion(s.LANMembers(), minVersion) { bs, err := state.ACLGetBootstrap() if err != nil { return fmt.Errorf("failed looking for ACL bootstrap info: %v", err) } if bs == nil { req := structs.ACLRequest{ Datacenter: authDC, Op: structs.ACLBootstrapInit, } resp, err := s.raftApply(structs.ACLRequestType, &req) if err != nil { return fmt.Errorf("failed to initialize ACL bootstrap: %v", err) } switch v := resp.(type) { case error: return fmt.Errorf("failed to initialize ACL bootstrap: %v", v) case bool: if v { s.logger.Printf("[INFO] consul: ACL bootstrap enabled") } else { s.logger.Printf("[INFO] consul: ACL bootstrap disabled, existing management tokens found") } default: return fmt.Errorf("unexpected response trying to initialize ACL bootstrap: %T", v) } } } else { s.logger.Printf("[WARN] consul: Can't initialize ACL bootstrap until all servers are >= %s", minVersion.String()) } return nil } // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config { state := s.fsm.State() _, config, err := state.AutopilotConfig() if err != nil { s.logger.Printf("[ERR] autopilot: failed to get config: %v", err) return nil } if config != nil { return config } if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) { s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String()) return nil } config = s.config.AutopilotConfig req := structs.AutopilotSetConfigRequest{Config: *config} if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil { s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err) return nil } return config } // reconcileReaped is used to reconcile nodes that have failed and been reaped // from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered. // We generate a "reap" event to cause the node to be cleaned up. func (s *Server) reconcileReaped(known map[string]struct{}) error { state := s.fsm.State() _, checks, err := state.ChecksInState(nil, api.HealthAny) if err != nil { return err } for _, check := range checks { // Ignore any non serf checks if check.CheckID != structs.SerfCheckID { continue } // Check if this node is "known" by serf if _, ok := known[check.Node]; ok { continue } // Get the node services, look for ConsulServiceID _, services, err := state.NodeServices(nil, check.Node) if err != nil { return err } serverPort := 0 serverAddr := "" serverID := "" CHECKS: for _, service := range services.Services { if service.ID == structs.ConsulServiceID { _, node, err := state.GetNode(check.Node) if err != nil { s.logger.Printf("[ERR] consul: Unable to look up node with name %q: %v", check.Node, err) continue CHECKS } serverAddr = node.Address serverPort = service.Port lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort)) svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr)) if svr != nil { serverID = svr.ID } break } } // Create a fake member member := serf.Member{ Name: check.Node, Tags: map[string]string{ "dc": s.config.Datacenter, "role": "node", }, } // Create the appropriate tags if this was a server node if serverPort > 0 { member.Tags["role"] = "consul" member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10) member.Tags["id"] = serverID member.Addr = net.ParseIP(serverAddr) } // Attempt to reap this member if err := s.handleReapMember(member); err != nil { return err } } return nil } // reconcileMember is used to do an async reconcile of a single // serf member func (s *Server) reconcileMember(member serf.Member) error { // Check if this is a member we should handle if !s.shouldHandleMember(member) { s.logger.Printf("[WARN] consul: skipping reconcile of node %v", member) return nil } defer metrics.MeasureSince([]string{"consul", "leader", "reconcileMember"}, time.Now()) defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now()) var err error switch member.Status { case serf.StatusAlive: err = s.handleAliveMember(member) case serf.StatusFailed: err = s.handleFailedMember(member) case serf.StatusLeft: err = s.handleLeftMember(member) case StatusReap: err = s.handleReapMember(member) } if err != nil { s.logger.Printf("[ERR] consul: failed to reconcile member: %v: %v", member, err) // Permission denied should not bubble up if acl.IsErrPermissionDenied(err) { return nil } } return nil } // shouldHandleMember checks if this is a Consul pool member func (s *Server) shouldHandleMember(member serf.Member) bool { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { return true } if valid, parts := metadata.IsConsulServer(member); valid && parts.Segment == "" && parts.Datacenter == s.config.Datacenter { return true } return false } // handleAliveMember is used to ensure the node // is registered, with a passing health check. func (s *Server) handleAliveMember(member serf.Member) error { // Register consul service if a server var service *structs.NodeService if valid, parts := metadata.IsConsulServer(member); valid { service = &structs.NodeService{ ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, Port: parts.Port, } // Attempt to join the consul server if err := s.joinConsulServer(member, parts); err != nil { return err } } // Check if the node exists state := s.fsm.State() _, node, err := state.GetNode(member.Name) if err != nil { return err } if node != nil && node.Address == member.Addr.String() { // Check if the associated service is available if service != nil { match := false _, services, err := state.NodeServices(nil, member.Name) if err != nil { return err } if services != nil { for id := range services.Services { if id == service.ID { match = true } } } if !match { goto AFTER_CHECK } } // Check if the serfCheck is in the passing state _, checks, err := state.NodeChecks(nil, member.Name) if err != nil { return err } for _, check := range checks { if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing { return nil } } } AFTER_CHECK: s.logger.Printf("[INFO] consul: member '%s' joined, marking health alive", member.Name) // Register with the catalog. req := structs.RegisterRequest{ Datacenter: s.config.Datacenter, Node: member.Name, ID: types.NodeID(member.Tags["id"]), Address: member.Addr.String(), Service: service, Check: &structs.HealthCheck{ Node: member.Name, CheckID: structs.SerfCheckID, Name: structs.SerfCheckName, Status: api.HealthPassing, Output: structs.SerfCheckAliveOutput, }, // If there's existing information about the node, do not // clobber it. SkipNodeUpdate: true, } _, err = s.raftApply(structs.RegisterRequestType, &req) return err } // handleFailedMember is used to mark the node's status // as being critical, along with all checks as unknown. func (s *Server) handleFailedMember(member serf.Member) error { // Check if the node exists state := s.fsm.State() _, node, err := state.GetNode(member.Name) if err != nil { return err } if node != nil && node.Address == member.Addr.String() { // Check if the serfCheck is in the critical state _, checks, err := state.NodeChecks(nil, member.Name) if err != nil { return err } for _, check := range checks { if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical { return nil } } } s.logger.Printf("[INFO] consul: member '%s' failed, marking health critical", member.Name) // Register with the catalog req := structs.RegisterRequest{ Datacenter: s.config.Datacenter, Node: member.Name, ID: types.NodeID(member.Tags["id"]), Address: member.Addr.String(), Check: &structs.HealthCheck{ Node: member.Name, CheckID: structs.SerfCheckID, Name: structs.SerfCheckName, Status: api.HealthCritical, Output: structs.SerfCheckFailedOutput, }, // If there's existing information about the node, do not // clobber it. SkipNodeUpdate: true, } _, err = s.raftApply(structs.RegisterRequestType, &req) return err } // handleLeftMember is used to handle members that gracefully // left. They are deregistered if necessary. func (s *Server) handleLeftMember(member serf.Member) error { return s.handleDeregisterMember("left", member) } // handleReapMember is used to handle members that have been // reaped after a prolonged failure. They are deregistered. func (s *Server) handleReapMember(member serf.Member) error { return s.handleDeregisterMember("reaped", member) } // handleDeregisterMember is used to deregister a member of a given reason func (s *Server) handleDeregisterMember(reason string, member serf.Member) error { // Do not deregister ourself. This can only happen if the current leader // is leaving. Instead, we should allow a follower to take-over and // deregister us later. if member.Name == s.config.NodeName { s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName) return nil } // Remove from Raft peers if this was a server if valid, parts := metadata.IsConsulServer(member); valid { if err := s.removeConsulServer(member, parts.Port); err != nil { return err } } // Check if the node does not exist state := s.fsm.State() _, node, err := state.GetNode(member.Name) if err != nil { return err } if node == nil { return nil } // Deregister the node s.logger.Printf("[INFO] consul: member '%s' %s, deregistering", member.Name, reason) req := structs.DeregisterRequest{ Datacenter: s.config.Datacenter, Node: member.Name, } _, err = s.raftApply(structs.DeregisterRequestType, &req) return err } // joinConsulServer is used to try to join another consul server func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error { // Check for possibility of multiple bootstrap nodes if parts.Bootstrap { members := s.serfLAN.Members() for _, member := range members { valid, p := metadata.IsConsulServer(member) if valid && member.Name != m.Name && p.Bootstrap { s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name) return nil } } } // Processing ourselves could result in trying to remove ourselves to // fix up our address, which would make us step down. This is only // safe to attempt if there are multiple servers available. configFuture := s.raft.GetConfiguration() if err := configFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) return err } if m.Name == s.config.NodeName { if l := len(configFuture.Configuration().Servers); l < 3 { s.logger.Printf("[DEBUG] consul: Skipping self join check for %q since the cluster is too small", m.Name) return nil } } // See if it's already in the configuration. It's harmless to re-add it // but we want to avoid doing that if possible to prevent useless Raft // log entries. If the address is the same but the ID changed, remove the // old server before adding the new one. addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String() minRaftProtocol, err := s.autopilot.MinRaftProtocol() if err != nil { return err } for _, server := range configFuture.Configuration().Servers { // No-op if the raft version is too low if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) { return nil } // If the address or ID matches an existing server, see if we need to remove the old one first if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) { // Exit with no-op if this is being called on an existing server if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) { return nil } future := s.raft.RemoveServer(server.ID, 0, 0) if server.Address == raft.ServerAddress(addr) { if err := future.Error(); err != nil { return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err) } s.logger.Printf("[INFO] consul: removed server with duplicate address: %s", server.Address) } else { if err := future.Error(); err != nil { return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err) } s.logger.Printf("[INFO] consul: removed server with duplicate ID: %s", server.ID) } } } // Attempt to add as a peer switch { case minRaftProtocol >= 3: addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) if err := addFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) return err } case minRaftProtocol == 2 && parts.RaftVersion >= 3: addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) if err := addFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) return err } default: addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) if err := addFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) return err } } // Trigger a check to remove dead servers s.autopilot.RemoveDeadServers() return nil } // removeConsulServer is used to try to remove a consul server that has left func (s *Server) removeConsulServer(m serf.Member, port int) error { addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String() // See if it's already in the configuration. It's harmless to re-remove it // but we want to avoid doing that if possible to prevent useless Raft // log entries. configFuture := s.raft.GetConfiguration() if err := configFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) return err } minRaftProtocol, err := s.autopilot.MinRaftProtocol() if err != nil { return err } _, parts := metadata.IsConsulServer(m) // Pick which remove API to use based on how the server was added. for _, server := range configFuture.Configuration().Servers { // If we understand the new add/remove APIs and the server was added by ID, use the new remove API if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) { s.logger.Printf("[INFO] consul: removing server by ID: %q", server.ID) future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0) if err := future.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", server.ID, err) return err } break } else if server.Address == raft.ServerAddress(addr) { // If not, use the old remove API s.logger.Printf("[INFO] consul: removing server by address: %q", server.Address) future := s.raft.RemovePeer(raft.ServerAddress(addr)) if err := future.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", addr, err) return err } break } } return nil } // reapTombstones is invoked by the current leader to manage garbage // collection of tombstones. When a key is deleted, we trigger a tombstone // GC clock. Once the expiration is reached, this routine is invoked // to clear all tombstones before this index. This must be replicated // through Raft to ensure consistency. We do this outside the leader loop // to avoid blocking. func (s *Server) reapTombstones(index uint64) { defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now()) defer metrics.MeasureSince([]string{"leader", "reapTombstones"}, time.Now()) req := structs.TombstoneRequest{ Datacenter: s.config.Datacenter, Op: structs.TombstoneReap, ReapIndex: index, } _, err := s.raftApply(structs.TombstoneRequestType, &req) if err != nil { s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v", index, err) } }