From 8be5a76b198758c8dce5d5bca0b67dade8c6e0ae Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 28 Apr 2021 13:46:53 -0400 Subject: [PATCH 1/2] connect/proxy: remove t.Parallel from tests These tests run in under 10ms, t.Parallel provides no value and makes debuging failures more difficult. --- connect/proxy/config_test.go | 2 -- connect/proxy/conn_test.go | 9 ++------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/connect/proxy/config_test.go b/connect/proxy/config_test.go index ef9955765..ba2974d5d 100644 --- a/connect/proxy/config_test.go +++ b/connect/proxy/config_test.go @@ -14,8 +14,6 @@ import ( ) func TestUpstreamResolverFuncFromClient(t *testing.T) { - t.Parallel() - tests := []struct { name string cfg UpstreamConfig diff --git a/connect/proxy/conn_test.go b/connect/proxy/conn_test.go index 92e92f5cb..2ca29fab5 100644 --- a/connect/proxy/conn_test.go +++ b/connect/proxy/conn_test.go @@ -7,9 +7,10 @@ import ( "testing" "time" - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/sdk/testutil/retry" ) // Assert io.Closer implementation @@ -65,8 +66,6 @@ func testConnPipelineSetup(t *testing.T) (net.Conn, net.Conn, *Conn, func()) { } func TestConn(t *testing.T) { - t.Parallel() - src, dst, c, stop := testConnPipelineSetup(t) defer stop() @@ -124,8 +123,6 @@ func TestConn(t *testing.T) { } func TestConnSrcClosing(t *testing.T) { - t.Parallel() - src, dst, c, stop := testConnPipelineSetup(t) defer stop() @@ -164,8 +161,6 @@ func TestConnSrcClosing(t *testing.T) { } func TestConnDstClosing(t *testing.T) { - t.Parallel() - src, dst, c, stop := testConnPipelineSetup(t) defer stop() From 03aa6106ffa2da5e894fc938989231c36165f776 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 28 Apr 2021 14:27:55 -0400 Subject: [PATCH 2/2] connect/proxy: fix a number of problems with Listener We noticed that TestUpstreamListener would deadlock sometimes when run with the race detector. While debugging this issue I found and fixed the following problems. 1. the net.Listener was not being closed properly when Listener.Stop was called. This caused the Listener.Serve goroutine to run forever. Fixed by storing a reference to net.Listener and closing it properly when Listener.Stop is called. 2. call connWG.Add in the correct place. WaitGroup.Add must be called before starting a goroutine, not from inside the goroutine. 3. Set metrics config EnableRuntimeMetrics to `false` so that we don't start a background goroutine in each test for no reason. There is no way to shutdown this goroutine, and it was an added distraction while debugging these timeouts. 5. two tests were calling require.NoError from a goroutine. require.NoError calls t.FailNow, which MUST be called from the main test goroutine. Instead use t.Errorf, which can be called from other goroutines and will still fail the test. 6. `assertCurrentGaugeValue` wass breaking out of a for loop, which would cause the `RWMutex.RUnlock` to be missed. Fixed by calling unlock before `break`. The core issue of a deadlock was fixed by https://github.com/armon/go-metrics/pull/124. --- connect/proxy/listener.go | 21 +++++++++++++-------- connect/proxy/listener_test.go | 12 ++++++++---- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/connect/proxy/listener.go b/connect/proxy/listener.go index 56e19297e..ea84493db 100644 --- a/connect/proxy/listener.go +++ b/connect/proxy/listener.go @@ -10,10 +10,11 @@ import ( "time" metrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/connect" "github.com/hashicorp/consul/ipaddr" - "github.com/hashicorp/go-hclog" ) const ( @@ -44,6 +45,7 @@ type Listener struct { // `connection refused`. Retry loops and sleeps are unpleasant workarounds and // this is cheap and correct. listeningChan chan struct{} + listener net.Listener logger hclog.Logger @@ -136,14 +138,15 @@ func (l *Listener) Serve() error { return errors.New("serve called on a closed listener") } - listen, err := l.listenFunc() + var err error + l.listener, err = l.listenFunc() if err != nil { return err } close(l.listeningChan) for { - conn, err := listen.Accept() + conn, err := l.listener.Accept() if err != nil { if atomic.LoadInt32(&l.stopFlag) == 1 { return nil @@ -151,6 +154,7 @@ func (l *Listener) Serve() error { return err } + l.connWG.Add(1) go l.handleConn(conn) } } @@ -158,6 +162,8 @@ func (l *Listener) Serve() error { // handleConn is the internal connection handler goroutine. func (l *Listener) handleConn(src net.Conn) { defer src.Close() + // Make sure Listener.Close waits for this conn to be cleaned up. + defer l.connWG.Done() dst, err := l.dialFunc() if err != nil { @@ -169,11 +175,6 @@ func (l *Listener) handleConn(src net.Conn) { // it closes. defer l.trackConn()() - // Make sure Close() waits for this conn to be cleaned up. Note defer is - // before conn.Close() so runs after defer conn.Close(). - l.connWG.Add(1) - defer l.connWG.Done() - // Note no need to defer dst.Close() since conn handles that for us. conn := NewConn(src, dst) defer conn.Close() @@ -246,6 +247,10 @@ func (l *Listener) Close() error { close(l.stopChan) // Wait for all conns to close l.connWG.Wait() + + if l.listener != nil { + l.listener.Close() + } } return nil } diff --git a/connect/proxy/listener_test.go b/connect/proxy/listener_test.go index fe842c4f6..83cb39987 100644 --- a/connect/proxy/listener_test.go +++ b/connect/proxy/listener_test.go @@ -26,6 +26,7 @@ func testSetupMetrics(t *testing.T) *metrics.InmemSink { s := metrics.NewInmemSink(10*time.Second, 300*time.Second) cfg := metrics.DefaultConfig("consul.proxy.test") cfg.EnableHostname = false + cfg.EnableRuntimeMetrics = false metrics.NewGlobal(cfg, s) return s } @@ -45,6 +46,7 @@ func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, currentInterval.RLock() if len(currentInterval.Gauges) > 0 { got = currentInterval.Gauges[name].Value + currentInterval.RUnlock() break } currentInterval.RUnlock() @@ -132,8 +134,9 @@ func TestPublicListener(t *testing.T) { // Run proxy go func() { - err := l.Serve() - require.NoError(t, err) + if err := l.Serve(); err != nil { + t.Errorf("failed to listen: %v", err.Error()) + } }() defer l.Close() l.Wait() @@ -200,8 +203,9 @@ func TestUpstreamListener(t *testing.T) { // Run proxy go func() { - err := l.Serve() - require.NoError(t, err) + if err := l.Serve(); err != nil { + t.Errorf("failed to listen: %v", err.Error()) + } }() defer l.Close() l.Wait()