Persist proxies from config files

Also change how loadProxies works. Now it will load all persisted proxies into a map, then when loading config file proxies will look up the previous proxy token in that map.
This commit is contained in:
Matt Keeler 2018-07-17 16:16:43 -04:00
parent 60dff7fc19
commit 953b72318f
3 changed files with 198 additions and 72 deletions

View File

@ -208,6 +208,9 @@ type Agent struct {
// proxyManager is the proxy process manager for managed Connect proxies. // proxyManager is the proxy process manager for managed Connect proxies.
proxyManager *proxy.Manager proxyManager *proxy.Manager
// proxyLock protects proxy information in the local state from concurrent modification
proxyLock sync.Mutex
} }
func New(c *config.RuntimeConfig) (*Agent, error) { func New(c *config.RuntimeConfig) (*Agent, error) {
@ -1616,16 +1619,21 @@ func (a *Agent) purgeService(serviceID string) error {
type persistedProxy struct { type persistedProxy struct {
ProxyToken string ProxyToken string
Proxy *structs.ConnectManagedProxy Proxy *structs.ConnectManagedProxy
// Set to true when the proxy information originated from the agents configuration
// as opposed to API registration.
FromFile bool
} }
// persistProxy saves a proxy definition to a JSON file in the data dir // persistProxy saves a proxy definition to a JSON file in the data dir
func (a *Agent) persistProxy(proxy *local.ManagedProxy) error { func (a *Agent) persistProxy(proxy *local.ManagedProxy, FromFile bool) error {
proxyPath := filepath.Join(a.config.DataDir, proxyDir, proxyPath := filepath.Join(a.config.DataDir, proxyDir,
stringHash(proxy.Proxy.ProxyService.ID)) stringHash(proxy.Proxy.ProxyService.ID))
wrapped := persistedProxy{ wrapped := persistedProxy{
ProxyToken: proxy.ProxyToken, ProxyToken: proxy.ProxyToken,
Proxy: proxy.Proxy, Proxy: proxy.Proxy,
FromFile: FromFile,
} }
encoded, err := json.Marshal(wrapped) encoded, err := json.Marshal(wrapped)
if err != nil { if err != nil {
@ -2076,7 +2084,9 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
return nil return nil
} }
// AddProxy adds a new local Connect Proxy instance to be managed by the agent. // addProxyLocked adds a new local Connect Proxy instance to be managed by the agent.
//
// This assumes that the agent's proxyLock is already held
// //
// It REQUIRES that the service that is being proxied is already present in the // It REQUIRES that the service that is being proxied is already present in the
// local state. Note that this is only used for agent-managed proxies so we can // local state. Note that this is only used for agent-managed proxies so we can
@ -2091,7 +2101,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
// definitions from disk; new proxies must leave it blank to get a new token // definitions from disk; new proxies must leave it blank to get a new token
// assigned. We need to restore from disk to enable to continue authenticating // assigned. We need to restore from disk to enable to continue authenticating
// running proxies that already had that credential injected. // running proxies that already had that credential injected.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool, func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
restoredProxyToken string) error { restoredProxyToken string) error {
// Lookup the target service token in state if there is one. // Lookup the target service token in state if there is one.
token := a.State.ServiceToken(proxy.TargetServiceID) token := a.State.ServiceToken(proxy.TargetServiceID)
@ -2143,11 +2153,33 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
// Persist the proxy // Persist the proxy
if persist && a.config.DataDir != "" { if persist && a.config.DataDir != "" {
return a.persistProxy(proxyState) return a.persistProxy(proxyState, FromFile)
} }
return nil return nil
} }
// addProxyLocked adds a new local Connect Proxy instance to be managed by the agent.
//
// It REQUIRES that the service that is being proxied is already present in the
// local state. Note that this is only used for agent-managed proxies so we can
// ensure that we always make this true. For externally managed and registered
// proxies we explicitly allow the proxy to be registered first to make
// bootstrap ordering of a new service simpler but the same is not true here
// since this is only ever called when setting up a _managed_ proxy which was
// registered as part of a service registration either from config or HTTP API
// call.
//
// The restoredProxyToken argument should only be used when restoring proxy
// definitions from disk; new proxies must leave it blank to get a new token
// assigned. We need to restore from disk to enable to continue authenticating
// running proxies that already had that credential injected.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
restoredProxyToken string) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken)
}
// resolveProxyCheckAddress returns the best address to use for a TCP check of // resolveProxyCheckAddress returns the best address to use for a TCP check of
// the proxy's public listener. It expects the input to already have default // the proxy's public listener. It expects the input to already have default
// values populated by applyProxyConfigDefaults. It may return an empty string // values populated by applyProxyConfigDefaults. It may return an empty string
@ -2290,8 +2322,10 @@ func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error {
return nil return nil
} }
// RemoveProxy stops and removes a local proxy instance. // removeProxyLocked stops and removes a local proxy instance.
func (a *Agent) RemoveProxy(proxyID string, persist bool) error { //
// It is assumed that this function is called while holding the proxyLock already
func (a *Agent) removeProxyLocked(proxyID string, persist bool) error {
// Validate proxyID // Validate proxyID
if proxyID == "" { if proxyID == "" {
return fmt.Errorf("proxyID missing") return fmt.Errorf("proxyID missing")
@ -2316,6 +2350,13 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
return nil return nil
} }
// RemoveProxy stops and removes a local proxy instance.
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
return a.removeProxyLocked(proxyID, persist)
}
// verifyProxyToken takes a token and attempts to verify it against the // verifyProxyToken takes a token and attempts to verify it against the
// targetService name. If targetProxy is specified, then the local proxy token // targetService name. If targetProxy is specified, then the local proxy token
// must exactly match the given proxy ID. cert, config, etc.). // must exactly match the given proxy ID. cert, config, etc.).
@ -2782,9 +2823,67 @@ func (a *Agent) unloadChecks() error {
return nil return nil
} }
// loadPersistedProxies will load connect proxy definitions from their
// persisted state on disk and return a slice of them
//
// This does not add them to the local
func (a *Agent) loadPersistedProxies() (map[string]persistedProxy, error) {
persistedProxies := make(map[string]persistedProxy)
proxyDir := filepath.Join(a.config.DataDir, proxyDir)
files, err := ioutil.ReadDir(proxyDir)
if err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err)
}
}
for _, fi := range files {
// Skip all dirs
if fi.IsDir() {
continue
}
// Skip all partially written temporary files
if strings.HasSuffix(fi.Name(), "tmp") {
return nil, fmt.Errorf("Ignoring temporary proxy file %v", fi.Name())
}
// Open the file for reading
file := filepath.Join(proxyDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return nil, fmt.Errorf("failed opening proxy file %q: %s", file, err)
}
// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
if err != nil {
return nil, fmt.Errorf("failed reading proxy file %q: %s", file, err)
}
// Try decoding the proxy definition
var p persistedProxy
if err := json.Unmarshal(buf, &p); err != nil {
return nil, fmt.Errorf("Failed decoding proxy file %q: %s", file, err)
}
svcID := p.Proxy.TargetServiceID
persistedProxies[svcID] = p
}
return persistedProxies, nil
}
// loadProxies will load connect proxy definitions from configuration and // loadProxies will load connect proxy definitions from configuration and
// persisted definitions on disk, and load them into the local agent. // persisted definitions on disk, and load them into the local agent.
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error { func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
persistedProxies, persistenceErr := a.loadPersistedProxies()
for _, svc := range conf.Services { for _, svc := range conf.Services {
if svc.Connect != nil { if svc.Connect != nil {
proxy, err := svc.ConnectManagedProxy() proxy, err := svc.ConnectManagedProxy()
@ -2794,78 +2893,46 @@ func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
if proxy == nil { if proxy == nil {
continue continue
} }
if err := a.AddProxy(proxy, false, ""); err != nil { restoredToken := ""
if persisted, ok := persistedProxies[proxy.TargetServiceID]; ok {
restoredToken = persisted.ProxyToken
}
if err := a.addProxyLocked(proxy, true, true, restoredToken); err != nil {
return fmt.Errorf("failed adding proxy: %s", err) return fmt.Errorf("failed adding proxy: %s", err)
} }
} }
} }
// Load any persisted proxies for _, persisted := range persistedProxies {
proxyDir := filepath.Join(a.config.DataDir, proxyDir) proxyID := persisted.Proxy.ProxyService.ID
files, err := ioutil.ReadDir(proxyDir) if persisted.FromFile && a.State.Proxy(proxyID) == nil {
if err != nil { // Purge proxies that were configured previously but are no longer in the config
if os.IsNotExist(err) { a.logger.Printf("[DEBUG] agent: purging stale persisted proxy %q", proxyID)
return nil
}
return fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err)
}
for _, fi := range files {
// Skip all dirs
if fi.IsDir() {
continue
}
// Skip all partially written temporary files
if strings.HasSuffix(fi.Name(), "tmp") {
a.logger.Printf("[WARN] agent: Ignoring temporary proxy file %v", fi.Name())
continue
}
// Open the file for reading
file := filepath.Join(proxyDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed opening proxy file %q: %s", file, err)
}
// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
if err != nil {
return fmt.Errorf("failed reading proxy file %q: %s", file, err)
}
// Try decoding the proxy definition
var p persistedProxy
if err := json.Unmarshal(buf, &p); err != nil {
a.logger.Printf("[ERR] agent: Failed decoding proxy file %q: %s", file, err)
continue
}
proxyID := p.Proxy.ProxyService.ID
if a.State.Proxy(proxyID) != nil {
// Purge previously persisted proxy. This allows config to be preferred
// over services persisted from the API.
a.logger.Printf("[DEBUG] agent: proxy %q exists, not restoring from %q",
proxyID, file)
if err := a.purgeProxy(proxyID); err != nil { if err := a.purgeProxy(proxyID); err != nil {
return fmt.Errorf("failed purging proxy %q: %s", proxyID, err) return fmt.Errorf("failed purging proxy %q: %v", proxyID, err)
}
} else if !persisted.FromFile {
if a.State.Proxy(proxyID) == nil {
a.logger.Printf("[DEBUG] agent: restored proxy definition %q", proxyID)
if err := a.addProxyLocked(persisted.Proxy, false, false, persisted.ProxyToken); err != nil {
return fmt.Errorf("failed adding proxy %q: %v", proxyID, err)
} }
} else { } else {
a.logger.Printf("[DEBUG] agent: restored proxy definition %q from %q", a.logger.Printf("[WARN] agent: proxy definition %q was overwritten by a proxy definition within a config file", proxyID)
proxyID, file)
if err := a.AddProxy(p.Proxy, false, p.ProxyToken); err != nil {
return fmt.Errorf("failed adding proxy %q: %s", proxyID, err)
} }
} }
} }
return nil
return persistenceErr
} }
// unloadProxies will deregister all proxies known to the local agent. // unloadProxies will deregister all proxies known to the local agent.
func (a *Agent) unloadProxies() error { func (a *Agent) unloadProxies() error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
for id := range a.State.Proxies() { for id := range a.State.Proxies() {
if err := a.RemoveProxy(id, false); err != nil { if err := a.removeProxyLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err) return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err)
} }
} }

