From d22fe5ba62fb632175c12f083f374265855513ba Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 12 Jan 2016 21:10:25 -0800 Subject: [PATCH] Adds support for the reap lock. --- command/agent/agent.go | 9 +++++++++ command/agent/check.go | 7 +++++++ command/agent/check_test.go | 4 ++++ command/agent/command.go | 6 +++--- command/agent/remote_exec.go | 6 ++++++ command/agent/watch_handler.go | 9 ++++++++- command/agent/watch_handler_test.go | 3 ++- 7 files changed, 39 insertions(+), 5 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index b74a1dcf1..ec2de974c 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -110,6 +110,14 @@ type Agent struct { // agent methods use this, so use with care and never override // outside of a unit test. endpoints map[string]string + + // reapLock is used to prevent child process reaping from interfering + // with normal waiting for subprocesses to complete. Any time you exec + // and wait, you should take a read lock on this mutex. Only the reaper + // takes the write lock. This setup prevents us from serializing all the + // child process management with each other, it just serializes them + // with the child process reaper. + reapLock sync.RWMutex } // Create is used to create a new Agent. Returns @@ -949,6 +957,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist Script: chkType.Script, Interval: chkType.Interval, Logger: a.logger, + ReapLock: &a.reapLock, } monitor.Start() a.checkMonitors[check.CheckID] = monitor diff --git a/command/agent/check.go b/command/agent/check.go index c658bb017..189c0aeaa 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -101,6 +101,7 @@ type CheckMonitor struct { Script string Interval time.Duration Logger *log.Logger + ReapLock *sync.RWMutex stop bool stopCh chan struct{} @@ -146,6 +147,12 @@ func (c *CheckMonitor) run() { // check is invoked periodically to perform the script check func (c *CheckMonitor) check() { + // Disable child process reaping so that we can get this command's + // return value. Note that we take the read lock here since we are + // waiting on a specific PID and don't need to serialize all waits. + c.ReapLock.RLock() + defer c.ReapLock.RUnlock() + // Create the command cmd, err := ExecScript(c.Script) if err != nil { diff --git a/command/agent/check_test.go b/command/agent/check_test.go index 95045d9cf..75099c8f0 100644 --- a/command/agent/check_test.go +++ b/command/agent/check_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "os" "os/exec" + "sync" "testing" "time" @@ -42,6 +43,7 @@ func expectStatus(t *testing.T, script, status string) { Script: script, Interval: 10 * time.Millisecond, Logger: log.New(os.Stderr, "", log.LstdFlags), + ReapLock: &sync.RWMutex{}, } check.Start() defer check.Stop() @@ -90,6 +92,7 @@ func TestCheckMonitor_RandomStagger(t *testing.T) { Script: "exit 0", Interval: 25 * time.Millisecond, Logger: log.New(os.Stderr, "", log.LstdFlags), + ReapLock: &sync.RWMutex{}, } check.Start() defer check.Stop() @@ -118,6 +121,7 @@ func TestCheckMonitor_LimitOutput(t *testing.T) { Script: "od -N 81920 /dev/urandom", Interval: 25 * time.Millisecond, Logger: log.New(os.Stderr, "", log.LstdFlags), + ReapLock: &sync.RWMutex{}, } check.Start() defer check.Stop() diff --git a/command/agent/command.go b/command/agent/command.go index ffdfcef07..aa72b14f7 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -674,7 +674,7 @@ func (c *Command) Run(args []string) int { } } }() - go reap.ReapChildren(pids, errors, c.agent.shutdownCh) + go reap.ReapChildren(pids, errors, c.agent.shutdownCh, &c.agent.reapLock) } } @@ -709,7 +709,7 @@ func (c *Command) Run(args []string) int { // Register the watches for _, wp := range config.WatchPlans { go func(wp *watch.WatchPlan) { - wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"]) + wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"], &c.agent.reapLock) wp.LogOutput = c.logOutput if err := wp.Run(httpAddr.String()); err != nil { c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) @@ -896,7 +896,7 @@ func (c *Command) handleReload(config *Config) *Config { // Register the new watches for _, wp := range newConf.WatchPlans { go func(wp *watch.WatchPlan) { - wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"]) + wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"], &c.agent.reapLock) wp.LogOutput = c.logOutput if err := wp.Run(httpAddr.String()); err != nil { c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go index 20c4bb0b9..7c8268dca 100644 --- a/command/agent/remote_exec.go +++ b/command/agent/remote_exec.go @@ -135,6 +135,12 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { return } + // Disable child process reaping so that we can get this command's + // return value. Note that we take the read lock here since we are + // waiting on a specific PID and don't need to serialize all waits. + a.reapLock.RLock() + defer a.reapLock.RUnlock() + // Ensure we write out an exit code exitCode := 0 defer a.remoteExecWriteExitCode(&event, &exitCode) diff --git a/command/agent/watch_handler.go b/command/agent/watch_handler.go index afc4fb94d..5ed210500 100644 --- a/command/agent/watch_handler.go +++ b/command/agent/watch_handler.go @@ -8,6 +8,7 @@ import ( "log" "os" "strconv" + "sync" "github.com/armon/circbuf" "github.com/hashicorp/consul/watch" @@ -33,10 +34,16 @@ func verifyWatchHandler(params interface{}) error { } // makeWatchHandler returns a handler for the given watch -func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc { +func makeWatchHandler(logOutput io.Writer, params interface{}, reapLock *sync.RWMutex) watch.HandlerFunc { script := params.(string) logger := log.New(logOutput, "", log.LstdFlags) fn := func(idx uint64, data interface{}) { + // Disable child process reaping so that we can get this command's + // return value. Note that we take the read lock here since we are + // waiting on a specific PID and don't need to serialize all waits. + reapLock.RLock() + defer reapLock.RUnlock() + // Create the command cmd, err := ExecScript(script) if err != nil { diff --git a/command/agent/watch_handler_test.go b/command/agent/watch_handler_test.go index 28f1e425f..3980fd26c 100644 --- a/command/agent/watch_handler_test.go +++ b/command/agent/watch_handler_test.go @@ -3,6 +3,7 @@ package agent import ( "io/ioutil" "os" + "sync" "testing" ) @@ -25,7 +26,7 @@ func TestMakeWatchHandler(t *testing.T) { defer os.Remove("handler_out") defer os.Remove("handler_index_out") script := "echo $CONSUL_INDEX >> handler_index_out && cat >> handler_out" - handler := makeWatchHandler(os.Stderr, script) + handler := makeWatchHandler(os.Stderr, script, &sync.RWMutex{}) handler(100, []string{"foo", "bar", "baz"}) raw, err := ioutil.ReadFile("handler_out") if err != nil {