diff --git a/command/agent/consul/script.go b/command/agent/consul/script.go index 1f3213be3..b4a99c847 100644 --- a/command/agent/consul/script.go +++ b/command/agent/consul/script.go @@ -18,6 +18,54 @@ type heartbeater interface { UpdateTTL(id, output, status string) error } +// contextExec allows canceling a ScriptExecutor with a context. +type contextExec struct { + // pctx is the parent context. A subcontext will be created with Exec's + // timeout. + pctx context.Context + + // exec to be wrapped in a context + exec interfaces.ScriptExecutor +} + +func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec { + return &contextExec{ + pctx: ctx, + exec: exec, + } +} + +type execResult struct { + buf []byte + code int + err error +} + +// Exec a command until the timeout expires, the context is canceled, or the +// underlying Exec returns. +func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { + resCh := make(chan execResult, 1) + + // Don't trust the underlying implementation to obey timeout + ctx, cancel := context.WithTimeout(c.pctx, timeout) + defer cancel() + + go func() { + output, code, err := c.exec.Exec(timeout, cmd, args) + select { + case resCh <- execResult{output, code, err}: + case <-ctx.Done(): + } + }() + + select { + case res := <-resCh: + return res.buf, res.code, res.err + case <-ctx.Done(): + return nil, 0, ctx.Err() + } +} + // scriptHandle is returned by scriptCheck.run by cancelling a scriptCheck and // waiting for it to shutdown. type scriptHandle struct { @@ -74,6 +122,11 @@ func newScriptCheck(allocID, taskName, checkID string, check *structs.ServiceChe func (s *scriptCheck) run() *scriptHandle { ctx, cancel := context.WithCancel(context.Background()) exitCh := make(chan struct{}) + + // Wrap the original ScriptExecutor in one that obeys context + // cancelation. + ctxExec := newContextExec(ctx, s.exec) + go func() { defer close(exitCh) timer := time.NewTimer(0) @@ -93,7 +146,7 @@ func (s *scriptCheck) run() *scriptHandle { metrics.IncrCounter([]string{"client", "consul", "script_runs"}, 1) // Execute check script with timeout - output, code, err := s.exec.Exec(s.check.Timeout, s.check.Command, s.check.Args) + output, code, err := ctxExec.Exec(s.check.Timeout, s.check.Command, s.check.Args) switch err { case context.Canceled: // check removed during execution; exit diff --git a/command/agent/consul/script_test.go b/command/agent/consul/script_test.go index 42bd1aed4..25b6329b3 100644 --- a/command/agent/consul/script_test.go +++ b/command/agent/consul/script_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/exec" + "sync/atomic" "testing" "time" @@ -23,20 +24,34 @@ func TestMain(m *testing.M) { // blockingScriptExec implements ScriptExec by running a subcommand that never // exits. type blockingScriptExec struct { + // pctx is canceled *only* for test cleanup. Just like real + // ScriptExecutors its Exec method cannot be canceled directly -- only + // with a timeout. + pctx context.Context + // running is ticked before blocking to allow synchronizing operations running chan struct{} - // set to true if Exec is called and has exited - exited bool + // set to 1 with atomics if Exec is called and has exited + exited int32 } -func newBlockingScriptExec() *blockingScriptExec { - return &blockingScriptExec{running: make(chan struct{})} +// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the +// caller recvs on the b.running chan. It also returns a CancelFunc for test +// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout +// expires. +func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + exec := &blockingScriptExec{ + pctx: ctx, + running: make(chan struct{}), + } + return exec, cancel } func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) { b.running <- struct{}{} - ctx, cancel := context.WithTimeout(context.Background(), dur) + ctx, cancel := context.WithTimeout(b.pctx, dur) defer cancel() cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h") testtask.SetCmdEnv(cmd) @@ -47,23 +62,20 @@ func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]by code = 1 } } - b.exited = true + atomic.StoreInt32(&b.exited, 1) return []byte{}, code, err } // TestConsulScript_Exec_Cancel asserts cancelling a script check shortcircuits // any running scripts. func TestConsulScript_Exec_Cancel(t *testing.T) { - // FIXME: This test is failing now as check process cancellation - // doesn't get propogated to the script check causing timeouts - t.Skip("FIXME: unexpected failing test") - serviceCheck := structs.ServiceCheck{ Name: "sleeper", Interval: time.Hour, Timeout: time.Hour, } - exec := newBlockingScriptExec() + exec, cancel := newBlockingScriptExec() + defer cancel() // pass nil for heartbeater as it shouldn't be called check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.HCLogger(t), nil) @@ -80,8 +92,11 @@ func TestConsulScript_Exec_Cancel(t *testing.T) { case <-time.After(3 * time.Second): t.Fatalf("timed out waiting for script check to exit") } - if !exec.exited { - t.Errorf("expected script executor to run and exit but it has not") + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + if atomic.LoadInt32(&exec.exited) == 1 { + t.Errorf("expected script executor to still be running after timeout") } } @@ -106,16 +121,19 @@ func newFakeHeartbeater() *fakeHeartbeater { return &fakeHeartbeater{updates: make(chan execStatus)} } -// TestConsulScript_Exec_Timeout asserts a script will be killed when the +// TestConsulScript_Exec_TimeoutBasic asserts a script will be killed when the // timeout is reached. -func TestConsulScript_Exec_Timeout(t *testing.T) { - t.Parallel() // run the slow tests in parallel +func TestConsulScript_Exec_TimeoutBasic(t *testing.T) { + t.Parallel() + serviceCheck := structs.ServiceCheck{ Name: "sleeper", Interval: time.Hour, Timeout: time.Second, } - exec := newBlockingScriptExec() + + exec, cancel := newBlockingScriptExec() + defer cancel() hb := newFakeHeartbeater() check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), nil) @@ -132,8 +150,11 @@ func TestConsulScript_Exec_Timeout(t *testing.T) { case <-time.After(3 * time.Second): t.Fatalf("timed out waiting for script check to exit") } - if !exec.exited { - t.Errorf("expected script executor to run and exit but it has not") + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + if atomic.LoadInt32(&exec.exited) == 1 { + t.Errorf("expected script executor to still be running after timeout") } // Cancel and watch for exit @@ -160,11 +181,8 @@ func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) { // the timeout is reached and always set a critical status regardless of what // Exec returns. func TestConsulScript_Exec_TimeoutCritical(t *testing.T) { - // FIXME: This test is failing now because we no longer mark critical - // if check succeeded - t.Skip("FIXME: unexpected failing test") + t.Parallel() - t.Parallel() // run the slow tests in parallel serviceCheck := structs.ServiceCheck{ Name: "sleeper", Interval: time.Hour,