From 8d9f5b9a6482ad0f32e89b5c91a198fab2e7212e Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Fri, 19 May 2017 11:53:41 +0200 Subject: [PATCH] agent: move http/dns endpoints into agent Move the HTTP and DNS endpoints into the agent and control their lifespan via the agent. This removes the requirement to manage HTTP and DNS servers indpendent of the agent since the agent is mostly useless without an endpoint and the endpoints without the agent. --- command/agent/agent.go | 293 +++++++++++++---- command/agent/agent_endpoint.go | 4 +- command/agent/agent_endpoint_test.go | 56 +--- command/agent/catalog_endpoint_test.go | 20 -- command/agent/command.go | 104 ++---- command/agent/config.go | 47 ++- command/agent/coordinate_endpoint_test.go | 2 - command/agent/dns_test.go | 39 +-- command/agent/event_endpoint_test.go | 1 - command/agent/health_endpoint_test.go | 11 - command/agent/http.go | 371 +++++++--------------- command/agent/http_test.go | 35 +- command/agent/kvs_endpoint_test.go | 5 - command/agent/listen.go | 61 ++++ command/agent/status_endpoint_test.go | 2 - command/agent/ui_endpoint_test.go | 5 +- command/util_test.go | 77 ++--- 17 files changed, 543 insertions(+), 590 deletions(-) create mode 100644 command/agent/listen.go diff --git a/command/agent/agent.go b/command/agent/agent.go index 827ccd2c4..1c492bca5 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1,6 +1,7 @@ package agent import ( + "context" "crypto/sha512" "encoding/json" "errors" @@ -9,6 +10,7 @@ import ( "io/ioutil" "log" "net" + "net/http" "os" "path/filepath" "reflect" @@ -141,27 +143,62 @@ type Agent struct { // agent methods use this, so use with care and never override // outside of a unit test. endpoints map[string]string + + // dnsAddr is the address the DNS server binds to + dnsAddr net.Addr + + // dnsServer provides the DNS API + dnsServers []*DNSServer + + // httpAddrs are the addresses per protocol the HTTP server binds to + httpAddrs map[string][]net.Addr + + // httpServers provides the HTTP API on various endpoints + httpServers []*HTTPServer + + // wgServers is the wait group for all HTTP and DNS servers + wgServers sync.WaitGroup } // Create is used to create a new Agent. Returns // the agent or potentially an error. -func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { - // Ensure we have a log sink +func Create(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { + a, err := NewAgent(c, logOutput, logWriter, reloadCh) + if err != nil { + return nil, err + } + if err := a.Start(); err != nil { + return nil, err + } + return a, nil +} + +func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { if logOutput == nil { logOutput = os.Stderr } - - // Validate the config - if config.Datacenter == "" { + if c.Datacenter == "" { return nil, fmt.Errorf("Must configure a Datacenter") } - if config.DataDir == "" && !config.DevMode { + if c.DataDir == "" && !c.DevMode { return nil, fmt.Errorf("Must configure a DataDir") } + dnsAddr, err := c.ClientListener(c.Addresses.DNS, c.Ports.DNS) + if err != nil { + return nil, fmt.Errorf("Invalid DNS bind address: %s", err) + } + httpAddrs, err := c.HTTPAddrs() + if err != nil { + return nil, fmt.Errorf("Invalid HTTP bind address: %s", err) + } + acls, err := newACLManager(c) + if err != nil { + return nil, err + } - agent := &Agent{ - config: config, - logger: log.New(logOutput, "", log.LstdFlags), + a := &Agent{ + config: c, + acls: acls, logOutput: logOutput, logWriter: logWriter, checkReapAfter: make(map[types.CheckID]time.Duration), @@ -175,79 +212,200 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re reloadCh: reloadCh, shutdownCh: make(chan struct{}), endpoints: make(map[string]string), + dnsAddr: dnsAddr, + httpAddrs: httpAddrs, } - if err := agent.resolveTmplAddrs(); err != nil { + if err := a.resolveTmplAddrs(); err != nil { return nil, err } + return a, nil +} - // Initialize the ACL manager. - acls, err := newACLManager(config) - if err != nil { - return nil, err - } - agent.acls = acls +func (a *Agent) Start() error { + c := a.config + + a.logger = log.New(a.logOutput, "", log.LstdFlags) // Retrieve or generate the node ID before setting up the rest of the // agent, which depends on it. - if err := agent.setupNodeID(config); err != nil { - return nil, fmt.Errorf("Failed to setup node ID: %v", err) + if err := a.setupNodeID(c); err != nil { + return fmt.Errorf("Failed to setup node ID: %v", err) } // Initialize the local state. - agent.state.Init(config, agent.logger) + a.state.Init(c, a.logger) // Setup either the client or the server. - if config.Server { - err = agent.setupServer() - agent.state.SetIface(agent.delegate) + if c.Server { + server, err := a.makeServer() + if err != nil { + return err + } + + a.delegate = server + a.state.SetIface(server) // Automatically register the "consul" service on server nodes consulService := structs.NodeService{ Service: consul.ConsulServiceName, ID: consul.ConsulServiceID, - Port: agent.config.Ports.Server, + Port: c.Ports.Server, Tags: []string{}, } - agent.state.AddService(&consulService, agent.config.GetTokenForAgent()) + a.state.AddService(&consulService, c.GetTokenForAgent()) } else { - err = agent.setupClient() - agent.state.SetIface(agent.delegate) - } - if err != nil { - return nil, err + client, err := a.makeClient() + if err != nil { + return err + } + + a.delegate = client + a.state.SetIface(client) } // Load checks/services/metadata. - if err := agent.loadServices(config); err != nil { - return nil, err + if err := a.loadServices(c); err != nil { + return err } - if err := agent.loadChecks(config); err != nil { - return nil, err + if err := a.loadChecks(c); err != nil { + return err } - if err := agent.loadMetadata(config); err != nil { - return nil, err + if err := a.loadMetadata(c); err != nil { + return err } // Start watching for critical services to deregister, based on their // checks. - go agent.reapServices() + go a.reapServices() // Start handling events. - go agent.handleEvents() + go a.handleEvents() // Start sending network coordinate to the server. - if !config.DisableCoordinates { - go agent.sendCoordinate() + if !c.DisableCoordinates { + go a.sendCoordinate() } // Write out the PID file if necessary. - err = agent.storePid() - if err != nil { - return nil, err + if err := a.storePid(); err != nil { + return err } - return agent, nil + // start dns server + if c.Ports.DNS > 0 { + srv, err := NewDNSServer(a, &c.DNSConfig, a.logOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors) + if err != nil { + return fmt.Errorf("error starting DNS server: %s", err) + } + a.dnsServers = []*DNSServer{srv} + } + + // start HTTP servers + return a.startHTTP(a.httpAddrs) +} + +func (a *Agent) startHTTP(httpAddrs map[string][]net.Addr) error { + // ln contains the list of pending listeners until the + // actual server is created and the listeners are used. + var ln []net.Listener + + // cleanup the listeners on error. ln should be empty on success. + defer func() { + for _, l := range ln { + l.Close() + } + }() + + // bind to the listeners for all addresses and protocols + // before we start the servers so that we can fail early + // if we can't bind to one of the addresses. + for proto, addrs := range httpAddrs { + for _, addr := range addrs { + switch addr.(type) { + case *net.UnixAddr: + switch proto { + case "http": + if _, err := os.Stat(addr.String()); !os.IsNotExist(err) { + a.logger.Printf("[WARN] agent: Replacing socket %q", addr.String()) + } + l, err := ListenUnix(addr.String(), a.config.UnixSockets) + if err != nil { + return err + } + ln = append(ln, l) + + default: + return fmt.Errorf("invalid protocol: %q", proto) + } + + case *net.TCPAddr: + switch proto { + case "http": + l, err := ListenTCP(addr.String()) + if err != nil { + return err + } + ln = append(ln, l) + + case "https": + tlscfg, err := a.config.IncomingTLSConfig() + if err != nil { + return fmt.Errorf("invalid TLS configuration: %s", err) + } + l, err := ListenTLS(addr.String(), tlscfg) + if err != nil { + return err + } + ln = append(ln, l) + + default: + return fmt.Errorf("invalid protocol: %q", proto) + } + + default: + return fmt.Errorf("invalid address type: %T", addr) + } + } + } + + // https://github.com/golang/go/issues/20239 + // + // In go1.8.1 there is a race between Serve and Shutdown. If + // Shutdown is called before the Serve go routine was scheduled then + // the Serve go routine never returns. This deadlocks the agent + // shutdown for some tests since it will wait forever. + // + // We solve this with another WaitGroup which checks that the Serve + // go routine was called and after that it should be safe to call + // Shutdown on that server. + var up sync.WaitGroup + for _, l := range ln { + l := l // capture loop var + + // create a server per listener instead of a single + // server with multiple listeners to take advantage + // of the Addr field for logging. Since the server + // does not keep state and they all share the same + // agent there is no overhead. + addr := l.Addr().String() + srv := NewHTTPServer(addr, a) + a.httpServers = append(a.httpServers, srv) + + up.Add(1) + a.wgServers.Add(1) + go func() { + defer a.wgServers.Done() + up.Done() + a.logger.Printf("[INFO] agent: Starting HTTP server on %s", addr) + if err := srv.Serve(l); err != nil && err != http.ErrServerClosed { + a.logger.Print(err) + } + }() + } + up.Wait() + ln = nil + return nil } // consulConfig is used to return a consul configuration @@ -612,38 +770,36 @@ func (a *Agent) resolveTmplAddrs() error { return nil } -// setupServer is used to initialize the Consul server -func (a *Agent) setupServer() error { +// makeServer creates a new consul server. +func (a *Agent) makeServer() (*consul.Server, error) { config, err := a.consulConfig() if err != nil { - return err + return nil, err } if err := a.setupKeyrings(config); err != nil { - return fmt.Errorf("Failed to configure keyring: %v", err) + return nil, fmt.Errorf("Failed to configure keyring: %v", err) } server, err := consul.NewServer(config) if err != nil { - return fmt.Errorf("Failed to start Consul server: %v", err) + return nil, fmt.Errorf("Failed to start Consul server: %v", err) } - a.delegate = server - return nil + return server, nil } -// setupClient is used to initialize the Consul client -func (a *Agent) setupClient() error { +// makeClient creates a new consul client. +func (a *Agent) makeClient() (*consul.Client, error) { config, err := a.consulConfig() if err != nil { - return err + return nil, err } if err := a.setupKeyrings(config); err != nil { - return fmt.Errorf("Failed to configure keyring: %v", err) + return nil, fmt.Errorf("Failed to configure keyring: %v", err) } client, err := consul.NewClient(config) if err != nil { - return fmt.Errorf("Failed to start Consul client: %v", err) + return nil, fmt.Errorf("Failed to start Consul client: %v", err) } - a.delegate = client - return nil + return client, nil } // makeRandomID will generate a random UUID for a node. @@ -830,6 +986,27 @@ func (a *Agent) Shutdown() error { if a.shutdown { return nil } + a.logger.Println("[INFO] agent: Requesting shutdown") + + // Stop all API endpoints + a.logger.Println("[INFO] agent: Stopping DNS endpoints") + for _, srv := range a.dnsServers { + srv.Shutdown() + } + for _, srv := range a.httpServers { + a.logger.Println("[INFO] agent: Stopping HTTP endpoint", srv.Addr) + + // old behavior: just die + // srv.Close() + + // graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + srv.Shutdown(ctx) + } + a.logger.Println("[INFO] agent: Waiting for endpoints to shut down") + a.wgServers.Wait() + a.logger.Print("[INFO] agent: Endpoints down") // Stop all the checks a.checkLock.Lock() @@ -849,8 +1026,8 @@ func (a *Agent) Shutdown() error { chk.Stop() } - a.logger.Println("[INFO] agent: requesting shutdown") err := a.delegate.Shutdown() + a.logger.Print("[INFO] agent: delegate down") pidErr := a.deletePid() if pidErr != nil { diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 628ad403d..f67805a31 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -223,7 +223,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request // only warn because the write did succeed and anti-entropy will sync later. func (s *HTTPServer) syncChanges() { if err := s.agent.state.syncChanges(); err != nil { - s.logger.Printf("[ERR] agent: failed to sync changes: %v", err) + s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } @@ -654,7 +654,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( handler := &httpLogHandler{ filter: filter, logCh: make(chan string, 512), - logger: s.logger, + logger: s.agent.logger, } s.agent.logWriter.RegisterHandler(handler) defer s.agent.logWriter.DeregisterHandler(handler) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index a876c617d..825c55fc8 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -43,7 +43,6 @@ func makeReadOnlyAgentACL(t *testing.T, srv *HTTPServer) string { func TestAgent_Services(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() srv1 := &structs.NodeService{ @@ -71,7 +70,6 @@ func TestAgent_Services(t *testing.T) { func TestAgent_Services_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -102,7 +100,6 @@ func TestAgent_Services_ACLFilter(t *testing.T) { func TestAgent_Checks(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk1 := &structs.HealthCheck{ @@ -130,7 +127,6 @@ func TestAgent_Checks(t *testing.T) { func TestAgent_Checks_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk1 := &structs.HealthCheck{ @@ -174,7 +170,6 @@ func TestAgent_Self(t *testing.T) { conf.Meta = meta }) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() req, _ := http.NewRequest("GET", "/v1/agent/self", nil) @@ -216,7 +211,6 @@ func TestAgent_Self(t *testing.T) { func TestAgent_Self_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -285,7 +279,10 @@ func TestAgent_Reload(t *testing.T) { }() retry.Run(t, func(r *retry.R) { - if got, want := len(cmd.httpServers), 1; got != want { + if cmd == nil || cmd.agent == nil { + r.Fatal("waiting for agent") + } + if got, want := len(cmd.agent.httpServers), 1; got != want { r.Fatalf("got %d servers want %d", got, want) } }) @@ -299,7 +296,7 @@ func TestAgent_Reload(t *testing.T) { t.Fatalf("err: %v", err) } - srv := cmd.httpServers[0] + srv := cmd.agent.httpServers[0] req, _ := http.NewRequest("PUT", "/v1/agent/reload", nil) if _, err := srv.AgentReload(nil, req); err != nil { t.Fatalf("Err: %v", err) @@ -313,7 +310,6 @@ func TestAgent_Reload(t *testing.T) { func TestAgent_Reload_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -340,7 +336,6 @@ func TestAgent_Reload_ACLDeny(t *testing.T) { func TestAgent_Members(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() req, _ := http.NewRequest("GET", "/v1/agent/members", nil) @@ -361,7 +356,6 @@ func TestAgent_Members(t *testing.T) { func TestAgent_Members_WAN(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() req, _ := http.NewRequest("GET", "/v1/agent/members?wan=true", nil) @@ -382,7 +376,6 @@ func TestAgent_Members_WAN(t *testing.T) { func TestAgent_Members_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -413,7 +406,6 @@ func TestAgent_Members_ACLFilter(t *testing.T) { func TestAgent_Join(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -444,7 +436,6 @@ func TestAgent_Join(t *testing.T) { func TestAgent_Join_WAN(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -475,7 +466,6 @@ func TestAgent_Join_WAN(t *testing.T) { func TestAgent_Join_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -510,7 +500,6 @@ func TestAgent_Join_ACLDeny(t *testing.T) { func TestAgent_Leave(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) { @@ -518,7 +507,7 @@ func TestAgent_Leave(t *testing.T) { c.Bootstrap = false }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() + defer srv2.agent.Shutdown() // Join first addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan) @@ -547,7 +536,6 @@ func TestAgent_Leave(t *testing.T) { func TestAgent_Leave_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -578,7 +566,6 @@ func TestAgent_Leave_ACLDeny(t *testing.T) { func TestAgent_ForceLeave(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -615,7 +602,6 @@ func TestAgent_ForceLeave(t *testing.T) { func TestAgent_ForceLeave_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -644,7 +630,6 @@ func TestAgent_ForceLeave_ACLDeny(t *testing.T) { func TestAgent_RegisterCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register node @@ -686,7 +671,6 @@ func TestAgent_RegisterCheck(t *testing.T) { func TestAgent_RegisterCheck_Passing(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register node @@ -723,7 +707,6 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) { func TestAgent_RegisterCheck_BadStatus(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register node @@ -745,7 +728,6 @@ func TestAgent_RegisterCheck_BadStatus(t *testing.T) { func TestAgent_RegisterCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &CheckDefinition{ @@ -771,7 +753,6 @@ func TestAgent_RegisterCheck_ACLDeny(t *testing.T) { func TestAgent_DeregisterCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -798,7 +779,6 @@ func TestAgent_DeregisterCheck(t *testing.T) { func TestAgent_DeregisterCheckACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -824,7 +804,6 @@ func TestAgent_DeregisterCheckACLDeny(t *testing.T) { func TestAgent_PassCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -852,7 +831,6 @@ func TestAgent_PassCheck(t *testing.T) { func TestAgent_PassCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -879,7 +857,6 @@ func TestAgent_PassCheck_ACLDeny(t *testing.T) { func TestAgent_WarnCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -907,7 +884,6 @@ func TestAgent_WarnCheck(t *testing.T) { func TestAgent_WarnCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -934,7 +910,6 @@ func TestAgent_WarnCheck_ACLDeny(t *testing.T) { func TestAgent_FailCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -962,7 +937,6 @@ func TestAgent_FailCheck(t *testing.T) { func TestAgent_FailCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -989,7 +963,6 @@ func TestAgent_FailCheck_ACLDeny(t *testing.T) { func TestAgent_UpdateCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -1089,7 +1062,6 @@ func TestAgent_UpdateCheck(t *testing.T) { func TestAgent_UpdateCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -1118,7 +1090,6 @@ func TestAgent_UpdateCheck_ACLDeny(t *testing.T) { func TestAgent_RegisterService(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &ServiceDefinition{ @@ -1171,7 +1142,6 @@ func TestAgent_RegisterService(t *testing.T) { func TestAgent_RegisterService_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &ServiceDefinition{ @@ -1209,7 +1179,6 @@ func TestAgent_RegisterService_ACLDeny(t *testing.T) { func TestAgent_RegisterService_InvalidAddress(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() for _, addr := range []string{"0.0.0.0", "::", "[::]"} { @@ -1238,7 +1207,6 @@ func TestAgent_RegisterService_InvalidAddress(t *testing.T) { func TestAgent_DeregisterService(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() service := &structs.NodeService{ @@ -1271,7 +1239,6 @@ func TestAgent_DeregisterService(t *testing.T) { func TestAgent_DeregisterService_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() service := &structs.NodeService{ @@ -1300,7 +1267,6 @@ func TestAgent_DeregisterService_ACLDeny(t *testing.T) { func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("not PUT", func(t *testing.T) { @@ -1351,7 +1317,6 @@ func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) { func TestAgent_ServiceMaintenance_Enable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register the service @@ -1394,7 +1359,6 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) { func TestAgent_ServiceMaintenance_Disable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register the service @@ -1431,7 +1395,6 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) { func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register the service. @@ -1461,7 +1424,6 @@ func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) { func TestAgent_NodeMaintenance_BadRequest(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Fails on non-PUT @@ -1488,7 +1450,6 @@ func TestAgent_NodeMaintenance_BadRequest(t *testing.T) { func TestAgent_NodeMaintenance_Enable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Force the node into maintenance mode @@ -1521,7 +1482,6 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) { func TestAgent_NodeMaintenance_Disable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Force the node into maintenance mode @@ -1546,7 +1506,6 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) { func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -1567,7 +1526,6 @@ func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) { func TestAgent_RegisterCheck_Service(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &ServiceDefinition{ @@ -1616,7 +1574,6 @@ func TestAgent_Monitor(t *testing.T) { dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Try passing an invalid log level @@ -1678,7 +1635,6 @@ func (r *closableRecorder) CloseNotify() <-chan bool { func TestAgent_Monitor_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Try without a token. diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index d0bc340d5..41556b8f9 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -17,7 +17,6 @@ import ( func TestCatalogRegister(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -53,7 +52,6 @@ func TestCatalogRegister(t *testing.T) { func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -81,7 +79,6 @@ func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { func TestCatalogDeregister(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -103,7 +100,6 @@ func TestCatalogDeregister(t *testing.T) { func TestCatalogDatacenters(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() retry.Run(t, func(r *retry.R) { @@ -122,7 +118,6 @@ func TestCatalogDatacenters(t *testing.T) { func TestCatalogNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -158,7 +153,6 @@ func TestCatalogNodes(t *testing.T) { func TestCatalogNodes_MetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -206,7 +200,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -217,7 +210,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") @@ -302,7 +294,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { func TestCatalogNodes_Blocking(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -358,7 +349,6 @@ func TestCatalogNodes_Blocking(t *testing.T) { func TestCatalogNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -445,7 +435,6 @@ func TestCatalogNodes_DistanceSort(t *testing.T) { func TestCatalogServices(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -483,7 +472,6 @@ func TestCatalogServices(t *testing.T) { func TestCatalogServices_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -527,7 +515,6 @@ func TestCatalogServices_NodeMetaFilter(t *testing.T) { func TestCatalogServiceNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -583,7 +570,6 @@ func TestCatalogServiceNodes(t *testing.T) { func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -646,7 +632,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -657,7 +642,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") @@ -734,7 +718,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) { func TestCatalogServiceNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -824,7 +807,6 @@ func TestCatalogServiceNodes_DistanceSort(t *testing.T) { func TestCatalogNodeServices(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -867,7 +849,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -878,7 +859,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") diff --git a/command/agent/command.go b/command/agent/command.go index 4bc415e2d..fa7ff547e 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -50,8 +50,6 @@ type Command struct { logFilter *logutils.LevelFilter logOutput io.Writer agent *Agent - httpServers []*HTTPServer - dnsServer *DNSServer } // readConfig is responsible for setup of our configuration using @@ -455,70 +453,6 @@ func (c *Command) readConfig() *Config { return config } -// setupAgent is used to start the agent and various interfaces -func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error { - c.UI.Output("Starting Consul agent...") - agent, err := Create(config, logOutput, logWriter, c.configReloadCh) - if err != nil { - c.UI.Error(fmt.Sprintf("Error starting agent: %s", err)) - return err - } - c.agent = agent - - if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 { - servers, err := NewHTTPServers(agent) - if err != nil { - agent.Shutdown() - c.UI.Error(fmt.Sprintf("Error starting http servers: %s", err)) - return err - } - c.httpServers = servers - } - - if config.Ports.DNS > 0 { - dnsAddr, err := config.ClientListener(config.Addresses.DNS, config.Ports.DNS) - if err != nil { - agent.Shutdown() - c.UI.Error(fmt.Sprintf("Invalid DNS bind address: %s", err)) - return err - } - - server, err := NewDNSServer(agent, &config.DNSConfig, logOutput, - config.Domain, dnsAddr.String(), config.DNSRecursors) - if err != nil { - agent.Shutdown() - c.UI.Error(fmt.Sprintf("Error starting dns server: %s", err)) - return err - } - c.dnsServer = server - } - - // Setup update checking - if !config.DisableUpdateCheck { - version := config.Version - if config.VersionPrerelease != "" { - version += fmt.Sprintf("-%s", config.VersionPrerelease) - } - updateParams := &checkpoint.CheckParams{ - Product: "consul", - Version: version, - } - if !config.DisableAnonymousSignature { - updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature") - } - - // Schedule a periodic check with expected interval of 24 hours - checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults) - - // Do an immediate check within the next 30 seconds - go func() { - time.Sleep(lib.RandomStagger(30 * time.Second)) - c.checkpointResults(checkpoint.Check(updateParams)) - }() - } - return nil -} - // checkpointResults is used to handler periodic results from our update checker func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) { if err != nil { @@ -806,16 +740,39 @@ func (c *Command) Run(args []string) int { } // Create the agent - if err := c.setupAgent(config, logOutput, logWriter); err != nil { + c.UI.Output("Starting Consul agent...") + agent, err := Create(config, logOutput, logWriter, c.configReloadCh) + if err != nil { + c.UI.Error(fmt.Sprintf("Error starting agent: %s", err)) return 1 } + c.agent = agent + + // Setup update checking + if !config.DisableUpdateCheck { + version := config.Version + if config.VersionPrerelease != "" { + version += fmt.Sprintf("-%s", config.VersionPrerelease) + } + updateParams := &checkpoint.CheckParams{ + Product: "consul", + Version: version, + } + if !config.DisableAnonymousSignature { + updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature") + } + + // Schedule a periodic check with expected interval of 24 hours + checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults) + + // Do an immediate check within the next 30 seconds + go func() { + time.Sleep(lib.RandomStagger(30 * time.Second)) + c.checkpointResults(checkpoint.Check(updateParams)) + }() + } + defer c.agent.Shutdown() - if c.dnsServer != nil { - defer c.dnsServer.Shutdown() - } - for _, server := range c.httpServers { - defer server.Shutdown() - } // Join startup nodes if specified if err := c.startupJoin(config); err != nil { @@ -831,7 +788,6 @@ func (c *Command) Run(args []string) int { // Get the new client http listener addr var httpAddr net.Addr - var err error if config.Ports.HTTP != -1 { httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) } else if config.Ports.HTTPS != -1 { diff --git a/command/agent/config.go b/command/agent/config.go index c49af33f0..07e67e142 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -1,6 +1,7 @@ package agent import ( + "crypto/tls" "encoding/base64" "encoding/json" "fmt" @@ -760,6 +761,47 @@ type Config struct { DeprecatedAtlasEndpoint string `mapstructure:"atlas_endpoint" json:"-"` } +// IncomingTLSConfig returns the TLS configuration for TLS +// connections to consul. +func (c *Config) IncomingTLSConfig() (*tls.Config, error) { + tc := &tlsutil.Config{ + VerifyIncoming: c.VerifyIncoming || c.VerifyIncomingHTTPS, + VerifyOutgoing: c.VerifyOutgoing, + CAFile: c.CAFile, + CAPath: c.CAPath, + CertFile: c.CertFile, + KeyFile: c.KeyFile, + NodeName: c.NodeName, + ServerName: c.ServerName, + TLSMinVersion: c.TLSMinVersion, + CipherSuites: c.TLSCipherSuites, + PreferServerCipherSuites: c.TLSPreferServerCipherSuites, + } + return tc.IncomingTLSConfig() +} + +// HTTPAddrs returns the bind addresses for the HTTP server and +// the application protocol which should be served, e.g. 'http' +// or 'https'. +func (c *Config) HTTPAddrs() (map[string][]net.Addr, error) { + m := map[string][]net.Addr{} + if c.Ports.HTTP > 0 { + a, err := c.ClientListener(c.Addresses.HTTP, c.Ports.HTTP) + if err != nil { + return nil, err + } + m["http"] = []net.Addr{a} + } + if c.Ports.HTTPS > 0 { + a, err := c.ClientListener(c.Addresses.HTTPS, c.Ports.HTTPS) + if err != nil { + return nil, err + } + m["https"] = []net.Addr{a} + } + return m, nil +} + // Bool is used to initialize bool pointers in struct literals. func Bool(b bool) *bool { return &b @@ -914,13 +956,10 @@ func (c *Config) EncryptBytes() ([]byte, error) { // ClientListener is used to format a listener for a // port on a ClientAddr func (c *Config) ClientListener(override string, port int) (net.Addr, error) { - var addr string + addr := c.ClientAddr if override != "" { addr = override - } else { - addr = c.ClientAddr } - if path := socketPath(addr); path != "" { return &net.UnixAddr{Name: path, Net: "unix"}, nil } diff --git a/command/agent/coordinate_endpoint_test.go b/command/agent/coordinate_endpoint_test.go index b24c1a1fc..64fafce66 100644 --- a/command/agent/coordinate_endpoint_test.go +++ b/command/agent/coordinate_endpoint_test.go @@ -15,7 +15,6 @@ import ( func TestCoordinate_Datacenters(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -39,7 +38,6 @@ func TestCoordinate_Datacenters(t *testing.T) { func TestCoordinate_Nodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") diff --git a/command/agent/dns_test.go b/command/agent/dns_test.go index 9e23586c8..21941fb12 100644 --- a/command/agent/dns_test.go +++ b/command/agent/dns_test.go @@ -34,35 +34,25 @@ func makeDNSServer(t *testing.T) (string, *DNSServer) { return makeDNSServerConfig(t, nil, nil) } -func makeDNSServerConfig( - t *testing.T, - agentFn func(c *Config), - dnsFn func(*DNSConfig)) (string, *DNSServer) { +func makeDNSServerConfig(t *testing.T, agentFn func(c *Config), dnsFn func(*DNSConfig)) (string, *DNSServer) { // Create the configs and apply the functions - agentConf := nextConfig() + c := nextConfig() if agentFn != nil { - agentFn(agentConf) + agentFn(c) } - dnsConf := &DefaultConfig().DNSConfig + c.DNSConfig = DefaultConfig().DNSConfig if dnsFn != nil { - dnsFn(dnsConf) + dnsFn(&c.DNSConfig) } // Add in the recursor if any - if r := agentConf.DNSRecursor; r != "" { - agentConf.DNSRecursors = append(agentConf.DNSRecursors, r) + if r := c.DNSRecursor; r != "" { + c.DNSRecursors = append(c.DNSRecursors, r) } // Start the server - addr, _ := agentConf.ClientListener(agentConf.Addresses.DNS, agentConf.Ports.DNS) - dir, agent := makeAgent(t, agentConf) - server, err := NewDNSServer(agent, dnsConf, agent.logOutput, - agentConf.Domain, addr.String(), agentConf.DNSRecursors) - if err != nil { - t.Fatalf("err: %v", err) - } - - return dir, server + dir, agent := makeAgent(t, c) + return dir, agent.dnsServers[0] } // makeRecursor creates a generic DNS server which always returns @@ -1283,7 +1273,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) { c.ACLDatacenter = "" }, nil) defer os.RemoveAll(dir1) - defer srv1.Shutdown() + defer srv1.agent.Shutdown() dir2, srv2 := makeDNSServerConfig(t, func(c *Config) { c.Datacenter = "dc2" @@ -1291,7 +1281,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) { c.ACLDatacenter = "" }, nil) defer os.RemoveAll(dir2) - defer srv2.Shutdown() + defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") @@ -2421,6 +2411,9 @@ func TestDNS_ServiceLookup_OnlyPassing(t *testing.T) { } // Only 1 is passing, so we should only get 1 answer + for _, a := range in.Answer { + fmt.Println(question, a) + } if len(in.Answer) != 1 { t.Fatalf("Bad: %#v", in) } @@ -3364,14 +3357,14 @@ func TestDNS_PreparedQuery_Failover(t *testing.T) { c.TranslateWanAddrs = true }, nil) defer os.RemoveAll(dir1) - defer srv1.Shutdown() + defer srv1.agent.Shutdown() dir2, srv2 := makeDNSServerConfig(t, func(c *Config) { c.Datacenter = "dc2" c.TranslateWanAddrs = true }, nil) defer os.RemoveAll(dir2) - defer srv2.Shutdown() + defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index ee8a7484d..c04e9c6a3 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -178,7 +178,6 @@ func TestEventList_Filter(t *testing.T) { func TestEventList_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Fire an event. diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index 13f165d65..e364b249b 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -98,7 +98,6 @@ func TestHealthChecksInState_NodeMetaFilter(t *testing.T) { func TestHealthChecksInState_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -175,7 +174,6 @@ func TestHealthChecksInState_DistanceSort(t *testing.T) { func TestHealthNodeChecks(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -212,7 +210,6 @@ func TestHealthNodeChecks(t *testing.T) { func TestHealthServiceChecks(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -266,7 +263,6 @@ func TestHealthServiceChecks(t *testing.T) { func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -321,7 +317,6 @@ func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) { func TestHealthServiceChecks_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -403,7 +398,6 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) { func TestHealthServiceNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -472,7 +466,6 @@ func TestHealthServiceNodes(t *testing.T) { func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -527,7 +520,6 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { func TestHealthServiceNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -609,7 +601,6 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) { func TestHealthServiceNodes_PassingFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -656,7 +647,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -667,7 +657,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") diff --git a/command/agent/http.go b/command/agent/http.go index e81bf7b7b..91d98bead 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -1,300 +1,161 @@ package agent import ( - "crypto/tls" "encoding/json" "fmt" - "log" - "net" "net/http" "net/http/pprof" "net/url" - "os" "strconv" "strings" "time" "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/tlsutil" "github.com/mitchellh/mapstructure" ) -// HTTPServer is used to wrap an Agent and expose various API's -// in a RESTful manner +// HTTPServer provides an HTTP api for an agent. type HTTPServer struct { - agent *Agent - mux *http.ServeMux - listener net.Listener - logger *log.Logger - addr string + *http.Server + agent *Agent } -// NewHTTPServers starts new HTTP servers to provide an interface to -// the agent. -func NewHTTPServers(agent *Agent) ([]*HTTPServer, error) { - config := agent.config - logOutput := agent.logOutput - - var servers []*HTTPServer - - if config.Ports.HTTPS > 0 { - httpAddr, err := config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS) - if err != nil { - return nil, err - } - - tlsConf := &tlsutil.Config{ - VerifyIncoming: config.VerifyIncoming || config.VerifyIncomingHTTPS, - VerifyOutgoing: config.VerifyOutgoing, - CAFile: config.CAFile, - CAPath: config.CAPath, - CertFile: config.CertFile, - KeyFile: config.KeyFile, - NodeName: config.NodeName, - ServerName: config.ServerName, - TLSMinVersion: config.TLSMinVersion, - CipherSuites: config.TLSCipherSuites, - PreferServerCipherSuites: config.TLSPreferServerCipherSuites, - } - - tlsConfig, err := tlsConf.IncomingTLSConfig() - if err != nil { - return nil, err - } - - ln, err := net.Listen(httpAddr.Network(), httpAddr.String()) - if err != nil { - return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err) - } - - list := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig) - - // Create the mux - mux := http.NewServeMux() - - // Create the server - srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: list, - logger: log.New(logOutput, "", log.LstdFlags), - addr: httpAddr.String(), - } - srv.registerHandlers(config.EnableDebug) - - // Start the server - go http.Serve(list, mux) - servers = append(servers, srv) - } - - if config.Ports.HTTP > 0 { - httpAddr, err := config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) - if err != nil { - return nil, fmt.Errorf("Failed to get ClientListener address:port: %v", err) - } - - // Error if we are trying to bind a domain socket to an existing path - path := socketPath(config.Addresses.HTTP) - if path != "" { - if _, err := os.Stat(path); !os.IsNotExist(err) { - agent.logger.Printf("[WARN] agent: Replacing socket %q", path) - } - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - return nil, fmt.Errorf("error removing socket file: %s", err) - } - } - - ln, err := net.Listen(httpAddr.Network(), httpAddr.String()) - if err != nil { - return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err) - } - - var list net.Listener - if path != "" { - // Set up ownership/permission bits on the socket file - if err := setFilePermissions(path, config.UnixSockets); err != nil { - return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err) - } - list = ln - } else { - list = tcpKeepAliveListener{ln.(*net.TCPListener)} - } - - // Create the mux - mux := http.NewServeMux() - - // Create the server - srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: list, - logger: log.New(logOutput, "", log.LstdFlags), - addr: httpAddr.String(), - } - srv.registerHandlers(config.EnableDebug) - - // Start the server - go http.Serve(list, mux) - servers = append(servers, srv) - } - - return servers, nil +func NewHTTPServer(addr string, a *Agent) *HTTPServer { + s := &HTTPServer{&http.Server{Addr: addr}, a} + s.Server.Handler = s.handler(s.agent.config.EnableDebug) + return s } -// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted -// connections. It's used by NewHttpServer so -// dead TCP connections eventually go away. -type tcpKeepAliveListener struct { - *net.TCPListener -} +// handler is used to attach our handlers to the mux +func (s *HTTPServer) handler(enableDebug bool) http.Handler { + mux := http.NewServeMux() -func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { - tc, err := ln.AcceptTCP() - if err != nil { - return - } - tc.SetKeepAlive(true) - tc.SetKeepAlivePeriod(30 * time.Second) - return tc, nil -} - -// Shutdown is used to shutdown the HTTP server -func (s *HTTPServer) Shutdown() { - if s != nil { - s.logger.Printf("[DEBUG] http: Shutting down http server (%v)", s.addr) - s.listener.Close() - } -} - -// handleFuncMetrics takes the given pattern and handler and wraps to produce -// metrics based on the pattern and request. -func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.ResponseWriter, *http.Request)) { - // Get the parts of the pattern. We omit any initial empty for the - // leading slash, and put an underscore as a "thing" placeholder if we - // see a trailing slash, which means the part after is parsed. This lets - // us distinguish from things like /v1/query and /v1/query/. - var parts []string - for i, part := range strings.Split(pattern, "/") { - if part == "" { - if i == 0 { - continue - } else { + // handleFuncMetrics takes the given pattern and handler and wraps to produce + // metrics based on the pattern and request. + handleFuncMetrics := func(pattern string, handler http.HandlerFunc) { + // Get the parts of the pattern. We omit any initial empty for the + // leading slash, and put an underscore as a "thing" placeholder if we + // see a trailing slash, which means the part after is parsed. This lets + // us distinguish from things like /v1/query and /v1/query/. + var parts []string + for i, part := range strings.Split(pattern, "/") { + if part == "" { + if i == 0 { + continue + } part = "_" } + parts = append(parts, part) } - parts = append(parts, part) + + // Register the wrapper, which will close over the expensive-to-compute + // parts from above. + wrapper := func(resp http.ResponseWriter, req *http.Request) { + start := time.Now() + handler(resp, req) + key := append([]string{"consul", "http", req.Method}, parts...) + metrics.MeasureSince(key, start) + } + mux.HandleFunc(pattern, wrapper) } - // Register the wrapper, which will close over the expensive-to-compute - // parts from above. - wrapper := func(resp http.ResponseWriter, req *http.Request) { - start := time.Now() - handler(resp, req) - - key := append([]string{"consul", "http", req.Method}, parts...) - metrics.MeasureSince(key, start) - } - s.mux.HandleFunc(pattern, wrapper) -} - -// registerHandlers is used to attach our handlers to the mux -func (s *HTTPServer) registerHandlers(enableDebug bool) { - s.mux.HandleFunc("/", s.Index) + mux.HandleFunc("/", s.Index) // API V1. if s.agent.config.ACLDatacenter != "" { - s.handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate)) - s.handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate)) - s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy)) - s.handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet)) - s.handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone)) - s.handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList)) - s.handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus)) + handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate)) + handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate)) + handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy)) + handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet)) + handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone)) + handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList)) + handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus)) } else { - s.handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled)) } - s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) - s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) - s.handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload)) - s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) - s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) - s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) - s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) - s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) - s.handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave)) - s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) - s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) - s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) - s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass)) - s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn)) - s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail)) - s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate)) - s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) - s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) - s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance)) - s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) - s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) - s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) - s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) - s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices)) - s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) - s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) + handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) + handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) + handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload)) + handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) + handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) + handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) + handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) + handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) + handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave)) + handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) + handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) + handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) + handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass)) + handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn)) + handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail)) + handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate)) + handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) + handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) + handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance)) + handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) + handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) + handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) + handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) + handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices)) + handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) + handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) if !s.agent.config.DisableCoordinates { - s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) - s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) + handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) + handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) } else { - s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) - s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) + handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) + handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) } - s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) - s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) - s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks)) - s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) - s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState)) - s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes)) - s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes)) - s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) - s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices)) - s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint)) - s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) - s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) - s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint)) - s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration)) - s.handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth)) - s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) - s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) - s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) - s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy)) - s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew)) - s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet)) - s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode)) - s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList)) - s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) - s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) - s.handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot)) - s.handleFuncMetrics("/v1/txn", s.wrap(s.Txn)) + handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) + handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) + handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks)) + handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) + handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState)) + handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes)) + handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes)) + handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) + handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices)) + handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint)) + handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) + handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) + handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint)) + handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration)) + handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth)) + handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) + handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) + handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) + handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy)) + handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew)) + handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet)) + handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode)) + handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList)) + handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) + handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) + handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot)) + handleFuncMetrics("/v1/txn", s.wrap(s.Txn)) // Debug endpoints. if enableDebug { - s.handleFuncMetrics("/debug/pprof/", pprof.Index) - s.handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline) - s.handleFuncMetrics("/debug/pprof/profile", pprof.Profile) - s.handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol) + handleFuncMetrics("/debug/pprof/", pprof.Index) + handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline) + handleFuncMetrics("/debug/pprof/profile", pprof.Profile) + handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol) } // Use the custom UI dir if provided. if s.agent.config.UIDir != "" { - s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir)))) + mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir)))) } else if s.agent.config.EnableUI { - s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS()))) + mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS()))) } + return mux } // wrap is used to wrap functions to make them more convenient @@ -306,7 +167,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque // Obfuscate any tokens from appearing in the logs formVals, err := url.ParseQuery(req.URL.RawQuery) if err != nil { - s.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr) + s.agent.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr) resp.WriteHeader(http.StatusInternalServerError) // 500 return } @@ -322,7 +183,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque } handleErr := func(err error) { - s.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr) + s.agent.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr) code := http.StatusInternalServerError // 500 errMsg := err.Error() if strings.Contains(errMsg, "Permission denied") || strings.Contains(errMsg, "ACL not found") { @@ -344,7 +205,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque // Invoke the handler start := time.Now() defer func() { - s.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr) + s.agent.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr) }() obj, err := handler(resp, req) if err != nil { diff --git a/command/agent/http_test.go b/command/agent/http_test.go index db4fa1068..d759fef7b 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -51,26 +51,13 @@ func makeHTTPServerWithACLs(t *testing.T) (string, *HTTPServer) { } func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) { - configTry := 0 -RECONF: - configTry++ conf := nextConfig() if cb != nil { cb(conf) } dir, agent := makeAgentLog(t, conf, l, logWriter) - servers, err := NewHTTPServers(agent) - if err != nil { - if configTry < 3 { - goto RECONF - } - t.Fatalf("err: %v", err) - } - if len(servers) == 0 { - t.Fatalf(fmt.Sprintf("Failed to make HTTP server")) - } - return dir, servers[0] + return dir, agent.httpServers[0] } func TestHTTPServer_UnixSocket(t *testing.T) { @@ -91,7 +78,6 @@ func TestHTTPServer_UnixSocket(t *testing.T) { c.UnixSockets.Perms = "0777" }) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Ensure the socket was created @@ -158,12 +144,9 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) { dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) + defer agent.Shutdown() - // Try to start the server with the same path anyways. - if _, err := NewHTTPServers(agent); err != nil { - t.Fatalf("err: %s", err) - } - + defer agent.Shutdown() // Ensure the file was replaced by the socket fi, err = os.Stat(socket) if err != nil { @@ -238,7 +221,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) { { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -260,7 +242,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) { dir, srv := makeHTTPServer(t) srv.agent.config.TranslateWanAddrs = true defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -286,7 +267,6 @@ func TestHTTPAPIResponseHeaders(t *testing.T) { } defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -313,7 +293,6 @@ func TestContentTypeIsJSON(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -336,12 +315,11 @@ func TestContentTypeIsJSON(t *testing.T) { func TestHTTP_wrap_obfuscateLog(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Attach a custom logger so we can inspect it buf := &bytes.Buffer{} - srv.logger = log.New(buf, "", log.LstdFlags) + srv.agent.logger = log.New(buf, "", log.LstdFlags) resp := httptest.NewRecorder() req, _ := http.NewRequest("GET", "/some/url?token=secret1&token=secret2", nil) @@ -367,7 +345,6 @@ func TestPrettyPrintBare(t *testing.T) { func testPrettyPrint(pretty string, t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() r := &structs.DirEntry{Key: "key"} @@ -396,7 +373,6 @@ func testPrettyPrint(pretty string, t *testing.T) { func TestParseSource(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Default is agent's DC and no node (since the user didn't care, then @@ -578,7 +554,7 @@ func TestEnableWebUI(t *testing.T) { req, _ := http.NewRequest("GET", "/ui/", nil) // Perform the request resp := httptest.NewRecorder() - s.mux.ServeHTTP(resp, req) + s.Handler.ServeHTTP(resp, req) // Check the result if resp.Code != 200 { @@ -626,7 +602,6 @@ func httpTest(t *testing.T, f func(srv *HTTPServer)) { func httpTestWithConfig(t *testing.T, f func(srv *HTTPServer), cb func(c *Config)) { dir, srv := makeHTTPServerWithConfig(t, cb) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") f(srv) diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go index d66acbd7f..7701006b2 100644 --- a/command/agent/kvs_endpoint_test.go +++ b/command/agent/kvs_endpoint_test.go @@ -16,7 +16,6 @@ import ( func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -78,7 +77,6 @@ func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { func TestKVSEndpoint_Recurse(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -157,7 +155,6 @@ func TestKVSEndpoint_Recurse(t *testing.T) { func TestKVSEndpoint_DELETE_CAS(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -226,7 +223,6 @@ func TestKVSEndpoint_DELETE_CAS(t *testing.T) { func TestKVSEndpoint_CAS(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -305,7 +301,6 @@ func TestKVSEndpoint_CAS(t *testing.T) { func TestKVSEndpoint_ListKeys(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") diff --git a/command/agent/listen.go b/command/agent/listen.go new file mode 100644 index 000000000..ebe984f2e --- /dev/null +++ b/command/agent/listen.go @@ -0,0 +1,61 @@ +package agent + +import ( + "crypto/tls" + "fmt" + "net" + "os" + "time" +) + +func ListenTCP(addr string) (net.Listener, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + l = tcpKeepAliveListener{l.(*net.TCPListener)} + return l, nil +} + +func ListenTLS(addr string, cfg *tls.Config) (net.Listener, error) { + l, err := ListenTCP(addr) + if err != nil { + return nil, err + } + return tls.NewListener(l, cfg), nil +} + +func ListenUnix(addr string, perm FilePermissions) (net.Listener, error) { + // todo(fs): move this somewhere else + // if _, err := os.Stat(addr); !os.IsNotExist(err) { + // s.agent.logger.Printf("[WARN] agent: Replacing socket %q", addr) + // } + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("error removing socket file: %s", err) + } + l, err := net.Listen("unix", addr) + if err != nil { + return nil, err + } + if err := setFilePermissions(addr, perm); err != nil { + return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err) + } + return l, nil +} + +// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted +// connections. It's used by NewHttpServer so +// dead TCP connections eventually go away. +type tcpKeepAliveListener struct { + *net.TCPListener +} + +func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(30 * time.Second) + return tc, nil +} diff --git a/command/agent/status_endpoint_test.go b/command/agent/status_endpoint_test.go index 74a4341fd..0e354d977 100644 --- a/command/agent/status_endpoint_test.go +++ b/command/agent/status_endpoint_test.go @@ -10,7 +10,6 @@ import ( func TestStatusLeader(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -28,7 +27,6 @@ func TestStatusLeader(t *testing.T) { func TestStatusPeers(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() obj, err := srv.StatusPeers(nil, nil) diff --git a/command/agent/ui_endpoint_test.go b/command/agent/ui_endpoint_test.go index 75fdd593c..83dd63746 100644 --- a/command/agent/ui_endpoint_test.go +++ b/command/agent/ui_endpoint_test.go @@ -29,7 +29,6 @@ func TestUiIndex(t *testing.T) { c.UIDir = uiDir }) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Create file @@ -41,7 +40,7 @@ func TestUiIndex(t *testing.T) { // Register node req, _ := http.NewRequest("GET", "/ui/my-file", nil) req.URL.Scheme = "http" - req.URL.Host = srv.listener.Addr().String() + req.URL.Host = srv.Addr // Make the request client := cleanhttp.DefaultClient() @@ -66,7 +65,6 @@ func TestUiIndex(t *testing.T) { func TestUiNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -106,7 +104,6 @@ func TestUiNodes(t *testing.T) { func TestUiNodeInfo(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") diff --git a/command/util_test.go b/command/util_test.go index b81282b46..4cace08a4 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -20,8 +20,6 @@ import ( "github.com/mitchellh/cli" ) -var offset uint64 - func init() { // Seed the random number generator rand.Seed(time.Now().UnixNano()) @@ -29,25 +27,23 @@ func init() { version.Version = "0.8.0" } -type agentWrapper struct { - dir string - config *agent.Config +type server struct { agent *agent.Agent - http *agent.HTTPServer + config *agent.Config httpAddr string + dir string } -func (a *agentWrapper) Shutdown() { +func (a *server) Shutdown() { a.agent.Shutdown() - a.http.Shutdown() os.RemoveAll(a.dir) } -func testAgent(t *testing.T) *agentWrapper { +func testAgent(t *testing.T) *server { return testAgentWithConfig(t, nil) } -func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) { +func testAgentWithAPIClient(t *testing.T) (*server, *api.Client) { agent := testAgentWithConfig(t, func(c *agent.Config) {}) client, err := api.NewClient(&api.Config{Address: agent.httpAddr}) if err != nil { @@ -56,71 +52,54 @@ func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) { return agent, client } -func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { +func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *server { return testAgentWithConfigReload(t, cb, nil) } -func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *agentWrapper { - lw := logger.NewLogWriter(512) +func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *server { conf := nextConfig() if cb != nil { cb(conf) } - dir := testutil.TempDir(t, "agent") - conf.DataDir = dir - - a, err := agent.Create(conf, lw, nil, reloadCh) + conf.DataDir = testutil.TempDir(t, "agent") + a, err := agent.Create(conf, logger.NewLogWriter(512), nil, reloadCh) if err != nil { - os.RemoveAll(dir) - t.Fatalf(fmt.Sprintf("err: %v", err)) + os.RemoveAll(conf.DataDir) + t.Fatalf("err: %v", err) } conf.Addresses.HTTP = "127.0.0.1" - httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP) - http, err := agent.NewHTTPServers(a) - if err != nil { - os.RemoveAll(dir) - t.Fatalf(fmt.Sprintf("err: %v", err)) - } - - if http == nil || len(http) == 0 { - os.RemoveAll(dir) - t.Fatalf(fmt.Sprintf("Could not create HTTP server to listen on: %s", httpAddr)) - } - - return &agentWrapper{ - dir: dir, - config: conf, - agent: a, - http: http[0], - httpAddr: httpAddr, - } + addr := fmt.Sprintf("%s:%d", conf.Addresses.HTTP, conf.Ports.HTTP) + return &server{agent: a, config: conf, httpAddr: addr, dir: conf.DataDir} } -func nextConfig() *agent.Config { - idx := int(atomic.AddUint64(&offset, 1)) - conf := agent.DefaultConfig() +var nextPort uint64 = 10000 +func nextConfig() *agent.Config { nodeID, err := uuid.GenerateUUID() if err != nil { panic(err) } + port := int(atomic.AddUint64(&nextPort, 10)) + + conf := agent.DefaultConfig() conf.Bootstrap = true conf.Datacenter = "dc1" - conf.NodeName = fmt.Sprintf("Node %d", idx) + conf.NodeName = fmt.Sprintf("Node %d", port) conf.NodeID = types.NodeID(nodeID) conf.BindAddr = "127.0.0.1" conf.Server = true - conf.Version = version.Version - - conf.Ports.HTTP = 10000 + 10*idx - conf.Ports.HTTPS = 10401 + 10*idx - conf.Ports.SerfLan = 10201 + 10*idx - conf.Ports.SerfWan = 10202 + 10*idx - conf.Ports.Server = 10300 + 10*idx + conf.Ports = agent.PortConfig{ + DNS: port + 1, + HTTP: port + 2, + HTTPS: port + 3, + SerfLan: port + 4, + SerfWan: port + 5, + Server: port + 6, + } cons := consul.DefaultConfig() conf.ConsulConfig = cons