proxycfg-glue: server-local implementation of `TrustBundle` and `TrustBundleList`
This is the OSS portion of enterprise PR 2250. This PR provides server-local implementations of the proxycfg.TrustBundle and proxycfg.TrustBundleList interfaces, based on local blocking queries.
This commit is contained in:
parent
70f29942f4
commit
a280c9a10b
|
@ -4251,6 +4251,8 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
||||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||||
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
|
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
|
||||||
|
sources.TrustBundle = proxycfgglue.ServerTrustBundle(deps)
|
||||||
|
sources.TrustBundleList = proxycfgglue.ServerTrustBundleList(deps)
|
||||||
}
|
}
|
||||||
|
|
||||||
a.fillEnterpriseProxyDataSources(&sources)
|
a.fillEnterpriseProxyDataSources(&sources)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/configentry"
|
"github.com/hashicorp/consul/agent/configentry"
|
||||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/consul/watch"
|
"github.com/hashicorp/consul/agent/consul/watch"
|
||||||
"github.com/hashicorp/consul/agent/proxycfg"
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -22,6 +23,9 @@ type Store interface {
|
||||||
|
|
||||||
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
||||||
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
|
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
|
||||||
|
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
||||||
|
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||||
|
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
|
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
|
||||||
|
@ -102,18 +106,6 @@ func CacheResolvedServiceConfig(c *cache.Cache) proxycfg.ResolvedServiceConfig {
|
||||||
return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName}
|
return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheTrustBundle satisfies the proxycfg.TrustBundle interface by sourcing
|
|
||||||
// data from the agent cache.
|
|
||||||
func CacheTrustBundle(c *cache.Cache) proxycfg.TrustBundle {
|
|
||||||
return &cacheProxyDataSource[*pbpeering.TrustBundleReadRequest]{c, cachetype.TrustBundleReadName}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CacheTrustBundleList satisfies the proxycfg.TrustBundleList interface by sourcing
|
|
||||||
// data from the agent cache.
|
|
||||||
func CacheTrustBundleList(c *cache.Cache) proxycfg.TrustBundleList {
|
|
||||||
return &cacheProxyDataSource[*pbpeering.TrustBundleListByServiceRequest]{c, cachetype.TrustBundleListName}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices
|
// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices
|
||||||
// interface by sourcing data from the agent cache.
|
// interface by sourcing data from the agent cache.
|
||||||
func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices {
|
func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices {
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/watch"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CacheTrustBundle satisfies the proxycfg.TrustBundle interface by sourcing
|
||||||
|
// data from the agent cache.
|
||||||
|
func CacheTrustBundle(c *cache.Cache) proxycfg.TrustBundle {
|
||||||
|
return &cacheProxyDataSource[*pbpeering.TrustBundleReadRequest]{c, cachetype.TrustBundleReadName}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerTrustBundle satisfies the proxycfg.TrustBundle interface by sourcing
|
||||||
|
// data from a blocking query against the server's state store.
|
||||||
|
func ServerTrustBundle(deps ServerDataSourceDeps) proxycfg.TrustBundle {
|
||||||
|
return &serverTrustBundle{deps}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverTrustBundle struct {
|
||||||
|
deps ServerDataSourceDeps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverTrustBundle) Notify(ctx context.Context, req *pbpeering.TrustBundleReadRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
// TODO(peering): ACL check.
|
||||||
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
||||||
|
func(ws memdb.WatchSet, store Store) (uint64, *pbpeering.TrustBundleReadResponse, error) {
|
||||||
|
index, bundle, err := store.PeeringTrustBundleRead(ws, state.Query{
|
||||||
|
Value: req.Name,
|
||||||
|
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(req.Partition),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
return index, &pbpeering.TrustBundleReadResponse{
|
||||||
|
Index: index,
|
||||||
|
Bundle: bundle,
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
dispatchBlockingQueryUpdate[*pbpeering.TrustBundleReadResponse](ch),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CacheTrustBundleList satisfies the proxycfg.TrustBundleList interface by sourcing
|
||||||
|
// data from the agent cache.
|
||||||
|
func CacheTrustBundleList(c *cache.Cache) proxycfg.TrustBundleList {
|
||||||
|
return &cacheProxyDataSource[*pbpeering.TrustBundleListByServiceRequest]{c, cachetype.TrustBundleListName}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerTrustBundleList satisfies the proxycfg.TrustBundle interface by
|
||||||
|
// sourcing data from a blocking query against the server's state store.
|
||||||
|
func ServerTrustBundleList(deps ServerDataSourceDeps) proxycfg.TrustBundleList {
|
||||||
|
return &serverTrustBundleList{deps}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverTrustBundleList struct {
|
||||||
|
deps ServerDataSourceDeps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverTrustBundleList) Notify(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
entMeta := acl.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
|
||||||
|
|
||||||
|
// TODO(peering): ACL check.
|
||||||
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
||||||
|
func(ws memdb.WatchSet, store Store) (uint64, *pbpeering.TrustBundleListByServiceResponse, error) {
|
||||||
|
var (
|
||||||
|
index uint64
|
||||||
|
bundles []*pbpeering.PeeringTrustBundle
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
switch {
|
||||||
|
case req.ServiceName != "":
|
||||||
|
index, bundles, err = store.TrustBundleListByService(ws, req.ServiceName, s.deps.Datacenter, entMeta)
|
||||||
|
case req.Kind == string(structs.ServiceKindMeshGateway):
|
||||||
|
index, bundles, err = store.PeeringTrustBundleList(ws, entMeta)
|
||||||
|
case req.Kind != "":
|
||||||
|
err = errors.New("kind must be mesh-gateway if set")
|
||||||
|
default:
|
||||||
|
err = errors.New("one of service or kind is required")
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return index, &pbpeering.TrustBundleListByServiceResponse{
|
||||||
|
Index: index,
|
||||||
|
Bundles: bundles,
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
dispatchBlockingQueryUpdate[*pbpeering.TrustBundleListByServiceResponse](ch),
|
||||||
|
)
|
||||||
|
}
|
|
@ -0,0 +1,152 @@
|
||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServerTrustBundle(t *testing.T) {
|
||||||
|
const (
|
||||||
|
index uint64 = 123
|
||||||
|
peerName = "peer1"
|
||||||
|
)
|
||||||
|
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
|
||||||
|
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
|
||||||
|
PeerName: peerName,
|
||||||
|
TrustDomain: "before.com",
|
||||||
|
}))
|
||||||
|
|
||||||
|
dataSource := ServerTrustBundle(ServerDataSourceDeps{
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
})
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
err := dataSource.Notify(context.Background(), &pbpeering.TrustBundleReadRequest{
|
||||||
|
Name: peerName,
|
||||||
|
}, "", eventCh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||||
|
result := getEventResult[*pbpeering.TrustBundleReadResponse](t, eventCh)
|
||||||
|
require.Equal(t, "before.com", result.Bundle.TrustDomain)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "update trust bundle", func(t *testing.T) {
|
||||||
|
require.NoError(t, store.PeeringTrustBundleWrite(index+1, &pbpeering.PeeringTrustBundle{
|
||||||
|
PeerName: peerName,
|
||||||
|
TrustDomain: "after.com",
|
||||||
|
}))
|
||||||
|
|
||||||
|
result := getEventResult[*pbpeering.TrustBundleReadResponse](t, eventCh)
|
||||||
|
require.Equal(t, "after.com", result.Bundle.TrustDomain)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerTrustBundleList(t *testing.T) {
|
||||||
|
const index uint64 = 123
|
||||||
|
|
||||||
|
t.Run("list by service", func(t *testing.T) {
|
||||||
|
const (
|
||||||
|
serviceName = "web"
|
||||||
|
us = "default"
|
||||||
|
them = "peer2"
|
||||||
|
)
|
||||||
|
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
require.NoError(t, store.CASetConfig(index, &structs.CAConfiguration{ClusterID: "cluster-id"}))
|
||||||
|
|
||||||
|
testutil.RunStep(t, "export service to peer", func(t *testing.T) {
|
||||||
|
require.NoError(t, store.PeeringWrite(index, &pbpeering.Peering{
|
||||||
|
ID: testUUID(t),
|
||||||
|
Name: them,
|
||||||
|
State: pbpeering.PeeringState_ACTIVE,
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
|
||||||
|
PeerName: them,
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(index, &structs.ExportedServicesConfigEntry{
|
||||||
|
Name: us,
|
||||||
|
Services: []structs.ExportedService{
|
||||||
|
{
|
||||||
|
Name: serviceName,
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: them},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
dataSource := ServerTrustBundleList(ServerDataSourceDeps{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
})
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
err := dataSource.Notify(context.Background(), &pbpeering.TrustBundleListByServiceRequest{
|
||||||
|
ServiceName: serviceName,
|
||||||
|
Partition: us,
|
||||||
|
}, "", eventCh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||||
|
result := getEventResult[*pbpeering.TrustBundleListByServiceResponse](t, eventCh)
|
||||||
|
require.Len(t, result.Bundles, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "unexport the service", func(t *testing.T) {
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(index+1, &structs.ExportedServicesConfigEntry{
|
||||||
|
Name: us,
|
||||||
|
Services: []structs.ExportedService{},
|
||||||
|
}))
|
||||||
|
|
||||||
|
result := getEventResult[*pbpeering.TrustBundleListByServiceResponse](t, eventCh)
|
||||||
|
require.Len(t, result.Bundles, 0)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("list for mesh gateway", func(t *testing.T) {
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
require.NoError(t, store.CASetConfig(index, &structs.CAConfiguration{ClusterID: "cluster-id"}))
|
||||||
|
|
||||||
|
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
|
||||||
|
PeerName: "peer1",
|
||||||
|
}))
|
||||||
|
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
|
||||||
|
PeerName: "peer2",
|
||||||
|
}))
|
||||||
|
|
||||||
|
dataSource := ServerTrustBundleList(ServerDataSourceDeps{
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
})
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
err := dataSource.Notify(context.Background(), &pbpeering.TrustBundleListByServiceRequest{
|
||||||
|
Kind: string(structs.ServiceKindMeshGateway),
|
||||||
|
Partition: "default",
|
||||||
|
}, "", eventCh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
result := getEventResult[*pbpeering.TrustBundleListByServiceResponse](t, eventCh)
|
||||||
|
require.Len(t, result.Bundles, 2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testUUID(t *testing.T) string {
|
||||||
|
v, err := lib.GenerateUUID(nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return v
|
||||||
|
}
|
Loading…
Reference in New Issue