From 1d6e1ace116101c88b28ff18b17da8a8ab43d7ca Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 19 Jun 2018 12:11:42 +0100 Subject: [PATCH] register TCP check for managed proxies --- agent/agent.go | 69 +++++++++++++++++++++++++++++++++++++++-- agent/agent_endpoint.go | 32 +++---------------- agent/agent_test.go | 43 ++++++++++++++++++++++++- agent/proxy/daemon.go | 1 - 4 files changed, 113 insertions(+), 32 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index d3f672448..2d546c0f0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2123,8 +2123,23 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool, } proxyService := proxyState.Proxy.ProxyService - // TODO(banks): register proxy health checks. - err = a.AddService(proxyService, nil, persist, token) + // Register proxy TCP check. The built in proxy doesn't listen publically + // until it's loaded certs so this ensures we won't route traffic until it's + // ready. + proxyCfg, err := a.applyProxyConfigDefaults(proxyState.Proxy) + if err != nil { + return err + } + chkTypes := []*structs.CheckType{ + &structs.CheckType{ + Name: "Connect Proxy Listening", + TCP: fmt.Sprintf("%s:%d", proxyCfg["bind_address"], + proxyCfg["bind_port"]), + Interval: 10 * time.Second, + }, + } + + err = a.AddService(proxyService, chkTypes, persist, token) if err != nil { // Remove the state too a.State.RemoveProxy(proxyService.ID) @@ -2138,6 +2153,56 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool, return nil } +// applyProxyConfigDefaults takes a *structs.ConnectManagedProxy and returns +// it's Config map merged with any defaults from the Agent's config. It would be +// nicer if this were defined as a method on structs.ConnectManagedProxy but we +// can't do that because ot the import cycle it causes with agent/config. +func (a *Agent) applyProxyConfigDefaults(p *structs.ConnectManagedProxy) (map[string]interface{}, error) { + if p == nil || p.ProxyService == nil { + // Should never happen but protect from panic + return nil, fmt.Errorf("invalid proxy state") + } + + // Lookup the target service + target := a.State.Service(p.TargetServiceID) + if target == nil { + // Can happen during deregistration race between proxy and scheduler. + return nil, fmt.Errorf("unknown target service ID: %s", p.TargetServiceID) + } + + // Merge globals defaults + config := make(map[string]interface{}) + for k, v := range a.config.ConnectProxyDefaultConfig { + if _, ok := config[k]; !ok { + config[k] = v + } + } + + // Copy config from the proxy + for k, v := range p.Config { + config[k] = v + } + + // Set defaults for anything that is still not specified but required. + // Note that these are not included in the content hash. Since we expect + // them to be static in general but some like the default target service + // port might not be. In that edge case services can set that explicitly + // when they re-register which will be caught though. + if _, ok := config["bind_port"]; !ok { + config["bind_port"] = p.ProxyService.Port + } + if _, ok := config["bind_address"]; !ok { + // Default to binding to the same address the agent is configured to + // bind to. + config["bind_address"] = a.config.BindAddr.String() + } + if _, ok := config["local_service_address"]; !ok { + // Default to localhost and the port the service registered with + config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d", target.Port) + } + return config, nil +} + // applyProxyDefaults modifies the given proxy by applying any configured // defaults, such as the default execution mode, command, etc. func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error { diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 4b1c2ff00..4653c3ae2 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1033,34 +1033,10 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http } contentHash := fmt.Sprintf("%x", hash) - // Merge globals defaults - config := make(map[string]interface{}) - for k, v := range s.agent.config.ConnectProxyDefaultConfig { - if _, ok := config[k]; !ok { - config[k] = v - } - } - - // Set defaults for anything that is still not specified but required. - // Note that these are not included in the content hash. Since we expect - // them to be static in general but some like the default target service - // port might not be. In that edge case services can set that explicitly - // when they re-register which will be caught though. - for k, v := range proxy.Proxy.Config { - config[k] = v - } - if _, ok := config["bind_port"]; !ok { - config["bind_port"] = proxy.Proxy.ProxyService.Port - } - if _, ok := config["bind_address"]; !ok { - // Default to binding to the same address the agent is configured to - // bind to. - config["bind_address"] = s.agent.config.BindAddr.String() - } - if _, ok := config["local_service_address"]; !ok { - // Default to localhost and the port the service registered with - config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d", - target.Port) + // Set defaults + config, err := s.agent.applyProxyConfigDefaults(proxy.Proxy) + if err != nil { + return "", nil, err } // Only merge in telemetry config from agent if the requested is diff --git a/agent/agent_test.go b/agent/agent_test.go index 9a7c2a8de..005d91d70 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -2577,6 +2577,11 @@ func TestAgent_AddProxy(t *testing.T) { script_command = ["bar", "foo"] } } + + ports { + proxy_min_port = 20000 + proxy_max_port = 20000 + } `) defer a.Shutdown() @@ -2590,6 +2595,7 @@ func TestAgent_AddProxy(t *testing.T) { tests := []struct { desc string proxy, wantProxy *structs.ConnectManagedProxy + wantTCPCheck string wantErr bool }{ { @@ -2606,7 +2612,7 @@ func TestAgent_AddProxy(t *testing.T) { wantErr: true, }, { - desc: "basic proxy adding, unregistered service", + desc: "basic proxy adding, registered service", proxy: &structs.ConnectManagedProxy{ ExecMode: structs.ProxyExecModeDaemon, Command: []string{"consul", "connect", "proxy"}, @@ -2656,6 +2662,21 @@ func TestAgent_AddProxy(t *testing.T) { }, wantErr: false, }, + { + desc: "managed proxy with custom bind port", + proxy: &structs.ConnectManagedProxy{ + ExecMode: structs.ProxyExecModeDaemon, + Command: []string{"consul", "connect", "proxy"}, + Config: map[string]interface{}{ + "foo": "bar", + "bind_address": "127.10.10.10", + "bind_port": 1234, + }, + TargetServiceID: "web", + }, + wantTCPCheck: "127.10.10.10:1234", + wantErr: false, + }, } for _, tt := range tests { @@ -2677,6 +2698,26 @@ func TestAgent_AddProxy(t *testing.T) { } wantProxy.ProxyService = got.Proxy.ProxyService require.Equal(wantProxy, got.Proxy) + + // Ensure a TCP check was created for the service. + gotCheck := a.State.Check("service:web-proxy") + require.NotNil(gotCheck) + require.Equal("Connect Proxy Listening", gotCheck.Name) + + // Confusingly, a.State.Check("service:web-proxy") will return the state + // but it's Definition field will be empty. This appears to be expected + // when adding Checks as part of `AddService`. Notice how `AddService` + // tests in this file don't assert on that state but instead look at the + // agent's check state directly to ensure the right thing was registered. + // We'll do the same for now. + gotTCP, ok := a.checkTCPs["service:web-proxy"] + require.True(ok) + wantTCPCheck := tt.wantTCPCheck + if wantTCPCheck == "" { + wantTCPCheck = "127.0.0.1:20000" + } + require.Equal(wantTCPCheck, gotTCP.TCP) + require.Equal(10*time.Second, gotTCP.Interval) }) } } diff --git a/agent/proxy/daemon.go b/agent/proxy/daemon.go index c00b98136..8b37ffb19 100644 --- a/agent/proxy/daemon.go +++ b/agent/proxy/daemon.go @@ -360,7 +360,6 @@ func (p *Daemon) Close() error { // If we're already stopped or never started, then no problem. if p.stopped || p.process == nil { p.stopped = true - p.lock.Unlock() return nil }