diff --git a/agent/agent.go b/agent/agent.go index 4b78b69a9..751e62d4b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4251,6 +4251,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) sources.Intentions = proxycfgglue.ServerIntentions(deps) sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps) + sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps) sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache)) sources.TrustBundle = proxycfgglue.ServerTrustBundle(deps) sources.TrustBundleList = proxycfgglue.ServerTrustBundleList(deps) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 622cccd35..849d0820c 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1990,7 +1990,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st } } psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name} - if err := freeServiceVirtualIP(tx, psn, nil); err != nil { + if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil { return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) } if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil { @@ -2008,6 +2008,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st // is removed. func freeServiceVirtualIP( tx WriteTxn, + idx uint64, psn structs.PeeredServiceName, excludeGateway *structs.ServiceName, ) error { @@ -2059,6 +2060,10 @@ func freeServiceVirtualIP( return fmt.Errorf("failed updating freed virtual IP table: %v", err) } + if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil { + return err + } + return nil } @@ -3497,7 +3502,7 @@ func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.T } if len(nodes) == 0 { psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: sn} - if err := freeServiceVirtualIP(tx, psn, &gatewayName); err != nil { + if err := freeServiceVirtualIP(tx, idx, psn, &gatewayName); err != nil { return err } } diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 7c3311f36..1e254f406 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -28,6 +28,7 @@ type Store interface { PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) + VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error) } // CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from @@ -90,12 +91,6 @@ func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate { return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName} } -// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface -// by sourcing data from the agent cache. -func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams { - return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName} -} - // CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by // sourcing data from the agent cache. func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery { diff --git a/agent/proxycfg-glue/peered_upstreams.go b/agent/proxycfg-glue/peered_upstreams.go new file mode 100644 index 000000000..4d3e85f81 --- /dev/null +++ b/agent/proxycfg-glue/peered_upstreams.go @@ -0,0 +1,55 @@ +package proxycfgglue + +import ( + "context" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/consul/watch" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" +) + +// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface +// by sourcing data from the agent cache. +func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams { + return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName} +} + +// ServerPeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface by +// sourcing data from a blocking query against the server's state store. +func ServerPeeredUpstreams(deps ServerDataSourceDeps) proxycfg.PeeredUpstreams { + return &serverPeeredUpstreams{deps} +} + +type serverPeeredUpstreams struct { + deps ServerDataSourceDeps +} + +func (s *serverPeeredUpstreams) Notify(ctx context.Context, req *structs.PartitionSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { + // TODO(peering): ACL filtering. + return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore, + func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedPeeredServiceList, error) { + index, vips, err := store.VirtualIPsForAllImportedServices(ws, req.EnterpriseMeta) + if err != nil { + return 0, nil, err + } + + result := make([]structs.PeeredServiceName, 0, len(vips)) + for _, vip := range vips { + result = append(result, vip.Service) + } + + return index, &structs.IndexedPeeredServiceList{ + Services: result, + QueryMeta: structs.QueryMeta{ + Index: index, + Backend: structs.QueryBackendBlocking, + }, + }, nil + }, + dispatchBlockingQueryUpdate[*structs.IndexedPeeredServiceList](ch), + ) +} diff --git a/agent/proxycfg-glue/peered_upstreams_test.go b/agent/proxycfg-glue/peered_upstreams_test.go new file mode 100644 index 000000000..c2faa44da --- /dev/null +++ b/agent/proxycfg-glue/peered_upstreams_test.go @@ -0,0 +1,88 @@ +package proxycfgglue + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" +) + +func TestServerPeeredUpstreams(t *testing.T) { + const ( + index uint64 = 123 + nodeName = "node-1" + ) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := state.NewStateStore(nil) + enableVirtualIPs(t, store) + + registerService := func(t *testing.T, index uint64, peerName, serviceName string) { + require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{ + Node: nodeName, + Service: &structs.NodeService{Service: serviceName, ID: serviceName}, + PeerName: peerName, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + })) + + require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{ + Node: nodeName, + Service: &structs.NodeService{ + Service: fmt.Sprintf("%s-proxy", serviceName), + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: serviceName, + }, + }, + PeerName: peerName, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + })) + } + + registerService(t, index, "peer-1", "web") + + eventCh := make(chan proxycfg.UpdateEvent) + dataSource := ServerPeeredUpstreams(ServerDataSourceDeps{ + GetStore: func() Store { return store }, + }) + require.NoError(t, dataSource.Notify(ctx, &structs.PartitionSpecificRequest{EnterpriseMeta: *acl.DefaultEnterpriseMeta()}, "", eventCh)) + + testutil.RunStep(t, "initial state", func(t *testing.T) { + result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh) + require.Len(t, result.Services, 1) + require.Equal(t, "peer-1", result.Services[0].Peer) + require.Equal(t, "web", result.Services[0].ServiceName.Name) + }) + + testutil.RunStep(t, "register another service", func(t *testing.T) { + registerService(t, index+1, "peer-2", "db") + + result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh) + require.Len(t, result.Services, 2) + }) + + testutil.RunStep(t, "deregister service", func(t *testing.T) { + require.NoError(t, store.DeleteService(index+2, nodeName, "web", acl.DefaultEnterpriseMeta(), "peer-1")) + + result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh) + require.Len(t, result.Services, 1) + }) +} + +func enableVirtualIPs(t *testing.T, store *state.Store) { + t.Helper() + + require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{ + Key: structs.SystemMetadataVirtualIPsEnabled, + Value: "true", + })) +}