Fix racy connect network tests that always fail in Docker due to listen races

This commit is contained in:
Paul Banks 2018-04-20 22:26:00 +01:00 committed by Mitchell Hashimoto
parent 5abd43a567
commit 93ff59a132
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
4 changed files with 69 additions and 22 deletions

View File

@ -25,6 +25,15 @@ type Listener struct {
stopFlag int32
stopChan chan struct{}
// listeningChan is closed when listener is opened successfully. It's really
// only for use in tests where we need to coordinate wait for the Serve
// goroutine to be running before we proceed trying to connect. On my laptop
// this always works out anyway but on constrained VMs and especially docker
// containers (e.g. in CI) we often see the Dial routine win the race and get
// `connection refused`. Retry loops and sleeps are unpleasant workarounds and
// this is cheap and correct.
listeningChan chan struct{}
logger *log.Logger
}
@ -41,8 +50,9 @@ func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig,
return net.DialTimeout("tcp", cfg.LocalServiceAddress,
time.Duration(cfg.LocalConnectTimeoutMs)*time.Millisecond)
},
stopChan: make(chan struct{}),
logger: logger,
stopChan: make(chan struct{}),
listeningChan: make(chan struct{}),
logger: logger,
}
}
@ -64,17 +74,27 @@ func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
defer cancel()
return svc.Dial(ctx, cfg.resolver)
},
stopChan: make(chan struct{}),
logger: logger,
stopChan: make(chan struct{}),
listeningChan: make(chan struct{}),
logger: logger,
}
}
// Serve runs the listener until it is stopped.
// Serve runs the listener until it is stopped. It is an error to call Serve
// more than once for any given Listener instance.
func (l *Listener) Serve() error {
// Ensure we mark state closed if we fail before Close is called externally.
defer l.Close()
if atomic.LoadInt32(&l.stopFlag) != 0 {
return errors.New("serve called on a closed listener")
}
listen, err := l.listenFunc()
if err != nil {
return err
}
close(l.listeningChan)
for {
conn, err := listen.Accept()
@ -113,3 +133,8 @@ func (l *Listener) handleConn(src net.Conn) {
func (l *Listener) Close() error {
return nil
}
// Wait for the listener to be ready to accept connections.
func (l *Listener) Wait() {
<-l.listeningChan
}

View File

@ -24,7 +24,7 @@ func TestPublicListener(t *testing.T) {
}
testApp, err := NewTestTCPServer(t, cfg.LocalServiceAddress)
require.Nil(t, err)
require.NoError(t, err)
defer testApp.Close()
svc := connect.TestService(t, "db", ca)
@ -34,9 +34,10 @@ func TestPublicListener(t *testing.T) {
// Run proxy
go func() {
err := l.Serve()
require.Nil(t, err)
require.NoError(t, err)
}()
defer l.Close()
l.Wait()
// Proxy and backend are running, play the part of a TLS client using same
// cert for now.
@ -44,7 +45,7 @@ func TestPublicListener(t *testing.T) {
Addr: addrs[0],
CertURI: agConnect.TestSpiffeIDService(t, "db"),
})
require.Nilf(t, err, "unexpected err: %s", err)
require.NoError(t, err)
TestEchoConn(t, conn, "")
}
@ -56,9 +57,10 @@ func TestUpstreamListener(t *testing.T) {
testSvr := connect.NewTestServer(t, "db", ca)
go func() {
err := testSvr.Serve()
require.Nil(t, err)
require.NoError(t, err)
}()
defer testSvr.Close()
<-testSvr.Listening
cfg := UpstreamConfig{
DestinationType: "service",
@ -79,13 +81,14 @@ func TestUpstreamListener(t *testing.T) {
// Run proxy
go func() {
err := l.Serve()
require.Nil(t, err)
require.NoError(t, err)
}()
defer l.Close()
l.Wait()
// Proxy and fake remote service are running, play the part of the app
// connecting to a remote connect service over TCP.
conn, err := net.Dial("tcp", cfg.LocalBindAddress)
require.Nilf(t, err, "unexpected err: %s", err)
require.NoError(t, err)
TestEchoConn(t, conn, "")
}

View File

@ -73,9 +73,10 @@ func TestService_Dial(t *testing.T) {
if tt.accept {
go func() {
err := testSvr.Serve()
require.Nil(err)
require.NoError(err)
}()
defer testSvr.Close()
<-testSvr.Listening
}
// Always expect to be connecting to a "DB"
@ -95,10 +96,10 @@ func TestService_Dial(t *testing.T) {
testTimer.Stop()
if tt.wantErr == "" {
require.Nil(err)
require.NoError(err)
require.IsType(&tls.Conn{}, conn)
} else {
require.NotNil(err)
require.Error(err)
require.Contains(err.Error(), tt.wantErr)
}
@ -117,7 +118,6 @@ func TestService_ServerTLSConfig(t *testing.T) {
}
func TestService_HTTPClient(t *testing.T) {
require := require.New(t)
ca := connect.TestCA(t, nil)
s := TestService(t, "web", ca)
@ -129,8 +129,9 @@ func TestService_HTTPClient(t *testing.T) {
err := testSvr.ServeHTTPS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello, I am Backend"))
}))
require.Nil(t, err)
require.NoError(t, err)
}()
<-testSvr.Listening
// TODO(banks): this will talk http2 on both client and server. I hit some
// compatibility issues when testing though need to make sure that the http

View File

@ -105,6 +105,8 @@ type TestServer struct {
// Addr is the listen address. It is set to a random free port on `localhost`
// by default.
Addr string
// Listening is closed when the listener is run.
Listening chan struct{}
l net.Listener
stopFlag int32
@ -116,11 +118,12 @@ type TestServer struct {
func NewTestServer(t testing.T, service string, ca *structs.CARoot) *TestServer {
ports := freeport.GetT(t, 1)
return &TestServer{
Service: service,
CA: ca,
stopChan: make(chan struct{}),
TLSCfg: TestTLSConfig(t, service, ca),
Addr: fmt.Sprintf("localhost:%d", ports[0]),
Service: service,
CA: ca,
stopChan: make(chan struct{}),
TLSCfg: TestTLSConfig(t, service, ca),
Addr: fmt.Sprintf("127.0.0.1:%d", ports[0]),
Listening: make(chan struct{}),
}
}
@ -132,6 +135,7 @@ func (s *TestServer) Serve() error {
if err != nil {
return err
}
close(s.Listening)
s.l = l
log.Printf("test connect service listening on %s", s.Addr)
@ -172,7 +176,21 @@ func (s *TestServer) ServeHTTPS(h http.Handler) error {
Handler: h,
}
log.Printf("starting test connect HTTPS server on %s", s.Addr)
return srv.ListenAndServeTLS("", "")
// Use our own listener so we can signal when it's ready.
l, err := net.Listen("tcp", s.Addr)
if err != nil {
return err
}
close(s.Listening)
s.l = l
log.Printf("test connect service listening on %s", s.Addr)
err = srv.ServeTLS(l, "", "")
if atomic.LoadInt32(&s.stopFlag) == 1 {
return nil
}
return err
}
// Close stops a TestServer