Merge pull request #9152 from hashicorp/dnephin/streaming-enable-connect
use streaming backend for connect service health
This commit is contained in:
commit
579015dde1
|
@ -377,8 +377,6 @@ func New(bd BaseDeps) (*Agent, error) {
|
|||
Cache: bd.Cache,
|
||||
NetRPC: &a,
|
||||
CacheName: cacheName,
|
||||
// Temporarily until streaming supports all connect events
|
||||
CacheNameConnect: cachetype.HealthServicesName,
|
||||
}
|
||||
|
||||
a.serviceManager = NewServiceManager(&a)
|
||||
|
@ -540,6 +538,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
// Start the proxy config manager.
|
||||
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
|
||||
Cache: a.cache,
|
||||
Health: a.rpcClientHealth,
|
||||
Logger: a.logger.Named(logging.ProxyConfig),
|
||||
State: a.State,
|
||||
Source: &structs.QuerySource{
|
||||
|
|
|
@ -203,8 +203,6 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
|||
func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers {
|
||||
return stream.SnapshotHandlers{
|
||||
topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth),
|
||||
// The connect topic is temporarily disabled until the correct events are
|
||||
// created for terminating gateway changes.
|
||||
//topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect),
|
||||
topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,11 +4,12 @@ import (
|
|||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -58,6 +59,8 @@ type ManagerConfig struct {
|
|||
// Cache is the agent's cache instance that can be used to retrieve, store and
|
||||
// monitor state for the proxies.
|
||||
Cache *cache.Cache
|
||||
// Health provides service health updates on a notification channel.
|
||||
Health Health
|
||||
// state is the agent's local state to be watched for new proxy registrations.
|
||||
State *local.State
|
||||
// source describes the current agent's identity, it's used directly for
|
||||
|
@ -195,6 +198,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
|
|||
// Set the necessary dependencies
|
||||
state.logger = m.Logger.With("service_id", sid.String())
|
||||
state.cache = m.Cache
|
||||
state.health = m.Health
|
||||
state.source = m.Source
|
||||
state.dnsConfig = m.DNSConfig
|
||||
state.intentionDefaultAllow = m.IntentionDefaultAllow
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
|
@ -342,7 +343,13 @@ func testManager_BasicLifecycle(
|
|||
state.TriggerSyncChanges = func() {}
|
||||
|
||||
// Create manager
|
||||
m, err := NewManager(ManagerConfig{c, state, source, DNSConfig{}, logger, nil, false})
|
||||
m, err := NewManager(ManagerConfig{
|
||||
Cache: c,
|
||||
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName},
|
||||
State: state,
|
||||
Source: source,
|
||||
Logger: logger,
|
||||
})
|
||||
require.NoError(err)
|
||||
|
||||
// And run it
|
||||
|
|
|
@ -9,13 +9,14 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/mitchellh/copystructure"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/mitchellh/copystructure"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
type CacheNotifier interface {
|
||||
|
@ -23,6 +24,10 @@ type CacheNotifier interface {
|
|||
correlationID string, ch chan<- cache.UpdateEvent) error
|
||||
}
|
||||
|
||||
type Health interface {
|
||||
Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error
|
||||
}
|
||||
|
||||
const (
|
||||
coalesceTimeout = 200 * time.Millisecond
|
||||
rootsWatchID = "roots"
|
||||
|
@ -54,6 +59,7 @@ type state struct {
|
|||
logger hclog.Logger
|
||||
source *structs.QuerySource
|
||||
cache CacheNotifier
|
||||
health Health
|
||||
dnsConfig DNSConfig
|
||||
serverSNIFn ServerSNIFunc
|
||||
intentionDefaultAllow bool
|
||||
|
@ -155,6 +161,7 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
|
|||
taggedAddresses: taggedAddresses,
|
||||
proxyCfg: proxyCfg,
|
||||
token: token,
|
||||
|
||||
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
|
||||
// reasonable number of upstream watches to all deliver their initial
|
||||
// messages in parallel without blocking the cache.Notify loops. It's not a
|
||||
|
@ -225,7 +232,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
|
|||
var finalMeta structs.EnterpriseMeta
|
||||
finalMeta.Merge(entMeta)
|
||||
|
||||
return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
||||
return s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
|
@ -443,7 +450,7 @@ func (s *state) initWatchesMeshGateway() error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
||||
err = s.health.Notify(s.ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: structs.ConsulServiceName,
|
||||
|
@ -969,7 +976,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
|
|||
// Watch the health endpoint to discover endpoints for the service
|
||||
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: svc.Service.Name,
|
||||
|
@ -1267,7 +1274,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
|||
|
||||
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: svc.Name,
|
||||
|
|
|
@ -6,12 +6,14 @@ import (
|
|||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStateChanged(t *testing.T) {
|
||||
|
@ -143,6 +145,10 @@ func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Reque
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cn *testCacheNotifier) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) {
|
||||
panic("Get: not implemented")
|
||||
}
|
||||
|
||||
func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId string) testCacheNotifierRequest {
|
||||
cn.lock.RLock()
|
||||
req, ok := cn.notifiers[correlationId]
|
||||
|
@ -1521,6 +1527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
// setup a new testing cache notifier
|
||||
cn := newTestCacheNotifier()
|
||||
state.cache = cn
|
||||
state.health = &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}
|
||||
|
||||
// setup the local datacenter information
|
||||
state.source = &structs.QuerySource{
|
||||
|
|
|
@ -12,8 +12,6 @@ type Client struct {
|
|||
Cache CacheGetter
|
||||
// CacheName to use for service health.
|
||||
CacheName string
|
||||
// CacheNameConnect is the name of the cache to use for connect service health.
|
||||
CacheNameConnect string
|
||||
}
|
||||
|
||||
type NetRPC interface {
|
||||
|
@ -22,6 +20,7 @@ type NetRPC interface {
|
|||
|
||||
type CacheGetter interface {
|
||||
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
|
||||
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
|
||||
}
|
||||
|
||||
func (c *Client) ServiceNodes(
|
||||
|
@ -54,12 +53,7 @@ func (c *Client) getServiceNodes(
|
|||
return out, cache.ResultMeta{}, err
|
||||
}
|
||||
|
||||
cacheName := c.CacheName
|
||||
if req.Connect {
|
||||
cacheName = c.CacheNameConnect
|
||||
}
|
||||
|
||||
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
|
||||
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
|
||||
if err != nil {
|
||||
return out, md, err
|
||||
}
|
||||
|
@ -71,3 +65,12 @@ func (c *Client) getServiceNodes(
|
|||
|
||||
return *value, md, nil
|
||||
}
|
||||
|
||||
func (c *Client) Notify(
|
||||
ctx context.Context,
|
||||
req structs.ServiceSpecificRequest,
|
||||
correlationID string,
|
||||
ch chan<- cache.UpdateEvent,
|
||||
) error {
|
||||
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue