agent/local: add Notify mechanism for proxy changes

This commit is contained in:
Mitchell Hashimoto 2018-04-30 21:12:55 -07:00
parent f64a002f68
commit fae8dc8951
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
3 changed files with 98 additions and 79 deletions

View File

@ -1,46 +0,0 @@
package local
import (
"fmt"
"os/exec"
"github.com/hashicorp/consul/agent/proxy"
"github.com/hashicorp/consul/agent/structs"
)
// newProxyProcess returns the proxy.Proxy for the given ManagedProxy
// state entry. proxy.Proxy is the actual managed process. The returned value
// is the initialized struct but isn't explicitly started.
func (s *State) newProxyProcess(p *structs.ConnectManagedProxy, pToken string) (proxy.Proxy, error) {
switch p.ExecMode {
case structs.ProxyExecModeDaemon:
command := p.Command
if len(command) == 0 {
command = p.CommandDefault
}
// This should never happen since validation should happen upstream
// but verify it because the alternative is to panic below.
if len(command) == 0 {
return nil, fmt.Errorf("daemon mode managed proxy requires command")
}
// Build the command to execute.
var cmd exec.Cmd
cmd.Path = command[0]
cmd.Args = command[1:]
// Build the daemon structure
return &proxy.Daemon{
Command: &cmd,
ProxyToken: pToken,
Logger: s.logger,
}, nil
case structs.ProxyExecModeScript:
return &proxy.Noop{}, nil
default:
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
}
}

View File

