Issue 3452 (#3500)
* Make sure that id and address are set in member created during reaping of catalog nodes that have been removed from serf * Get address from node table in the state store rather than from service address * Fix incorrect lookup by checkname instead of node name * Make sure that serverlookup is called with the right address format, added unit test. * Address code review comments * Tweaks style stuff.
This commit is contained in:
parent
55fb1fcfac
commit
783e24be64
|
@ -329,9 +329,8 @@ func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) {
|
|||
}
|
||||
|
||||
// reconcileReaped is used to reconcile nodes that have failed and been reaped
|
||||
// from Serf but remain in the catalog. This is done by looking for SerfCheckID
|
||||
// in a critical state that does not correspond to a known Serf member. We generate
|
||||
// a "reap" event to cause the node to be cleaned up.
|
||||
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
|
||||
// We generate a "reap" event to cause the node to be cleaned up.
|
||||
func (s *Server) reconcileReaped(known map[string]struct{}) error {
|
||||
state := s.fsm.State()
|
||||
_, checks, err := state.ChecksInState(nil, api.HealthAny)
|
||||
|
@ -349,6 +348,35 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
|
|||
continue
|
||||
}
|
||||
|
||||
// Get the node services, look for ConsulServiceID
|
||||
_, services, err := state.NodeServices(nil, check.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serverPort := 0
|
||||
serverAddr := ""
|
||||
serverID := ""
|
||||
|
||||
CHECKS:
|
||||
for _, service := range services.Services {
|
||||
if service.ID == structs.ConsulServiceID {
|
||||
_, node, err := state.GetNode(check.Node)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: Unable to look up node with name %q: %v", check.Node, err)
|
||||
continue CHECKS
|
||||
}
|
||||
|
||||
serverAddr = node.Address
|
||||
serverPort = service.Port
|
||||
lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort))
|
||||
svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr))
|
||||
if svr != nil {
|
||||
serverID = svr.ID
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Create a fake member
|
||||
member := serf.Member{
|
||||
Name: check.Node,
|
||||
|
@ -358,23 +386,12 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
|
|||
},
|
||||
}
|
||||
|
||||
// Get the node services, look for ConsulServiceID
|
||||
_, services, err := state.NodeServices(nil, check.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serverPort := 0
|
||||
for _, service := range services.Services {
|
||||
if service.ID == structs.ConsulServiceID {
|
||||
serverPort = service.Port
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Create the appropriate tags if this was a server node
|
||||
if serverPort > 0 {
|
||||
member.Tags["role"] = "consul"
|
||||
member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10)
|
||||
member.Tags["id"] = serverID
|
||||
member.Addr = net.ParseIP(serverAddr)
|
||||
}
|
||||
|
||||
// Attempt to reap this member
|
||||
|
|
|
@ -250,6 +250,79 @@ func TestLeader_ReapMember(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLeader_ReapServer(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "allow"
|
||||
c.ACLEnforceVersion8 = true
|
||||
c.Bootstrap = true
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "allow"
|
||||
c.ACLEnforceVersion8 = true
|
||||
c.Bootstrap = false
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "allow"
|
||||
c.ACLEnforceVersion8 = true
|
||||
c.Bootstrap = false
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Try to join
|
||||
joinLAN(t, s1, s2)
|
||||
joinLAN(t, s1, s3)
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
state := s1.fsm.State()
|
||||
|
||||
// s3 should be registered
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, node, err := state.GetNode(s3.config.NodeName)
|
||||
if err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
if node == nil {
|
||||
r.Fatal("client not registered")
|
||||
}
|
||||
})
|
||||
|
||||
// call reconcileReaped with a map that does not contain s3
|
||||
knownMembers := make(map[string]struct{})
|
||||
knownMembers[s1.config.NodeName] = struct{}{}
|
||||
knownMembers[s2.config.NodeName] = struct{}{}
|
||||
|
||||
err := s1.reconcileReaped(knownMembers)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error :%v", err)
|
||||
}
|
||||
// s3 should be deregistered
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, node, err := state.GetNode(s3.config.NodeName)
|
||||
if err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
if node != nil {
|
||||
r.Fatalf("server with id %v should not be registered", s3.config.NodeID)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestLeader_Reconcile_ReapMember(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
|
|
Loading…
Reference in a new issue