From 44afb5c69906856a02b414258e359535876e1a19 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Wed, 18 Apr 2018 21:05:30 +0100 Subject: [PATCH] Agent Connect Proxy config endpoint with hash-based blocking --- agent/agent_endpoint.go | 119 +++++++++++++++++++++++++ agent/agent_endpoint_test.go | 168 ++++++++++++++++++++++++++++++++++- agent/local/state.go | 47 +++++----- agent/local/state_test.go | 72 +++++++++++---- agent/structs/connect.go | 25 ++++++ 5 files changed, 386 insertions(+), 45 deletions(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 43013785f..e7cec596a 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -7,6 +7,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/checks" @@ -26,6 +27,7 @@ import ( // NOTE(mitcehllh): This is temporary while certs are stubbed out. "github.com/mitchellh/go-testing-interface" + "github.com/mitchellh/hashstructure" ) type Self struct { @@ -896,6 +898,123 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http. return &reply, nil } +// GET /v1/agent/connect/proxy/:proxy_service_id +// +// Returns the local proxy config for the identified proxy. Requires token= +// param with the correct local ProxyToken (not ACL token). +func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Get the proxy ID. Note that this is the ID of a proxy's service instance. + id := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/proxy/") + + // Maybe block + var queryOpts structs.QueryOptions + if parseWait(resp, req, &queryOpts) { + // parseWait returns an error itself + return nil, nil + } + + // Parse hash specially since it's only this endpoint that uses it currently. + // Eventually this should happen in parseWait and end up in QueryOptions but I + // didn't want to make very general changes right away. + hash := req.URL.Query().Get("hash") + + return s.agentLocalBlockingQuery(hash, &queryOpts, + func(updateCh chan struct{}) (string, interface{}, error) { + // Retrieve the proxy specified + proxy := s.agent.State.Proxy(id) + if proxy == nil { + resp.WriteHeader(http.StatusNotFound) + fmt.Fprintf(resp, "unknown proxy service ID: %s", id) + return "", nil, nil + } + + // Lookup the target service as a convenience + target := s.agent.State.Service(proxy.Proxy.TargetServiceID) + if target == nil { + // Not found since this endpoint is only useful for agent-managed proxies so + // service missing means the service was deregistered racily with this call. + resp.WriteHeader(http.StatusNotFound) + fmt.Fprintf(resp, "unknown target service ID: %s", proxy.Proxy.TargetServiceID) + return "", nil, nil + } + + // Setup "watch" on the proxy being modified and respond on chan if it is. + go func() { + select { + case <-updateCh: + // blocking query timedout or was cancelled. Abort + return + case <-proxy.WatchCh: + // Proxy was updated or removed, report it + updateCh <- struct{}{} + } + }() + + hash, err := hashstructure.Hash(proxy.Proxy, nil) + if err != nil { + return "", nil, err + } + contentHash := fmt.Sprintf("%x", hash) + + reply := &structs.ConnectManageProxyResponse{ + ProxyServiceID: proxy.Proxy.ProxyService.ID, + TargetServiceID: target.ID, + TargetServiceName: target.Service, + ContentHash: contentHash, + ExecMode: proxy.Proxy.ExecMode.String(), + Command: proxy.Proxy.Command, + Config: proxy.Proxy.Config, + } + return contentHash, reply, nil + }) + return nil, nil +} + +type agentLocalBlockingFunc func(updateCh chan struct{}) (string, interface{}, error) + +func (s *HTTPServer) agentLocalBlockingQuery(hash string, + queryOpts *structs.QueryOptions, fn agentLocalBlockingFunc) (interface{}, error) { + + var timer *time.Timer + + if hash != "" { + // TODO(banks) at least define these defaults somewhere in a const. Would be + // nice not to duplicate the ones in consul/rpc.go too... + wait := queryOpts.MaxQueryTime + if wait == 0 { + wait = 5 * time.Minute + } + if wait > 10*time.Minute { + wait = 10 * time.Minute + } + // Apply a small amount of jitter to the request. + wait += lib.RandomStagger(wait / 16) + timer = time.NewTimer(wait) + } + + ch := make(chan struct{}) + + for { + curHash, curResp, err := fn(ch) + if err != nil { + return curResp, err + } + // Hash was passed and matches current one, wait for update or timeout. + if timer != nil && hash == curHash { + select { + case <-ch: + // Update happened, loop to fetch a new value + continue + case <-timer.C: + // Timeout, stop the watcher goroutine and return what we have + close(ch) + break + } + } + return curResp, err + } +} + // AgentConnectAuthorize // // POST /v1/agent/connect/authorize diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 9d8591126..4e73556ec 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" + "github.com/mitchellh/copystructure" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1428,9 +1429,9 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) { // Ensure proxy itself was registered proxy := a.State.Proxy("web-proxy") require.NotNil(proxy) - assert.Equal(structs.ProxyExecModeScript, proxy.ExecMode) - assert.Equal("proxy.sh", proxy.Command) - assert.Equal(args.Connect.Proxy.Config, proxy.Config) + assert.Equal(structs.ProxyExecModeScript, proxy.Proxy.ExecMode) + assert.Equal("proxy.sh", proxy.Proxy.Command) + assert.Equal(args.Connect.Proxy.Config, proxy.Proxy.Config) // Ensure the token was configured assert.Equal("abc123", a.State.ServiceToken("web")) @@ -2200,6 +2201,167 @@ func TestAgentConnectCALeafCert_good(t *testing.T) { // TODO(mitchellh): verify the private key matches the cert } +func TestAgentConnectProxy(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + // Define a local service with a managed proxy. It's registered in the test + // loop to make sure agent state is predictable whatever order tests execute + // since some alter this service config. + reg := &structs.ServiceDefinition{ + Name: "test", + Address: "127.0.0.1", + Port: 8000, + Check: structs.CheckType{ + TTL: 15 * time.Second, + }, + Connect: &structs.ServiceDefinitionConnect{ + Proxy: &structs.ServiceDefinitionConnectProxy{ + Config: map[string]interface{}{ + "bind_port": 1234, + "connect_timeout_ms": 500, + "upstreams": []map[string]interface{}{ + { + "destination_name": "db", + "local_port": 3131, + }, + }, + }, + }, + }, + } + + expectedResponse := &structs.ConnectManageProxyResponse{ + ProxyServiceID: "test-proxy", + TargetServiceID: "test", + TargetServiceName: "test", + ContentHash: "a15dccb216d38a6e", + ExecMode: "daemon", + Command: "", + Config: map[string]interface{}{ + "upstreams": []interface{}{ + map[string]interface{}{ + "destination_name": "db", + "local_port": float64(3131), + }, + }, + "bind_port": float64(1234), + "connect_timeout_ms": float64(500), + }, + } + + ur, err := copystructure.Copy(expectedResponse) + require.NoError(t, err) + updatedResponse := ur.(*structs.ConnectManageProxyResponse) + updatedResponse.ContentHash = "22bc9233a52c08fd" + upstreams := updatedResponse.Config["upstreams"].([]interface{}) + upstreams = append(upstreams, + map[string]interface{}{ + "destination_name": "cache", + "local_port": float64(4242), + }) + updatedResponse.Config["upstreams"] = upstreams + + tests := []struct { + name string + url string + updateFunc func() + wantWait time.Duration + wantCode int + wantErr bool + wantResp *structs.ConnectManageProxyResponse + }{ + { + name: "simple fetch", + url: "/v1/agent/connect/proxy/test-proxy", + wantCode: 200, + wantErr: false, + wantResp: expectedResponse, + }, + { + name: "blocking fetch timeout, no change", + url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e&wait=100ms", + wantWait: 100 * time.Millisecond, + wantCode: 200, + wantErr: false, + wantResp: expectedResponse, + }, + { + name: "blocking fetch old hash should return immediately", + url: "/v1/agent/connect/proxy/test-proxy?hash=123456789abcd&wait=10m", + wantCode: 200, + wantErr: false, + wantResp: expectedResponse, + }, + { + name: "blocking fetch returns change", + url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e", + updateFunc: func() { + time.Sleep(100 * time.Millisecond) + // Re-register with new proxy config + r2, err := copystructure.Copy(reg) + require.NoError(t, err) + reg2 := r2.(*structs.ServiceDefinition) + reg2.Connect.Proxy.Config = updatedResponse.Config + req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(r2)) + resp := httptest.NewRecorder() + _, err = a.srv.AgentRegisterService(resp, req) + require.NoError(t, err) + require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String()) + }, + wantWait: 100 * time.Millisecond, + wantCode: 200, + wantErr: false, + wantResp: updatedResponse, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + // Register the basic service to ensure it's in a known state to start. + { + req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg)) + resp := httptest.NewRecorder() + _, err := a.srv.AgentRegisterService(resp, req) + require.NoError(err) + require.Equal(200, resp.Code, "body: %s", resp.Body.String()) + } + + req, _ := http.NewRequest("GET", tt.url, nil) + resp := httptest.NewRecorder() + if tt.updateFunc != nil { + go tt.updateFunc() + } + start := time.Now() + obj, err := a.srv.AgentConnectProxyConfig(resp, req) + elapsed := time.Now().Sub(start) + + if tt.wantErr { + require.Error(err) + } else { + require.NoError(err) + } + if tt.wantCode != 0 { + require.Equal(tt.wantCode, resp.Code, "body: %s", resp.Body.String()) + } + if tt.wantWait != 0 { + assert.True(elapsed >= tt.wantWait, "should have waited at least %s, "+ + "took %s", tt.wantWait, elapsed) + } else { + assert.True(elapsed < 10*time.Millisecond, "should not have waited, "+ + "took %s", elapsed) + } + + assert.Equal(tt.wantResp, obj) + }) + } +} + func TestAgentConnectAuthorize_badBody(t *testing.T) { t.Parallel() diff --git a/agent/local/state.go b/agent/local/state.go index 47a006943..839b3cdb2 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -125,6 +125,10 @@ type ManagedProxy struct { // be exposed any other way. Unmanaged proxies will never see this and need to // use service-scoped ACL tokens distributed externally. ProxyToken string + + // WatchCh is a close-only chan that is closed when the proxy is removed or + // updated. + WatchCh chan struct{} } // State is used to represent the node's services, @@ -171,7 +175,7 @@ type State struct { // tokens contains the ACL tokens tokens *token.Store - // managedProxies is a map of all manged connect proxies registered locally on + // managedProxies is a map of all managed connect proxies registered locally on // this agent. This is NOT kept in sync with servers since it's agent-local // config only. Proxy instances have separate service registrations in the // services map above which are kept in sync via anti-entropy. Un-managed @@ -633,9 +637,17 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str proxy.ProxyService = svc // All set, add the proxy and return the service + if old, ok := l.managedProxies[svc.ID]; ok { + // Notify watchers of the existing proxy config that it's changing. Note + // this is safe here even before the map is updated since we still hold the + // state lock and the watcher can't re-read the new config until we return + // anyway. + close(old.WatchCh) + } l.managedProxies[svc.ID] = &ManagedProxy{ Proxy: proxy, ProxyToken: pToken, + WatchCh: make(chan struct{}), } // No need to trigger sync as proxy state is local only. @@ -653,49 +665,32 @@ func (l *State) RemoveProxy(id string) error { } delete(l.managedProxies, id) + // Notify watchers of the existing proxy config that it's changed. + close(p.WatchCh) + // No need to trigger sync as proxy state is local only. return nil } // Proxy returns the local proxy state. -func (l *State) Proxy(id string) *structs.ConnectManagedProxy { +func (l *State) Proxy(id string) *ManagedProxy { l.RLock() defer l.RUnlock() - - p := l.managedProxies[id] - if p == nil { - return nil - } - return p.Proxy + return l.managedProxies[id] } // Proxies returns the locally registered proxies. -func (l *State) Proxies() map[string]*structs.ConnectManagedProxy { +func (l *State) Proxies() map[string]*ManagedProxy { l.RLock() defer l.RUnlock() - m := make(map[string]*structs.ConnectManagedProxy) + m := make(map[string]*ManagedProxy) for id, p := range l.managedProxies { - m[id] = p.Proxy + m[id] = p } return m } -// ProxyToken returns the local proxy token for a given proxy. Note this is not -// an ACL token so it won't fallback to using the agent-configured default ACL -// token. If the proxy doesn't exist an error is returned, otherwise the token -// is guaranteed to exist. -func (l *State) ProxyToken(id string) (string, error) { - l.RLock() - defer l.RUnlock() - - p := l.managedProxies[id] - if p == nil { - return "", fmt.Errorf("proxy %s not registered", id) - } - return p.ProxyToken, nil -} - // Metadata returns the local node metadata fields that the // agent is aware of and are being kept in sync with the server func (l *State) Metadata() map[string]string { diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 6950cd477..a8890a540 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -6,6 +6,7 @@ import ( "log" "os" "reflect" + "sync" "testing" "time" @@ -1673,8 +1674,8 @@ func TestStateProxyManagement(t *testing.T) { t.Parallel() state := local.NewState(local.Config{ - ProxyPortRangeStart: 20000, - ProxyPortRangeEnd: 20002, + ProxyBindMinPort: 20000, + ProxyBindMaxPort: 20001, }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) // Stub state syncing @@ -1707,6 +1708,20 @@ func TestStateProxyManagement(t *testing.T) { }, "fake-token-db") require.NoError(err) + // Record initial local modify index + lastModifyIndex := state.LocalModifyIndex() + assertModIndexUpdate := func(id string) { + t.Helper() + nowIndex := state.LocalModifyIndex() + assert.True(lastModifyIndex < nowIndex) + if id != "" { + p := state.Proxy(id) + require.NotNil(p) + assert.True(lastModifyIndex < p.ModifyIndex) + } + lastModifyIndex = nowIndex + } + // Should work now svc, err := state.AddProxy(&p1, "fake-token") require.NoError(err) @@ -1718,6 +1733,7 @@ func TestStateProxyManagement(t *testing.T) { assert.Equal("", svc.Address, "should have empty address by default") // Port is non-deterministic but could be either of 20000 or 20001 assert.Contains([]int{20000, 20001}, svc.Port) + assertModIndexUpdate(svc.ID) // Second proxy should claim other port p2 := p1 @@ -1726,10 +1742,10 @@ func TestStateProxyManagement(t *testing.T) { require.NoError(err) assert.Contains([]int{20000, 20001}, svc2.Port) assert.NotEqual(svc.Port, svc2.Port) + assertModIndexUpdate(svc2.ID) - // Just saving this for later... - p2Token, err := state.ProxyToken(svc2.ID) - require.NoError(err) + // Store this for later + p2token := state.Proxy(svc2.ID).ProxyToken // Third proxy should fail as all ports are used p3 := p1 @@ -1746,6 +1762,32 @@ func TestStateProxyManagement(t *testing.T) { require.NoError(err) require.Equal("0.0.0.0", svc3.Address) require.Equal(1234, svc3.Port) + assertModIndexUpdate(svc3.ID) + + // Update config of an already registered proxy should work + p3updated := p3 + p3updated.Config["foo"] = "bar" + // Setup multiple watchers who should all witness the change + gotP3 := state.Proxy(svc3.ID) + require.NotNil(gotP3) + var watchWg sync.WaitGroup + for i := 0; i < 3; i++ { + watchWg.Add(1) + go func() { + <-gotP3.WatchCh + watchWg.Done() + }() + } + svc3, err = state.AddProxy(&p3updated, "fake-token") + require.NoError(err) + require.Equal("0.0.0.0", svc3.Address) + require.Equal(1234, svc3.Port) + gotProxy3 := state.Proxy(svc3.ID) + require.NotNil(gotProxy3) + require.Equal(p3updated.Config, gotProxy3.Proxy.Config) + assertModIndexUpdate(svc3.ID) // update must change mod index + // All watchers should have fired so this should not hang the test! + watchWg.Wait() // Remove one of the auto-assigned proxies err = state.RemoveProxy(svc2.ID) @@ -1758,31 +1800,29 @@ func TestStateProxyManagement(t *testing.T) { require.NoError(err) assert.Contains([]int{20000, 20001}, svc2.Port) assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed") + assertModIndexUpdate(svc4.ID) // Remove a proxy that doesn't exist should error err = state.RemoveProxy("nope") require.Error(err) - assert.Equal(&p4, state.Proxy(p4.ProxyService.ID), + assert.Equal(&p4, state.Proxy(p4.ProxyService.ID).Proxy, "should fetch the right proxy details") assert.Nil(state.Proxy("nope")) proxies := state.Proxies() assert.Len(proxies, 3) - assert.Equal(&p1, proxies[svc.ID]) - assert.Equal(&p4, proxies[svc4.ID]) - assert.Equal(&p3, proxies[svc3.ID]) + assert.Equal(&p1, proxies[svc.ID].Proxy) + assert.Equal(&p4, proxies[svc4.ID].Proxy) + assert.Equal(&p3, proxies[svc3.ID].Proxy) tokens := make([]string, 4) - tokens[0], err = state.ProxyToken(svc.ID) - require.NoError(err) + tokens[0] = state.Proxy(svc.ID).ProxyToken // p2 not registered anymore but lets make sure p4 got a new token when it // re-registered with same ID. - tokens[1] = p2Token - tokens[2], err = state.ProxyToken(svc3.ID) - require.NoError(err) - tokens[3], err = state.ProxyToken(svc4.ID) - require.NoError(err) + tokens[1] = p2token + tokens[2] = state.Proxy(svc2.ID).ProxyToken + tokens[3] = state.Proxy(svc3.ID).ProxyToken // Quick check all are distinct for i := 0; i < len(tokens)-1; i++ { diff --git a/agent/structs/connect.go b/agent/structs/connect.go index 6f11c5fe3..d879718b2 100644 --- a/agent/structs/connect.go +++ b/agent/structs/connect.go @@ -32,6 +32,18 @@ const ( ProxyExecModeScript ) +// String implements Stringer +func (m ProxyExecMode) String() string { + switch m { + case ProxyExecModeDaemon: + return "daemon" + case ProxyExecModeScript: + return "script" + default: + return "unknown" + } +} + // ConnectManagedProxy represents the agent-local state for a configured proxy // instance. This is never stored or sent to the servers and is only used to // store the config for the proxy that the agent needs to track. For now it's @@ -91,3 +103,16 @@ func (p *ConnectManagedProxy) ParseConfig() (*ConnectManagedProxyConfig, error) } return &cfg, nil } + +// ConnectManageProxyResponse is the public response object we return for +// queries on local proxy config state. It's similar to ConnectManagedProxy but +// with some fields re-arranged. +type ConnectManageProxyResponse struct { + ProxyServiceID string + TargetServiceID string + TargetServiceName string + ContentHash string + ExecMode string + Command string + Config map[string]interface{} +}