proxycfg-glue: server-local implementation of PeeredUpstreams

This is the OSS portion of enterprise PR 2352.

It adds a server-local implementation of the proxycfg.PeeredUpstreams interface
based on a blocking query against the server's state store.

It also fixes an omission in the Virtual IP freeing logic where we were never
updating the max index (and therefore blocking queries against
VirtualIPsForAllImportedServices would not return on service deletion).
This commit is contained in:
Daniel Upton 2022-07-21 13:38:28 +01:00 committed by Dan Upton
parent 1baf4d13d6
commit e3bff8fb39
5 changed files with 152 additions and 8 deletions

View file

@ -4251,6 +4251,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps)
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
sources.TrustBundle = proxycfgglue.ServerTrustBundle(deps)
sources.TrustBundleList = proxycfgglue.ServerTrustBundleList(deps)

View file

@ -1990,7 +1990,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
}
}
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name}
if err := freeServiceVirtualIP(tx, psn, nil); err != nil {
if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
}
if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil {
@ -2008,6 +2008,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
// is removed.
func freeServiceVirtualIP(
tx WriteTxn,
idx uint64,
psn structs.PeeredServiceName,
excludeGateway *structs.ServiceName,
) error {
@ -2059,6 +2060,10 @@ func freeServiceVirtualIP(
return fmt.Errorf("failed updating freed virtual IP table: %v", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return err
}
return nil
}
@ -3497,7 +3502,7 @@ func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.T
}
if len(nodes) == 0 {
psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: sn}
if err := freeServiceVirtualIP(tx, psn, &gatewayName); err != nil {
if err := freeServiceVirtualIP(tx, idx, psn, &gatewayName); err != nil {
return err
}
}

View file

@ -28,6 +28,7 @@ type Store interface {
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error)
}
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
@ -90,12 +91,6 @@ func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
}
// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface
// by sourcing data from the agent cache.
func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams {
return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName}
}
// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by
// sourcing data from the agent cache.
func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery {

View file

@ -0,0 +1,55 @@
package proxycfgglue
import (
"context"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface
// by sourcing data from the agent cache.
func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams {
return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName}
}
// ServerPeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface by
// sourcing data from a blocking query against the server's state store.
func ServerPeeredUpstreams(deps ServerDataSourceDeps) proxycfg.PeeredUpstreams {
return &serverPeeredUpstreams{deps}
}
type serverPeeredUpstreams struct {
deps ServerDataSourceDeps
}
func (s *serverPeeredUpstreams) Notify(ctx context.Context, req *structs.PartitionSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
// TODO(peering): ACL filtering.
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedPeeredServiceList, error) {
index, vips, err := store.VirtualIPsForAllImportedServices(ws, req.EnterpriseMeta)
if err != nil {
return 0, nil, err
}
result := make([]structs.PeeredServiceName, 0, len(vips))
for _, vip := range vips {
result = append(result, vip.Service)
}
return index, &structs.IndexedPeeredServiceList{
Services: result,
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendBlocking,
},
}, nil
},
dispatchBlockingQueryUpdate[*structs.IndexedPeeredServiceList](ch),
)
}

View file

@ -0,0 +1,88 @@
package proxycfgglue
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestServerPeeredUpstreams(t *testing.T) {
const (
index uint64 = 123
nodeName = "node-1"
)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store := state.NewStateStore(nil)
enableVirtualIPs(t, store)
registerService := func(t *testing.T, index uint64, peerName, serviceName string) {
require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{Service: serviceName, ID: serviceName},
PeerName: peerName,
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}))
require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
Service: fmt.Sprintf("%s-proxy", serviceName),
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: serviceName,
},
},
PeerName: peerName,
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}))
}
registerService(t, index, "peer-1", "web")
eventCh := make(chan proxycfg.UpdateEvent)
dataSource := ServerPeeredUpstreams(ServerDataSourceDeps{
GetStore: func() Store { return store },
})
require.NoError(t, dataSource.Notify(ctx, &structs.PartitionSpecificRequest{EnterpriseMeta: *acl.DefaultEnterpriseMeta()}, "", eventCh))
testutil.RunStep(t, "initial state", func(t *testing.T) {
result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh)
require.Len(t, result.Services, 1)
require.Equal(t, "peer-1", result.Services[0].Peer)
require.Equal(t, "web", result.Services[0].ServiceName.Name)
})
testutil.RunStep(t, "register another service", func(t *testing.T) {
registerService(t, index+1, "peer-2", "db")
result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh)
require.Len(t, result.Services, 2)
})
testutil.RunStep(t, "deregister service", func(t *testing.T) {
require.NoError(t, store.DeleteService(index+2, nodeName, "web", acl.DefaultEnterpriseMeta(), "peer-1"))
result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh)
require.Len(t, result.Services, 1)
})
}
func enableVirtualIPs(t *testing.T, store *state.Store) {
t.Helper()
require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{
Key: structs.SystemMetadataVirtualIPsEnabled,
Value: "true",
}))
}