Address review comments
This commit is contained in:
parent
78cd7a9087
commit
bb8a21c385
|
@ -148,7 +148,7 @@ func TestACL_Special_IDs(t *testing.T) {
|
|||
}
|
||||
|
||||
// The ACL master token should also not call the server, but should give
|
||||
// us a working a token.
|
||||
// us a working agent token.
|
||||
acl, err := a.resolveToken("towel")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
|
|
@ -78,9 +78,6 @@ type clientServer interface {
|
|||
// mode, it runs a full Consul server. In client-only mode, it only forwards
|
||||
// requests to other Consul servers.
|
||||
type Agent struct {
|
||||
// id is an optional log prefix.
|
||||
id string
|
||||
|
||||
// config is the agent configuration.
|
||||
config *Config
|
||||
|
||||
|
@ -217,7 +214,7 @@ func (a *Agent) Start() error {
|
|||
if logOutput == nil {
|
||||
logOutput = os.Stderr
|
||||
}
|
||||
a.logger = log.New(logOutput, a.id, log.LstdFlags)
|
||||
a.logger = log.New(logOutput, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Retrieve or generate the node ID before setting up the rest of the
|
||||
|
@ -334,8 +331,6 @@ func (a *Agent) listenAndServeDNS() error {
|
|||
}
|
||||
|
||||
// wait for servers to be up
|
||||
// todo(fs): not sure whether this is the right approach.
|
||||
// todo(fs): maybe a failing server should trigger an agent shutdown.
|
||||
timeout := time.After(time.Second)
|
||||
for range a.dnsAddrs {
|
||||
select {
|
||||
|
@ -376,14 +371,23 @@ func (a *Agent) listenHTTP(addrs []ProtoAddr) ([]net.Listener, error) {
|
|||
|
||||
case p.Net == "tcp" && p.Proto == "http":
|
||||
l, err = net.Listen("tcp", p.Addr)
|
||||
if err != nil {
|
||||
l = &tcpKeepAliveListener{l.(*net.TCPListener)}
|
||||
}
|
||||
|
||||
case p.Net == "tcp" && p.Proto == "https":
|
||||
var tlscfg *tls.Config
|
||||
tlscfg, err = a.config.IncomingTLSConfig()
|
||||
tlscfg, err = a.config.IncomingHTTPSConfig()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
l, err = tls.Listen("tcp", p.Addr, tlscfg)
|
||||
if err != nil {
|
||||
l = &tcpKeepAliveListener{l.(*net.TCPListener)}
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("%s:%s listener not supported", p.Net, p.Proto)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -398,6 +402,23 @@ func (a *Agent) listenHTTP(addrs []ProtoAddr) ([]net.Listener, error) {
|
|||
return ln, nil
|
||||
}
|
||||
|
||||
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
|
||||
// connections. It's used by NewHttpServer so dead TCP connections
|
||||
// eventually go away.
|
||||
type tcpKeepAliveListener struct {
|
||||
*net.TCPListener
|
||||
}
|
||||
|
||||
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
|
||||
tc, err := ln.AcceptTCP()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tc.SetKeepAlive(true)
|
||||
tc.SetKeepAlivePeriod(30 * time.Second)
|
||||
return tc, nil
|
||||
}
|
||||
|
||||
func (a *Agent) listenSocket(path string, perm FilePermissions) (net.Listener, error) {
|
||||
if _, err := os.Stat(path); !os.IsNotExist(err) {
|
||||
a.logger.Printf("[WARN] agent: Replacing socket %q", path)
|
||||
|
@ -422,6 +443,11 @@ func (a *Agent) serveHTTP(l net.Listener, srv *HTTPServer) error {
|
|||
// Shutdown is called before the Serve go routine was scheduled then
|
||||
// the Serve go routine never returns. This deadlocks the agent
|
||||
// shutdown for some tests since it will wait forever.
|
||||
//
|
||||
// Since we need to check for an unexported type (*tls.listener)
|
||||
// we cannot just perform a type check since the compiler won't let
|
||||
// us. We might be able to use reflection but the fmt.Sprintf() hack
|
||||
// works just as well.
|
||||
if strings.Contains("*tls.listener", fmt.Sprintf("%T", l)) {
|
||||
srv.proto = "https"
|
||||
}
|
||||
|
@ -1038,11 +1064,8 @@ func (a *Agent) Shutdown() error {
|
|||
// http server is HTTPS if TLSConfig is not nil and NextProtos does not only contain "h2"
|
||||
// the latter seems to be a side effect of HTTP/2 support in go 1.8. TLSConfig != nil is
|
||||
// no longer sufficient to check for an HTTPS server.
|
||||
if srv.proto == "https" {
|
||||
a.logger.Println("[INFO] agent: Stopping HTTPS server", srv.Addr)
|
||||
} else {
|
||||
a.logger.Println("[INFO] agent: Stopping HTTP server", srv.Addr)
|
||||
}
|
||||
a.logger.Printf("[INFO] agent: Stopping %s server %s",
|
||||
strings.ToUpper(srv.proto), srv.Addr)
|
||||
|
||||
// old behavior: just die
|
||||
// srv.Close()
|
||||
|
@ -1050,7 +1073,19 @@ func (a *Agent) Shutdown() error {
|
|||
// graceful shutdown
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
srv.Shutdown(ctx)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
srv.Shutdown(ctx)
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
// server down within timeout
|
||||
case <-ctx.Done():
|
||||
a.logger.Printf("[WARN] agent: Timeout stopping %s server %s",
|
||||
strings.ToUpper(srv.proto), srv.Addr)
|
||||
}
|
||||
}
|
||||
a.logger.Println("[INFO] agent: Waiting for endpoints to shut down")
|
||||
a.wgServers.Wait()
|
||||
|
|
|
@ -761,9 +761,9 @@ type Config struct {
|
|||
DeprecatedAtlasEndpoint string `mapstructure:"atlas_endpoint" json:"-"`
|
||||
}
|
||||
|
||||
// IncomingTLSConfig returns the TLS configuration for TLS
|
||||
// IncomingHTTPSConfig returns the TLS configuration for HTTPS
|
||||
// connections to consul.
|
||||
func (c *Config) IncomingTLSConfig() (*tls.Config, error) {
|
||||
func (c *Config) IncomingHTTPSConfig() (*tls.Config, error) {
|
||||
tc := &tlsutil.Config{
|
||||
VerifyIncoming: c.VerifyIncoming || c.VerifyIncomingHTTPS,
|
||||
VerifyOutgoing: c.VerifyOutgoing,
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
@ -147,9 +148,13 @@ func (a *TestAgent) Start() *TestAgent {
|
|||
panic(fmt.Sprintf("Error creating agent: %s", err))
|
||||
}
|
||||
|
||||
agent.id = id
|
||||
agent.LogOutput = a.LogOutput
|
||||
logOutput := a.LogOutput
|
||||
if logOutput == nil {
|
||||
logOutput = os.Stderr
|
||||
}
|
||||
agent.LogOutput = logOutput
|
||||
agent.LogWriter = a.LogWriter
|
||||
agent.logger = log.New(logOutput, id, log.LstdFlags)
|
||||
|
||||
// we need the err var in the next exit condition
|
||||
if err := agent.Start(); err == nil {
|
||||
|
|
|
@ -31,7 +31,7 @@ func TestExecCommand_implements(t *testing.T) {
|
|||
func TestExecCommandRun(t *testing.T) {
|
||||
t.Parallel()
|
||||
cfg := agent.TestConfig()
|
||||
cfg.DisableRemoteExec = agent.Bool(false)
|
||||
cfg.DisableRemoteExec = &agent.BoolFalse
|
||||
a := agent.NewTestAgent(t.Name(), cfg)
|
||||
defer a.Shutdown()
|
||||
|
||||
|
|
Loading…
Reference in New Issue