package state import ( "errors" "fmt" "sort" "strings" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib/maps" "github.com/hashicorp/consul/proto/pbpeering" ) const ( tablePeering = "peering" tablePeeringTrustBundles = "peering-trust-bundles" ) func peeringTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: tablePeering, Indexes: map[string]*memdb.IndexSchema{ indexID: { Name: indexID, AllowMissing: false, Unique: true, Indexer: indexerSingle[string, *pbpeering.Peering]{ readIndex: indexFromUUIDString, writeIndex: indexIDFromPeering, }, }, indexName: { Name: indexName, AllowMissing: false, Unique: true, Indexer: indexerSingleWithPrefix[Query, *pbpeering.Peering, any]{ readIndex: indexPeeringFromQuery, writeIndex: indexFromPeering, prefixIndex: prefixIndexFromQueryNoNamespace, }, }, indexDeleted: { Name: indexDeleted, AllowMissing: false, Unique: false, Indexer: indexerSingle[BoolQuery, *pbpeering.Peering]{ readIndex: indexDeletedFromBoolQuery, writeIndex: indexDeletedFromPeering, }, }, }, } } func peeringTrustBundlesTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: tablePeeringTrustBundles, Indexes: map[string]*memdb.IndexSchema{ indexID: { Name: indexID, AllowMissing: false, Unique: true, Indexer: indexerSingleWithPrefix[Query, *pbpeering.PeeringTrustBundle, any]{ readIndex: indexPeeringFromQuery, // same as peering table since we'll use the query.Value writeIndex: indexFromPeeringTrustBundle, prefixIndex: prefixIndexFromQueryNoNamespace, }, }, }, } } func indexIDFromPeering(p *pbpeering.Peering) ([]byte, error) { if p.ID == "" { return nil, errMissingValueForIndex } uuid, err := uuidStringToBytes(p.ID) if err != nil { return nil, err } var b indexBuilder b.Raw(uuid) return b.Bytes(), nil } func indexDeletedFromPeering(p *pbpeering.Peering) ([]byte, error) { var b indexBuilder b.Bool(!p.IsActive()) return b.Bytes(), nil } func (s *Store) PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error) { tx := s.db.ReadTxn() defer tx.Abort() peering, err := peeringReadByIDTxn(tx, ws, id) if err != nil { return 0, nil, fmt.Errorf("failed to read peering by id: %w", err) } if peering == nil { // Return the tables index so caller can watch it for changes if the peering doesn't exist return maxIndexWatchTxn(tx, ws, tablePeering), nil, nil } return peering.ModifyIndex, peering, nil } func peeringReadByIDTxn(tx ReadTxn, ws memdb.WatchSet, id string) (*pbpeering.Peering, error) { watchCh, peeringRaw, err := tx.FirstWatch(tablePeering, indexID, id) if err != nil { return nil, fmt.Errorf("failed peering lookup: %w", err) } ws.Add(watchCh) peering, ok := peeringRaw.(*pbpeering.Peering) if peeringRaw != nil && !ok { return nil, fmt.Errorf("invalid type %T", peering) } return peering, nil } func (s *Store) PeeringRead(ws memdb.WatchSet, q Query) (uint64, *pbpeering.Peering, error) { tx := s.db.ReadTxn() defer tx.Abort() return peeringReadTxn(tx, ws, q) } func peeringReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64, *pbpeering.Peering, error) { watchCh, peeringRaw, err := tx.FirstWatch(tablePeering, indexName, q) if err != nil { return 0, nil, fmt.Errorf("failed peering lookup: %w", err) } peering, ok := peeringRaw.(*pbpeering.Peering) if peeringRaw != nil && !ok { return 0, nil, fmt.Errorf("invalid type %T", peering) } ws.Add(watchCh) if peering == nil { // Return the tables index so caller can watch it for changes if the peering doesn't exist return maxIndexWatchTxn(tx, ws, partitionedIndexEntryName(tablePeering, q.PartitionOrDefault())), nil, nil } return peering.ModifyIndex, peering, nil } func (s *Store) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) { tx := s.db.ReadTxn() defer tx.Abort() return s.peeringListTxn(ws, tx, entMeta) } func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) { var ( iter memdb.ResultIterator err error idx uint64 ) if entMeta.PartitionOrDefault() == structs.WildcardSpecifier { iter, err = tx.Get(tablePeering, indexID) idx = maxIndexWatchTxn(tx, ws, tablePeering) } else { iter, err = tx.Get(tablePeering, indexName+"_prefix", entMeta) idx = maxIndexWatchTxn(tx, ws, partitionedIndexEntryName(tablePeering, entMeta.PartitionOrDefault())) } if err != nil { return 0, nil, fmt.Errorf("failed peering lookup: %v", err) } var result []*pbpeering.Peering for entry := iter.Next(); entry != nil; entry = iter.Next() { result = append(result, entry.(*pbpeering.Peering)) } return idx, result, nil } func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error { tx := s.db.WriteTxn(idx) defer tx.Abort() // Check that the ID and Name are set. if p.ID == "" { return errors.New("Missing Peering ID") } if p.Name == "" { return errors.New("Missing Peering Name") } // ensure the name is unique (cannot conflict with another peering with a different ID) _, existing, err := peeringReadTxn(tx, nil, Query{ Value: p.Name, EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition), }) if err != nil { return err } if existing != nil { if p.ID != existing.ID { return fmt.Errorf("A peering already exists with the name %q and a different ID %q", p.Name, existing.ID) } // Prevent modifications to Peering marked for deletion if !existing.IsActive() { return fmt.Errorf("cannot write to peering that is marked for deletion") } p.CreateIndex = existing.CreateIndex p.ModifyIndex = idx } else { idMatch, err := peeringReadByIDTxn(tx, nil, p.ID) if err != nil { return err } if idMatch != nil { return fmt.Errorf("A peering already exists with the ID %q and a different name %q", p.Name, existing.ID) } if !p.IsActive() { return fmt.Errorf("cannot create a new peering marked for deletion") } if p.State == 0 { p.State = pbpeering.PeeringState_PENDING } p.CreateIndex = idx p.ModifyIndex = idx } if err := tx.Insert(tablePeering, p); err != nil { return fmt.Errorf("failed inserting peering: %w", err) } if err := updatePeeringTableIndexes(tx, idx, p.PartitionOrDefault()); err != nil { return err } return tx.Commit() } func (s *Store) PeeringDelete(idx uint64, q Query) error { tx := s.db.WriteTxn(idx) defer tx.Abort() existing, err := tx.First(tablePeering, indexName, q) if err != nil { return fmt.Errorf("failed peering lookup: %v", err) } if existing == nil { return nil } if existing.(*pbpeering.Peering).IsActive() { return fmt.Errorf("cannot delete a peering without first marking for deletion") } if err := tx.Delete(tablePeering, existing); err != nil { return fmt.Errorf("failed deleting peering: %v", err) } if err := updatePeeringTableIndexes(tx, idx, q.PartitionOrDefault()); err != nil { return err } return tx.Commit() } func (s *Store) PeeringTerminateByID(idx uint64, id string) error { tx := s.db.WriteTxn(idx) defer tx.Abort() existing, err := peeringReadByIDTxn(tx, nil, id) if err != nil { return fmt.Errorf("failed to read peering %q: %w", id, err) } if existing == nil { return nil } c := proto.Clone(existing) clone, ok := c.(*pbpeering.Peering) if !ok { return fmt.Errorf("invalid type %T, expected *pbpeering.Peering", existing) } clone.State = pbpeering.PeeringState_TERMINATED clone.ModifyIndex = idx if err := tx.Insert(tablePeering, clone); err != nil { return fmt.Errorf("failed inserting peering: %w", err) } if err := updatePeeringTableIndexes(tx, idx, clone.PartitionOrDefault()); err != nil { return err } return tx.Commit() } // ExportedServicesForPeer returns the list of typical and proxy services // exported to a peer. // // TODO(peering): What to do about terminating gateways? Sometimes terminating // gateways are the appropriate destination to dial for an upstream mesh // service. However, that information is handled by observing the terminating // gateway's config entry, which we wouldn't want to replicate. How would // client peers know to route through terminating gateways when they're not // dialing through a remote mesh gateway? func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc string) (uint64, *structs.ExportedServiceList, error) { tx := s.db.ReadTxn() defer tx.Abort() peering, err := peeringReadByIDTxn(tx, ws, peerID) if err != nil { return 0, nil, fmt.Errorf("failed to read peering: %w", err) } if peering == nil { return 0, &structs.ExportedServiceList{}, nil } return s.exportedServicesForPeerTxn(ws, tx, peering, dc) } func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) { tx := s.db.ReadTxn() defer tx.Abort() maxIdx, peerings, err := s.peeringListTxn(ws, tx, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed to list peerings: %w", err) } out := make(map[string]structs.ServiceList) for _, peering := range peerings { idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering, "") if err != nil { return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err) } if idx > maxIdx { maxIdx = idx } m := list.ListAllDiscoveryChains() if len(m) > 0 { out[peering.Name] = maps.SliceOfKeys(m) } } return maxIdx, out, nil } // exportedServicesForPeerTxn will find all services that are exported to a // specific peering, and optionally include information about discovery chain // reachable targets for these exported services if the "dc" parameter is // specified. func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering, dc string) (uint64, *structs.ExportedServiceList, error) { maxIdx := peering.ModifyIndex entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition) idx, raw, err := configEntryTxn(tx, ws, structs.ExportedServices, entMeta.PartitionOrDefault(), entMeta) if err != nil { return 0, nil, fmt.Errorf("failed to fetch exported-services config entry: %w", err) } if idx > maxIdx { maxIdx = idx } if raw == nil { return maxIdx, &structs.ExportedServiceList{}, nil } conf, ok := raw.(*structs.ExportedServicesConfigEntry) if !ok { return 0, nil, fmt.Errorf("expected type *structs.ExportedServicesConfigEntry, got %T", raw) } var ( normalSet = make(map[structs.ServiceName]struct{}) discoSet = make(map[structs.ServiceName]struct{}) ) // TODO(peering): filter the disco chain portion of the results to only be // things reachable over the mesh to avoid replicating some clutter. // // At least one of the following should be true for a name for it to // replicate: // // - are a discovery chain by definition (service-router, service-splitter, service-resolver) // - have an explicit sidecar kind=connect-proxy // - use connect native mode for _, svc := range conf.Services { svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace) sawPeer := false for _, consumer := range svc.Consumers { name := structs.NewServiceName(svc.Name, &svcMeta) if _, ok := normalSet[name]; ok { // Service was covered by a wildcard that was already accounted for continue } if consumer.PeerName != peering.Name { continue } sawPeer = true if svc.Name != structs.WildcardSpecifier { normalSet[name] = struct{}{} } } // If the target peer is a consumer, and all services in the namespace are exported, query those service names. if sawPeer && svc.Name == structs.WildcardSpecifier { idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta) if err != nil { return 0, nil, fmt.Errorf("failed to get typical service names: %w", err) } if idx > maxIdx { maxIdx = idx } for _, s := range typicalServices { normalSet[s.Service] = struct{}{} } // list all config entries of kind service-resolver, service-router, service-splitter? idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, svcMeta) if err != nil { return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err) } if idx > maxIdx { maxIdx = idx } for _, sn := range discoChains { discoSet[sn] = struct{}{} } } } normal := maps.SliceOfKeys(normalSet) disco := maps.SliceOfKeys(discoSet) chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo) populateChainInfo := func(svc structs.ServiceName) error { if _, ok := chainInfo[svc]; ok { return nil // already processed } var info structs.ExportedDiscoveryChainInfo idx, protocol, err := protocolForService(tx, ws, svc) if err != nil { return fmt.Errorf("failed to get protocol for service %q: %w", svc, err) } if idx > maxIdx { maxIdx = idx } info.Protocol = protocol if dc != "" && !structs.IsProtocolHTTPLike(protocol) { // We only need to populate the targets for replication purposes for L4 protocols, which // do not ultimately get intercepted by the mesh gateways. idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta) if err != nil { return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err) } if idx > maxIdx { maxIdx = idx } sort.Slice(targets, func(i, j int) bool { return targets[i].ID < targets[j].ID }) info.TCPTargets = targets } chainInfo[svc] = info return nil } for _, svc := range normal { if err := populateChainInfo(svc); err != nil { return 0, nil, err } } for _, svc := range disco { if err := populateChainInfo(svc); err != nil { return 0, nil, err } } structs.ServiceList(normal).Sort() list := &structs.ExportedServiceList{ Services: normal, DiscoChains: chainInfo, } return maxIdx, list, nil } // PeeringsForService returns the list of peerings that are associated with the service name provided in the query. // This is used to configure connect proxies for a given service. The result is generated by querying for exported // service config entries and filtering for those that match the given service. // // TODO(peering): this implementation does all of the work on read to materialize this list of peerings, we should explore // writing to a separate index that has service peerings prepared ahead of time should this become a performance bottleneck. func (s *Store) PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) { tx := s.db.ReadTxn() defer tx.Abort() return peeringsForServiceTxn(tx, ws, serviceName, entMeta) } func peeringsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) { // Return the idx of the config entry so the caller can watch for changes. maxIdx, peerNames, err := peersForServiceTxn(tx, ws, serviceName, &entMeta) if err != nil { return 0, nil, fmt.Errorf("failed to read peers for service name %q: %w", serviceName, err) } var peerings []*pbpeering.Peering // Lookup and return the peering corresponding to each name. for _, name := range peerNames { readQuery := Query{ Value: name, EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(entMeta.PartitionOrDefault()), } idx, peering, err := peeringReadTxn(tx, ws, readQuery) if err != nil { return 0, nil, fmt.Errorf("failed to read peering: %w", err) } if idx > maxIdx { maxIdx = idx } if peering == nil || !peering.IsActive() { continue } peerings = append(peerings, peering) } return maxIdx, peerings, nil } // TrustBundleListByService returns the trust bundles for all peers that the // given service is exported to, via a discovery chain target. func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) { tx := s.db.ReadTxn() defer tx.Abort() realSvc := structs.NewServiceName(service, &entMeta) maxIdx, chainNames, err := s.discoveryChainSourcesTxn(tx, ws, dc, realSvc) if err != nil { return 0, nil, fmt.Errorf("failed to list all discovery chains referring to %q: %w", realSvc, err) } peerNames := make(map[string]struct{}) for _, chainSvc := range chainNames { idx, peers, err := peeringsForServiceTxn(tx, ws, chainSvc.Name, chainSvc.EnterpriseMeta) if err != nil { return 0, nil, fmt.Errorf("failed to get peers for service %s: %v", chainSvc, err) } if idx > maxIdx { maxIdx = idx } for _, peer := range peers { peerNames[peer.Name] = struct{}{} } } peerNamesSlice := maps.SliceOfKeys(peerNames) sort.Strings(peerNamesSlice) var resp []*pbpeering.PeeringTrustBundle for _, peerName := range peerNamesSlice { pq := Query{ Value: strings.ToLower(peerName), EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(entMeta.PartitionOrDefault()), } idx, trustBundle, err := peeringTrustBundleReadTxn(tx, ws, pq) if err != nil { return 0, nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peerName, err) } if idx > maxIdx { maxIdx = idx } if trustBundle != nil { resp = append(resp, trustBundle) } } return maxIdx, resp, nil } // PeeringTrustBundleList returns the peering trust bundles for all peers. func (s *Store) PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) { tx := s.db.ReadTxn() defer tx.Abort() return peeringTrustBundleListTxn(tx, ws, entMeta) } func peeringTrustBundleListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) { iter, err := tx.Get(tablePeeringTrustBundles, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed peering trust bundle lookup: %w", err) } idx := maxIndexWatchTxn(tx, ws, partitionedIndexEntryName(tablePeeringTrustBundles, entMeta.PartitionOrDefault())) var result []*pbpeering.PeeringTrustBundle for entry := iter.Next(); entry != nil; entry = iter.Next() { result = append(result, entry.(*pbpeering.PeeringTrustBundle)) } return idx, result, nil } // PeeringTrustBundleRead returns the peering trust bundle for the peer name given as the query value. func (s *Store) PeeringTrustBundleRead(ws memdb.WatchSet, q Query) (uint64, *pbpeering.PeeringTrustBundle, error) { tx := s.db.ReadTxn() defer tx.Abort() return peeringTrustBundleReadTxn(tx, ws, q) } func peeringTrustBundleReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64, *pbpeering.PeeringTrustBundle, error) { watchCh, ptbRaw, err := tx.FirstWatch(tablePeeringTrustBundles, indexID, q) if err != nil { return 0, nil, fmt.Errorf("failed peering trust bundle lookup: %w", err) } ptb, ok := ptbRaw.(*pbpeering.PeeringTrustBundle) if ptbRaw != nil && !ok { return 0, nil, fmt.Errorf("invalid type %T", ptb) } ws.Add(watchCh) if ptb == nil { // Return the tables index so caller can watch it for changes if the trust bundle doesn't exist return maxIndexWatchTxn(tx, ws, partitionedIndexEntryName(tablePeeringTrustBundles, q.PartitionOrDefault())), nil, nil } return ptb.ModifyIndex, ptb, nil } // PeeringTrustBundleWrite writes ptb to the state store. If there is an existing trust bundle with the given peer name, // it will be overwritten. func (s *Store) PeeringTrustBundleWrite(idx uint64, ptb *pbpeering.PeeringTrustBundle) error { tx := s.db.WriteTxn(idx) defer tx.Abort() q := Query{ Value: ptb.PeerName, EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(ptb.Partition), } existingRaw, err := tx.First(tablePeeringTrustBundles, indexID, q) if err != nil { return fmt.Errorf("failed peering trust bundle lookup: %w", err) } existing, ok := existingRaw.(*pbpeering.PeeringTrustBundle) if existingRaw != nil && !ok { return fmt.Errorf("invalid type %T", existingRaw) } if existing != nil { ptb.CreateIndex = existing.CreateIndex } else { ptb.CreateIndex = idx } ptb.ModifyIndex = idx if err := tx.Insert(tablePeeringTrustBundles, ptb); err != nil { return fmt.Errorf("failed inserting peering trust bundle: %w", err) } if err := updatePeeringTrustBundlesTableIndexes(tx, idx, ptb.PartitionOrDefault()); err != nil { return err } return tx.Commit() } func (s *Store) PeeringTrustBundleDelete(idx uint64, q Query) error { tx := s.db.WriteTxn(idx) defer tx.Abort() existing, err := tx.First(tablePeeringTrustBundles, indexID, q) if err != nil { return fmt.Errorf("failed peering trust bundle lookup: %v", err) } if existing == nil { return nil } if err := tx.Delete(tablePeeringTrustBundles, existing); err != nil { return fmt.Errorf("failed deleting peering trust bundle: %v", err) } if err := updatePeeringTrustBundlesTableIndexes(tx, idx, q.PartitionOrDefault()); err != nil { return err } return tx.Commit() } func (s *Snapshot) Peerings() (memdb.ResultIterator, error) { return s.tx.Get(tablePeering, indexName) } func (s *Snapshot) PeeringTrustBundles() (memdb.ResultIterator, error) { return s.tx.Get(tablePeeringTrustBundles, indexID) } func (r *Restore) Peering(p *pbpeering.Peering) error { if err := r.tx.Insert(tablePeering, p); err != nil { return fmt.Errorf("failed restoring peering: %w", err) } if err := updatePeeringTableIndexes(r.tx, p.ModifyIndex, p.PartitionOrDefault()); err != nil { return err } return nil } func (r *Restore) PeeringTrustBundle(ptb *pbpeering.PeeringTrustBundle) error { if err := r.tx.Insert(tablePeeringTrustBundles, ptb); err != nil { return fmt.Errorf("failed restoring peering trust bundle: %w", err) } if err := updatePeeringTrustBundlesTableIndexes(r.tx, ptb.ModifyIndex, ptb.PartitionOrDefault()); err != nil { return err } return nil } // peersForServiceTxn returns the names of all peers that a service is exported to. func peersForServiceTxn( tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, ) (uint64, []string, error) { // Exported service config entries are scoped to partitions so they are in the default namespace. partitionMeta := structs.DefaultEnterpriseMetaInPartition(entMeta.PartitionOrDefault()) idx, rawEntry, err := configEntryTxn(tx, ws, structs.ExportedServices, partitionMeta.PartitionOrDefault(), partitionMeta) if err != nil { return 0, nil, err } if rawEntry == nil { return idx, nil, err } entry, ok := rawEntry.(*structs.ExportedServicesConfigEntry) if !ok { return 0, nil, fmt.Errorf("unexpected type %T for pbpeering.Peering index", rawEntry) } var ( wildcardNamespaceIdx = -1 wildcardServiceIdx = -1 exactMatchIdx = -1 ) // Ensure the metadata is defaulted since we make assertions against potentially empty values below. // In OSS this is a no-op. if entMeta == nil { entMeta = acl.DefaultEnterpriseMeta() } entMeta.Normalize() // Services can be exported via wildcards or by their exact name: // Namespace: *, Service: * // Namespace: Exact, Service: * // Namespace: Exact, Service: Exact for i, service := range entry.Services { switch { case service.Namespace == structs.WildcardSpecifier: wildcardNamespaceIdx = i case service.Name == structs.WildcardSpecifier && acl.EqualNamespaces(service.Namespace, entMeta.NamespaceOrDefault()): wildcardServiceIdx = i case service.Name == serviceName && acl.EqualNamespaces(service.Namespace, entMeta.NamespaceOrDefault()): exactMatchIdx = i } } var results []string // Prefer the exact match over the wildcard match. This matches how we handle intention precedence. var targetIdx int switch { case exactMatchIdx >= 0: targetIdx = exactMatchIdx case wildcardServiceIdx >= 0: targetIdx = wildcardServiceIdx case wildcardNamespaceIdx >= 0: targetIdx = wildcardNamespaceIdx default: return idx, results, nil } for _, c := range entry.Services[targetIdx].Consumers { if c.PeerName != "" { results = append(results, c.PeerName) } } return idx, results, nil } func (s *Store) PeeringListDeleted(ws memdb.WatchSet) (uint64, []*pbpeering.Peering, error) { tx := s.db.ReadTxn() defer tx.Abort() return peeringListDeletedTxn(tx, ws) } func peeringListDeletedTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*pbpeering.Peering, error) { iter, err := tx.Get(tablePeering, indexDeleted, BoolQuery{Value: true}) if err != nil { return 0, nil, fmt.Errorf("failed peering lookup: %v", err) } // Instead of watching iter.WatchCh() we only need to watch the index entry for the peering table // This is sufficient to pick up any changes to peerings. idx := maxIndexWatchTxn(tx, ws, tablePeering) var result []*pbpeering.Peering for t := iter.Next(); t != nil; t = iter.Next() { result = append(result, t.(*pbpeering.Peering)) } return idx, result, nil }