open-nomad/client/alloc_runner_test.go

380 lines
10 KiB
Go

package client
import (
"fmt"
"os"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
ctestutil "github.com/hashicorp/nomad/client/testutil"
)
type MockAllocStateUpdater struct {
Count int
Allocs []*structs.Allocation
}
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) {
m.Count += 1
m.Allocs = append(m.Allocs, alloc)
}
func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
logger := testLogger()
conf := DefaultConfig()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
if !restarts {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
}
ar := NewAllocRunner(logger, conf, upd.Update, alloc)
return upd, ar
}
func TestAllocRunner_SimpleRun(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
go ar.Run()
defer ar.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Update the alloc to be terminal which should cause the alloc runner to
// stop the tasks and wait for a destroy.
update := ar.alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
}
// Check the alloc directory still exists
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Send the destroy signal and ensure the AllocRunner cleans up.
ar.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
start := time.Now()
// Begin the tear down
go func() {
time.Sleep(100 * time.Millisecond)
ar.Destroy()
}()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
if time.Since(start) > 15*time.Second {
t.Fatalf("took too long to terminate")
}
}
func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
_, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()
// Update the alloc definition
newAlloc := new(structs.Allocation)
*newAlloc = *ar.alloc
newAlloc.Name = "FOO"
newAlloc.AllocModifyIndex++
ar.Update(newAlloc)
// Check the alloc runner stores the update allocation.
testutil.WaitForResult(func() (bool, error) {
return ar.Alloc().Name == "FOO", nil
}, func(err error) {
t.Fatalf("err: %v %#v", err, ar.Alloc())
})
}
func TestAllocRunner_SaveRestoreState(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
// Snapshot state
testutil.WaitForResult(func() (bool, error) {
return len(ar.tasks) == 1, nil
}, func(err error) {
t.Fatalf("task never started: %v", err)
})
err := ar.SaveState()
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID})
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()
// Destroy and wait
ar2.Destroy()
start := time.Now()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus != structs.AllocClientStatusPending, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})
if time.Since(start) > time.Duration(testutil.TestMultiplier()*15)*time.Second {
t.Fatalf("took too long to terminate")
}
}
func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Update the alloc to be terminal which should cause the alloc runner to
// stop the tasks and wait for a destroy.
update := ar.alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)
testutil.WaitForResult(func() (bool, error) {
return ar.alloc.DesiredStatus == structs.AllocDesiredStatusStop, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
err := ar.SaveState()
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure both alloc runners don't destroy
ar.destroy = true
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID})
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()
testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
}
// Check the alloc directory still exists
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})
// Send the destroy signal and ensure the AllocRunner cleans up.
ar2.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}