Merge pull request #9461 from hashicorp/dnephin/xds-server

xds: enable race detector and some small cleanup
This commit is contained in:
Daniel Nephin 2021-01-07 18:29:18 -05:00 committed by GitHub
commit 9bc74ca399
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 40 additions and 66 deletions

View File

@ -237,7 +237,7 @@ jobs:
command: | command: |
mkdir -p $TEST_RESULTS_DIR /tmp/jsonfile mkdir -p $TEST_RESULTS_DIR /tmp/jsonfile
pkgs="$(go list ./... | \ pkgs="$(go list ./... | \
grep -E -v '^github.com/hashicorp/consul/agent(/consul|/local|/xds|/routine-leak-checker)?$' | \ grep -E -v '^github.com/hashicorp/consul/agent(/consul|/local|/routine-leak-checker)?$' | \
grep -E -v '^github.com/hashicorp/consul/command/')" grep -E -v '^github.com/hashicorp/consul/command/')"
gotestsum \ gotestsum \
--jsonfile /tmp/jsonfile/go-test-race.log \ --jsonfile /tmp/jsonfile/go-test-race.log \

View File

@ -650,22 +650,22 @@ func (a *Agent) listenAndServeGRPC() error {
} }
xdsServer := &xds.Server{ xdsServer := &xds.Server{
Logger: a.logger, Logger: a.logger.Named(logging.Envoy),
CfgMgr: a.proxyConfig, CfgMgr: a.proxyConfig,
ResolveToken: a.resolveToken, ResolveToken: a.resolveToken,
CheckFetcher: a, CheckFetcher: a,
CfgFetcher: a, CfgFetcher: a,
AuthCheckFrequency: xds.DefaultAuthCheckFrequency,
} }
xdsServer.Initialize()
var err error tlsConfig := a.tlsConfigurator
if a.config.HTTPSPort > 0 { // gRPC uses the same TLS settings as the HTTPS API. If HTTPS is not enabled
// gRPC uses the same TLS settings as the HTTPS API. If HTTPS is // then gRPC should not use TLS.
// enabled then gRPC will require HTTPS as well. if a.config.HTTPSPort <= 0 {
a.grpcServer, err = xdsServer.GRPCServer(a.tlsConfigurator) tlsConfig = nil
} else {
a.grpcServer, err = xdsServer.GRPCServer(nil)
} }
var err error
a.grpcServer, err = xdsServer.GRPCServer(tlsConfig)
if err != nil { if err != nil {
return err return err
} }

View File

