From e20abcc14faf14c7b7bce9014272124d46c02b54 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 24 Jun 2021 15:02:34 -0500 Subject: [PATCH] connect/proxy: fixes logic bug preventing builtin/native proxy from starting upstream listeners (#10486) Fixes #10480 Also fixed a data race in the `connect/proxy` package that was unearthed by the tests changed for this bugfix. --- .changelog/10486.txt | 3 +++ connect/proxy/listener.go | 48 ++++++++++++++++++++++++--------- connect/proxy/proxy.go | 13 ++++++--- connect/proxy/proxy_test.go | 53 ++++++++++++++++++++++++++++++++----- 4 files changed, 95 insertions(+), 22 deletions(-) create mode 100644 .changelog/10486.txt diff --git a/.changelog/10486.txt b/.changelog/10486.txt new file mode 100644 index 000000000..d17325d78 --- /dev/null +++ b/.changelog/10486.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect/proxy: fixes logic bug preventing builtin/native proxy from starting upstream listeners +``` diff --git a/connect/proxy/listener.go b/connect/proxy/listener.go index ea84493db..546658d69 100644 --- a/connect/proxy/listener.go +++ b/connect/proxy/listener.go @@ -45,7 +45,10 @@ type Listener struct { // `connection refused`. Retry loops and sleeps are unpleasant workarounds and // this is cheap and correct. listeningChan chan struct{} - listener net.Listener + + // listenerLock guards access to the listener field + listenerLock sync.Mutex + listener net.Listener logger hclog.Logger @@ -138,15 +141,17 @@ func (l *Listener) Serve() error { return errors.New("serve called on a closed listener") } - var err error - l.listener, err = l.listenFunc() + listener, err := l.listenFunc() if err != nil { return err } + + l.setListener(listener) + close(l.listeningChan) for { - conn, err := l.listener.Accept() + conn, err := listener.Accept() if err != nil { if atomic.LoadInt32(&l.stopFlag) == 1 { return nil @@ -242,16 +247,23 @@ func (l *Listener) trackConn() func() { // Close terminates the listener and all active connections. func (l *Listener) Close() error { + // Prevent the listener from being started. oldFlag := atomic.SwapInt32(&l.stopFlag, 1) - if oldFlag == 0 { - close(l.stopChan) - // Wait for all conns to close - l.connWG.Wait() - - if l.listener != nil { - l.listener.Close() - } + if oldFlag != 0 { + return nil } + + // Stop the current listener and stop accepting new requests. + if listener := l.getListener(); listener != nil { + listener.Close() + } + + // Stop outstanding requests. + close(l.stopChan) + + // Wait for all conns to close + l.connWG.Wait() + return nil } @@ -264,3 +276,15 @@ func (l *Listener) Wait() { func (l *Listener) BindAddr() string { return l.bindAddr } + +func (l *Listener) setListener(listener net.Listener) { + l.listenerLock.Lock() + l.listener = listener + l.listenerLock.Unlock() +} + +func (l *Listener) getListener() net.Listener { + l.listenerLock.Lock() + defer l.listenerLock.Unlock() + return l.listener +} diff --git a/connect/proxy/proxy.go b/connect/proxy/proxy.go index 22c9ff83d..8eb34c499 100644 --- a/connect/proxy/proxy.go +++ b/connect/proxy/proxy.go @@ -3,10 +3,11 @@ package proxy import ( "crypto/x509" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/connect" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/go-hclog" ) // Proxy implements the built-in connect proxy. @@ -103,8 +104,14 @@ func (p *Proxy) Serve() error { for _, uc := range newCfg.Upstreams { uc.applyDefaults() - if uc.LocalBindPort < 1 || uc.LocalBindSocketPath == "" { - p.logger.Error("upstream has no local_bind_port or local_bind_socket_path. "+ + if uc.LocalBindSocketPath != "" { + p.logger.Error("local_bind_socket_path is not supported with this proxy implementation. "+ + "Can't start upstream.", "upstream", uc.String()) + continue + } + + if uc.LocalBindPort < 1 { + p.logger.Error("upstream has no local_bind_port. "+ "Can't start upstream.", "upstream", uc.String()) continue } diff --git a/connect/proxy/proxy_test.go b/connect/proxy/proxy_test.go index fc9243bf9..46274de9b 100644 --- a/connect/proxy/proxy_test.go +++ b/connect/proxy/proxy_test.go @@ -4,7 +4,11 @@ import ( "context" "crypto/tls" "net" + "path/filepath" + "runtime" + "strconv" "testing" + "time" "github.com/stretchr/testify/require" @@ -23,9 +27,7 @@ func TestProxy_public(t *testing.T) { t.Skip("too slow for testing.Short") } - require := require.New(t) - - ports := freeport.MustTake(1) + ports := freeport.MustTake(2) defer freeport.Return(ports) a := agent.NewTestAgent(t, "") @@ -42,12 +44,31 @@ func TestProxy_public(t *testing.T) { Service: "echo", }, }, nil) - require.NoError(err) + require.NoError(t, err) // Start the backend service that is being proxied testApp := NewTestTCPServer(t) defer testApp.Close() + upstreams := []UpstreamConfig{ + { + DestinationName: "just-a-port", + LocalBindPort: ports[1], + }, + } + + var unixSocket string + if runtime.GOOS != "windows" { + tempDir := testutil.TempDir(t, "consul") + unixSocket = filepath.Join(tempDir, "test.sock") + + upstreams = append(upstreams, UpstreamConfig{ + DestinationName: "just-a-unix-domain-socket", + LocalBindSocketPath: unixSocket, + LocalBindSocketMode: "0600", + }) + } + // Start the proxy p, err := New(client, NewStaticConfigWatcher(&Config{ ProxiedServiceName: "echo", @@ -56,8 +77,9 @@ func TestProxy_public(t *testing.T) { BindPort: ports[0], LocalServiceAddress: testApp.Addr().String(), }, + Upstreams: upstreams, }), testutil.Logger(t)) - require.NoError(err) + require.NoError(t, err) defer p.Close() go p.Serve() @@ -65,7 +87,7 @@ func TestProxy_public(t *testing.T) { // if the proxy supports it. This is so we can verify below that the proxy _doesn't_ // advertise `h2` support as it's only a L4 proxy. svc, err := connect.NewServiceWithConfig("echo", connect.Config{Client: client, ServerNextProtos: []string{"h2"}}) - require.NoError(err) + require.NoError(t, err) // Create a test connection to the proxy. We retry here a few times // since this is dependent on the agent actually starting up and setting @@ -83,8 +105,25 @@ func TestProxy_public(t *testing.T) { // Verify that we did not select h2 via ALPN since the proxy is layer 4 only tlsConn := conn.(*tls.Conn) - require.Equal("", tlsConn.ConnectionState().NegotiatedProtocol) + require.Equal(t, "", tlsConn.ConnectionState().NegotiatedProtocol) // Connection works, test it is the right one TestEchoConn(t, conn, "") + + t.Run("verify port upstream is configured", func(t *testing.T) { + // Verify that it is listening by doing a simple TCP dial. + addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(ports[1])) + conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) + require.NoError(t, err) + _ = conn.Close() + }) + + t.Run("verify unix domain socket upstream will never work", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.SkipNow() + } + + // Ensure the socket was not created + require.NoFileExists(t, unixSocket) + }) }