Merge pull request #10146 from hashicorp/dnephin/connect-proxy-test-deadlock

connect/proxy: fix a few problems with tests
This commit is contained in:
Daniel Nephin 2021-04-28 17:53:43 -04:00 committed by GitHub
commit faf264b293
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 21 deletions

View File

@ -14,8 +14,6 @@ import (
) )
func TestUpstreamResolverFuncFromClient(t *testing.T) { func TestUpstreamResolverFuncFromClient(t *testing.T) {
t.Parallel()
tests := []struct { tests := []struct {
name string name string
cfg UpstreamConfig cfg UpstreamConfig

View File

@ -7,9 +7,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil/retry"
) )
// Assert io.Closer implementation // Assert io.Closer implementation
@ -65,8 +66,6 @@ func testConnPipelineSetup(t *testing.T) (net.Conn, net.Conn, *Conn, func()) {
} }
func TestConn(t *testing.T) { func TestConn(t *testing.T) {
t.Parallel()
src, dst, c, stop := testConnPipelineSetup(t) src, dst, c, stop := testConnPipelineSetup(t)
defer stop() defer stop()
@ -124,8 +123,6 @@ func TestConn(t *testing.T) {
} }
func TestConnSrcClosing(t *testing.T) { func TestConnSrcClosing(t *testing.T) {
t.Parallel()
src, dst, c, stop := testConnPipelineSetup(t) src, dst, c, stop := testConnPipelineSetup(t)
defer stop() defer stop()
@ -164,8 +161,6 @@ func TestConnSrcClosing(t *testing.T) {
} }
func TestConnDstClosing(t *testing.T) { func TestConnDstClosing(t *testing.T) {
t.Parallel()
src, dst, c, stop := testConnPipelineSetup(t) src, dst, c, stop := testConnPipelineSetup(t)
defer stop() defer stop()

View File

@ -10,10 +10,11 @@ import (
"time" "time"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect" "github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/go-hclog"
) )
const ( const (
@ -44,6 +45,7 @@ type Listener struct {
// `connection refused`. Retry loops and sleeps are unpleasant workarounds and // `connection refused`. Retry loops and sleeps are unpleasant workarounds and
// this is cheap and correct. // this is cheap and correct.
listeningChan chan struct{} listeningChan chan struct{}
listener net.Listener
logger hclog.Logger logger hclog.Logger
@ -136,14 +138,15 @@ func (l *Listener) Serve() error {
return errors.New("serve called on a closed listener") return errors.New("serve called on a closed listener")
} }
listen, err := l.listenFunc() var err error
l.listener, err = l.listenFunc()
if err != nil { if err != nil {
return err return err
} }
close(l.listeningChan) close(l.listeningChan)
for { for {
conn, err := listen.Accept() conn, err := l.listener.Accept()
if err != nil { if err != nil {
if atomic.LoadInt32(&l.stopFlag) == 1 { if atomic.LoadInt32(&l.stopFlag) == 1 {
return nil return nil
@ -151,6 +154,7 @@ func (l *Listener) Serve() error {
return err return err
} }
l.connWG.Add(1)
go l.handleConn(conn) go l.handleConn(conn)
} }
} }
@ -158,6 +162,8 @@ func (l *Listener) Serve() error {
// handleConn is the internal connection handler goroutine. // handleConn is the internal connection handler goroutine.
func (l *Listener) handleConn(src net.Conn) { func (l *Listener) handleConn(src net.Conn) {
defer src.Close() defer src.Close()
// Make sure Listener.Close waits for this conn to be cleaned up.
defer l.connWG.Done()
dst, err := l.dialFunc() dst, err := l.dialFunc()
if err != nil { if err != nil {
@ -169,11 +175,6 @@ func (l *Listener) handleConn(src net.Conn) {
// it closes. // it closes.
defer l.trackConn()() defer l.trackConn()()
// Make sure Close() waits for this conn to be cleaned up. Note defer is
// before conn.Close() so runs after defer conn.Close().
l.connWG.Add(1)
defer l.connWG.Done()
// Note no need to defer dst.Close() since conn handles that for us. // Note no need to defer dst.Close() since conn handles that for us.
conn := NewConn(src, dst) conn := NewConn(src, dst)
defer conn.Close() defer conn.Close()
@ -246,6 +247,10 @@ func (l *Listener) Close() error {
close(l.stopChan) close(l.stopChan)
// Wait for all conns to close // Wait for all conns to close
l.connWG.Wait() l.connWG.Wait()
if l.listener != nil {
l.listener.Close()
}
} }
return nil return nil
} }

View File

@ -26,6 +26,7 @@ func testSetupMetrics(t *testing.T) *metrics.InmemSink {
s := metrics.NewInmemSink(10*time.Second, 300*time.Second) s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig("consul.proxy.test") cfg := metrics.DefaultConfig("consul.proxy.test")
cfg.EnableHostname = false cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
metrics.NewGlobal(cfg, s) metrics.NewGlobal(cfg, s)
return s return s
} }
@ -45,6 +46,7 @@ func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink,
currentInterval.RLock() currentInterval.RLock()
if len(currentInterval.Gauges) > 0 { if len(currentInterval.Gauges) > 0 {
got = currentInterval.Gauges[name].Value got = currentInterval.Gauges[name].Value
currentInterval.RUnlock()
break break
} }
currentInterval.RUnlock() currentInterval.RUnlock()
@ -132,8 +134,9 @@ func TestPublicListener(t *testing.T) {
// Run proxy // Run proxy
go func() { go func() {
err := l.Serve() if err := l.Serve(); err != nil {
require.NoError(t, err) t.Errorf("failed to listen: %v", err.Error())
}
}() }()
defer l.Close() defer l.Close()
l.Wait() l.Wait()
@ -200,8 +203,9 @@ func TestUpstreamListener(t *testing.T) {
// Run proxy // Run proxy
go func() { go func() {
err := l.Serve() if err := l.Serve(); err != nil {
require.NoError(t, err) t.Errorf("failed to listen: %v", err.Error())
}
}() }()
defer l.Close() defer l.Close()
l.Wait() l.Wait()