diff --git a/connect/proxy/listener.go b/connect/proxy/listener.go index 51ab761ca..c8e70ac31 100644 --- a/connect/proxy/listener.go +++ b/connect/proxy/listener.go @@ -25,6 +25,15 @@ type Listener struct { stopFlag int32 stopChan chan struct{} + // listeningChan is closed when listener is opened successfully. It's really + // only for use in tests where we need to coordinate wait for the Serve + // goroutine to be running before we proceed trying to connect. On my laptop + // this always works out anyway but on constrained VMs and especially docker + // containers (e.g. in CI) we often see the Dial routine win the race and get + // `connection refused`. Retry loops and sleeps are unpleasant workarounds and + // this is cheap and correct. + listeningChan chan struct{} + logger *log.Logger } @@ -41,8 +50,9 @@ func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig, return net.DialTimeout("tcp", cfg.LocalServiceAddress, time.Duration(cfg.LocalConnectTimeoutMs)*time.Millisecond) }, - stopChan: make(chan struct{}), - logger: logger, + stopChan: make(chan struct{}), + listeningChan: make(chan struct{}), + logger: logger, } } @@ -64,17 +74,27 @@ func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig, defer cancel() return svc.Dial(ctx, cfg.resolver) }, - stopChan: make(chan struct{}), - logger: logger, + stopChan: make(chan struct{}), + listeningChan: make(chan struct{}), + logger: logger, } } -// Serve runs the listener until it is stopped. +// Serve runs the listener until it is stopped. It is an error to call Serve +// more than once for any given Listener instance. func (l *Listener) Serve() error { + // Ensure we mark state closed if we fail before Close is called externally. + defer l.Close() + + if atomic.LoadInt32(&l.stopFlag) != 0 { + return errors.New("serve called on a closed listener") + } + listen, err := l.listenFunc() if err != nil { return err } + close(l.listeningChan) for { conn, err := listen.Accept() @@ -113,3 +133,8 @@ func (l *Listener) handleConn(src net.Conn) { func (l *Listener) Close() error { return nil } + +// Wait for the listener to be ready to accept connections. +func (l *Listener) Wait() { + <-l.listeningChan +} diff --git a/connect/proxy/listener_test.go b/connect/proxy/listener_test.go index ce41c81e5..8354fbe58 100644 --- a/connect/proxy/listener_test.go +++ b/connect/proxy/listener_test.go @@ -24,7 +24,7 @@ func TestPublicListener(t *testing.T) { } testApp, err := NewTestTCPServer(t, cfg.LocalServiceAddress) - require.Nil(t, err) + require.NoError(t, err) defer testApp.Close() svc := connect.TestService(t, "db", ca) @@ -34,9 +34,10 @@ func TestPublicListener(t *testing.T) { // Run proxy go func() { err := l.Serve() - require.Nil(t, err) + require.NoError(t, err) }() defer l.Close() + l.Wait() // Proxy and backend are running, play the part of a TLS client using same // cert for now. @@ -44,7 +45,7 @@ func TestPublicListener(t *testing.T) { Addr: addrs[0], CertURI: agConnect.TestSpiffeIDService(t, "db"), }) - require.Nilf(t, err, "unexpected err: %s", err) + require.NoError(t, err) TestEchoConn(t, conn, "") } @@ -56,9 +57,10 @@ func TestUpstreamListener(t *testing.T) { testSvr := connect.NewTestServer(t, "db", ca) go func() { err := testSvr.Serve() - require.Nil(t, err) + require.NoError(t, err) }() defer testSvr.Close() + <-testSvr.Listening cfg := UpstreamConfig{ DestinationType: "service", @@ -79,13 +81,14 @@ func TestUpstreamListener(t *testing.T) { // Run proxy go func() { err := l.Serve() - require.Nil(t, err) + require.NoError(t, err) }() defer l.Close() + l.Wait() // Proxy and fake remote service are running, play the part of the app // connecting to a remote connect service over TCP. conn, err := net.Dial("tcp", cfg.LocalBindAddress) - require.Nilf(t, err, "unexpected err: %s", err) + require.NoError(t, err) TestEchoConn(t, conn, "") } diff --git a/connect/service_test.go b/connect/service_test.go index 7bc4c97f2..20433d1f5 100644 --- a/connect/service_test.go +++ b/connect/service_test.go @@ -73,9 +73,10 @@ func TestService_Dial(t *testing.T) { if tt.accept { go func() { err := testSvr.Serve() - require.Nil(err) + require.NoError(err) }() defer testSvr.Close() + <-testSvr.Listening } // Always expect to be connecting to a "DB" @@ -95,10 +96,10 @@ func TestService_Dial(t *testing.T) { testTimer.Stop() if tt.wantErr == "" { - require.Nil(err) + require.NoError(err) require.IsType(&tls.Conn{}, conn) } else { - require.NotNil(err) + require.Error(err) require.Contains(err.Error(), tt.wantErr) } @@ -117,7 +118,6 @@ func TestService_ServerTLSConfig(t *testing.T) { } func TestService_HTTPClient(t *testing.T) { - require := require.New(t) ca := connect.TestCA(t, nil) s := TestService(t, "web", ca) @@ -129,8 +129,9 @@ func TestService_HTTPClient(t *testing.T) { err := testSvr.ServeHTTPS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Hello, I am Backend")) })) - require.Nil(t, err) + require.NoError(t, err) }() + <-testSvr.Listening // TODO(banks): this will talk http2 on both client and server. I hit some // compatibility issues when testing though need to make sure that the http diff --git a/connect/testing.go b/connect/testing.go index 9f6e4f781..f9a6a4850 100644 --- a/connect/testing.go +++ b/connect/testing.go @@ -105,6 +105,8 @@ type TestServer struct { // Addr is the listen address. It is set to a random free port on `localhost` // by default. Addr string + // Listening is closed when the listener is run. + Listening chan struct{} l net.Listener stopFlag int32 @@ -116,11 +118,12 @@ type TestServer struct { func NewTestServer(t testing.T, service string, ca *structs.CARoot) *TestServer { ports := freeport.GetT(t, 1) return &TestServer{ - Service: service, - CA: ca, - stopChan: make(chan struct{}), - TLSCfg: TestTLSConfig(t, service, ca), - Addr: fmt.Sprintf("localhost:%d", ports[0]), + Service: service, + CA: ca, + stopChan: make(chan struct{}), + TLSCfg: TestTLSConfig(t, service, ca), + Addr: fmt.Sprintf("127.0.0.1:%d", ports[0]), + Listening: make(chan struct{}), } } @@ -132,6 +135,7 @@ func (s *TestServer) Serve() error { if err != nil { return err } + close(s.Listening) s.l = l log.Printf("test connect service listening on %s", s.Addr) @@ -172,7 +176,21 @@ func (s *TestServer) ServeHTTPS(h http.Handler) error { Handler: h, } log.Printf("starting test connect HTTPS server on %s", s.Addr) - return srv.ListenAndServeTLS("", "") + + // Use our own listener so we can signal when it's ready. + l, err := net.Listen("tcp", s.Addr) + if err != nil { + return err + } + close(s.Listening) + s.l = l + log.Printf("test connect service listening on %s", s.Addr) + + err = srv.ServeTLS(l, "", "") + if atomic.LoadInt32(&s.stopFlag) == 1 { + return nil + } + return err } // Close stops a TestServer