@ -10,12 +10,13 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes/wrappers" "github.com/golang/protobuf/ptypes/wrappers"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
) )
func TestClustersFromSnapshot(t *testing.T) { func TestClustersFromSnapshot(t *testing.T) {
@ -665,10 +666,7 @@ func TestClustersFromSnapshot(t *testing.T) {
} }
// Need server just for logger dependency // Need server just for logger dependency
logger := testutil.Logger(t) s := Server{Logger: testutil.Logger(t)}
s := Server{
Logger: logger,
}
cInfo := connectionInfo{ cInfo := connectionInfo{
Token: "my-token", Token: "my-token",

View File

@ -12,11 +12,12 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
) )
func Test_makeLoadAssignment(t *testing.T) { func Test_makeLoadAssignment(t *testing.T) {
@ -579,10 +580,7 @@ func Test_endpointsFromSnapshot(t *testing.T) {
} }
// Need server just for logger dependency // Need server just for logger dependency
logger := testutil.Logger(t) s := Server{Logger: testutil.Logger(t)}
s := Server{
Logger: logger,
}
cInfo := connectionInfo{ cInfo := connectionInfo{
Token: "my-token", Token: "my-token",

View File

@ -9,12 +9,13 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/envoyproxy/go-control-plane/pkg/wellknown"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
) )
func TestListenersFromSnapshot(t *testing.T) { func TestListenersFromSnapshot(t *testing.T) {
@ -508,10 +509,7 @@ func TestListenersFromSnapshot(t *testing.T) {
} }
// Need server just for logger dependency // Need server just for logger dependency
logger := testutil.Logger(t) s := Server{Logger: testutil.Logger(t)}
s := Server{
Logger: logger,
}
cInfo := connectionInfo{ cInfo := connectionInfo{
Token: "my-token", Token: "my-token",

View File

@ -9,14 +9,15 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
) )
func TestRoutesFromSnapshot(t *testing.T) { func TestRoutesFromSnapshot(t *testing.T) {
@ -256,10 +257,7 @@ func TestRoutesFromSnapshot(t *testing.T) {
tt.setup(snap) tt.setup(snap)
} }
logger := testutil.Logger(t) s := Server{Logger: testutil.Logger(t)}
s := Server{
Logger: logger,
}
cInfo := connectionInfo{ cInfo := connectionInfo{
Token: "my-token", Token: "my-token",
ProxyFeatures: sf, ProxyFeatures: sf,

View File

@ -11,17 +11,17 @@ import (
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoydisco "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoydisco "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/tlsutil"
) )
// ADSStream is a shorter way of referring to this thing... // ADSStream is a shorter way of referring to this thing...
@ -125,14 +125,6 @@ type Server struct {
CfgFetcher ConfigFetcher CfgFetcher ConfigFetcher
} }
// Initialize will finish configuring the Server for first use.
func (s *Server) Initialize() {
if s.AuthCheckFrequency == 0 {
s.AuthCheckFrequency = DefaultAuthCheckFrequency
}
s.Logger = s.Logger.Named(logging.Envoy)
}
// StreamAggregatedResources implements // StreamAggregatedResources implements
// envoydisco.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is // envoydisco.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is
// the only xDS API we directly support for now. // the only xDS API we directly support for now.

View File

@ -89,7 +89,6 @@ func (m *testManager) AssertWatchCancelled(t *testing.T, proxyID structs.Service
} }
func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) { func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
logger := testutil.Logger(t)
mgr := newTestManager(t) mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) { aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all // Allow all
@ -99,11 +98,10 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
defer envoy.Close() defer envoy.Close()
s := Server{ s := Server{
Logger: logger, Logger: testutil.Logger(t),
CfgMgr: mgr, CfgMgr: mgr,
ResolveToken: aclResolve, ResolveToken: aclResolve,
} }
s.Initialize()
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -430,7 +428,6 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
logger := testutil.Logger(t)
mgr := newTestManager(t) mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) { aclResolve := func(id string) (acl.Authorizer, error) {
if !tt.defaultDeny { if !tt.defaultDeny {
@ -452,11 +449,10 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
defer envoy.Close() defer envoy.Close()
s := Server{ s := Server{
Logger: logger, Logger: testutil.Logger(t),
CfgMgr: mgr, CfgMgr: mgr,
ResolveToken: aclResolve, ResolveToken: aclResolve,
} }
s.Initialize()
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {
@ -513,7 +509,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuring
var validToken atomic.Value var validToken atomic.Value
validToken.Store(token) validToken.Store(token)
logger := testutil.Logger(t)
mgr := newTestManager(t) mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) { aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) { if token := validToken.Load(); token == nil || id != token.(string) {
@ -526,12 +521,11 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuring
defer envoy.Close() defer envoy.Close()
s := Server{ s := Server{
Logger: logger, Logger: testutil.Logger(t),
CfgMgr: mgr, CfgMgr: mgr,
ResolveToken: aclResolve, ResolveToken: aclResolve,
AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in
} }
s.Initialize()
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {
@ -608,7 +602,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
var validToken atomic.Value var validToken atomic.Value
validToken.Store(token) validToken.Store(token)
logger := testutil.Logger(t)
mgr := newTestManager(t) mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) { aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) { if token := validToken.Load(); token == nil || id != token.(string) {
@ -621,12 +614,11 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
defer envoy.Close() defer envoy.Close()
s := Server{ s := Server{
Logger: logger, Logger: testutil.Logger(t),
CfgMgr: mgr, CfgMgr: mgr,
ResolveToken: aclResolve, ResolveToken: aclResolve,
AuthCheckFrequency: 100 * time.Millisecond, // Make this short. AuthCheckFrequency: 100 * time.Millisecond, // Make this short.
} }
s.Initialize()
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {
@ -698,7 +690,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
} }
func TestServer_StreamAggregatedResources_IngressEmptyResponse(t *testing.T) { func TestServer_StreamAggregatedResources_IngressEmptyResponse(t *testing.T) {
logger := testutil.Logger(t)
mgr := newTestManager(t) mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) { aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all // Allow all
@ -708,11 +699,10 @@ func TestServer_StreamAggregatedResources_IngressEmptyResponse(t *testing.T) {
defer envoy.Close() defer envoy.Close()
s := Server{ s := Server{
Logger: logger, Logger: testutil.Logger(t),
CfgMgr: mgr, CfgMgr: mgr,
ResolveToken: aclResolve, ResolveToken: aclResolve,
} }
s.Initialize()
sid := structs.NewServiceID("ingress-gateway", nil) sid := structs.NewServiceID("ingress-gateway", nil)

View File

@ -189,7 +189,7 @@ func (e *TestEnvoy) Close() error {
// unblock the recv chan to simulate recv error when client disconnects // unblock the recv chan to simulate recv error when client disconnects
if e.stream != nil && e.stream.recvCh != nil { if e.stream != nil && e.stream.recvCh != nil {
close(e.stream.recvCh) close(e.stream.recvCh)
e.stream.recvCh = nil e.stream = nil
} }
if e.cancel != nil { if e.cancel != nil {
e.cancel() e.cancel()