diff --git a/.changelog/16570.txt b/.changelog/16570.txt new file mode 100644 index 000000000..ad07cda81 --- /dev/null +++ b/.changelog/16570.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: Fixes a bug that can lead to peering service deletes impacting the state of local services +``` diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 39a40d9ca..077fbde79 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1186,7 +1186,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, unique := make(map[structs.ServiceName]struct{}) for service := services.Next(); service != nil; service = services.Next() { svc := service.(*structs.ServiceNode) - unique[svc.CompoundServiceName()] = struct{}{} + unique[svc.CompoundServiceName().ServiceName] = struct{}{} } results := make(structs.ServiceList, 0, len(unique)) @@ -1920,17 +1920,17 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return fmt.Errorf("failed updating service-kind indexes: %w", err) } // Update the node indexes as the service information is included in node catalog queries. - if err := catalogUpdateNodesIndexes(tx, idx, entMeta, peerName); err != nil { + if err := catalogUpdateNodesIndexes(tx, idx, entMeta, svc.PeerName); err != nil { return fmt.Errorf("failed updating nodes indexes: %w", err) } - if err := catalogUpdateNodeIndexes(tx, idx, nodeName, entMeta, peerName); err != nil { + if err := catalogUpdateNodeIndexes(tx, idx, nodeName, entMeta, svc.PeerName); err != nil { return fmt.Errorf("failed updating node indexes: %w", err) } - name := svc.CompoundServiceName() + psn := svc.CompoundServiceName() if err := cleanupMeshTopology(tx, idx, svc); err != nil { - return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err) + return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", psn.String(), err) } q := Query{ @@ -1957,12 +1957,14 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta, svc.PeerName); err != nil { return err } - psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name} if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil { - return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) + return fmt.Errorf("failed to clean up virtual IP for %q: %v", psn.String(), err) } - if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil { - return fmt.Errorf("failed to persist service name: %v", err) + + if svc.PeerName == "" { + if err := cleanupKindServiceName(tx, idx, psn.ServiceName, svc.ServiceKind); err != nil { + return fmt.Errorf("failed to persist service name: %v", err) + } } } } else { @@ -1990,7 +1992,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st if svc.PeerName == "" { sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta} if err := cleanupGatewayWildcards(tx, idx, sn, false); err != nil { - return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) + return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", psn.String(), err) } } @@ -2695,7 +2697,7 @@ func (s *Store) CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, // De-dup services to lookup names := make(map[structs.ServiceName]struct{}) for _, n := range nodes { - names[n.CompoundServiceName()] = struct{}{} + names[n.CompoundServiceName().ServiceName] = struct{}{} } var results structs.CheckServiceNodes @@ -3657,7 +3659,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer continue } - existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port) + existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName().ServiceName, service.Port) if err != nil { return fmt.Errorf("gateway service lookup failed: %s", err) } @@ -4611,7 +4613,10 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS // cleanupMeshTopology removes a service from the mesh topology table // This is only safe to call when there are no more known instances of this proxy func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) error { - // TODO(peering): make this peering aware? + if service.PeerName != "" { + return nil + } + if service.ServiceKind != structs.ServiceKindConnectProxy { return nil } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index cef5ba0a0..b5976bbd2 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2577,20 +2577,49 @@ func TestStateStore_DeleteService(t *testing.T) { testRegisterService(t, s, 2, "node1", "service1") testRegisterCheck(t, s, 3, "node1", "service1", "check1", api.HealthPassing) - // Delete the service. - ws := memdb.NewWatchSet() - _, _, err := s.NodeServices(ws, "node1", nil, "") + // register a node with a service on a cluster peer. + testRegisterNodeOpts(t, s, 4, "node1", func(n *structs.Node) error { + n.PeerName = "cluster-01" + return nil + }) + testRegisterServiceOpts(t, s, 5, "node1", "service1", func(service *structs.NodeService) { + service.PeerName = "cluster-01" + }) + + wsPeer := memdb.NewWatchSet() + _, ns, err := s.NodeServices(wsPeer, "node1", nil, "cluster-01") + require.Len(t, ns.Services, 1) require.NoError(t, err) - if err := s.DeleteService(4, "node1", "service1", nil, ""); err != nil { - t.Fatalf("err: %s", err) + + ws := memdb.NewWatchSet() + _, ns, err = s.NodeServices(ws, "node1", nil, "") + require.Len(t, ns.Services, 1) + require.NoError(t, err) + + { + // Delete the peered service. + err = s.DeleteService(6, "node1", "service1", nil, "cluster-01") + require.NoError(t, err) + require.True(t, watchFired(wsPeer)) + _, kindServiceNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindTypical) + require.NoError(t, err) + require.Len(t, kindServiceNames, 1) + require.Equal(t, "service1", kindServiceNames[0].Service.Name) } - if !watchFired(ws) { - t.Fatalf("bad") + + { + // Delete the service. + err = s.DeleteService(6, "node1", "service1", nil, "") + require.NoError(t, err) + require.True(t, watchFired(ws)) + _, kindServiceNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindTypical) + require.NoError(t, err) + require.Len(t, kindServiceNames, 0) } // Service doesn't exist. ws = memdb.NewWatchSet() - _, ns, err := s.NodeServices(ws, "node1", nil, "") + _, ns, err = s.NodeServices(ws, "node1", nil, "") if err != nil || ns == nil || len(ns.Services) != 0 { t.Fatalf("bad: %#v (err: %#v)", ns, err) } @@ -2605,15 +2634,15 @@ func TestStateStore_DeleteService(t *testing.T) { } // Index tables were updated. - assert.Equal(t, uint64(4), catalogChecksMaxIndex(tx, nil, "")) - assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, "")) + assert.Equal(t, uint64(6), catalogChecksMaxIndex(tx, nil, "")) + assert.Equal(t, uint64(6), catalogServicesMaxIndex(tx, nil, "")) // Deleting a nonexistent service should be idempotent and not return an // error, nor fire a watch. - if err := s.DeleteService(5, "node1", "service1", nil, ""); err != nil { + if err := s.DeleteService(6, "node1", "service1", nil, ""); err != nil { t.Fatalf("err: %s", err) } - assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, "")) + assert.Equal(t, uint64(6), catalogServicesMaxIndex(tx, nil, "")) if watchFired(ws) { t.Fatalf("bad") } diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 5e3d7ce9c..9db30e5b2 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -126,11 +126,11 @@ func updateUsage(tx WriteTxn, changes Changes) error { // changed, in order to compare it with the finished memdb state. // Make sure to account for the fact that services can change their names. if serviceNameChanged(change) { - serviceNameChanges[svc.CompoundServiceName()] += 1 + serviceNameChanges[svc.CompoundServiceName().ServiceName] += 1 before := change.Before.(*structs.ServiceNode) - serviceNameChanges[before.CompoundServiceName()] -= 1 + serviceNameChanges[before.CompoundServiceName().ServiceName] -= 1 } else { - serviceNameChanges[svc.CompoundServiceName()] += delta + serviceNameChanges[svc.CompoundServiceName().ServiceName] += delta } case "kvs": diff --git a/agent/structs/structs.go b/agent/structs/structs.go index ae23899c9..9d752a383 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1153,7 +1153,7 @@ func (sn *ServiceNode) CompoundServiceID() ServiceID { } } -func (sn *ServiceNode) CompoundServiceName() ServiceName { +func (sn *ServiceNode) CompoundServiceName() PeeredServiceName { name := sn.ServiceName if name == "" { name = sn.ServiceID @@ -1163,10 +1163,14 @@ func (sn *ServiceNode) CompoundServiceName() ServiceName { entMeta := sn.EnterpriseMeta entMeta.Normalize() - return ServiceName{ - Name: name, - EnterpriseMeta: entMeta, + return PeeredServiceName{ + ServiceName: ServiceName{ + Name: name, + EnterpriseMeta: entMeta, + }, + Peer: sn.PeerName, } + } // Weights represent the weight used by DNS for a given status