health: create health.Client in Agent.New

This commit is contained in:
Daniel Nephin 2021-04-20 18:14:46 -04:00
parent 0ea49c3e65
commit 3cda0a7cc4
4 changed files with 34 additions and 12 deletions

View File

@ -50,6 +50,7 @@ import (
"github.com/hashicorp/consul/lib/file" "github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
@ -373,15 +374,21 @@ func New(bd BaseDeps) (*Agent, error) {
cache: bd.Cache, cache: bd.Cache,
} }
cacheName := cachetype.HealthServicesName // TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
if bd.RuntimeConfig.UseStreamingBackend { conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
cacheName = cachetype.StreamingHealthServicesName if err != nil {
return nil, err
} }
a.rpcClientHealth = &health.Client{ a.rpcClientHealth = &health.Client{
Cache: bd.Cache, Cache: bd.Cache,
NetRPC: &a, NetRPC: &a,
CacheName: cacheName, CacheName: cachetype.HealthServicesName,
CacheNameNotStreaming: cachetype.HealthServicesName, ViewStore: bd.ViewStore,
MaterializerDeps: health.MaterializerDeps{
Client: pbsubscribe.NewStateChangeSubscriptionClient(conn),
Logger: bd.Logger.Named("rpcclient.health"),
},
} }
a.serviceManager = NewServiceManager(&a) a.serviceManager = NewServiceManager(&a)
@ -533,6 +540,8 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy) return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy)
} }
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager. // Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache, Cache: a.cache,

View File

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"google.golang.org/grpc"
"gopkg.in/square/go-jose.v2/jwt" "gopkg.in/square/go-jose.v2/jwt"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
@ -307,6 +308,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil), Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store), Tokens: new(token.Store),
TLSConfigurator: tlsConf, TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
}, },
RuntimeConfig: &config.RuntimeConfig{ RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{ HTTPAddrs: []net.Addr{
@ -355,6 +357,12 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
} }
} }
type fakeGRPCConnPool struct{}
func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) {
return nil, nil
}
func TestAgent_ReconnectConfigWanDisabled(t *testing.T) { func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
@ -5173,6 +5181,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil), Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store), Tokens: new(token.Store),
TLSConfigurator: tlsConf, TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
}, },
RuntimeConfig: &config.RuntimeConfig{ RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{ HTTPAddrs: []net.Addr{

View File

@ -1,12 +1,13 @@
package consul package consul
import ( import (
"github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
) )
type Deps struct { type Deps struct {
@ -15,5 +16,9 @@ type Deps struct {
Tokens *token.Store Tokens *token.Store
Router *router.Router Router *router.Router
ConnPool *pool.ConnPool ConnPool *pool.ConnPool
GRPCConnPool *grpc.ClientConnPool GRPCConnPool GRPCClientConner
}
type GRPCClientConner interface {
ClientConn(datacenter string) (*grpc.ClientConn, error)
} }

View File

@ -5,14 +5,13 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/rpc/subscribe" "github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
type subscribeBackend struct { type subscribeBackend struct {
srv *Server srv *Server
connPool *agentgrpc.ClientConnPool connPool GRPCClientConner
} }
// TODO: refactor Resolve methods to an ACLBackend that can be used by all // TODO: refactor Resolve methods to an ACLBackend that can be used by all