agent: move http/dns endpoints into agent

Move the HTTP and DNS endpoints into the agent and control
their lifespan via the agent.

This removes the requirement to manage HTTP and DNS servers
indpendent of the agent since the agent is mostly useless
without an endpoint and the endpoints without the agent.
This commit is contained in:
Frank Schroeder 2017-05-19 11:53:41 +02:00
parent b2670b2d59
commit 8d9f5b9a64
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
17 changed files with 543 additions and 590 deletions

View File

@ -1,6 +1,7 @@
package agent package agent
import ( import (
"context"
"crypto/sha512" "crypto/sha512"
"encoding/json" "encoding/json"
"errors" "errors"
@ -9,6 +10,7 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
@ -141,27 +143,62 @@ type Agent struct {
// agent methods use this, so use with care and never override // agent methods use this, so use with care and never override
// outside of a unit test. // outside of a unit test.
endpoints map[string]string endpoints map[string]string
// dnsAddr is the address the DNS server binds to
dnsAddr net.Addr
// dnsServer provides the DNS API
dnsServers []*DNSServer
// httpAddrs are the addresses per protocol the HTTP server binds to
httpAddrs map[string][]net.Addr
// httpServers provides the HTTP API on various endpoints
httpServers []*HTTPServer
// wgServers is the wait group for all HTTP and DNS servers
wgServers sync.WaitGroup
} }
// Create is used to create a new Agent. Returns // Create is used to create a new Agent. Returns
// the agent or potentially an error. // the agent or potentially an error.
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { func Create(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) {
// Ensure we have a log sink a, err := NewAgent(c, logOutput, logWriter, reloadCh)
if err != nil {
return nil, err
}
if err := a.Start(); err != nil {
return nil, err
}
return a, nil
}
func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) {
if logOutput == nil { if logOutput == nil {
logOutput = os.Stderr logOutput = os.Stderr
} }
if c.Datacenter == "" {
// Validate the config
if config.Datacenter == "" {
return nil, fmt.Errorf("Must configure a Datacenter") return nil, fmt.Errorf("Must configure a Datacenter")
} }
if config.DataDir == "" && !config.DevMode { if c.DataDir == "" && !c.DevMode {
return nil, fmt.Errorf("Must configure a DataDir") return nil, fmt.Errorf("Must configure a DataDir")
} }
dnsAddr, err := c.ClientListener(c.Addresses.DNS, c.Ports.DNS)
if err != nil {
return nil, fmt.Errorf("Invalid DNS bind address: %s", err)
}
httpAddrs, err := c.HTTPAddrs()
if err != nil {
return nil, fmt.Errorf("Invalid HTTP bind address: %s", err)
}
acls, err := newACLManager(c)
if err != nil {
return nil, err
}
agent := &Agent{ a := &Agent{
config: config, config: c,
logger: log.New(logOutput, "", log.LstdFlags), acls: acls,
logOutput: logOutput, logOutput: logOutput,
logWriter: logWriter, logWriter: logWriter,
checkReapAfter: make(map[types.CheckID]time.Duration), checkReapAfter: make(map[types.CheckID]time.Duration),
@ -175,79 +212,200 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
reloadCh: reloadCh, reloadCh: reloadCh,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
endpoints: make(map[string]string), endpoints: make(map[string]string),
dnsAddr: dnsAddr,
httpAddrs: httpAddrs,
} }
if err := agent.resolveTmplAddrs(); err != nil { if err := a.resolveTmplAddrs(); err != nil {
return nil, err return nil, err
} }
return a, nil
}
// Initialize the ACL manager. func (a *Agent) Start() error {
acls, err := newACLManager(config) c := a.config
if err != nil {
return nil, err a.logger = log.New(a.logOutput, "", log.LstdFlags)
}
agent.acls = acls
// Retrieve or generate the node ID before setting up the rest of the // Retrieve or generate the node ID before setting up the rest of the
// agent, which depends on it. // agent, which depends on it.
if err := agent.setupNodeID(config); err != nil { if err := a.setupNodeID(c); err != nil {
return nil, fmt.Errorf("Failed to setup node ID: %v", err) return fmt.Errorf("Failed to setup node ID: %v", err)
} }
// Initialize the local state. // Initialize the local state.
agent.state.Init(config, agent.logger) a.state.Init(c, a.logger)
// Setup either the client or the server. // Setup either the client or the server.
if config.Server { if c.Server {
err = agent.setupServer() server, err := a.makeServer()
agent.state.SetIface(agent.delegate) if err != nil {
return err
}
a.delegate = server
a.state.SetIface(server)
// Automatically register the "consul" service on server nodes // Automatically register the "consul" service on server nodes
consulService := structs.NodeService{ consulService := structs.NodeService{
Service: consul.ConsulServiceName, Service: consul.ConsulServiceName,
ID: consul.ConsulServiceID, ID: consul.ConsulServiceID,
Port: agent.config.Ports.Server, Port: c.Ports.Server,
Tags: []string{}, Tags: []string{},
} }
agent.state.AddService(&consulService, agent.config.GetTokenForAgent()) a.state.AddService(&consulService, c.GetTokenForAgent())
} else { } else {
err = agent.setupClient() client, err := a.makeClient()
agent.state.SetIface(agent.delegate)
}
if err != nil { if err != nil {
return nil, err return err
}
a.delegate = client
a.state.SetIface(client)
} }
// Load checks/services/metadata. // Load checks/services/metadata.
if err := agent.loadServices(config); err != nil { if err := a.loadServices(c); err != nil {
return nil, err return err
} }
if err := agent.loadChecks(config); err != nil { if err := a.loadChecks(c); err != nil {
return nil, err return err
} }
if err := agent.loadMetadata(config); err != nil { if err := a.loadMetadata(c); err != nil {
return nil, err return err
} }
// Start watching for critical services to deregister, based on their // Start watching for critical services to deregister, based on their
// checks. // checks.
go agent.reapServices() go a.reapServices()
// Start handling events. // Start handling events.
go agent.handleEvents() go a.handleEvents()
// Start sending network coordinate to the server. // Start sending network coordinate to the server.
if !config.DisableCoordinates { if !c.DisableCoordinates {
go agent.sendCoordinate() go a.sendCoordinate()
} }
// Write out the PID file if necessary. // Write out the PID file if necessary.
err = agent.storePid() if err := a.storePid(); err != nil {
if err != nil { return err
return nil, err
} }
return agent, nil // start dns server
if c.Ports.DNS > 0 {
srv, err := NewDNSServer(a, &c.DNSConfig, a.logOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors)
if err != nil {
return fmt.Errorf("error starting DNS server: %s", err)
}
a.dnsServers = []*DNSServer{srv}
}
// start HTTP servers
return a.startHTTP(a.httpAddrs)
}
func (a *Agent) startHTTP(httpAddrs map[string][]net.Addr) error {
// ln contains the list of pending listeners until the
// actual server is created and the listeners are used.
var ln []net.Listener
// cleanup the listeners on error. ln should be empty on success.
defer func() {
for _, l := range ln {
l.Close()
}
}()
// bind to the listeners for all addresses and protocols
// before we start the servers so that we can fail early
// if we can't bind to one of the addresses.
for proto, addrs := range httpAddrs {
for _, addr := range addrs {
switch addr.(type) {
case *net.UnixAddr:
switch proto {
case "http":
if _, err := os.Stat(addr.String()); !os.IsNotExist(err) {
a.logger.Printf("[WARN] agent: Replacing socket %q", addr.String())
}
l, err := ListenUnix(addr.String(), a.config.UnixSockets)
if err != nil {
return err
}
ln = append(ln, l)
default:
return fmt.Errorf("invalid protocol: %q", proto)
}
case *net.TCPAddr:
switch proto {
case "http":
l, err := ListenTCP(addr.String())
if err != nil {
return err
}
ln = append(ln, l)
case "https":
tlscfg, err := a.config.IncomingTLSConfig()
if err != nil {
return fmt.Errorf("invalid TLS configuration: %s", err)
}
l, err := ListenTLS(addr.String(), tlscfg)
if err != nil {
return err
}
ln = append(ln, l)
default:
return fmt.Errorf("invalid protocol: %q", proto)
}
default:
return fmt.Errorf("invalid address type: %T", addr)
}
}
}
// https://github.com/golang/go/issues/20239
//
// In go1.8.1 there is a race between Serve and Shutdown. If
// Shutdown is called before the Serve go routine was scheduled then
// the Serve go routine never returns. This deadlocks the agent
// shutdown for some tests since it will wait forever.
//
// We solve this with another WaitGroup which checks that the Serve
// go routine was called and after that it should be safe to call
// Shutdown on that server.
var up sync.WaitGroup
for _, l := range ln {
l := l // capture loop var
// create a server per listener instead of a single
// server with multiple listeners to take advantage
// of the Addr field for logging. Since the server
// does not keep state and they all share the same
// agent there is no overhead.
addr := l.Addr().String()
srv := NewHTTPServer(addr, a)
a.httpServers = append(a.httpServers, srv)
up.Add(1)
a.wgServers.Add(1)
go func() {
defer a.wgServers.Done()
up.Done()
a.logger.Printf("[INFO] agent: Starting HTTP server on %s", addr)
if err := srv.Serve(l); err != nil && err != http.ErrServerClosed {
a.logger.Print(err)
}
}()
}
up.Wait()
ln = nil
return nil
} }
// consulConfig is used to return a consul configuration // consulConfig is used to return a consul configuration
@ -612,38 +770,36 @@ func (a *Agent) resolveTmplAddrs() error {
return nil return nil
} }
// setupServer is used to initialize the Consul server // makeServer creates a new consul server.
func (a *Agent) setupServer() error { func (a *Agent) makeServer() (*consul.Server, error) {
config, err := a.consulConfig() config, err := a.consulConfig()
if err != nil { if err != nil {
return err return nil, err
} }
if err := a.setupKeyrings(config); err != nil { if err := a.setupKeyrings(config); err != nil {
return fmt.Errorf("Failed to configure keyring: %v", err) return nil, fmt.Errorf("Failed to configure keyring: %v", err)
} }
server, err := consul.NewServer(config) server, err := consul.NewServer(config)
if err != nil { if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err) return nil, fmt.Errorf("Failed to start Consul server: %v", err)
} }
a.delegate = server return server, nil
return nil
} }
// setupClient is used to initialize the Consul client // makeClient creates a new consul client.
func (a *Agent) setupClient() error { func (a *Agent) makeClient() (*consul.Client, error) {
config, err := a.consulConfig() config, err := a.consulConfig()
if err != nil { if err != nil {
return err return nil, err
} }
if err := a.setupKeyrings(config); err != nil { if err := a.setupKeyrings(config); err != nil {
return fmt.Errorf("Failed to configure keyring: %v", err) return nil, fmt.Errorf("Failed to configure keyring: %v", err)
} }
client, err := consul.NewClient(config) client, err := consul.NewClient(config)
if err != nil { if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err) return nil, fmt.Errorf("Failed to start Consul client: %v", err)
} }
a.delegate = client return client, nil
return nil
} }
// makeRandomID will generate a random UUID for a node. // makeRandomID will generate a random UUID for a node.
@ -830,6 +986,27 @@ func (a *Agent) Shutdown() error {
if a.shutdown { if a.shutdown {
return nil return nil
} }
a.logger.Println("[INFO] agent: Requesting shutdown")
// Stop all API endpoints
a.logger.Println("[INFO] agent: Stopping DNS endpoints")
for _, srv := range a.dnsServers {
srv.Shutdown()
}
for _, srv := range a.httpServers {
a.logger.Println("[INFO] agent: Stopping HTTP endpoint", srv.Addr)
// old behavior: just die
// srv.Close()
// graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
srv.Shutdown(ctx)
}
a.logger.Println("[INFO] agent: Waiting for endpoints to shut down")
a.wgServers.Wait()
a.logger.Print("[INFO] agent: Endpoints down")
// Stop all the checks // Stop all the checks
a.checkLock.Lock() a.checkLock.Lock()
@ -849,8 +1026,8 @@ func (a *Agent) Shutdown() error {
chk.Stop() chk.Stop()
} }
a.logger.Println("[INFO] agent: requesting shutdown")
err := a.delegate.Shutdown() err := a.delegate.Shutdown()
a.logger.Print("[INFO] agent: delegate down")
pidErr := a.deletePid() pidErr := a.deletePid()
if pidErr != nil { if pidErr != nil {

View File

@ -223,7 +223,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request
// only warn because the write did succeed and anti-entropy will sync later. // only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPServer) syncChanges() { func (s *HTTPServer) syncChanges() {
if err := s.agent.state.syncChanges(); err != nil { if err := s.agent.state.syncChanges(); err != nil {
s.logger.Printf("[ERR] agent: failed to sync changes: %v", err) s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
} }
} }
@ -654,7 +654,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
handler := &httpLogHandler{ handler := &httpLogHandler{
filter: filter, filter: filter,
logCh: make(chan string, 512), logCh: make(chan string, 512),
logger: s.logger, logger: s.agent.logger,
} }
s.agent.logWriter.RegisterHandler(handler) s.agent.logWriter.RegisterHandler(handler)
defer s.agent.logWriter.DeregisterHandler(handler) defer s.agent.logWriter.DeregisterHandler(handler)

