Service http checks data source for agentless proxies (#14924)
Adds another datasource for proxycfg.HTTPChecks, for use on server agents. Typically these checks are performed by local client agents and there is no equivalent of this in agentless (where servers configure consul-dataplane proxies). Hence, the data source is mostly a no-op on servers but in the case where the service is present within the local state, it delegates to the cache data source.
This commit is contained in:
parent
4cf0bf4865
commit
474d9cfcdc
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
server: fixes the error trying to source proxy configuration for http checks, in case of proxies using consul-dataplane.
|
||||||
|
```
|
|
@ -674,6 +674,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
Source: &structs.QuerySource{
|
Source: &structs.QuerySource{
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: a.config.Datacenter,
|
||||||
Segment: a.config.SegmentName,
|
Segment: a.config.SegmentName,
|
||||||
|
Node: a.config.NodeName,
|
||||||
NodePartition: a.config.PartitionOrEmpty(),
|
NodePartition: a.config.PartitionOrEmpty(),
|
||||||
},
|
},
|
||||||
DNSConfig: proxycfg.DNSConfig{
|
DNSConfig: proxycfg.DNSConfig{
|
||||||
|
@ -4402,6 +4403,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
||||||
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
|
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
|
||||||
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
|
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
|
||||||
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||||
|
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
|
||||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||||
sources.IntentionUpstreamsDestination = proxycfgglue.ServerIntentionUpstreamsDestination(deps)
|
sources.IntentionUpstreamsDestination = proxycfgglue.ServerIntentionUpstreamsDestination(deps)
|
||||||
|
|
|
@ -103,6 +103,7 @@ func (c *ServiceHTTPChecks) Fetch(opts cache.FetchOptions, req cache.Request) (c
|
||||||
// directly to any Consul servers.
|
// directly to any Consul servers.
|
||||||
type ServiceHTTPChecksRequest struct {
|
type ServiceHTTPChecksRequest struct {
|
||||||
ServiceID string
|
ServiceID string
|
||||||
|
NodeName string
|
||||||
MinQueryIndex uint64
|
MinQueryIndex uint64
|
||||||
MaxQueryTime time.Duration
|
MaxQueryTime time.Duration
|
||||||
acl.EnterpriseMeta
|
acl.EnterpriseMeta
|
||||||
|
|
|
@ -55,6 +55,8 @@ type Store interface {
|
||||||
//
|
//
|
||||||
// Note: there isn't a server-local equivalent of this data source because
|
// Note: there isn't a server-local equivalent of this data source because
|
||||||
// "agentless" proxies obtain certificates via SDS served by consul-dataplane.
|
// "agentless" proxies obtain certificates via SDS served by consul-dataplane.
|
||||||
|
// If SDS is not supported on consul-dataplane, data is sourced from the server agent cache
|
||||||
|
// even for "agentless" proxies.
|
||||||
func CacheCARoots(c *cache.Cache) proxycfg.CARoots {
|
func CacheCARoots(c *cache.Cache) proxycfg.CARoots {
|
||||||
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ConnectCARootName}
|
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ConnectCARootName}
|
||||||
}
|
}
|
||||||
|
@ -74,20 +76,13 @@ func CacheServiceGateways(c *cache.Cache) proxycfg.GatewayServices {
|
||||||
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.ServiceGatewaysName}
|
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.ServiceGatewaysName}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing
|
|
||||||
// data from the agent cache.
|
|
||||||
//
|
|
||||||
// Note: there isn't a server-local equivalent of this data source because only
|
|
||||||
// services registered to the local agent can be health checked by it.
|
|
||||||
func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {
|
|
||||||
return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
|
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
|
||||||
// sourcing data from the agent cache.
|
// sourcing data from the agent cache.
|
||||||
//
|
//
|
||||||
// Note: there isn't a server-local equivalent of this data source because
|
// Note: there isn't a server-local equivalent of this data source because
|
||||||
// "agentless" proxies obtain certificates via SDS served by consul-dataplane.
|
// "agentless" proxies obtain certificates via SDS served by consul-dataplane.
|
||||||
|
// If SDS is not supported on consul-dataplane, data is sourced from the server agent cache
|
||||||
|
// even for "agentless" proxies.
|
||||||
func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
|
func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
|
||||||
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
|
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/local"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing
|
||||||
|
// data from the agent cache.
|
||||||
|
func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {
|
||||||
|
return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerHTTPChecks satisifies the proxycfg.HTTPChecks interface.
|
||||||
|
// It sources data from the server agent cache if the service exists in the local state, else
|
||||||
|
// it is a no-op since the checks can only be performed by local agents.
|
||||||
|
func ServerHTTPChecks(deps ServerDataSourceDeps, nodeName string, cacheSource proxycfg.HTTPChecks, localState *local.State) proxycfg.HTTPChecks {
|
||||||
|
return serverHTTPChecks{deps, nodeName, cacheSource, localState}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverHTTPChecks struct {
|
||||||
|
deps ServerDataSourceDeps
|
||||||
|
nodeName string
|
||||||
|
cacheSource proxycfg.HTTPChecks
|
||||||
|
localState *local.State
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c serverHTTPChecks) Notify(ctx context.Context, req *cachetype.ServiceHTTPChecksRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
// if the service exists in the current server agent local state, delegate to the cache data source
|
||||||
|
if req.NodeName == c.nodeName && c.localState.ServiceExists(structs.ServiceID{ID: req.ServiceID, EnterpriseMeta: req.EnterpriseMeta}) {
|
||||||
|
return c.cacheSource.Notify(ctx, req, correlationID, ch)
|
||||||
|
}
|
||||||
|
l := c.deps.Logger.With("service_id", req.ServiceID)
|
||||||
|
l.Debug("service-http-checks: no-op")
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/local"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/token"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServerHTTPChecks(t *testing.T) {
|
||||||
|
var (
|
||||||
|
ctx = context.Background()
|
||||||
|
svcID = "web-sidecar-proxy-1"
|
||||||
|
correlationID = "correlation-id"
|
||||||
|
ch = make(chan<- proxycfg.UpdateEvent)
|
||||||
|
cacheResult = errors.New("KABOOM")
|
||||||
|
nodeName = "server-1"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
serviceInLocalState bool
|
||||||
|
req *cachetype.ServiceHTTPChecksRequest
|
||||||
|
expectedResult error
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
serviceID := structs.NewServiceID(svcID, nil)
|
||||||
|
localState := testLocalState(t)
|
||||||
|
mockCacheSource := newMockServiceHTTPChecks(t)
|
||||||
|
if tc.serviceInLocalState {
|
||||||
|
require.NoError(t, localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, ""))
|
||||||
|
}
|
||||||
|
if tc.req.NodeName == nodeName && tc.serviceInLocalState {
|
||||||
|
mockCacheSource.On("Notify", ctx, tc.req, correlationID, ch).Return(cacheResult)
|
||||||
|
} else {
|
||||||
|
mockCacheSource.AssertNotCalled(t, "Notify")
|
||||||
|
}
|
||||||
|
|
||||||
|
dataSource := ServerHTTPChecks(ServerDataSourceDeps{Logger: hclog.NewNullLogger()}, nodeName, mockCacheSource, localState)
|
||||||
|
err := dataSource.Notify(ctx, tc.req, correlationID, ch)
|
||||||
|
require.Equal(t, tc.expectedResult, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
testcases := []testCase{
|
||||||
|
{
|
||||||
|
name: "delegate to cache source if service in local state of the server node",
|
||||||
|
serviceInLocalState: true,
|
||||||
|
req: &cachetype.ServiceHTTPChecksRequest{ServiceID: svcID, NodeName: nodeName},
|
||||||
|
expectedResult: cacheResult,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no-op if service not in local state of server node",
|
||||||
|
serviceInLocalState: false,
|
||||||
|
req: &cachetype.ServiceHTTPChecksRequest{ServiceID: svcID, NodeName: nodeName},
|
||||||
|
expectedResult: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no-op if service with same ID in local state but belongs to different node",
|
||||||
|
serviceInLocalState: true,
|
||||||
|
req: &cachetype.ServiceHTTPChecksRequest{ServiceID: svcID, NodeName: "server-2"},
|
||||||
|
expectedResult: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testcases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockServiceHTTPChecks(t *testing.T) *mockServiceHTTPChecks {
|
||||||
|
mock := &mockServiceHTTPChecks{}
|
||||||
|
mock.Mock.Test(t)
|
||||||
|
|
||||||
|
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||||
|
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockServiceHTTPChecks struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockServiceHTTPChecks) Notify(ctx context.Context, req *cachetype.ServiceHTTPChecksRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
return m.Called(ctx, req, correlationID, ch).Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testLocalState(t *testing.T) *local.State {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
l := local.NewState(local.Config{}, hclog.NewNullLogger(), &token.Store{})
|
||||||
|
l.TriggerSyncChanges = func() {}
|
||||||
|
return l
|
||||||
|
}
|
|
@ -96,6 +96,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
||||||
// Watch for service check updates
|
// Watch for service check updates
|
||||||
err = s.dataSources.HTTPChecks.Notify(ctx, &cachetype.ServiceHTTPChecksRequest{
|
err = s.dataSources.HTTPChecks.Notify(ctx, &cachetype.ServiceHTTPChecksRequest{
|
||||||
ServiceID: s.proxyCfg.DestinationServiceID,
|
ServiceID: s.proxyCfg.DestinationServiceID,
|
||||||
|
NodeName: s.source.Node,
|
||||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||||
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -40,6 +40,7 @@ func TestAgent_ServiceHTTPChecksNotification(t *testing.T) {
|
||||||
// Watch for service check updates
|
// Watch for service check updates
|
||||||
err := a.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
|
err := a.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
|
||||||
ServiceID: service.ID,
|
ServiceID: service.ID,
|
||||||
|
NodeName: a.Config.NodeName,
|
||||||
}, "service-checks:"+service.ID, ch)
|
}, "service-checks:"+service.ID, ch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to set cache notification: %v", err)
|
t.Fatalf("failed to set cache notification: %v", err)
|
||||||
|
|
Loading…
Reference in New Issue