From 953b72318fc79d8e6a7d9da33c67c12b0139c95d Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Tue, 17 Jul 2018 16:16:43 -0400 Subject: [PATCH] Persist proxies from config files Also change how loadProxies works. Now it will load all persisted proxies into a map, then when loading config file proxies will look up the previous proxy token in that map. --- agent/agent.go | 193 +++++++++++++++++++++++++++------------- agent/agent_endpoint.go | 2 +- agent/agent_test.go | 75 ++++++++++++++-- 3 files changed, 198 insertions(+), 72 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index c77565840..906093379 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -208,6 +208,9 @@ type Agent struct { // proxyManager is the proxy process manager for managed Connect proxies. proxyManager *proxy.Manager + + // proxyLock protects proxy information in the local state from concurrent modification + proxyLock sync.Mutex } func New(c *config.RuntimeConfig) (*Agent, error) { @@ -1616,16 +1619,21 @@ func (a *Agent) purgeService(serviceID string) error { type persistedProxy struct { ProxyToken string Proxy *structs.ConnectManagedProxy + + // Set to true when the proxy information originated from the agents configuration + // as opposed to API registration. + FromFile bool } // persistProxy saves a proxy definition to a JSON file in the data dir -func (a *Agent) persistProxy(proxy *local.ManagedProxy) error { +func (a *Agent) persistProxy(proxy *local.ManagedProxy, FromFile bool) error { proxyPath := filepath.Join(a.config.DataDir, proxyDir, stringHash(proxy.Proxy.ProxyService.ID)) wrapped := persistedProxy{ ProxyToken: proxy.ProxyToken, Proxy: proxy.Proxy, + FromFile: FromFile, } encoded, err := json.Marshal(wrapped) if err != nil { @@ -2076,7 +2084,9 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { return nil } -// AddProxy adds a new local Connect Proxy instance to be managed by the agent. +// addProxyLocked adds a new local Connect Proxy instance to be managed by the agent. +// +// This assumes that the agent's proxyLock is already held // // It REQUIRES that the service that is being proxied is already present in the // local state. Note that this is only used for agent-managed proxies so we can @@ -2091,7 +2101,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { // definitions from disk; new proxies must leave it blank to get a new token // assigned. We need to restore from disk to enable to continue authenticating // running proxies that already had that credential injected. -func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool, +func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, FromFile bool, restoredProxyToken string) error { // Lookup the target service token in state if there is one. token := a.State.ServiceToken(proxy.TargetServiceID) @@ -2143,11 +2153,33 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool, // Persist the proxy if persist && a.config.DataDir != "" { - return a.persistProxy(proxyState) + return a.persistProxy(proxyState, FromFile) } return nil } +// addProxyLocked adds a new local Connect Proxy instance to be managed by the agent. +// +// It REQUIRES that the service that is being proxied is already present in the +// local state. Note that this is only used for agent-managed proxies so we can +// ensure that we always make this true. For externally managed and registered +// proxies we explicitly allow the proxy to be registered first to make +// bootstrap ordering of a new service simpler but the same is not true here +// since this is only ever called when setting up a _managed_ proxy which was +// registered as part of a service registration either from config or HTTP API +// call. +// +// The restoredProxyToken argument should only be used when restoring proxy +// definitions from disk; new proxies must leave it blank to get a new token +// assigned. We need to restore from disk to enable to continue authenticating +// running proxies that already had that credential injected. +func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool, + restoredProxyToken string) error { + a.proxyLock.Lock() + defer a.proxyLock.Unlock() + return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken) +} + // resolveProxyCheckAddress returns the best address to use for a TCP check of // the proxy's public listener. It expects the input to already have default // values populated by applyProxyConfigDefaults. It may return an empty string @@ -2290,8 +2322,10 @@ func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error { return nil } -// RemoveProxy stops and removes a local proxy instance. -func (a *Agent) RemoveProxy(proxyID string, persist bool) error { +// removeProxyLocked stops and removes a local proxy instance. +// +// It is assumed that this function is called while holding the proxyLock already +func (a *Agent) removeProxyLocked(proxyID string, persist bool) error { // Validate proxyID if proxyID == "" { return fmt.Errorf("proxyID missing") @@ -2316,6 +2350,13 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error { return nil } +// RemoveProxy stops and removes a local proxy instance. +func (a *Agent) RemoveProxy(proxyID string, persist bool) error { + a.proxyLock.Lock() + defer a.proxyLock.Unlock() + return a.removeProxyLocked(proxyID, persist) +} + // verifyProxyToken takes a token and attempts to verify it against the // targetService name. If targetProxy is specified, then the local proxy token // must exactly match the given proxy ID. cert, config, etc.). @@ -2782,9 +2823,67 @@ func (a *Agent) unloadChecks() error { return nil } +// loadPersistedProxies will load connect proxy definitions from their +// persisted state on disk and return a slice of them +// +// This does not add them to the local +func (a *Agent) loadPersistedProxies() (map[string]persistedProxy, error) { + persistedProxies := make(map[string]persistedProxy) + + proxyDir := filepath.Join(a.config.DataDir, proxyDir) + files, err := ioutil.ReadDir(proxyDir) + if err != nil { + if !os.IsNotExist(err) { + return nil, fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err) + } + } + + for _, fi := range files { + // Skip all dirs + if fi.IsDir() { + continue + } + + // Skip all partially written temporary files + if strings.HasSuffix(fi.Name(), "tmp") { + return nil, fmt.Errorf("Ignoring temporary proxy file %v", fi.Name()) + } + + // Open the file for reading + file := filepath.Join(proxyDir, fi.Name()) + fh, err := os.Open(file) + if err != nil { + return nil, fmt.Errorf("failed opening proxy file %q: %s", file, err) + } + + // Read the contents into a buffer + buf, err := ioutil.ReadAll(fh) + fh.Close() + if err != nil { + return nil, fmt.Errorf("failed reading proxy file %q: %s", file, err) + } + + // Try decoding the proxy definition + var p persistedProxy + if err := json.Unmarshal(buf, &p); err != nil { + return nil, fmt.Errorf("Failed decoding proxy file %q: %s", file, err) + } + svcID := p.Proxy.TargetServiceID + + persistedProxies[svcID] = p + } + + return persistedProxies, nil +} + // loadProxies will load connect proxy definitions from configuration and // persisted definitions on disk, and load them into the local agent. func (a *Agent) loadProxies(conf *config.RuntimeConfig) error { + a.proxyLock.Lock() + defer a.proxyLock.Unlock() + + persistedProxies, persistenceErr := a.loadPersistedProxies() + for _, svc := range conf.Services { if svc.Connect != nil { proxy, err := svc.ConnectManagedProxy() @@ -2794,78 +2893,46 @@ func (a *Agent) loadProxies(conf *config.RuntimeConfig) error { if proxy == nil { continue } - if err := a.AddProxy(proxy, false, ""); err != nil { + restoredToken := "" + if persisted, ok := persistedProxies[proxy.TargetServiceID]; ok { + restoredToken = persisted.ProxyToken + } + + if err := a.addProxyLocked(proxy, true, true, restoredToken); err != nil { return fmt.Errorf("failed adding proxy: %s", err) } } } - // Load any persisted proxies - proxyDir := filepath.Join(a.config.DataDir, proxyDir) - files, err := ioutil.ReadDir(proxyDir) - if err != nil { - if os.IsNotExist(err) { - return nil - } - return fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err) - } - for _, fi := range files { - // Skip all dirs - if fi.IsDir() { - continue - } - - // Skip all partially written temporary files - if strings.HasSuffix(fi.Name(), "tmp") { - a.logger.Printf("[WARN] agent: Ignoring temporary proxy file %v", fi.Name()) - continue - } - - // Open the file for reading - file := filepath.Join(proxyDir, fi.Name()) - fh, err := os.Open(file) - if err != nil { - return fmt.Errorf("failed opening proxy file %q: %s", file, err) - } - - // Read the contents into a buffer - buf, err := ioutil.ReadAll(fh) - fh.Close() - if err != nil { - return fmt.Errorf("failed reading proxy file %q: %s", file, err) - } - - // Try decoding the proxy definition - var p persistedProxy - if err := json.Unmarshal(buf, &p); err != nil { - a.logger.Printf("[ERR] agent: Failed decoding proxy file %q: %s", file, err) - continue - } - proxyID := p.Proxy.ProxyService.ID - - if a.State.Proxy(proxyID) != nil { - // Purge previously persisted proxy. This allows config to be preferred - // over services persisted from the API. - a.logger.Printf("[DEBUG] agent: proxy %q exists, not restoring from %q", - proxyID, file) + for _, persisted := range persistedProxies { + proxyID := persisted.Proxy.ProxyService.ID + if persisted.FromFile && a.State.Proxy(proxyID) == nil { + // Purge proxies that were configured previously but are no longer in the config + a.logger.Printf("[DEBUG] agent: purging stale persisted proxy %q", proxyID) if err := a.purgeProxy(proxyID); err != nil { - return fmt.Errorf("failed purging proxy %q: %s", proxyID, err) + return fmt.Errorf("failed purging proxy %q: %v", proxyID, err) } - } else { - a.logger.Printf("[DEBUG] agent: restored proxy definition %q from %q", - proxyID, file) - if err := a.AddProxy(p.Proxy, false, p.ProxyToken); err != nil { - return fmt.Errorf("failed adding proxy %q: %s", proxyID, err) + } else if !persisted.FromFile { + if a.State.Proxy(proxyID) == nil { + a.logger.Printf("[DEBUG] agent: restored proxy definition %q", proxyID) + if err := a.addProxyLocked(persisted.Proxy, false, false, persisted.ProxyToken); err != nil { + return fmt.Errorf("failed adding proxy %q: %v", proxyID, err) + } + } else { + a.logger.Printf("[WARN] agent: proxy definition %q was overwritten by a proxy definition within a config file", proxyID) } } } - return nil + + return persistenceErr } // unloadProxies will deregister all proxies known to the local agent. func (a *Agent) unloadProxies() error { + a.proxyLock.Lock() + defer a.proxyLock.Unlock() for id := range a.State.Proxies() { - if err := a.RemoveProxy(id, false); err != nil { + if err := a.removeProxyLocked(id, false); err != nil { return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err) } } diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 4653c3ae2..4c62354c1 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -637,7 +637,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re } // Add proxy (which will add proxy service so do it before we trigger sync) if proxy != nil { - if err := s.agent.AddProxy(proxy, true, ""); err != nil { + if err := s.agent.AddProxy(proxy, true, false, ""); err != nil { return nil, err } } diff --git a/agent/agent_test.go b/agent/agent_test.go index f22fc88ff..d7a77943b 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/hashicorp/consul/agent/checks" + "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" @@ -1383,12 +1384,12 @@ func TestAgent_PersistProxy(t *testing.T) { file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy")) // Proxy is not persisted unless requested - require.NoError(a.AddProxy(proxy, false, "")) + require.NoError(a.AddProxy(proxy, false, false, "")) _, err := os.Stat(file) require.Error(err, "proxy should not be persisted") // Proxy is persisted if requested - require.NoError(a.AddProxy(proxy, true, "")) + require.NoError(a.AddProxy(proxy, true, false, "")) _, err = os.Stat(file) require.NoError(err, "proxy should be persisted") @@ -1404,7 +1405,7 @@ func TestAgent_PersistProxy(t *testing.T) { proxy.Config = map[string]interface{}{ "foo": "bar", } - require.NoError(a.AddProxy(proxy, true, "")) + require.NoError(a.AddProxy(proxy, true, false, "")) content, err = ioutil.ReadFile(file) require.NoError(err) @@ -1451,7 +1452,7 @@ func TestAgent_PurgeProxy(t *testing.T) { Command: []string{"/bin/sleep", "3600"}, } proxyID := "redis-proxy" - require.NoError(a.AddProxy(proxy, true, "")) + require.NoError(a.AddProxy(proxy, true, false, "")) file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy")) @@ -1461,7 +1462,7 @@ func TestAgent_PurgeProxy(t *testing.T) { require.NoError(err, "should not be removed") // Re-add the proxy - require.NoError(a.AddProxy(proxy, true, "")) + require.NoError(a.AddProxy(proxy, true, false, "")) // Removed require.NoError(a.RemoveProxy(proxyID, true)) @@ -1499,7 +1500,7 @@ func TestAgent_PurgeProxyOnDuplicate(t *testing.T) { Command: []string{"/bin/sleep", "3600"}, } proxyID := "redis-proxy" - require.NoError(a.AddProxy(proxy, true, "")) + require.NoError(a.AddProxy(proxy, true, false, "")) a.Shutdown() @@ -2753,7 +2754,7 @@ func TestAgent_AddProxy(t *testing.T) { } require.NoError(a.AddService(reg, nil, false, "")) - err := a.AddProxy(tt.proxy, false, "") + err := a.AddProxy(tt.proxy, false, false, "") if tt.wantErr { require.Error(err) return @@ -2813,7 +2814,7 @@ func TestAgent_RemoveProxy(t *testing.T) { ExecMode: structs.ProxyExecModeDaemon, Command: []string{"foo"}, } - require.NoError(a.AddProxy(pReg, false, "")) + require.NoError(a.AddProxy(pReg, false, false, "")) // Test the ID was created as we expect. gotProxy := a.State.Proxy("web-proxy") @@ -2830,3 +2831,61 @@ func TestAgent_RemoveProxy(t *testing.T) { err = a.RemoveProxy("foobar", false) require.Error(err) } + +func TestAgent_ReLoadProxiesFromConfig(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), + `node_name = "node1" + `) + defer a.Shutdown() + require := require.New(t) + + // Register a target service we can use + reg := &structs.NodeService{ + Service: "web", + Port: 8080, + } + require.NoError(a.AddService(reg, nil, false, "")) + + proxies := a.State.Proxies() + require.Len(proxies, 0) + + config := config.RuntimeConfig{ + Services: []*structs.ServiceDefinition{ + &structs.ServiceDefinition{ + Name: "web", + Connect: &structs.ServiceConnect{ + Native: false, + Proxy: &structs.ServiceDefinitionConnectProxy{}, + }, + }, + }, + } + + require.NoError(a.loadProxies(&config)) + + // ensure we loaded the proxy + proxies = a.State.Proxies() + require.Len(proxies, 1) + + // store the auto-generated token + ptok := "" + pid := "" + for id := range proxies { + pid = id + ptok = proxies[id].ProxyToken + break + } + + // reload the proxies and ensure the proxy token is the same + require.NoError(a.unloadProxies()) + require.NoError(a.loadProxies(&config)) + require.Len(proxies, 1) + require.Equal(ptok, proxies[pid].ProxyToken) + + // make sure when the config goes away so does the proxy + require.NoError(a.unloadProxies()) + // a.config contains no services or proxies + require.NoError(a.loadProxies(a.config)) + require.Len(proxies, 0) +}