View File

@ -43,7 +43,6 @@ func makeReadOnlyAgentACL(t *testing.T, srv *HTTPServer) string {
func TestAgent_Services(t *testing.T) { func TestAgent_Services(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
srv1 := &structs.NodeService{ srv1 := &structs.NodeService{
@ -71,7 +70,6 @@ func TestAgent_Services(t *testing.T) {
func TestAgent_Services_ACLFilter(t *testing.T) { func TestAgent_Services_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
@ -102,7 +100,6 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
func TestAgent_Checks(t *testing.T) { func TestAgent_Checks(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk1 := &structs.HealthCheck{ chk1 := &structs.HealthCheck{
@ -130,7 +127,6 @@ func TestAgent_Checks(t *testing.T) {
func TestAgent_Checks_ACLFilter(t *testing.T) { func TestAgent_Checks_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk1 := &structs.HealthCheck{ chk1 := &structs.HealthCheck{
@ -174,7 +170,6 @@ func TestAgent_Self(t *testing.T) {
conf.Meta = meta conf.Meta = meta
}) })
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
req, _ := http.NewRequest("GET", "/v1/agent/self", nil) req, _ := http.NewRequest("GET", "/v1/agent/self", nil)
@ -216,7 +211,6 @@ func TestAgent_Self(t *testing.T) {
func TestAgent_Self_ACLDeny(t *testing.T) { func TestAgent_Self_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
@ -285,7 +279,10 @@ func TestAgent_Reload(t *testing.T) {
}() }()
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
if got, want := len(cmd.httpServers), 1; got != want { if cmd == nil || cmd.agent == nil {
r.Fatal("waiting for agent")
}
if got, want := len(cmd.agent.httpServers), 1; got != want {
r.Fatalf("got %d servers want %d", got, want) r.Fatalf("got %d servers want %d", got, want)
} }
}) })
@ -299,7 +296,7 @@ func TestAgent_Reload(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
srv := cmd.httpServers[0] srv := cmd.agent.httpServers[0]
req, _ := http.NewRequest("PUT", "/v1/agent/reload", nil) req, _ := http.NewRequest("PUT", "/v1/agent/reload", nil)
if _, err := srv.AgentReload(nil, req); err != nil { if _, err := srv.AgentReload(nil, req); err != nil {
t.Fatalf("Err: %v", err) t.Fatalf("Err: %v", err)
@ -313,7 +310,6 @@ func TestAgent_Reload(t *testing.T) {
func TestAgent_Reload_ACLDeny(t *testing.T) { func TestAgent_Reload_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
@ -340,7 +336,6 @@ func TestAgent_Reload_ACLDeny(t *testing.T) {
func TestAgent_Members(t *testing.T) { func TestAgent_Members(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
req, _ := http.NewRequest("GET", "/v1/agent/members", nil) req, _ := http.NewRequest("GET", "/v1/agent/members", nil)
@ -361,7 +356,6 @@ func TestAgent_Members(t *testing.T) {
func TestAgent_Members_WAN(t *testing.T) { func TestAgent_Members_WAN(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
req, _ := http.NewRequest("GET", "/v1/agent/members?wan=true", nil) req, _ := http.NewRequest("GET", "/v1/agent/members?wan=true", nil)
@ -382,7 +376,6 @@ func TestAgent_Members_WAN(t *testing.T) {
func TestAgent_Members_ACLFilter(t *testing.T) { func TestAgent_Members_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
@ -413,7 +406,6 @@ func TestAgent_Members_ACLFilter(t *testing.T) {
func TestAgent_Join(t *testing.T) { func TestAgent_Join(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig()) dir2, a2 := makeAgent(t, nextConfig())
@ -444,7 +436,6 @@ func TestAgent_Join(t *testing.T) {
func TestAgent_Join_WAN(t *testing.T) { func TestAgent_Join_WAN(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig()) dir2, a2 := makeAgent(t, nextConfig())
@ -475,7 +466,6 @@ func TestAgent_Join_WAN(t *testing.T) {
func TestAgent_Join_ACLDeny(t *testing.T) { func TestAgent_Join_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig()) dir2, a2 := makeAgent(t, nextConfig())
@ -510,7 +500,6 @@ func TestAgent_Join_ACLDeny(t *testing.T) {
func TestAgent_Leave(t *testing.T) { func TestAgent_Leave(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) { dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) {
@ -518,7 +507,7 @@ func TestAgent_Leave(t *testing.T) {
c.Bootstrap = false c.Bootstrap = false
}) })
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer srv2.Shutdown() defer srv2.agent.Shutdown()
// Join first // Join first
addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan) addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan)
@ -547,7 +536,6 @@ func TestAgent_Leave(t *testing.T) {
func TestAgent_Leave_ACLDeny(t *testing.T) { func TestAgent_Leave_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
@ -578,7 +566,6 @@ func TestAgent_Leave_ACLDeny(t *testing.T) {
func TestAgent_ForceLeave(t *testing.T) { func TestAgent_ForceLeave(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig()) dir2, a2 := makeAgent(t, nextConfig())
@ -615,7 +602,6 @@ func TestAgent_ForceLeave(t *testing.T) {
func TestAgent_ForceLeave_ACLDeny(t *testing.T) { func TestAgent_ForceLeave_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
@ -644,7 +630,6 @@ func TestAgent_ForceLeave_ACLDeny(t *testing.T) {
func TestAgent_RegisterCheck(t *testing.T) { func TestAgent_RegisterCheck(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Register node // Register node
@ -686,7 +671,6 @@ func TestAgent_RegisterCheck(t *testing.T) {
func TestAgent_RegisterCheck_Passing(t *testing.T) { func TestAgent_RegisterCheck_Passing(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Register node // Register node
@ -723,7 +707,6 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
func TestAgent_RegisterCheck_BadStatus(t *testing.T) { func TestAgent_RegisterCheck_BadStatus(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Register node // Register node
@ -745,7 +728,6 @@ func TestAgent_RegisterCheck_BadStatus(t *testing.T) {
func TestAgent_RegisterCheck_ACLDeny(t *testing.T) { func TestAgent_RegisterCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
args := &CheckDefinition{ args := &CheckDefinition{
@ -771,7 +753,6 @@ func TestAgent_RegisterCheck_ACLDeny(t *testing.T) {
func TestAgent_DeregisterCheck(t *testing.T) { func TestAgent_DeregisterCheck(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -798,7 +779,6 @@ func TestAgent_DeregisterCheck(t *testing.T) {
func TestAgent_DeregisterCheckACLDeny(t *testing.T) { func TestAgent_DeregisterCheckACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -824,7 +804,6 @@ func TestAgent_DeregisterCheckACLDeny(t *testing.T) {
func TestAgent_PassCheck(t *testing.T) { func TestAgent_PassCheck(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -852,7 +831,6 @@ func TestAgent_PassCheck(t *testing.T) {
func TestAgent_PassCheck_ACLDeny(t *testing.T) { func TestAgent_PassCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -879,7 +857,6 @@ func TestAgent_PassCheck_ACLDeny(t *testing.T) {
func TestAgent_WarnCheck(t *testing.T) { func TestAgent_WarnCheck(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -907,7 +884,6 @@ func TestAgent_WarnCheck(t *testing.T) {
func TestAgent_WarnCheck_ACLDeny(t *testing.T) { func TestAgent_WarnCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -934,7 +910,6 @@ func TestAgent_WarnCheck_ACLDeny(t *testing.T) {
func TestAgent_FailCheck(t *testing.T) { func TestAgent_FailCheck(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -962,7 +937,6 @@ func TestAgent_FailCheck(t *testing.T) {
func TestAgent_FailCheck_ACLDeny(t *testing.T) { func TestAgent_FailCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -989,7 +963,6 @@ func TestAgent_FailCheck_ACLDeny(t *testing.T) {
func TestAgent_UpdateCheck(t *testing.T) { func TestAgent_UpdateCheck(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -1089,7 +1062,6 @@ func TestAgent_UpdateCheck(t *testing.T) {
func TestAgent_UpdateCheck_ACLDeny(t *testing.T) { func TestAgent_UpdateCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -1118,7 +1090,6 @@ func TestAgent_UpdateCheck_ACLDeny(t *testing.T) {
func TestAgent_RegisterService(t *testing.T) { func TestAgent_RegisterService(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
args := &ServiceDefinition{ args := &ServiceDefinition{
@ -1171,7 +1142,6 @@ func TestAgent_RegisterService(t *testing.T) {
func TestAgent_RegisterService_ACLDeny(t *testing.T) { func TestAgent_RegisterService_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
args := &ServiceDefinition{ args := &ServiceDefinition{
@ -1209,7 +1179,6 @@ func TestAgent_RegisterService_ACLDeny(t *testing.T) {
func TestAgent_RegisterService_InvalidAddress(t *testing.T) { func TestAgent_RegisterService_InvalidAddress(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
for _, addr := range []string{"0.0.0.0", "::", "[::]"} { for _, addr := range []string{"0.0.0.0", "::", "[::]"} {
@ -1238,7 +1207,6 @@ func TestAgent_RegisterService_InvalidAddress(t *testing.T) {
func TestAgent_DeregisterService(t *testing.T) { func TestAgent_DeregisterService(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
service := &structs.NodeService{ service := &structs.NodeService{
@ -1271,7 +1239,6 @@ func TestAgent_DeregisterService(t *testing.T) {
func TestAgent_DeregisterService_ACLDeny(t *testing.T) { func TestAgent_DeregisterService_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
service := &structs.NodeService{ service := &structs.NodeService{
@ -1300,7 +1267,6 @@ func TestAgent_DeregisterService_ACLDeny(t *testing.T) {
func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) { func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("not PUT", func(t *testing.T) { t.Run("not PUT", func(t *testing.T) {
@ -1351,7 +1317,6 @@ func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) {
func TestAgent_ServiceMaintenance_Enable(t *testing.T) { func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Register the service // Register the service
@ -1394,7 +1359,6 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
func TestAgent_ServiceMaintenance_Disable(t *testing.T) { func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Register the service // Register the service
@ -1431,7 +1395,6 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) { func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Register the service. // Register the service.
@ -1461,7 +1424,6 @@ func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) {
func TestAgent_NodeMaintenance_BadRequest(t *testing.T) { func TestAgent_NodeMaintenance_BadRequest(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Fails on non-PUT // Fails on non-PUT
@ -1488,7 +1450,6 @@ func TestAgent_NodeMaintenance_BadRequest(t *testing.T) {
func TestAgent_NodeMaintenance_Enable(t *testing.T) { func TestAgent_NodeMaintenance_Enable(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Force the node into maintenance mode // Force the node into maintenance mode
@ -1521,7 +1482,6 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) {
func TestAgent_NodeMaintenance_Disable(t *testing.T) { func TestAgent_NodeMaintenance_Disable(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Force the node into maintenance mode // Force the node into maintenance mode
@ -1546,7 +1506,6 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) {
func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) { func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
@ -1567,7 +1526,6 @@ func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) {
func TestAgent_RegisterCheck_Service(t *testing.T) { func TestAgent_RegisterCheck_Service(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
args := &ServiceDefinition{ args := &ServiceDefinition{
@ -1616,7 +1574,6 @@ func TestAgent_Monitor(t *testing.T) {
dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter) dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Try passing an invalid log level // Try passing an invalid log level
@ -1678,7 +1635,6 @@ func (r *closableRecorder) CloseNotify() <-chan bool {
func TestAgent_Monitor_ACLDeny(t *testing.T) { func TestAgent_Monitor_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Try without a token. // Try without a token.

View File

@ -17,7 +17,6 @@ import (
func TestCatalogRegister(t *testing.T) { func TestCatalogRegister(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -53,7 +52,6 @@ func TestCatalogRegister(t *testing.T) {
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -81,7 +79,6 @@ func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
func TestCatalogDeregister(t *testing.T) { func TestCatalogDeregister(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -103,7 +100,6 @@ func TestCatalogDeregister(t *testing.T) {
func TestCatalogDatacenters(t *testing.T) { func TestCatalogDatacenters(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -122,7 +118,6 @@ func TestCatalogDatacenters(t *testing.T) {
func TestCatalogNodes(t *testing.T) { func TestCatalogNodes(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -158,7 +153,6 @@ func TestCatalogNodes(t *testing.T) {
func TestCatalogNodes_MetaFilter(t *testing.T) { func TestCatalogNodes_MetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -206,7 +200,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown() defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -217,7 +210,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown() defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")
@ -302,7 +294,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) {
func TestCatalogNodes_Blocking(t *testing.T) { func TestCatalogNodes_Blocking(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -358,7 +349,6 @@ func TestCatalogNodes_Blocking(t *testing.T) {
func TestCatalogNodes_DistanceSort(t *testing.T) { func TestCatalogNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -445,7 +435,6 @@ func TestCatalogNodes_DistanceSort(t *testing.T) {
func TestCatalogServices(t *testing.T) { func TestCatalogServices(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -483,7 +472,6 @@ func TestCatalogServices(t *testing.T) {
func TestCatalogServices_NodeMetaFilter(t *testing.T) { func TestCatalogServices_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -527,7 +515,6 @@ func TestCatalogServices_NodeMetaFilter(t *testing.T) {
func TestCatalogServiceNodes(t *testing.T) { func TestCatalogServiceNodes(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -583,7 +570,6 @@ func TestCatalogServiceNodes(t *testing.T) {
func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) { func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -646,7 +632,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown() defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -657,7 +642,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown() defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")
@ -734,7 +718,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
func TestCatalogServiceNodes_DistanceSort(t *testing.T) { func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -824,7 +807,6 @@ func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
func TestCatalogNodeServices(t *testing.T) { func TestCatalogNodeServices(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -867,7 +849,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown() defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -878,7 +859,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown() defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")

View File

@ -50,8 +50,6 @@ type Command struct {
logFilter *logutils.LevelFilter logFilter *logutils.LevelFilter
logOutput io.Writer logOutput io.Writer
agent *Agent agent *Agent
httpServers []*HTTPServer
dnsServer *DNSServer
} }
// readConfig is responsible for setup of our configuration using // readConfig is responsible for setup of our configuration using
@ -455,70 +453,6 @@ func (c *Command) readConfig() *Config {
return config return config
} }
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error {
c.UI.Output("Starting Consul agent...")
agent, err := Create(config, logOutput, logWriter, c.configReloadCh)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
return err
}
c.agent = agent
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 {
servers, err := NewHTTPServers(agent)
if err != nil {
agent.Shutdown()
c.UI.Error(fmt.Sprintf("Error starting http servers: %s", err))
return err
}
c.httpServers = servers
}
if config.Ports.DNS > 0 {
dnsAddr, err := config.ClientListener(config.Addresses.DNS, config.Ports.DNS)
if err != nil {
agent.Shutdown()
c.UI.Error(fmt.Sprintf("Invalid DNS bind address: %s", err))
return err
}
server, err := NewDNSServer(agent, &config.DNSConfig, logOutput,
config.Domain, dnsAddr.String(), config.DNSRecursors)
if err != nil {
agent.Shutdown()
c.UI.Error(fmt.Sprintf("Error starting dns server: %s", err))
return err
}
c.dnsServer = server
}
// Setup update checking
if !config.DisableUpdateCheck {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
}
updateParams := &checkpoint.CheckParams{
Product: "consul",
Version: version,
}
if !config.DisableAnonymousSignature {
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
return nil
}
// checkpointResults is used to handler periodic results from our update checker // checkpointResults is used to handler periodic results from our update checker
func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) { func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) {
if err != nil { if err != nil {
@ -806,16 +740,39 @@ func (c *Command) Run(args []string) int {
} }
// Create the agent // Create the agent
if err := c.setupAgent(config, logOutput, logWriter); err != nil { c.UI.Output("Starting Consul agent...")
agent, err := Create(config, logOutput, logWriter, c.configReloadCh)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
return 1 return 1
} }
c.agent = agent
// Setup update checking
if !config.DisableUpdateCheck {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
}
updateParams := &checkpoint.CheckParams{
Product: "consul",
Version: version,
}
if !config.DisableAnonymousSignature {
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
defer c.agent.Shutdown() defer c.agent.Shutdown()
if c.dnsServer != nil {
defer c.dnsServer.Shutdown()
}
for _, server := range c.httpServers {
defer server.Shutdown()
}
// Join startup nodes if specified // Join startup nodes if specified
if err := c.startupJoin(config); err != nil { if err := c.startupJoin(config); err != nil {
@ -831,7 +788,6 @@ func (c *Command) Run(args []string) int {
// Get the new client http listener addr // Get the new client http listener addr
var httpAddr net.Addr var httpAddr net.Addr
var err error
if config.Ports.HTTP != -1 { if config.Ports.HTTP != -1 {
httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
} else if config.Ports.HTTPS != -1 { } else if config.Ports.HTTPS != -1 {

View File

@ -1,6 +1,7 @@
package agent package agent
import ( import (
"crypto/tls"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -760,6 +761,47 @@ type Config struct {
DeprecatedAtlasEndpoint string `mapstructure:"atlas_endpoint" json:"-"` DeprecatedAtlasEndpoint string `mapstructure:"atlas_endpoint" json:"-"`
} }
// IncomingTLSConfig returns the TLS configuration for TLS
// connections to consul.
func (c *Config) IncomingTLSConfig() (*tls.Config, error) {
tc := &tlsutil.Config{
VerifyIncoming: c.VerifyIncoming || c.VerifyIncomingHTTPS,
VerifyOutgoing: c.VerifyOutgoing,
CAFile: c.CAFile,
CAPath: c.CAPath,
CertFile: c.CertFile,
KeyFile: c.KeyFile,
NodeName: c.NodeName,
ServerName: c.ServerName,
TLSMinVersion: c.TLSMinVersion,
CipherSuites: c.TLSCipherSuites,
PreferServerCipherSuites: c.TLSPreferServerCipherSuites,
}
return tc.IncomingTLSConfig()
}
// HTTPAddrs returns the bind addresses for the HTTP server and
// the application protocol which should be served, e.g. 'http'
// or 'https'.
func (c *Config) HTTPAddrs() (map[string][]net.Addr, error) {
m := map[string][]net.Addr{}
if c.Ports.HTTP > 0 {
a, err := c.ClientListener(c.Addresses.HTTP, c.Ports.HTTP)
if err != nil {
return nil, err
}
m["http"] = []net.Addr{a}
}
if c.Ports.HTTPS > 0 {
a, err := c.ClientListener(c.Addresses.HTTPS, c.Ports.HTTPS)
if err != nil {
return nil, err
}
m["https"] = []net.Addr{a}
}
return m, nil
}
// Bool is used to initialize bool pointers in struct literals. // Bool is used to initialize bool pointers in struct literals.
func Bool(b bool) *bool { func Bool(b bool) *bool {
return &b return &b
@ -914,13 +956,10 @@ func (c *Config) EncryptBytes() ([]byte, error) {
// ClientListener is used to format a listener for a // ClientListener is used to format a listener for a
// port on a ClientAddr // port on a ClientAddr
func (c *Config) ClientListener(override string, port int) (net.Addr, error) { func (c *Config) ClientListener(override string, port int) (net.Addr, error) {
var addr string addr := c.ClientAddr
if override != "" { if override != "" {
addr = override addr = override
} else {
addr = c.ClientAddr
} }
if path := socketPath(addr); path != "" { if path := socketPath(addr); path != "" {
return &net.UnixAddr{Name: path, Net: "unix"}, nil return &net.UnixAddr{Name: path, Net: "unix"}, nil
} }

View File

@ -15,7 +15,6 @@ import (
func TestCoordinate_Datacenters(t *testing.T) { func TestCoordinate_Datacenters(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -39,7 +38,6 @@ func TestCoordinate_Datacenters(t *testing.T) {
func TestCoordinate_Nodes(t *testing.T) { func TestCoordinate_Nodes(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")

View File

@ -34,35 +34,25 @@ func makeDNSServer(t *testing.T) (string, *DNSServer) {
return makeDNSServerConfig(t, nil, nil) return makeDNSServerConfig(t, nil, nil)
} }
func makeDNSServerConfig( func makeDNSServerConfig(t *testing.T, agentFn func(c *Config), dnsFn func(*DNSConfig)) (string, *DNSServer) {
t *testing.T,
agentFn func(c *Config),
dnsFn func(*DNSConfig)) (string, *DNSServer) {
// Create the configs and apply the functions // Create the configs and apply the functions
agentConf := nextConfig() c := nextConfig()
if agentFn != nil { if agentFn != nil {
agentFn(agentConf) agentFn(c)
} }
dnsConf := &DefaultConfig().DNSConfig c.DNSConfig = DefaultConfig().DNSConfig
if dnsFn != nil { if dnsFn != nil {
dnsFn(dnsConf) dnsFn(&c.DNSConfig)
} }
// Add in the recursor if any // Add in the recursor if any
if r := agentConf.DNSRecursor; r != "" { if r := c.DNSRecursor; r != "" {
agentConf.DNSRecursors = append(agentConf.DNSRecursors, r) c.DNSRecursors = append(c.DNSRecursors, r)
} }
// Start the server // Start the server
addr, _ := agentConf.ClientListener(agentConf.Addresses.DNS, agentConf.Ports.DNS) dir, agent := makeAgent(t, c)
dir, agent := makeAgent(t, agentConf) return dir, agent.dnsServers[0]
server, err := NewDNSServer(agent, dnsConf, agent.logOutput,
agentConf.Domain, addr.String(), agentConf.DNSRecursors)
if err != nil {
t.Fatalf("err: %v", err)
}
return dir, server
} }
// makeRecursor creates a generic DNS server which always returns // makeRecursor creates a generic DNS server which always returns
@ -1283,7 +1273,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}, nil) }, nil)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer srv1.Shutdown() defer srv1.agent.Shutdown()
dir2, srv2 := makeDNSServerConfig(t, func(c *Config) { dir2, srv2 := makeDNSServerConfig(t, func(c *Config) {
c.Datacenter = "dc2" c.Datacenter = "dc2"
@ -1291,7 +1281,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}, nil) }, nil)
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer srv2.Shutdown() defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")
@ -2421,6 +2411,9 @@ func TestDNS_ServiceLookup_OnlyPassing(t *testing.T) {
} }
// Only 1 is passing, so we should only get 1 answer // Only 1 is passing, so we should only get 1 answer
for _, a := range in.Answer {
fmt.Println(question, a)
}
if len(in.Answer) != 1 { if len(in.Answer) != 1 {
t.Fatalf("Bad: %#v", in) t.Fatalf("Bad: %#v", in)
} }
@ -3364,14 +3357,14 @@ func TestDNS_PreparedQuery_Failover(t *testing.T) {
c.TranslateWanAddrs = true c.TranslateWanAddrs = true
}, nil) }, nil)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer srv1.Shutdown() defer srv1.agent.Shutdown()
dir2, srv2 := makeDNSServerConfig(t, func(c *Config) { dir2, srv2 := makeDNSServerConfig(t, func(c *Config) {
c.Datacenter = "dc2" c.Datacenter = "dc2"
c.TranslateWanAddrs = true c.TranslateWanAddrs = true
}, nil) }, nil)
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer srv2.Shutdown() defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")

View File

@ -178,7 +178,6 @@ func TestEventList_Filter(t *testing.T) {
func TestEventList_ACLFilter(t *testing.T) { func TestEventList_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t) dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Fire an event. // Fire an event.

View File

@ -98,7 +98,6 @@ func TestHealthChecksInState_NodeMetaFilter(t *testing.T) {
func TestHealthChecksInState_DistanceSort(t *testing.T) { func TestHealthChecksInState_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -175,7 +174,6 @@ func TestHealthChecksInState_DistanceSort(t *testing.T) {
func TestHealthNodeChecks(t *testing.T) { func TestHealthNodeChecks(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -212,7 +210,6 @@ func TestHealthNodeChecks(t *testing.T) {
func TestHealthServiceChecks(t *testing.T) { func TestHealthServiceChecks(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -266,7 +263,6 @@ func TestHealthServiceChecks(t *testing.T) {
func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) { func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -321,7 +317,6 @@ func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) {
func TestHealthServiceChecks_DistanceSort(t *testing.T) { func TestHealthServiceChecks_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -403,7 +398,6 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) {
func TestHealthServiceNodes(t *testing.T) { func TestHealthServiceNodes(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -472,7 +466,6 @@ func TestHealthServiceNodes(t *testing.T) {
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -527,7 +520,6 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
func TestHealthServiceNodes_DistanceSort(t *testing.T) { func TestHealthServiceNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -609,7 +601,6 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) {
func TestHealthServiceNodes_PassingFilter(t *testing.T) { func TestHealthServiceNodes_PassingFilter(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -656,7 +647,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown() defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -667,7 +657,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = "" c.ACLDatacenter = ""
}) })
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown() defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")

View File

@ -1,175 +1,39 @@
package agent package agent
import ( import (
"crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"net"
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
"net/url" "net/url"
"os"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/tlsutil"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
) )
// HTTPServer is used to wrap an Agent and expose various API's // HTTPServer provides an HTTP api for an agent.
// in a RESTful manner
type HTTPServer struct { type HTTPServer struct {
*http.Server
agent *Agent agent *Agent
mux *http.ServeMux
listener net.Listener
logger *log.Logger
addr string
} }
// NewHTTPServers starts new HTTP servers to provide an interface to func NewHTTPServer(addr string, a *Agent) *HTTPServer {
// the agent. s := &HTTPServer{&http.Server{Addr: addr}, a}
func NewHTTPServers(agent *Agent) ([]*HTTPServer, error) { s.Server.Handler = s.handler(s.agent.config.EnableDebug)
config := agent.config return s
logOutput := agent.logOutput }
var servers []*HTTPServer // handler is used to attach our handlers to the mux
func (s *HTTPServer) handler(enableDebug bool) http.Handler {
if config.Ports.HTTPS > 0 {
httpAddr, err := config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS)
if err != nil {
return nil, err
}
tlsConf := &tlsutil.Config{
VerifyIncoming: config.VerifyIncoming || config.VerifyIncomingHTTPS,
VerifyOutgoing: config.VerifyOutgoing,
CAFile: config.CAFile,
CAPath: config.CAPath,
CertFile: config.CertFile,
KeyFile: config.KeyFile,
NodeName: config.NodeName,
ServerName: config.ServerName,
TLSMinVersion: config.TLSMinVersion,
CipherSuites: config.TLSCipherSuites,
PreferServerCipherSuites: config.TLSPreferServerCipherSuites,
}
tlsConfig, err := tlsConf.IncomingTLSConfig()
if err != nil {
return nil, err
}
ln, err := net.Listen(httpAddr.Network(), httpAddr.String())
if err != nil {
return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err)
}
list := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig)
// Create the mux
mux := http.NewServeMux() mux := http.NewServeMux()
// Create the server // handleFuncMetrics takes the given pattern and handler and wraps to produce
srv := &HTTPServer{ // metrics based on the pattern and request.
agent: agent, handleFuncMetrics := func(pattern string, handler http.HandlerFunc) {
mux: mux,
listener: list,
logger: log.New(logOutput, "", log.LstdFlags),
addr: httpAddr.String(),
}
srv.registerHandlers(config.EnableDebug)
// Start the server
go http.Serve(list, mux)
servers = append(servers, srv)
}
if config.Ports.HTTP > 0 {
httpAddr, err := config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
return nil, fmt.Errorf("Failed to get ClientListener address:port: %v", err)
}
// Error if we are trying to bind a domain socket to an existing path
path := socketPath(config.Addresses.HTTP)
if path != "" {
if _, err := os.Stat(path); !os.IsNotExist(err) {
agent.logger.Printf("[WARN] agent: Replacing socket %q", path)
}
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("error removing socket file: %s", err)
}
}
ln, err := net.Listen(httpAddr.Network(), httpAddr.String())
if err != nil {
return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err)
}
var list net.Listener
if path != "" {
// Set up ownership/permission bits on the socket file
if err := setFilePermissions(path, config.UnixSockets); err != nil {
return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err)
}
list = ln
} else {
list = tcpKeepAliveListener{ln.(*net.TCPListener)}
}
// Create the mux
mux := http.NewServeMux()
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: list,
logger: log.New(logOutput, "", log.LstdFlags),
addr: httpAddr.String(),
}
srv.registerHandlers(config.EnableDebug)
// Start the server
go http.Serve(list, mux)
servers = append(servers, srv)
}
return servers, nil
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by NewHttpServer so
// dead TCP connections eventually go away.
type tcpKeepAliveListener struct {
*net.TCPListener
}
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(30 * time.Second)
return tc, nil
}
// Shutdown is used to shutdown the HTTP server
func (s *HTTPServer) Shutdown() {
if s != nil {
s.logger.Printf("[DEBUG] http: Shutting down http server (%v)", s.addr)
s.listener.Close()
}
}
// handleFuncMetrics takes the given pattern and handler and wraps to produce
// metrics based on the pattern and request.
func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.ResponseWriter, *http.Request)) {
// Get the parts of the pattern. We omit any initial empty for the // Get the parts of the pattern. We omit any initial empty for the
// leading slash, and put an underscore as a "thing" placeholder if we // leading slash, and put an underscore as a "thing" placeholder if we
// see a trailing slash, which means the part after is parsed. This lets // see a trailing slash, which means the part after is parsed. This lets
@ -179,9 +43,8 @@ func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.Respons
if part == "" { if part == "" {
if i == 0 { if i == 0 {
continue continue
} else {
part = "_"
} }
part = "_"
} }
parts = append(parts, part) parts = append(parts, part)
} }
@ -191,110 +54,108 @@ func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.Respons
wrapper := func(resp http.ResponseWriter, req *http.Request) { wrapper := func(resp http.ResponseWriter, req *http.Request) {
start := time.Now() start := time.Now()
handler(resp, req) handler(resp, req)
key := append([]string{"consul", "http", req.Method}, parts...) key := append([]string{"consul", "http", req.Method}, parts...)
metrics.MeasureSince(key, start) metrics.MeasureSince(key, start)
} }
s.mux.HandleFunc(pattern, wrapper) mux.HandleFunc(pattern, wrapper)
} }
// registerHandlers is used to attach our handlers to the mux mux.HandleFunc("/", s.Index)
func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/", s.Index)
// API V1. // API V1.
if s.agent.config.ACLDatacenter != "" { if s.agent.config.ACLDatacenter != "" {
s.handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate)) handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate))
s.handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate)) handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate))
s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy)) handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy))
s.handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet)) handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet))
s.handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone)) handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone))
s.handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList)) handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList))
s.handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus)) handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus))
} else { } else {
s.handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled)) handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled)) handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled)) handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled)) handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled)) handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled)) handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled)) handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled))
} }
s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
s.handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload)) handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload))
s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin))
s.handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave)) handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave))
s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave))
s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck))
s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck))
s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass)) handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass))
s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn)) handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn))
s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail)) handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail))
s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate)) handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate))
s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService))
s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance)) handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance))
s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister))
s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices)) handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices))
s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes))
s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices))
if !s.agent.config.DisableCoordinates { if !s.agent.config.DisableCoordinates {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
} else { } else {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
} }
s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks)) handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks))
s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks))
s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState)) handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState))
s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes)) handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes))
s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes)) handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes))
s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo))
s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices)) handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices))
s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint)) handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint))
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint)) handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration)) handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
s.handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth)) handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth))
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))
s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy)) handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy))
s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew)) handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew))
s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet)) handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet))
s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode)) handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode))
s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList)) handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList))
s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader))
s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers))
s.handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot)) handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot))
s.handleFuncMetrics("/v1/txn", s.wrap(s.Txn)) handleFuncMetrics("/v1/txn", s.wrap(s.Txn))
// Debug endpoints. // Debug endpoints.
if enableDebug { if enableDebug {
s.handleFuncMetrics("/debug/pprof/", pprof.Index) handleFuncMetrics("/debug/pprof/", pprof.Index)
s.handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline) handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline)
s.handleFuncMetrics("/debug/pprof/profile", pprof.Profile) handleFuncMetrics("/debug/pprof/profile", pprof.Profile)
s.handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol) handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol)
} }
// Use the custom UI dir if provided. // Use the custom UI dir if provided.
if s.agent.config.UIDir != "" { if s.agent.config.UIDir != "" {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir)))) mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir))))
} else if s.agent.config.EnableUI { } else if s.agent.config.EnableUI {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS()))) mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS())))
} }
return mux
} }
// wrap is used to wrap functions to make them more convenient // wrap is used to wrap functions to make them more convenient
@ -306,7 +167,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
// Obfuscate any tokens from appearing in the logs // Obfuscate any tokens from appearing in the logs
formVals, err := url.ParseQuery(req.URL.RawQuery) formVals, err := url.ParseQuery(req.URL.RawQuery)
if err != nil { if err != nil {
s.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr) s.agent.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr)
resp.WriteHeader(http.StatusInternalServerError) // 500 resp.WriteHeader(http.StatusInternalServerError) // 500
return return
} }
@ -322,7 +183,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
} }
handleErr := func(err error) { handleErr := func(err error) {
s.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr) s.agent.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr)
code := http.StatusInternalServerError // 500 code := http.StatusInternalServerError // 500
errMsg := err.Error() errMsg := err.Error()
if strings.Contains(errMsg, "Permission denied") || strings.Contains(errMsg, "ACL not found") { if strings.Contains(errMsg, "Permission denied") || strings.Contains(errMsg, "ACL not found") {
@ -344,7 +205,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
// Invoke the handler // Invoke the handler
start := time.Now() start := time.Now()
defer func() { defer func() {
s.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr) s.agent.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr)
}() }()
obj, err := handler(resp, req) obj, err := handler(resp, req)
if err != nil { if err != nil {

View File

@ -51,26 +51,13 @@ func makeHTTPServerWithACLs(t *testing.T) (string, *HTTPServer) {
} }
func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) { func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) {
configTry := 0
RECONF:
configTry++
conf := nextConfig() conf := nextConfig()
if cb != nil { if cb != nil {
cb(conf) cb(conf)
} }
dir, agent := makeAgentLog(t, conf, l, logWriter) dir, agent := makeAgentLog(t, conf, l, logWriter)
servers, err := NewHTTPServers(agent) return dir, agent.httpServers[0]
if err != nil {
if configTry < 3 {
goto RECONF
}
t.Fatalf("err: %v", err)
}
if len(servers) == 0 {
t.Fatalf(fmt.Sprintf("Failed to make HTTP server"))
}
return dir, servers[0]
} }
func TestHTTPServer_UnixSocket(t *testing.T) { func TestHTTPServer_UnixSocket(t *testing.T) {
@ -91,7 +78,6 @@ func TestHTTPServer_UnixSocket(t *testing.T) {
c.UnixSockets.Perms = "0777" c.UnixSockets.Perms = "0777"
}) })
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Ensure the socket was created // Ensure the socket was created
@ -158,12 +144,9 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) {
dir, agent := makeAgent(t, conf) dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer agent.Shutdown()
// Try to start the server with the same path anyways. defer agent.Shutdown()
if _, err := NewHTTPServers(agent); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the file was replaced by the socket // Ensure the file was replaced by the socket
fi, err = os.Stat(socket) fi, err = os.Stat(socket)
if err != nil { if err != nil {
@ -238,7 +221,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) {
{ {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -260,7 +242,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
srv.agent.config.TranslateWanAddrs = true srv.agent.config.TranslateWanAddrs = true
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -286,7 +267,6 @@ func TestHTTPAPIResponseHeaders(t *testing.T) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -313,7 +293,6 @@ func TestContentTypeIsJSON(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -336,12 +315,11 @@ func TestContentTypeIsJSON(t *testing.T) {
func TestHTTP_wrap_obfuscateLog(t *testing.T) { func TestHTTP_wrap_obfuscateLog(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Attach a custom logger so we can inspect it // Attach a custom logger so we can inspect it
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
srv.logger = log.New(buf, "", log.LstdFlags) srv.agent.logger = log.New(buf, "", log.LstdFlags)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/some/url?token=secret1&token=secret2", nil) req, _ := http.NewRequest("GET", "/some/url?token=secret1&token=secret2", nil)
@ -367,7 +345,6 @@ func TestPrettyPrintBare(t *testing.T) {
func testPrettyPrint(pretty string, t *testing.T) { func testPrettyPrint(pretty string, t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
r := &structs.DirEntry{Key: "key"} r := &structs.DirEntry{Key: "key"}
@ -396,7 +373,6 @@ func testPrettyPrint(pretty string, t *testing.T) {
func TestParseSource(t *testing.T) { func TestParseSource(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Default is agent's DC and no node (since the user didn't care, then // Default is agent's DC and no node (since the user didn't care, then
@ -578,7 +554,7 @@ func TestEnableWebUI(t *testing.T) {
req, _ := http.NewRequest("GET", "/ui/", nil) req, _ := http.NewRequest("GET", "/ui/", nil)
// Perform the request // Perform the request
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
s.mux.ServeHTTP(resp, req) s.Handler.ServeHTTP(resp, req)
// Check the result // Check the result
if resp.Code != 200 { if resp.Code != 200 {
@ -626,7 +602,6 @@ func httpTest(t *testing.T, f func(srv *HTTPServer)) {
func httpTestWithConfig(t *testing.T, f func(srv *HTTPServer), cb func(c *Config)) { func httpTestWithConfig(t *testing.T, f func(srv *HTTPServer), cb func(c *Config)) {
dir, srv := makeHTTPServerWithConfig(t, cb) dir, srv := makeHTTPServerWithConfig(t, cb)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
f(srv) f(srv)

View File

@ -16,7 +16,6 @@ import (
func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -78,7 +77,6 @@ func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
func TestKVSEndpoint_Recurse(t *testing.T) { func TestKVSEndpoint_Recurse(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -157,7 +155,6 @@ func TestKVSEndpoint_Recurse(t *testing.T) {
func TestKVSEndpoint_DELETE_CAS(t *testing.T) { func TestKVSEndpoint_DELETE_CAS(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -226,7 +223,6 @@ func TestKVSEndpoint_DELETE_CAS(t *testing.T) {
func TestKVSEndpoint_CAS(t *testing.T) { func TestKVSEndpoint_CAS(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -305,7 +301,6 @@ func TestKVSEndpoint_CAS(t *testing.T) {
func TestKVSEndpoint_ListKeys(t *testing.T) { func TestKVSEndpoint_ListKeys(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")

61
command/agent/listen.go Normal file
View File

@ -0,0 +1,61 @@
package agent
import (
"crypto/tls"
"fmt"
"net"
"os"
"time"
)
func ListenTCP(addr string) (net.Listener, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
l = tcpKeepAliveListener{l.(*net.TCPListener)}
return l, nil
}
func ListenTLS(addr string, cfg *tls.Config) (net.Listener, error) {
l, err := ListenTCP(addr)
if err != nil {
return nil, err
}
return tls.NewListener(l, cfg), nil
}
func ListenUnix(addr string, perm FilePermissions) (net.Listener, error) {
// todo(fs): move this somewhere else
// if _, err := os.Stat(addr); !os.IsNotExist(err) {
// s.agent.logger.Printf("[WARN] agent: Replacing socket %q", addr)
// }
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("error removing socket file: %s", err)
}
l, err := net.Listen("unix", addr)
if err != nil {
return nil, err
}
if err := setFilePermissions(addr, perm); err != nil {
return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err)
}
return l, nil
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by NewHttpServer so
// dead TCP connections eventually go away.
type tcpKeepAliveListener struct {
*net.TCPListener
}
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(30 * time.Second)
return tc, nil
}

View File

@ -10,7 +10,6 @@ import (
func TestStatusLeader(t *testing.T) { func TestStatusLeader(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -28,7 +27,6 @@ func TestStatusLeader(t *testing.T) {
func TestStatusPeers(t *testing.T) { func TestStatusPeers(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
obj, err := srv.StatusPeers(nil, nil) obj, err := srv.StatusPeers(nil, nil)

View File

@ -29,7 +29,6 @@ func TestUiIndex(t *testing.T) {
c.UIDir = uiDir c.UIDir = uiDir
}) })
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Create file // Create file
@ -41,7 +40,7 @@ func TestUiIndex(t *testing.T) {
// Register node // Register node
req, _ := http.NewRequest("GET", "/ui/my-file", nil) req, _ := http.NewRequest("GET", "/ui/my-file", nil)
req.URL.Scheme = "http" req.URL.Scheme = "http"
req.URL.Host = srv.listener.Addr().String() req.URL.Host = srv.Addr
// Make the request // Make the request
client := cleanhttp.DefaultClient() client := cleanhttp.DefaultClient()
@ -66,7 +65,6 @@ func TestUiIndex(t *testing.T) {
func TestUiNodes(t *testing.T) { func TestUiNodes(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -106,7 +104,6 @@ func TestUiNodes(t *testing.T) {
func TestUiNodeInfo(t *testing.T) { func TestUiNodeInfo(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")

View File

@ -20,8 +20,6 @@ import (
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
) )
var offset uint64
func init() { func init() {
// Seed the random number generator // Seed the random number generator
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
@ -29,25 +27,23 @@ func init() {
version.Version = "0.8.0" version.Version = "0.8.0"
} }
type agentWrapper struct { type server struct {
dir string
config *agent.Config
agent *agent.Agent agent *agent.Agent
http *agent.HTTPServer config *agent.Config
httpAddr string httpAddr string
dir string
} }
func (a *agentWrapper) Shutdown() { func (a *server) Shutdown() {
a.agent.Shutdown() a.agent.Shutdown()
a.http.Shutdown()
os.RemoveAll(a.dir) os.RemoveAll(a.dir)
} }
func testAgent(t *testing.T) *agentWrapper { func testAgent(t *testing.T) *server {
return testAgentWithConfig(t, nil) return testAgentWithConfig(t, nil)
} }
func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) { func testAgentWithAPIClient(t *testing.T) (*server, *api.Client) {
agent := testAgentWithConfig(t, func(c *agent.Config) {}) agent := testAgentWithConfig(t, func(c *agent.Config) {})
client, err := api.NewClient(&api.Config{Address: agent.httpAddr}) client, err := api.NewClient(&api.Config{Address: agent.httpAddr})
if err != nil { if err != nil {
@ -56,71 +52,54 @@ func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) {
return agent, client return agent, client
} }
func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *server {
return testAgentWithConfigReload(t, cb, nil) return testAgentWithConfigReload(t, cb, nil)
} }
func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *agentWrapper { func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *server {
lw := logger.NewLogWriter(512)
conf := nextConfig() conf := nextConfig()
if cb != nil { if cb != nil {
cb(conf) cb(conf)
} }
dir := testutil.TempDir(t, "agent") conf.DataDir = testutil.TempDir(t, "agent")
conf.DataDir = dir a, err := agent.Create(conf, logger.NewLogWriter(512), nil, reloadCh)
a, err := agent.Create(conf, lw, nil, reloadCh)
if err != nil { if err != nil {
os.RemoveAll(dir) os.RemoveAll(conf.DataDir)
t.Fatalf(fmt.Sprintf("err: %v", err)) t.Fatalf("err: %v", err)
} }
conf.Addresses.HTTP = "127.0.0.1" conf.Addresses.HTTP = "127.0.0.1"
httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP) addr := fmt.Sprintf("%s:%d", conf.Addresses.HTTP, conf.Ports.HTTP)
http, err := agent.NewHTTPServers(a) return &server{agent: a, config: conf, httpAddr: addr, dir: conf.DataDir}
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))
}
if http == nil || len(http) == 0 {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("Could not create HTTP server to listen on: %s", httpAddr))
}
return &agentWrapper{
dir: dir,
config: conf,
agent: a,
http: http[0],
httpAddr: httpAddr,
}
} }
func nextConfig() *agent.Config { var nextPort uint64 = 10000
idx := int(atomic.AddUint64(&offset, 1))
conf := agent.DefaultConfig()
func nextConfig() *agent.Config {
nodeID, err := uuid.GenerateUUID() nodeID, err := uuid.GenerateUUID()
if err != nil { if err != nil {
panic(err) panic(err)
} }
port := int(atomic.AddUint64(&nextPort, 10))
conf := agent.DefaultConfig()
conf.Bootstrap = true conf.Bootstrap = true
conf.Datacenter = "dc1" conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx) conf.NodeName = fmt.Sprintf("Node %d", port)
conf.NodeID = types.NodeID(nodeID) conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1" conf.BindAddr = "127.0.0.1"
conf.Server = true conf.Server = true
conf.Version = version.Version conf.Version = version.Version
conf.Ports = agent.PortConfig{
conf.Ports.HTTP = 10000 + 10*idx DNS: port + 1,
conf.Ports.HTTPS = 10401 + 10*idx HTTP: port + 2,
conf.Ports.SerfLan = 10201 + 10*idx HTTPS: port + 3,
conf.Ports.SerfWan = 10202 + 10*idx SerfLan: port + 4,
conf.Ports.Server = 10300 + 10*idx SerfWan: port + 5,
Server: port + 6,
}
cons := consul.DefaultConfig() cons := consul.DefaultConfig()
conf.ConsulConfig = cons conf.ConsulConfig = cons