View File

@ -637,7 +637,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
} }
// Add proxy (which will add proxy service so do it before we trigger sync) // Add proxy (which will add proxy service so do it before we trigger sync)
if proxy != nil { if proxy != nil {
if err := s.agent.AddProxy(proxy, true, ""); err != nil { if err := s.agent.AddProxy(proxy, true, false, ""); err != nil {
return nil, err return nil, err
} }
} }

View File

@ -16,6 +16,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
@ -1383,12 +1384,12 @@ func TestAgent_PersistProxy(t *testing.T) {
file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy")) file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy"))
// Proxy is not persisted unless requested // Proxy is not persisted unless requested
require.NoError(a.AddProxy(proxy, false, "")) require.NoError(a.AddProxy(proxy, false, false, ""))
_, err := os.Stat(file) _, err := os.Stat(file)
require.Error(err, "proxy should not be persisted") require.Error(err, "proxy should not be persisted")
// Proxy is persisted if requested // Proxy is persisted if requested
require.NoError(a.AddProxy(proxy, true, "")) require.NoError(a.AddProxy(proxy, true, false, ""))
_, err = os.Stat(file) _, err = os.Stat(file)
require.NoError(err, "proxy should be persisted") require.NoError(err, "proxy should be persisted")
@ -1404,7 +1405,7 @@ func TestAgent_PersistProxy(t *testing.T) {
proxy.Config = map[string]interface{}{ proxy.Config = map[string]interface{}{
"foo": "bar", "foo": "bar",
} }
require.NoError(a.AddProxy(proxy, true, "")) require.NoError(a.AddProxy(proxy, true, false, ""))
content, err = ioutil.ReadFile(file) content, err = ioutil.ReadFile(file)
require.NoError(err) require.NoError(err)
@ -1451,7 +1452,7 @@ func TestAgent_PurgeProxy(t *testing.T) {
Command: []string{"/bin/sleep", "3600"}, Command: []string{"/bin/sleep", "3600"},
} }
proxyID := "redis-proxy" proxyID := "redis-proxy"
require.NoError(a.AddProxy(proxy, true, "")) require.NoError(a.AddProxy(proxy, true, false, ""))
file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy")) file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy"))
@ -1461,7 +1462,7 @@ func TestAgent_PurgeProxy(t *testing.T) {
require.NoError(err, "should not be removed") require.NoError(err, "should not be removed")
// Re-add the proxy // Re-add the proxy
require.NoError(a.AddProxy(proxy, true, "")) require.NoError(a.AddProxy(proxy, true, false, ""))
// Removed // Removed
require.NoError(a.RemoveProxy(proxyID, true)) require.NoError(a.RemoveProxy(proxyID, true))
@ -1499,7 +1500,7 @@ func TestAgent_PurgeProxyOnDuplicate(t *testing.T) {
Command: []string{"/bin/sleep", "3600"}, Command: []string{"/bin/sleep", "3600"},
} }
proxyID := "redis-proxy" proxyID := "redis-proxy"
require.NoError(a.AddProxy(proxy, true, "")) require.NoError(a.AddProxy(proxy, true, false, ""))
a.Shutdown() a.Shutdown()
@ -2753,7 +2754,7 @@ func TestAgent_AddProxy(t *testing.T) {
} }
require.NoError(a.AddService(reg, nil, false, "")) require.NoError(a.AddService(reg, nil, false, ""))
err := a.AddProxy(tt.proxy, false, "") err := a.AddProxy(tt.proxy, false, false, "")
if tt.wantErr { if tt.wantErr {
require.Error(err) require.Error(err)
return return
@ -2813,7 +2814,7 @@ func TestAgent_RemoveProxy(t *testing.T) {
ExecMode: structs.ProxyExecModeDaemon, ExecMode: structs.ProxyExecModeDaemon,
Command: []string{"foo"}, Command: []string{"foo"},
} }
require.NoError(a.AddProxy(pReg, false, "")) require.NoError(a.AddProxy(pReg, false, false, ""))
// Test the ID was created as we expect. // Test the ID was created as we expect.
gotProxy := a.State.Proxy("web-proxy") gotProxy := a.State.Proxy("web-proxy")
@ -2830,3 +2831,61 @@ func TestAgent_RemoveProxy(t *testing.T) {
err = a.RemoveProxy("foobar", false) err = a.RemoveProxy("foobar", false)
require.Error(err) require.Error(err)
} }
func TestAgent_ReLoadProxiesFromConfig(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(),
`node_name = "node1"
`)
defer a.Shutdown()
require := require.New(t)
// Register a target service we can use
reg := &structs.NodeService{
Service: "web",
Port: 8080,
}
require.NoError(a.AddService(reg, nil, false, ""))
proxies := a.State.Proxies()
require.Len(proxies, 0)
config := config.RuntimeConfig{
Services: []*structs.ServiceDefinition{
&structs.ServiceDefinition{
Name: "web",
Connect: &structs.ServiceConnect{
Native: false,
Proxy: &structs.ServiceDefinitionConnectProxy{},
},
},
},
}
require.NoError(a.loadProxies(&config))
// ensure we loaded the proxy
proxies = a.State.Proxies()
require.Len(proxies, 1)
// store the auto-generated token
ptok := ""
pid := ""
for id := range proxies {
pid = id
ptok = proxies[id].ProxyToken
break
}
// reload the proxies and ensure the proxy token is the same
require.NoError(a.unloadProxies())
require.NoError(a.loadProxies(&config))
require.Len(proxies, 1)
require.Equal(ptok, proxies[pid].ProxyToken)
// make sure when the config goes away so does the proxy
require.NoError(a.unloadProxies())
// a.config contains no services or proxies
require.NoError(a.loadProxies(a.config))
require.Len(proxies, 0)
}