connect/proxy: fixes logic bug preventing builtin/native proxy from starting upstream listeners (#10486)

Fixes #10480

Also fixed a data race in the `connect/proxy` package that was unearthed by the tests changed for this bugfix.
This commit is contained in:
R.B. Boyer 2021-06-24 15:02:34 -05:00 committed by GitHub
parent 811aa82f15
commit e20abcc14f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 22 deletions

3
.changelog/10486.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect/proxy: fixes logic bug preventing builtin/native proxy from starting upstream listeners
```

View File

@ -45,6 +45,9 @@ type Listener struct {
// `connection refused`. Retry loops and sleeps are unpleasant workarounds and // `connection refused`. Retry loops and sleeps are unpleasant workarounds and
// this is cheap and correct. // this is cheap and correct.
listeningChan chan struct{} listeningChan chan struct{}
// listenerLock guards access to the listener field
listenerLock sync.Mutex
listener net.Listener listener net.Listener
logger hclog.Logger logger hclog.Logger
@ -138,15 +141,17 @@ func (l *Listener) Serve() error {
return errors.New("serve called on a closed listener") return errors.New("serve called on a closed listener")
} }
var err error listener, err := l.listenFunc()
l.listener, err = l.listenFunc()
if err != nil { if err != nil {
return err return err
} }
l.setListener(listener)
close(l.listeningChan) close(l.listeningChan)
for { for {
conn, err := l.listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
if atomic.LoadInt32(&l.stopFlag) == 1 { if atomic.LoadInt32(&l.stopFlag) == 1 {
return nil return nil
@ -242,16 +247,23 @@ func (l *Listener) trackConn() func() {
// Close terminates the listener and all active connections. // Close terminates the listener and all active connections.
func (l *Listener) Close() error { func (l *Listener) Close() error {
// Prevent the listener from being started.
oldFlag := atomic.SwapInt32(&l.stopFlag, 1) oldFlag := atomic.SwapInt32(&l.stopFlag, 1)
if oldFlag == 0 { if oldFlag != 0 {
return nil
}
// Stop the current listener and stop accepting new requests.
if listener := l.getListener(); listener != nil {
listener.Close()
}
// Stop outstanding requests.
close(l.stopChan) close(l.stopChan)
// Wait for all conns to close // Wait for all conns to close
l.connWG.Wait() l.connWG.Wait()
if l.listener != nil {
l.listener.Close()
}
}
return nil return nil
} }
@ -264,3 +276,15 @@ func (l *Listener) Wait() {
func (l *Listener) BindAddr() string { func (l *Listener) BindAddr() string {
return l.bindAddr return l.bindAddr
} }
func (l *Listener) setListener(listener net.Listener) {
l.listenerLock.Lock()
l.listener = listener
l.listenerLock.Unlock()
}
func (l *Listener) getListener() net.Listener {
l.listenerLock.Lock()
defer l.listenerLock.Unlock()
return l.listener
}

View File

@ -3,10 +3,11 @@ package proxy
import ( import (
"crypto/x509" "crypto/x509"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect" "github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-hclog"
) )
// Proxy implements the built-in connect proxy. // Proxy implements the built-in connect proxy.
@ -103,8 +104,14 @@ func (p *Proxy) Serve() error {
for _, uc := range newCfg.Upstreams { for _, uc := range newCfg.Upstreams {
uc.applyDefaults() uc.applyDefaults()
if uc.LocalBindPort < 1 || uc.LocalBindSocketPath == "" { if uc.LocalBindSocketPath != "" {
p.logger.Error("upstream has no local_bind_port or local_bind_socket_path. "+ p.logger.Error("local_bind_socket_path is not supported with this proxy implementation. "+
"Can't start upstream.", "upstream", uc.String())
continue
}
if uc.LocalBindPort < 1 {
p.logger.Error("upstream has no local_bind_port. "+
"Can't start upstream.", "upstream", uc.String()) "Can't start upstream.", "upstream", uc.String())
continue continue
} }

View File

@ -4,7 +4,11 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"net" "net"
"path/filepath"
"runtime"
"strconv"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -23,9 +27,7 @@ func TestProxy_public(t *testing.T) {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
require := require.New(t) ports := freeport.MustTake(2)
ports := freeport.MustTake(1)
defer freeport.Return(ports) defer freeport.Return(ports)
a := agent.NewTestAgent(t, "") a := agent.NewTestAgent(t, "")
@ -42,12 +44,31 @@ func TestProxy_public(t *testing.T) {
Service: "echo", Service: "echo",
}, },
}, nil) }, nil)
require.NoError(err) require.NoError(t, err)
// Start the backend service that is being proxied // Start the backend service that is being proxied
testApp := NewTestTCPServer(t) testApp := NewTestTCPServer(t)
defer testApp.Close() defer testApp.Close()
upstreams := []UpstreamConfig{
{
DestinationName: "just-a-port",
LocalBindPort: ports[1],
},
}
var unixSocket string
if runtime.GOOS != "windows" {
tempDir := testutil.TempDir(t, "consul")
unixSocket = filepath.Join(tempDir, "test.sock")
upstreams = append(upstreams, UpstreamConfig{
DestinationName: "just-a-unix-domain-socket",
LocalBindSocketPath: unixSocket,
LocalBindSocketMode: "0600",
})
}
// Start the proxy // Start the proxy
p, err := New(client, NewStaticConfigWatcher(&Config{ p, err := New(client, NewStaticConfigWatcher(&Config{
ProxiedServiceName: "echo", ProxiedServiceName: "echo",
@ -56,8 +77,9 @@ func TestProxy_public(t *testing.T) {
BindPort: ports[0], BindPort: ports[0],
LocalServiceAddress: testApp.Addr().String(), LocalServiceAddress: testApp.Addr().String(),
}, },
Upstreams: upstreams,
}), testutil.Logger(t)) }), testutil.Logger(t))
require.NoError(err) require.NoError(t, err)
defer p.Close() defer p.Close()
go p.Serve() go p.Serve()
@ -65,7 +87,7 @@ func TestProxy_public(t *testing.T) {
// if the proxy supports it. This is so we can verify below that the proxy _doesn't_ // if the proxy supports it. This is so we can verify below that the proxy _doesn't_
// advertise `h2` support as it's only a L4 proxy. // advertise `h2` support as it's only a L4 proxy.
svc, err := connect.NewServiceWithConfig("echo", connect.Config{Client: client, ServerNextProtos: []string{"h2"}}) svc, err := connect.NewServiceWithConfig("echo", connect.Config{Client: client, ServerNextProtos: []string{"h2"}})
require.NoError(err) require.NoError(t, err)
// Create a test connection to the proxy. We retry here a few times // Create a test connection to the proxy. We retry here a few times
// since this is dependent on the agent actually starting up and setting // since this is dependent on the agent actually starting up and setting
@ -83,8 +105,25 @@ func TestProxy_public(t *testing.T) {
// Verify that we did not select h2 via ALPN since the proxy is layer 4 only // Verify that we did not select h2 via ALPN since the proxy is layer 4 only
tlsConn := conn.(*tls.Conn) tlsConn := conn.(*tls.Conn)
require.Equal("", tlsConn.ConnectionState().NegotiatedProtocol) require.Equal(t, "", tlsConn.ConnectionState().NegotiatedProtocol)
// Connection works, test it is the right one // Connection works, test it is the right one
TestEchoConn(t, conn, "") TestEchoConn(t, conn, "")
t.Run("verify port upstream is configured", func(t *testing.T) {
// Verify that it is listening by doing a simple TCP dial.
addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(ports[1]))
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
require.NoError(t, err)
_ = conn.Close()
})
t.Run("verify unix domain socket upstream will never work", func(t *testing.T) {
if runtime.GOOS == "windows" {
t.SkipNow()
}
// Ensure the socket was not created
require.NoFileExists(t, unixSocket)
})
} }