Merge pull request #4953 from hashicorp/b-script-context-wrapper

consul: add ScriptExecutor context wrapper
This commit is contained in:
Michael Schurter 2018-12-10 17:22:53 -08:00 committed by GitHub
commit 8808ab9cea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 24 deletions

View file

@ -18,6 +18,54 @@ type heartbeater interface {
UpdateTTL(id, output, status string) error
}
// contextExec allows canceling a ScriptExecutor with a context.
type contextExec struct {
// pctx is the parent context. A subcontext will be created with Exec's
// timeout.
pctx context.Context
// exec to be wrapped in a context
exec interfaces.ScriptExecutor
}
func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec {
return &contextExec{
pctx: ctx,
exec: exec,
}
}
type execResult struct {
buf []byte
code int
err error
}
// Exec a command until the timeout expires, the context is canceled, or the
// underlying Exec returns.
func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
resCh := make(chan execResult, 1)
// Don't trust the underlying implementation to obey timeout
ctx, cancel := context.WithTimeout(c.pctx, timeout)
defer cancel()
go func() {
output, code, err := c.exec.Exec(timeout, cmd, args)
select {
case resCh <- execResult{output, code, err}:
case <-ctx.Done():
}
}()
select {
case res := <-resCh:
return res.buf, res.code, res.err
case <-ctx.Done():
return nil, 0, ctx.Err()
}
}
// scriptHandle is returned by scriptCheck.run by cancelling a scriptCheck and
// waiting for it to shutdown.
type scriptHandle struct {
@ -74,6 +122,11 @@ func newScriptCheck(allocID, taskName, checkID string, check *structs.ServiceChe
func (s *scriptCheck) run() *scriptHandle {
ctx, cancel := context.WithCancel(context.Background())
exitCh := make(chan struct{})
// Wrap the original ScriptExecutor in one that obeys context
// cancelation.
ctxExec := newContextExec(ctx, s.exec)
go func() {
defer close(exitCh)
timer := time.NewTimer(0)
@ -93,7 +146,7 @@ func (s *scriptCheck) run() *scriptHandle {
metrics.IncrCounter([]string{"client", "consul", "script_runs"}, 1)
// Execute check script with timeout
output, code, err := s.exec.Exec(s.check.Timeout, s.check.Command, s.check.Args)
output, code, err := ctxExec.Exec(s.check.Timeout, s.check.Command, s.check.Args)
switch err {
case context.Canceled:
// check removed during execution; exit

View file

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"os/exec"
"sync/atomic"
"testing"
"time"
@ -23,20 +24,34 @@ func TestMain(m *testing.M) {
// blockingScriptExec implements ScriptExec by running a subcommand that never
// exits.
type blockingScriptExec struct {
// pctx is canceled *only* for test cleanup. Just like real
// ScriptExecutors its Exec method cannot be canceled directly -- only
// with a timeout.
pctx context.Context
// running is ticked before blocking to allow synchronizing operations
running chan struct{}
// set to true if Exec is called and has exited
exited bool
// set to 1 with atomics if Exec is called and has exited
exited int32
}
func newBlockingScriptExec() *blockingScriptExec {
return &blockingScriptExec{running: make(chan struct{})}
// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the
// caller recvs on the b.running chan. It also returns a CancelFunc for test
// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout
// expires.
func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
exec := &blockingScriptExec{
pctx: ctx,
running: make(chan struct{}),
}
return exec, cancel
}
func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) {
b.running <- struct{}{}
ctx, cancel := context.WithTimeout(context.Background(), dur)
ctx, cancel := context.WithTimeout(b.pctx, dur)
defer cancel()
cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h")
testtask.SetCmdEnv(cmd)
@ -47,23 +62,20 @@ func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]by
code = 1
}
}
b.exited = true
atomic.StoreInt32(&b.exited, 1)
return []byte{}, code, err
}
// TestConsulScript_Exec_Cancel asserts cancelling a script check shortcircuits
// any running scripts.
func TestConsulScript_Exec_Cancel(t *testing.T) {
// FIXME: This test is failing now as check process cancellation
// doesn't get propogated to the script check causing timeouts
t.Skip("FIXME: unexpected failing test")
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Hour,
}
exec := newBlockingScriptExec()
exec, cancel := newBlockingScriptExec()
defer cancel()
// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.HCLogger(t), nil)
@ -80,8 +92,11 @@ func TestConsulScript_Exec_Cancel(t *testing.T) {
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
if !exec.exited {
t.Errorf("expected script executor to run and exit but it has not")
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected script executor to still be running after timeout")
}
}
@ -106,16 +121,19 @@ func newFakeHeartbeater() *fakeHeartbeater {
return &fakeHeartbeater{updates: make(chan execStatus)}
}
// TestConsulScript_Exec_Timeout asserts a script will be killed when the
// TestConsulScript_Exec_TimeoutBasic asserts a script will be killed when the
// timeout is reached.
func TestConsulScript_Exec_Timeout(t *testing.T) {
t.Parallel() // run the slow tests in parallel
func TestConsulScript_Exec_TimeoutBasic(t *testing.T) {
t.Parallel()
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Second,
}
exec := newBlockingScriptExec()
exec, cancel := newBlockingScriptExec()
defer cancel()
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), nil)
@ -132,8 +150,11 @@ func TestConsulScript_Exec_Timeout(t *testing.T) {
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
if !exec.exited {
t.Errorf("expected script executor to run and exit but it has not")
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected script executor to still be running after timeout")
}
// Cancel and watch for exit
@ -160,11 +181,8 @@ func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
// the timeout is reached and always set a critical status regardless of what
// Exec returns.
func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
// FIXME: This test is failing now because we no longer mark critical
// if check succeeded
t.Skip("FIXME: unexpected failing test")
t.Parallel()
t.Parallel() // run the slow tests in parallel
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,