agent/proxy: Manager.Close also has to stop all proxy watchers
This commit is contained in:
parent
18e64dafbc
commit
56f5924f3e
|
@ -394,7 +394,7 @@ func (p *Daemon) Stop() error {
|
||||||
|
|
||||||
case <-time.After(gracefulWait):
|
case <-time.After(gracefulWait):
|
||||||
// Interrupt didn't work
|
// Interrupt didn't work
|
||||||
p.Logger.Printf("[DEBUG] agent/proxy: gracefull wait of %s passed, "+
|
p.Logger.Printf("[DEBUG] agent/proxy: graceful wait of %s passed, "+
|
||||||
"killing", gracefulWait)
|
"killing", gracefulWait)
|
||||||
}
|
}
|
||||||
} else if isProcessAlreadyFinishedErr(err) {
|
} else if isProcessAlreadyFinishedErr(err) {
|
||||||
|
@ -413,9 +413,9 @@ func (p *Daemon) Stop() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// stopKeepAlive is like Stop but keeps the process running. This is
|
// Close implements Proxy by stopping the run loop but not killing the process.
|
||||||
// used only for tests.
|
// One Close is called, Stop has no effect.
|
||||||
func (p *Daemon) stopKeepAlive() error {
|
func (p *Daemon) Close() error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -432,7 +432,7 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) {
|
||||||
snap := d.MarshalSnapshot()
|
snap := d.MarshalSnapshot()
|
||||||
|
|
||||||
// Stop the original daemon but keep it alive
|
// Stop the original daemon but keep it alive
|
||||||
require.NoError(d.stopKeepAlive())
|
require.NoError(d.Close())
|
||||||
|
|
||||||
// Restore the second daemon
|
// Restore the second daemon
|
||||||
d2 := &Daemon{Logger: testLogger}
|
d2 := &Daemon{Logger: testLogger}
|
||||||
|
|
|
@ -151,7 +151,35 @@ func (m *Manager) Close() error {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
return m.stop(func(p Proxy) error {
|
||||||
|
return p.Close()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kill will Close the manager and Kill all proxies that were being managed.
|
||||||
|
// Only ONE of Kill or Close must be called. If Close has been called already
|
||||||
|
// then this will have no effect.
|
||||||
|
func (m *Manager) Kill() error {
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
return m.stop(func(p Proxy) error {
|
||||||
|
return p.Stop()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop stops the run loop and cleans up all the proxies by calling
|
||||||
|
// the given cleaner. If the cleaner returns an error the proxy won't be
|
||||||
|
// removed from the map.
|
||||||
|
//
|
||||||
|
// The lock must be held while this is called.
|
||||||
|
func (m *Manager) stop(cleaner func(Proxy) error) error {
|
||||||
for {
|
for {
|
||||||
|
// Special case state that exits the for loop
|
||||||
|
if m.runState == managerStateStopped {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
switch m.runState {
|
switch m.runState {
|
||||||
case managerStateIdle:
|
case managerStateIdle:
|
||||||
// Idle so just set it to stopped and return. We notify
|
// Idle so just set it to stopped and return. We notify
|
||||||
|
@ -170,29 +198,13 @@ func (m *Manager) Close() error {
|
||||||
case managerStateStopping:
|
case managerStateStopping:
|
||||||
// Still stopping, wait...
|
// Still stopping, wait...
|
||||||
m.cond.Wait()
|
m.cond.Wait()
|
||||||
|
|
||||||
case managerStateStopped:
|
|
||||||
// Stopped, target state reached
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Kill will Close the manager and Kill all proxies that were being managed.
|
|
||||||
//
|
|
||||||
// This is safe to call with Close already called since Close is idempotent.
|
|
||||||
func (m *Manager) Kill() error {
|
|
||||||
// Close first so that we aren't getting changes in proxies
|
|
||||||
if err := m.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
m.lock.Lock()
|
|
||||||
defer m.lock.Unlock()
|
|
||||||
|
|
||||||
|
// Clean up all the proxies
|
||||||
var err error
|
var err error
|
||||||
for id, proxy := range m.proxies {
|
for id, proxy := range m.proxies {
|
||||||
if err := proxy.Stop(); err != nil {
|
if err := cleaner(proxy); err != nil {
|
||||||
err = multierror.Append(
|
err = multierror.Append(
|
||||||
err, fmt.Errorf("failed to stop proxy %q: %s", id, err))
|
err, fmt.Errorf("failed to stop proxy %q: %s", id, err))
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -324,7 +324,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
|
||||||
|
|
||||||
// Add a second proxy so that we can determine when we're up
|
// Add a second proxy so that we can determine when we're up
|
||||||
// and running.
|
// and running.
|
||||||
path2 := filepath.Join(td, "file")
|
path2 := filepath.Join(td, "file2")
|
||||||
testStateProxy(t, state, "db", helperProcess("start-stop", path2))
|
testStateProxy(t, state, "db", helperProcess("start-stop", path2))
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
_, err := os.Stat(path2)
|
_, err := os.Stat(path2)
|
||||||
|
@ -343,7 +343,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.Fatalf("file still exists")
|
r.Fatalf("file still exists: %s", path)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ type Noop struct{}
|
||||||
|
|
||||||
func (p *Noop) Start() error { return nil }
|
func (p *Noop) Start() error { return nil }
|
||||||
func (p *Noop) Stop() error { return nil }
|
func (p *Noop) Stop() error { return nil }
|
||||||
|
func (p *Noop) Close() error { return nil }
|
||||||
func (p *Noop) Equal(Proxy) bool { return true }
|
func (p *Noop) Equal(Proxy) bool { return true }
|
||||||
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
|
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
|
||||||
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }
|
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }
|
||||||
|
|
|
@ -40,12 +40,18 @@ type Proxy interface {
|
||||||
Start() error
|
Start() error
|
||||||
|
|
||||||
// Stop stops the proxy and disallows it from ever being started again.
|
// Stop stops the proxy and disallows it from ever being started again.
|
||||||
|
// This should also clean up any resources used by this Proxy.
|
||||||
//
|
//
|
||||||
// If the proxy is not started yet, this should not return an error, but
|
// If the proxy is not started yet, this should not return an error, but
|
||||||
// it should disallow Start from working again. If the proxy is already
|
// it should disallow Start from working again. If the proxy is already
|
||||||
// stopped, this should not return an error.
|
// stopped, this should not return an error.
|
||||||
Stop() error
|
Stop() error
|
||||||
|
|
||||||
|
// Close should clean up any resources associated with this proxy but
|
||||||
|
// keep it running in the background. Only one of Close or Stop can be
|
||||||
|
// called.
|
||||||
|
Close() error
|
||||||
|
|
||||||
// Equal returns true if the argument is equal to the proxy being called.
|
// Equal returns true if the argument is equal to the proxy being called.
|
||||||
// This is called by the manager to determine if a change in configuration
|
// This is called by the manager to determine if a change in configuration
|
||||||
// results in a proxy that needs to be restarted or not. If Equal returns
|
// results in a proxy that needs to be restarted or not. If Equal returns
|
||||||
|
|
Loading…
Reference in a new issue