@ -14,7 +14,6 @@ import (
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxy"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
@ -127,9 +126,6 @@ type ManagedProxy struct {
// use service-scoped ACL tokens distributed externally.
ProxyToken string
// ProxyProcess is the managed proxy itself that is running.
ProxyProcess proxy.Proxy
// WatchCh is a close-only chan that is closed when the proxy is removed or
// updated.
WatchCh chan struct{}
@ -187,19 +183,24 @@ type State struct {
// registration) do not appear here as the agent doesn't need to manage their
// process nor config. The _do_ still exist in services above though as
// services with Kind == connect-proxy.
managedProxies map[string]*ManagedProxy
//
// managedProxyHandlers is a map of registered channel listeners that
// are sent a message each time a proxy changes via Add or RemoveProxy.
managedProxies map[string]*ManagedProxy
managedProxyHandlers map[chan<- struct{}]struct{}
}
// NewState creates a new local state for the agent.
func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
l := &State{
config: c,
logger: lg,
services: make(map[string]*ServiceState),
checks: make(map[types.CheckID]*CheckState),
metadata: make(map[string]string),
tokens: tokens,
managedProxies: make(map[string]*ManagedProxy),
config: c,
logger: lg,
services: make(map[string]*ServiceState),
checks: make(map[types.CheckID]*CheckState),
metadata: make(map[string]string),
tokens: tokens,
managedProxies: make(map[string]*ManagedProxy),
managedProxyHandlers: make(map[chan<- struct{}]struct{}),
}
l.SetDiscardCheckOutput(c.DiscardCheckOutput)
return l
@ -577,22 +578,22 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
// AddProxy returns the newly added proxy, any replaced proxy, and an error.
// The second return value (replaced proxy) can be used to determine if
// the process needs to be updated or not.
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, *ManagedProxy, error) {
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, error) {
if proxy == nil {
return nil, nil, fmt.Errorf("no proxy")
return nil, fmt.Errorf("no proxy")
}
// Lookup the local service
target := l.Service(proxy.TargetServiceID)
if target == nil {
return nil, nil, fmt.Errorf("target service ID %s not registered",
return nil, fmt.Errorf("target service ID %s not registered",
proxy.TargetServiceID)
}
// Get bind info from config
cfg, err := proxy.ParseConfig()
if err != nil {
return nil, nil, err
return nil, err
}
// Construct almost all of the NodeService that needs to be registered by the
@ -608,15 +609,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
pToken, err := uuid.GenerateUUID()
if err != nil {
return nil, nil, err
}
// Initialize the managed proxy process. This doesn't start anything,
// it only sets up the structures we'll use. To start the proxy, the
// caller should call Proxy and use the returned ManagedProxy instance.
proxyProcess, err := l.newProxyProcess(proxy, pToken)
if err != nil {
return nil, nil, err
return nil, err
}
// Lock now. We can't lock earlier as l.Service would deadlock and shouldn't
@ -650,7 +643,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
}
// If no ports left (or auto ports disabled) fail
if svc.Port < 1 {
return nil, nil, fmt.Errorf("no port provided for proxy bind_port and none "+
return nil, fmt.Errorf("no port provided for proxy bind_port and none "+
" left in the allocated range [%d, %d]", l.config.ProxyBindMinPort,
l.config.ProxyBindMaxPort)
}
@ -658,8 +651,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
proxy.ProxyService = svc
// All set, add the proxy and return the service
old, ok := l.managedProxies[svc.ID]
if ok {
if old, ok := l.managedProxies[svc.ID]; ok {
// Notify watchers of the existing proxy config that it's changing. Note
// this is safe here even before the map is updated since we still hold the
// state lock and the watcher can't re-read the new config until we return
@ -667,14 +659,22 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
close(old.WatchCh)
}
l.managedProxies[svc.ID] = &ManagedProxy{
Proxy: proxy,
ProxyToken: pToken,
ProxyProcess: proxyProcess,
WatchCh: make(chan struct{}),
Proxy: proxy,
ProxyToken: pToken,
WatchCh: make(chan struct{}),
}
// Notify
for ch := range l.managedProxyHandlers {
// Do not block
select {
case ch <- struct{}{}:
default:
}
}
// No need to trigger sync as proxy state is local only.
return l.managedProxies[svc.ID], old, nil
return l.managedProxies[svc.ID], nil
}
// RemoveProxy is used to remove a proxy entry from the local state.
@ -692,6 +692,15 @@ func (l *State) RemoveProxy(id string) (*ManagedProxy, error) {
// Notify watchers of the existing proxy config that it's changed.
close(p.WatchCh)
// Notify
for ch := range l.managedProxyHandlers {
// Do not block
select {
case ch <- struct{}{}:
default:
}
}
// No need to trigger sync as proxy state is local only.
return p, nil
}
@ -715,6 +724,27 @@ func (l *State) Proxies() map[string]*ManagedProxy {
return m
}
// NotifyProxy will register a channel to receive messages when the
// configuration or set of proxies changes. This will not block on
// channel send so ensure the channel has a large enough buffer.
//
// NOTE(mitchellh): This could be more generalized but for my use case I
// only needed proxy events. In the future if it were to be generalized I
// would add a new Notify method and remove the proxy-specific ones.
func (l *State) NotifyProxy(ch chan<- struct{}) {
l.Lock()
defer l.Unlock()
l.managedProxyHandlers[ch] = struct{}{}
}
// StopNotifyProxy will deregister a channel receiving proxy notifications.
// Pair this with all calls to NotifyProxy to clean up state.
func (l *State) StopNotifyProxy(ch chan<- struct{}) {
l.Lock()
defer l.Unlock()
delete(l.managedProxyHandlers, ch)
}
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func (l *State) Metadata() map[string]string {

View File

@ -1737,6 +1737,13 @@ func TestStateProxyManagement(t *testing.T) {
assert.Equal(svc.Port, svcDup.Port)
}
// Let's register a notifier now
notifyCh := make(chan struct{}, 1)
state.NotifyProxy(notifyCh)
defer state.StopNotifyProxy(notifyCh)
assert.Empty(notifyCh)
drainCh(notifyCh)
// Second proxy should claim other port
p2 := p1
p2.TargetServiceID = "cache"
@ -1746,6 +1753,10 @@ func TestStateProxyManagement(t *testing.T) {
assert.Contains([]int{20000, 20001}, svc2.Port)
assert.NotEqual(svc.Port, svc2.Port)
// Should have a notification
assert.NotEmpty(notifyCh)
drainCh(notifyCh)
// Store this for later
p2token := state.Proxy(svc2.ID).ProxyToken
@ -1755,6 +1766,9 @@ func TestStateProxyManagement(t *testing.T) {
_, err = state.AddProxy(&p3, "fake-token")
require.Error(err)
// Should have a notification but we'll do nothing so that the next
// receive should block (we set cap == 1 above)
// But if we set a port explicitly it should be OK
p3.Config = map[string]interface{}{
"bind_port": 1234,
@ -1766,6 +1780,10 @@ func TestStateProxyManagement(t *testing.T) {
require.Equal("0.0.0.0", svc3.Address)
require.Equal(1234, svc3.Port)
// Should have a notification
assert.NotEmpty(notifyCh)
drainCh(notifyCh)
// Update config of an already registered proxy should work
p3updated := p3
p3updated.Config["foo"] = "bar"
@ -1785,10 +1803,16 @@ func TestStateProxyManagement(t *testing.T) {
assert.False(ws.Watch(time.After(500*time.Millisecond)),
"watch should have fired so ws.Watch should not timeout")
drainCh(notifyCh)
// Remove one of the auto-assigned proxies
_, err = state.RemoveProxy(svc2.ID)
require.NoError(err)
// Should have a notification
assert.NotEmpty(notifyCh)
drainCh(notifyCh)
// Should be able to create a new proxy for that service with the port (it
// should have been "freed").
p4 := p2
@ -1829,3 +1853,14 @@ func TestStateProxyManagement(t *testing.T) {
}
}
}
// drainCh drains a channel by reading messages until it would block.
func drainCh(ch chan struct{}) {
for {
select {
case <-ch:
default:
return
}
}
}