From fae8dc895117522601202146c943b33da5a7400c Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Mon, 30 Apr 2018 21:12:55 -0700 Subject: [PATCH] agent/local: add Notify mechanism for proxy changes --- agent/local/proxy.go | 46 ------------------- agent/local/state.go | 96 +++++++++++++++++++++++++-------------- agent/local/state_test.go | 35 ++++++++++++++ 3 files changed, 98 insertions(+), 79 deletions(-) delete mode 100644 agent/local/proxy.go diff --git a/agent/local/proxy.go b/agent/local/proxy.go deleted file mode 100644 index 37484a32f..000000000 --- a/agent/local/proxy.go +++ /dev/null @@ -1,46 +0,0 @@ -package local - -import ( - "fmt" - "os/exec" - - "github.com/hashicorp/consul/agent/proxy" - "github.com/hashicorp/consul/agent/structs" -) - -// newProxyProcess returns the proxy.Proxy for the given ManagedProxy -// state entry. proxy.Proxy is the actual managed process. The returned value -// is the initialized struct but isn't explicitly started. -func (s *State) newProxyProcess(p *structs.ConnectManagedProxy, pToken string) (proxy.Proxy, error) { - switch p.ExecMode { - case structs.ProxyExecModeDaemon: - command := p.Command - if len(command) == 0 { - command = p.CommandDefault - } - - // This should never happen since validation should happen upstream - // but verify it because the alternative is to panic below. - if len(command) == 0 { - return nil, fmt.Errorf("daemon mode managed proxy requires command") - } - - // Build the command to execute. - var cmd exec.Cmd - cmd.Path = command[0] - cmd.Args = command[1:] - - // Build the daemon structure - return &proxy.Daemon{ - Command: &cmd, - ProxyToken: pToken, - Logger: s.logger, - }, nil - - case structs.ProxyExecModeScript: - return &proxy.Noop{}, nil - - default: - return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode) - } -} diff --git a/agent/local/state.go b/agent/local/state.go index ecd3299fd..03dbbd96c 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/go-uuid" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/proxy" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" @@ -127,9 +126,6 @@ type ManagedProxy struct { // use service-scoped ACL tokens distributed externally. ProxyToken string - // ProxyProcess is the managed proxy itself that is running. - ProxyProcess proxy.Proxy - // WatchCh is a close-only chan that is closed when the proxy is removed or // updated. WatchCh chan struct{} @@ -187,19 +183,24 @@ type State struct { // registration) do not appear here as the agent doesn't need to manage their // process nor config. The _do_ still exist in services above though as // services with Kind == connect-proxy. - managedProxies map[string]*ManagedProxy + // + // managedProxyHandlers is a map of registered channel listeners that + // are sent a message each time a proxy changes via Add or RemoveProxy. + managedProxies map[string]*ManagedProxy + managedProxyHandlers map[chan<- struct{}]struct{} } // NewState creates a new local state for the agent. func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { l := &State{ - config: c, - logger: lg, - services: make(map[string]*ServiceState), - checks: make(map[types.CheckID]*CheckState), - metadata: make(map[string]string), - tokens: tokens, - managedProxies: make(map[string]*ManagedProxy), + config: c, + logger: lg, + services: make(map[string]*ServiceState), + checks: make(map[types.CheckID]*CheckState), + metadata: make(map[string]string), + tokens: tokens, + managedProxies: make(map[string]*ManagedProxy), + managedProxyHandlers: make(map[chan<- struct{}]struct{}), } l.SetDiscardCheckOutput(c.DiscardCheckOutput) return l @@ -577,22 +578,22 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { // AddProxy returns the newly added proxy, any replaced proxy, and an error. // The second return value (replaced proxy) can be used to determine if // the process needs to be updated or not. -func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, *ManagedProxy, error) { +func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, error) { if proxy == nil { - return nil, nil, fmt.Errorf("no proxy") + return nil, fmt.Errorf("no proxy") } // Lookup the local service target := l.Service(proxy.TargetServiceID) if target == nil { - return nil, nil, fmt.Errorf("target service ID %s not registered", + return nil, fmt.Errorf("target service ID %s not registered", proxy.TargetServiceID) } // Get bind info from config cfg, err := proxy.ParseConfig() if err != nil { - return nil, nil, err + return nil, err } // Construct almost all of the NodeService that needs to be registered by the @@ -608,15 +609,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man pToken, err := uuid.GenerateUUID() if err != nil { - return nil, nil, err - } - - // Initialize the managed proxy process. This doesn't start anything, - // it only sets up the structures we'll use. To start the proxy, the - // caller should call Proxy and use the returned ManagedProxy instance. - proxyProcess, err := l.newProxyProcess(proxy, pToken) - if err != nil { - return nil, nil, err + return nil, err } // Lock now. We can't lock earlier as l.Service would deadlock and shouldn't @@ -650,7 +643,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man } // If no ports left (or auto ports disabled) fail if svc.Port < 1 { - return nil, nil, fmt.Errorf("no port provided for proxy bind_port and none "+ + return nil, fmt.Errorf("no port provided for proxy bind_port and none "+ " left in the allocated range [%d, %d]", l.config.ProxyBindMinPort, l.config.ProxyBindMaxPort) } @@ -658,8 +651,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man proxy.ProxyService = svc // All set, add the proxy and return the service - old, ok := l.managedProxies[svc.ID] - if ok { + if old, ok := l.managedProxies[svc.ID]; ok { // Notify watchers of the existing proxy config that it's changing. Note // this is safe here even before the map is updated since we still hold the // state lock and the watcher can't re-read the new config until we return @@ -667,14 +659,22 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man close(old.WatchCh) } l.managedProxies[svc.ID] = &ManagedProxy{ - Proxy: proxy, - ProxyToken: pToken, - ProxyProcess: proxyProcess, - WatchCh: make(chan struct{}), + Proxy: proxy, + ProxyToken: pToken, + WatchCh: make(chan struct{}), + } + + // Notify + for ch := range l.managedProxyHandlers { + // Do not block + select { + case ch <- struct{}{}: + default: + } } // No need to trigger sync as proxy state is local only. - return l.managedProxies[svc.ID], old, nil + return l.managedProxies[svc.ID], nil } // RemoveProxy is used to remove a proxy entry from the local state. @@ -692,6 +692,15 @@ func (l *State) RemoveProxy(id string) (*ManagedProxy, error) { // Notify watchers of the existing proxy config that it's changed. close(p.WatchCh) + // Notify + for ch := range l.managedProxyHandlers { + // Do not block + select { + case ch <- struct{}{}: + default: + } + } + // No need to trigger sync as proxy state is local only. return p, nil } @@ -715,6 +724,27 @@ func (l *State) Proxies() map[string]*ManagedProxy { return m } +// NotifyProxy will register a channel to receive messages when the +// configuration or set of proxies changes. This will not block on +// channel send so ensure the channel has a large enough buffer. +// +// NOTE(mitchellh): This could be more generalized but for my use case I +// only needed proxy events. In the future if it were to be generalized I +// would add a new Notify method and remove the proxy-specific ones. +func (l *State) NotifyProxy(ch chan<- struct{}) { + l.Lock() + defer l.Unlock() + l.managedProxyHandlers[ch] = struct{}{} +} + +// StopNotifyProxy will deregister a channel receiving proxy notifications. +// Pair this with all calls to NotifyProxy to clean up state. +func (l *State) StopNotifyProxy(ch chan<- struct{}) { + l.Lock() + defer l.Unlock() + delete(l.managedProxyHandlers, ch) +} + // Metadata returns the local node metadata fields that the // agent is aware of and are being kept in sync with the server func (l *State) Metadata() map[string]string { diff --git a/agent/local/state_test.go b/agent/local/state_test.go index f79249a73..800c017d6 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1737,6 +1737,13 @@ func TestStateProxyManagement(t *testing.T) { assert.Equal(svc.Port, svcDup.Port) } + // Let's register a notifier now + notifyCh := make(chan struct{}, 1) + state.NotifyProxy(notifyCh) + defer state.StopNotifyProxy(notifyCh) + assert.Empty(notifyCh) + drainCh(notifyCh) + // Second proxy should claim other port p2 := p1 p2.TargetServiceID = "cache" @@ -1746,6 +1753,10 @@ func TestStateProxyManagement(t *testing.T) { assert.Contains([]int{20000, 20001}, svc2.Port) assert.NotEqual(svc.Port, svc2.Port) + // Should have a notification + assert.NotEmpty(notifyCh) + drainCh(notifyCh) + // Store this for later p2token := state.Proxy(svc2.ID).ProxyToken @@ -1755,6 +1766,9 @@ func TestStateProxyManagement(t *testing.T) { _, err = state.AddProxy(&p3, "fake-token") require.Error(err) + // Should have a notification but we'll do nothing so that the next + // receive should block (we set cap == 1 above) + // But if we set a port explicitly it should be OK p3.Config = map[string]interface{}{ "bind_port": 1234, @@ -1766,6 +1780,10 @@ func TestStateProxyManagement(t *testing.T) { require.Equal("0.0.0.0", svc3.Address) require.Equal(1234, svc3.Port) + // Should have a notification + assert.NotEmpty(notifyCh) + drainCh(notifyCh) + // Update config of an already registered proxy should work p3updated := p3 p3updated.Config["foo"] = "bar" @@ -1785,10 +1803,16 @@ func TestStateProxyManagement(t *testing.T) { assert.False(ws.Watch(time.After(500*time.Millisecond)), "watch should have fired so ws.Watch should not timeout") + drainCh(notifyCh) + // Remove one of the auto-assigned proxies _, err = state.RemoveProxy(svc2.ID) require.NoError(err) + // Should have a notification + assert.NotEmpty(notifyCh) + drainCh(notifyCh) + // Should be able to create a new proxy for that service with the port (it // should have been "freed"). p4 := p2 @@ -1829,3 +1853,14 @@ func TestStateProxyManagement(t *testing.T) { } } } + +// drainCh drains a channel by reading messages until it would block. +func drainCh(ch chan struct{}) { + for { + select { + case <-ch: + default: + return + } + } +}