register TCP check for managed proxies

This commit is contained in:
Paul Banks 2018-06-19 12:11:42 +01:00 committed by Jack Pearkes
parent d1810ba338
commit 1d6e1ace11
4 changed files with 113 additions and 32 deletions

View File

@ -2123,8 +2123,23 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
}
proxyService := proxyState.Proxy.ProxyService
// TODO(banks): register proxy health checks.
err = a.AddService(proxyService, nil, persist, token)
// Register proxy TCP check. The built in proxy doesn't listen publically
// until it's loaded certs so this ensures we won't route traffic until it's
// ready.
proxyCfg, err := a.applyProxyConfigDefaults(proxyState.Proxy)
if err != nil {
return err
}
chkTypes := []*structs.CheckType{
&structs.CheckType{
Name: "Connect Proxy Listening",
TCP: fmt.Sprintf("%s:%d", proxyCfg["bind_address"],
proxyCfg["bind_port"]),
Interval: 10 * time.Second,
},
}
err = a.AddService(proxyService, chkTypes, persist, token)
if err != nil {
// Remove the state too
a.State.RemoveProxy(proxyService.ID)
@ -2138,6 +2153,56 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
return nil
}
// applyProxyConfigDefaults takes a *structs.ConnectManagedProxy and returns
// it's Config map merged with any defaults from the Agent's config. It would be
// nicer if this were defined as a method on structs.ConnectManagedProxy but we
// can't do that because ot the import cycle it causes with agent/config.
func (a *Agent) applyProxyConfigDefaults(p *structs.ConnectManagedProxy) (map[string]interface{}, error) {
if p == nil || p.ProxyService == nil {
// Should never happen but protect from panic
return nil, fmt.Errorf("invalid proxy state")
}
// Lookup the target service
target := a.State.Service(p.TargetServiceID)
if target == nil {
// Can happen during deregistration race between proxy and scheduler.
return nil, fmt.Errorf("unknown target service ID: %s", p.TargetServiceID)
}
// Merge globals defaults
config := make(map[string]interface{})
for k, v := range a.config.ConnectProxyDefaultConfig {
if _, ok := config[k]; !ok {
config[k] = v
}
}
// Copy config from the proxy
for k, v := range p.Config {
config[k] = v
}
// Set defaults for anything that is still not specified but required.
// Note that these are not included in the content hash. Since we expect
// them to be static in general but some like the default target service
// port might not be. In that edge case services can set that explicitly
// when they re-register which will be caught though.
if _, ok := config["bind_port"]; !ok {
config["bind_port"] = p.ProxyService.Port
}
if _, ok := config["bind_address"]; !ok {
// Default to binding to the same address the agent is configured to
// bind to.
config["bind_address"] = a.config.BindAddr.String()
}
if _, ok := config["local_service_address"]; !ok {
// Default to localhost and the port the service registered with
config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d", target.Port)
}
return config, nil
}
// applyProxyDefaults modifies the given proxy by applying any configured
// defaults, such as the default execution mode, command, etc.
func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error {

View File

@ -1033,34 +1033,10 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
}
contentHash := fmt.Sprintf("%x", hash)
// Merge globals defaults
config := make(map[string]interface{})
for k, v := range s.agent.config.ConnectProxyDefaultConfig {
if _, ok := config[k]; !ok {
config[k] = v
}
}
// Set defaults for anything that is still not specified but required.
// Note that these are not included in the content hash. Since we expect
// them to be static in general but some like the default target service
// port might not be. In that edge case services can set that explicitly
// when they re-register which will be caught though.
for k, v := range proxy.Proxy.Config {
config[k] = v
}
if _, ok := config["bind_port"]; !ok {
config["bind_port"] = proxy.Proxy.ProxyService.Port
}
if _, ok := config["bind_address"]; !ok {
// Default to binding to the same address the agent is configured to
// bind to.
config["bind_address"] = s.agent.config.BindAddr.String()
}
if _, ok := config["local_service_address"]; !ok {
// Default to localhost and the port the service registered with
config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d",
target.Port)
// Set defaults
config, err := s.agent.applyProxyConfigDefaults(proxy.Proxy)
if err != nil {
return "", nil, err
}
// Only merge in telemetry config from agent if the requested is

View File

@ -2577,6 +2577,11 @@ func TestAgent_AddProxy(t *testing.T) {
script_command = ["bar", "foo"]
}
}
ports {
proxy_min_port = 20000
proxy_max_port = 20000
}
`)
defer a.Shutdown()
@ -2590,6 +2595,7 @@ func TestAgent_AddProxy(t *testing.T) {
tests := []struct {
desc string
proxy, wantProxy *structs.ConnectManagedProxy
wantTCPCheck string
wantErr bool
}{
{
@ -2606,7 +2612,7 @@ func TestAgent_AddProxy(t *testing.T) {
wantErr: true,
},
{
desc: "basic proxy adding, unregistered service",
desc: "basic proxy adding, registered service",
proxy: &structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: []string{"consul", "connect", "proxy"},
@ -2656,6 +2662,21 @@ func TestAgent_AddProxy(t *testing.T) {
},
wantErr: false,
},
{
desc: "managed proxy with custom bind port",
proxy: &structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: []string{"consul", "connect", "proxy"},
Config: map[string]interface{}{
"foo": "bar",
"bind_address": "127.10.10.10",
"bind_port": 1234,
},
TargetServiceID: "web",
},
wantTCPCheck: "127.10.10.10:1234",
wantErr: false,
},
}
for _, tt := range tests {
@ -2677,6 +2698,26 @@ func TestAgent_AddProxy(t *testing.T) {
}
wantProxy.ProxyService = got.Proxy.ProxyService
require.Equal(wantProxy, got.Proxy)
// Ensure a TCP check was created for the service.
gotCheck := a.State.Check("service:web-proxy")
require.NotNil(gotCheck)
require.Equal("Connect Proxy Listening", gotCheck.Name)
// Confusingly, a.State.Check("service:web-proxy") will return the state
// but it's Definition field will be empty. This appears to be expected
// when adding Checks as part of `AddService`. Notice how `AddService`
// tests in this file don't assert on that state but instead look at the
// agent's check state directly to ensure the right thing was registered.
// We'll do the same for now.
gotTCP, ok := a.checkTCPs["service:web-proxy"]
require.True(ok)
wantTCPCheck := tt.wantTCPCheck
if wantTCPCheck == "" {
wantTCPCheck = "127.0.0.1:20000"
}
require.Equal(wantTCPCheck, gotTCP.TCP)
require.Equal(10*time.Second, gotTCP.Interval)
})
}
}

View File

@ -360,7 +360,6 @@ func (p *Daemon) Close() error {
// If we're already stopped or never started, then no problem.
if p.stopped || p.process == nil {
p.stopped = true
p.lock.Unlock()
return nil
}