peering: skip registering duplicate node and check from the peer (#14994)
* peering: skip register duplicate node and check from the peer * Prebuilt the nodes map and checks map to avoid repeated for loop * use key type to struct: node id, service id, and check id
This commit is contained in:
parent
4dd572fdd9
commit
e18434bcb1
|
@ -274,6 +274,7 @@ func TestLeader_PeeringSync_Lifecycle_UnexportWhileDown(t *testing.T) {
|
|||
insertNode := func(i int) {
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
ID: types.NodeID(generateUUID()),
|
||||
Node: fmt.Sprintf("node%d", i+1),
|
||||
Address: fmt.Sprintf("127.0.0.%d", i+1),
|
||||
NodeMeta: map[string]string{
|
||||
|
|
|
@ -334,34 +334,64 @@ func (s *Server) handleUpdateService(
|
|||
|
||||
// Normalize the data into a convenient form for operation.
|
||||
snap := newHealthSnapshot(structsNodes, partition, peerName)
|
||||
storedNodesMap, storedSvcInstMap, storedChecksMap := buildStoredMap(storedInstances)
|
||||
|
||||
for _, nodeSnap := range snap.Nodes {
|
||||
// First register the node
|
||||
req := nodeSnap.Node.ToRegisterRequest()
|
||||
if err := s.Backend.CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register node: %w", err)
|
||||
// First register the node - skip the unchanged ones
|
||||
changed := true
|
||||
if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok {
|
||||
if storedNode.IsSame(nodeSnap.Node) {
|
||||
changed = false
|
||||
}
|
||||
}
|
||||
|
||||
// Then register all services on that node
|
||||
for _, svcSnap := range nodeSnap.Services {
|
||||
req.Service = svcSnap.Service
|
||||
req := nodeSnap.Node.ToRegisterRequest()
|
||||
if changed {
|
||||
if err := s.Backend.CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register service: %w", err)
|
||||
return fmt.Errorf("failed to register node: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Then register all services on that node - skip the unchanged ones
|
||||
for _, svcSnap := range nodeSnap.Services {
|
||||
changed = true
|
||||
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok {
|
||||
if storedSvcInst.IsSame(svcSnap.Service) {
|
||||
changed = false
|
||||
}
|
||||
}
|
||||
|
||||
if changed {
|
||||
req.Service = svcSnap.Service
|
||||
if err := s.Backend.CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register service: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
req.Service = nil
|
||||
|
||||
// Then register all checks on that node
|
||||
// Then register all checks on that node - skip the unchanged ones
|
||||
var chks structs.HealthChecks
|
||||
for _, svcSnap := range nodeSnap.Services {
|
||||
for _, c := range svcSnap.Checks {
|
||||
chks = append(chks, c)
|
||||
changed := true
|
||||
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok {
|
||||
if chk.IsSame(c) {
|
||||
changed = false
|
||||
}
|
||||
}
|
||||
|
||||
if changed {
|
||||
chks = append(chks, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
req.Checks = chks
|
||||
if err := s.Backend.CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register check: %w", err)
|
||||
if len(chks) > 0 {
|
||||
req.Checks = chks
|
||||
if err := s.Backend.CatalogRegister(&req); err != nil {
|
||||
return fmt.Errorf("failed to register check: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -589,3 +619,46 @@ func makeReplicationResponse(resp *pbpeerstream.ReplicationMessage_Response) *pb
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
// nodeSvcInstIdentity uniquely identifies an service instance imported from a peering cluster
|
||||
type nodeSvcInstIdentity struct {
|
||||
nodeID string
|
||||
serviceID string
|
||||
}
|
||||
|
||||
// nodeCheckIdentity uniquely identifies a check imported from a peering cluster
|
||||
type nodeCheckIdentity struct {
|
||||
nodeID string
|
||||
serviceID string
|
||||
checkID string
|
||||
}
|
||||
|
||||
func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity {
|
||||
return nodeSvcInstIdentity{
|
||||
nodeID: string(nodeID),
|
||||
serviceID: serviceID,
|
||||
}
|
||||
}
|
||||
|
||||
func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity {
|
||||
return nodeCheckIdentity{
|
||||
serviceID: serviceID,
|
||||
checkID: string(checkID),
|
||||
nodeID: string(nodeID),
|
||||
}
|
||||
}
|
||||
|
||||
func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) {
|
||||
nodesMap := map[types.NodeID]*structs.Node{}
|
||||
svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{}
|
||||
checksMap := map[nodeCheckIdentity]*structs.HealthCheck{}
|
||||
|
||||
for _, csn := range storedInstances {
|
||||
nodesMap[csn.Node.ID] = csn.Node
|
||||
svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service
|
||||
for _, chk := range csn.Checks {
|
||||
checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk
|
||||
}
|
||||
}
|
||||
return nodesMap, svcInstMap, checksMap
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue