e2e: add NomadAgent and basic client state test

The e2e test code is absolutely hideous and leaks processes and files
on disk. NomadAgent seems useful, but the clientstate e2e tests are very
messy and slow. The last test "Corrupt" is probably the most useful as
it explicitly corrupts the state file whereas the other tests attempt to
reproduce steps thought to cause corruption in earlier releases of
Nomad.
This commit is contained in:
Michael Schurter 2019-01-25 16:51:20 -08:00
parent f1ce2fa1ec
commit cd87afd15f
8 changed files with 841 additions and 154 deletions

View File

@ -0,0 +1,468 @@
package clientstate
import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"syscall"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/execagent"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
)
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "clientstate",
CanRunLocal: true,
Cases: []framework.TestCase{
&ClientStateTC{},
},
})
}
type ClientStateTC struct {
framework.TC
// bin is the path to Nomad binary
bin string
}
func (tc *ClientStateTC) BeforeAll(f *framework.F) {
bin, err := discover.NomadExecutable()
f.NoError(err)
tc.bin = bin
}
func getPID(client *api.Client, alloc *api.Allocation, path string) (int, error) {
allocfs := client.AllocFS()
r, err := allocfs.Cat(alloc, path, nil)
if err != nil {
return 0, err
}
defer r.Close()
out, err := ioutil.ReadAll(r)
if err != nil {
return 0, err
}
lines := bytes.SplitN(out, []byte{'\n'}, 2)
if len(lines) != 2 || len(lines[1]) > 0 {
return 0, fmt.Errorf("expected 1 line not %q", string(out))
}
// Capture pid
pid, err := strconv.Atoi(string(lines[0]))
if err != nil {
return 0, err
}
return pid, nil
}
// TestClientState_Kill force kills Nomad agents and restarts them in a tight
// loop to assert Nomad is crash safe.
func (tc *ClientStateTC) TestClientState_Kill(f *framework.F) {
t := f.T()
t.Parallel()
serverOut := testlog.NewPrefixWriter(t, "SERVER: ")
clientOut := testlog.NewPrefixWriter(t, "CLIENT: ")
serverAgent, clientAgent, err := execagent.NewClientServerPair(tc.bin, serverOut, clientOut)
f.NoError(err)
f.NoError(serverAgent.Start())
defer serverAgent.Destroy()
f.NoError(clientAgent.Start())
defer clientAgent.Destroy()
// Get a client for the server agent to use even while the client is
// down.
client, err := serverAgent.Client()
f.NoError(err)
jobID := "sleeper-" + uuid.Generate()[:8]
allocs := e2eutil.RegisterAndWaitForAllocs(t, client, "clientstate/sleeper.nomad", jobID)
f.Len(allocs, 1)
alloc, _, err := client.Allocations().Info(allocs[0].ID, nil)
f.NoError(err)
defer func() {
if _, _, err := client.Jobs().Deregister(jobID, false, nil); err != nil {
t.Logf("error stopping job: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
sum, _, err := client.Jobs().Summary(jobID, nil)
if err != nil {
return false, err
}
if r := sum.Summary["sleeper"].Running; r > 0 {
return false, fmt.Errorf("still running: %d", r)
}
return true, nil
}, func(err error) {
f.NoError(err)
})
//XXX Must use client agent for gc'ing allocs?
clientAPI, err := clientAgent.Client()
f.NoError(err)
if err := clientAPI.Allocations().GC(alloc, nil); err != nil {
t.Logf("error garbage collecting alloc: %v", err)
}
if err := client.System().GarbageCollect(); err != nil {
t.Logf("error doing full gc: %v", err)
}
//HACK to wait until things have GC'd
time.Sleep(time.Second)
}()
assertHealthy := func() {
t.Helper()
testutil.WaitForResult(func() (bool, error) {
alloc, _, err = client.Allocations().Info(alloc.ID, nil)
f.NoError(err) // should never error
if len(alloc.TaskStates) == 0 {
return false, fmt.Errorf("waiting for tasks to start")
}
if s := alloc.TaskStates["sleeper"].State; s != "running" {
return false, fmt.Errorf("task should be running: %q", s)
}
// Restarts should never happen
f.Zero(alloc.TaskStates["sleeper"].Restarts)
return true, nil
}, func(err error) {
f.NoError(err)
})
}
assertHealthy()
// Find pid
pid := 0
testutil.WaitForResult(func() (bool, error) {
pid, err = getPID(client, alloc, "sleeper/pid")
return pid > 0, err
}, func(err error) {
f.NoError(err)
})
// Kill and restart a few times
tries := 10
for i := 0; i < tries; i++ {
t.Logf("TEST RUN %d/%d", i+1, tries)
// Kill -9 the Agent
agentPid := clientAgent.Cmd.Process.Pid
f.NoError(clientAgent.Cmd.Process.Signal(os.Kill))
state, err := clientAgent.Cmd.Process.Wait()
f.NoError(err)
f.False(state.Exited()) // kill signal != exited
f.False(state.Success())
// Assert sleeper is still running
f.NoError(syscall.Kill(pid, 0))
assertHealthy()
// Should not be able to reach its filesystem
_, err = getPID(client, alloc, "sleeper/pid")
f.Error(err)
// Restart the agent (have to create a new Cmd)
clientAgent.Cmd = exec.Command(clientAgent.BinPath, "agent",
"-config", clientAgent.ConfFile,
"-data-dir", clientAgent.DataDir,
"-servers", fmt.Sprintf("127.0.0.1:%d", serverAgent.Vars.RPC),
)
clientAgent.Cmd.Stdout = clientOut
clientAgent.Cmd.Stderr = clientOut
f.NoError(clientAgent.Start())
// Assert a new process did start
f.NotEqual(clientAgent.Cmd.Process.Pid, agentPid)
// Retrieving the pid should work once it restarts
testutil.WaitForResult(func() (bool, error) {
newPid, err := getPID(client, alloc, "sleeper/pid")
return newPid == pid, err
}, func(err error) {
f.NoError(err)
})
// Alloc should still be running
assertHealthy()
}
}
// TestClientState_KillDuringRestart force kills Nomad agents and restarts them
// in a tight loop to assert Nomad is crash safe while a task is restarting.
func (tc *ClientStateTC) TestClientState_KillDuringRestart(f *framework.F) {
t := f.T()
t.Parallel()
serverOut := testlog.NewPrefixWriter(t, "SERVER: ")
clientOut := testlog.NewPrefixWriter(t, "CLIENT: ")
serverAgent, clientAgent, err := execagent.NewClientServerPair(tc.bin, serverOut, clientOut)
f.NoError(err)
f.NoError(serverAgent.Start())
defer serverAgent.Destroy()
f.NoError(clientAgent.Start())
defer clientAgent.Destroy()
// Get a client for the server agent to use even while the client is
// down.
client, err := serverAgent.Client()
f.NoError(err)
jobID := "restarter-" + uuid.Generate()[:8]
allocs := e2eutil.RegisterAndWaitForAllocs(t, client, "clientstate/restarter.nomad", jobID)
f.Len(allocs, 1)
alloc, _, err := client.Allocations().Info(allocs[0].ID, nil)
f.NoError(err)
defer func() {
//FIXME(schmichael): this cleanup is insufficient, but I can't
// figure out how to fix it
client.Jobs().Deregister(jobID, false, nil)
client.System().GarbageCollect()
time.Sleep(time.Second)
}()
var restarts uint64
testutil.WaitForResult(func() (bool, error) {
alloc, _, err = client.Allocations().Info(alloc.ID, nil)
f.NoError(err) // should never error
if len(alloc.TaskStates) == 0 {
return false, fmt.Errorf("waiting for tasks to start")
}
n := alloc.TaskStates["restarter"].Restarts
if n < restarts {
// Restarts should never decrease; immediately fail
f.Failf("restarts decreased", "%d < %d", n, restarts)
}
// Capture current restarts
restarts = n
return true, nil
}, func(err error) {
f.NoError(err)
})
dice := rand.New(rand.NewSource(time.Now().UnixNano()))
// Kill and restart agent a few times
i := 0
for deadline := time.Now().Add(5 * time.Minute); time.Now().Before(deadline); {
i++
sleep := time.Duration(1500+dice.Int63n(6000)) * time.Millisecond
t.Logf("[TEST] ===> Run %d (pid: %d sleeping for %v; last restarts: %d)", i, clientAgent.Cmd.Process.Pid, sleep, restarts)
time.Sleep(sleep)
// Ensure restarts are progressing
alloc, _, err = client.Allocations().Info(alloc.ID, nil)
f.NoError(err) // should never error
n := alloc.TaskStates["restarter"].Restarts
if n < restarts {
// Restarts should never decrease; immediately fail
f.Failf("restarts decreased", "%d < %d", n, restarts)
}
if i > 5 && n == 0 {
// At least one restart should have happened by now
f.Failf("no restarts", "expected at least 1 restart after %d tries", i)
}
restarts = n
// Kill -9 Agent
agentPid := clientAgent.Cmd.Process.Pid
f.NoError(clientAgent.Cmd.Process.Signal(os.Kill))
t.Logf("[TEST] ===> Killed %d", agentPid)
state, err := clientAgent.Cmd.Process.Wait()
f.NoError(err)
f.False(state.Exited()) // kill signal != exited
f.False(state.Success())
// Restart the agent (have to create a new Cmd)
clientAgent.Cmd = exec.Command(clientAgent.BinPath, "agent",
"-config", clientAgent.ConfFile,
"-data-dir", clientAgent.DataDir,
"-servers", fmt.Sprintf("127.0.0.1:%d", serverAgent.Vars.RPC),
)
clientAgent.Cmd.Stdout = clientOut
clientAgent.Cmd.Stderr = clientOut
f.NoError(clientAgent.Start())
// Assert a new process did start
f.NotEqual(clientAgent.Cmd.Process.Pid, agentPid)
clientUrl := fmt.Sprintf("http://127.0.0.1:%d/v1/client/stats", clientAgent.Vars.HTTP)
testutil.WaitForResult(func() (bool, error) {
resp, err := http.Get(clientUrl)
if err != nil {
return false, err
}
resp.Body.Close()
return resp.StatusCode == 200, fmt.Errorf("%d != 200", resp.StatusCode)
}, func(err error) {
f.NoError(err)
})
}
t.Logf("[TEST] ===> Final restarts: %d", restarts)
}
// TestClientState_Corrupt removes task state from the client's state db to
// assert it recovers.
func (tc *ClientStateTC) TestClientState_Corrupt(f *framework.F) {
t := f.T()
t.Parallel()
serverOut := testlog.NewPrefixWriter(t, "SERVER: ")
clientOut := testlog.NewPrefixWriter(t, "CLIENT: ")
serverAgent, clientAgent, err := execagent.NewClientServerPair(tc.bin, serverOut, clientOut)
f.NoError(err)
f.NoError(serverAgent.Start())
defer serverAgent.Destroy()
f.NoError(clientAgent.Start())
defer clientAgent.Destroy()
// Get a client for the server agent to use even while the client is
// down.
client, err := serverAgent.Client()
f.NoError(err)
jobID := "sleeper-" + uuid.Generate()[:8]
allocs := e2eutil.RegisterAndWaitForAllocs(t, client, "clientstate/sleeper.nomad", jobID)
f.Len(allocs, 1)
alloc, _, err := client.Allocations().Info(allocs[0].ID, nil)
f.NoError(err)
defer func() {
//FIXME(schmichael): this cleanup is insufficient, but I can't
// figure out how to fix it
client.Jobs().Deregister(jobID, false, nil)
client.System().GarbageCollect()
time.Sleep(time.Second)
}()
assertHealthy := func() {
t.Helper()
testutil.WaitForResult(func() (bool, error) {
alloc, _, err = client.Allocations().Info(alloc.ID, nil)
f.NoError(err) // should never error
if len(alloc.TaskStates) == 0 {
return false, fmt.Errorf("waiting for tasks to start")
}
if s := alloc.TaskStates["sleeper"].State; s != "running" {
return false, fmt.Errorf("task should be running: %q", s)
}
// Restarts should never happen
f.Zero(alloc.TaskStates["sleeper"].Restarts)
return true, nil
}, func(err error) {
f.NoError(err)
})
}
assertHealthy()
// Find pid
pid := 0
testutil.WaitForResult(func() (bool, error) {
pid, err = getPID(client, alloc, "sleeper/pid")
return pid > 0, err
}, func(err error) {
f.NoError(err)
})
// Kill and corrupt the state
agentPid := clientAgent.Cmd.Process.Pid
f.NoError(clientAgent.Cmd.Process.Signal(os.Interrupt))
procState, err := clientAgent.Cmd.Process.Wait()
f.NoError(err)
f.True(procState.Exited())
// Assert sleeper is still running
f.NoError(syscall.Kill(pid, 0))
assertHealthy()
// Remove task bucket from client state
db, err := state.NewBoltStateDB(testlog.HCLogger(t), filepath.Join(clientAgent.DataDir, "client"))
f.NoError(err)
f.NoError(db.DeleteTaskBucket(alloc.ID, "sleeper"))
f.NoError(db.Close())
// Restart the agent (have to create a new Cmd)
clientAgent.Cmd = exec.Command(clientAgent.BinPath, "agent",
"-config", clientAgent.ConfFile,
"-data-dir", clientAgent.DataDir,
"-servers", fmt.Sprintf("127.0.0.1:%d", serverAgent.Vars.RPC),
)
clientAgent.Cmd.Stdout = clientOut
clientAgent.Cmd.Stderr = clientOut
f.NoError(clientAgent.Start())
// Assert a new process did start
f.NotEqual(clientAgent.Cmd.Process.Pid, agentPid)
// Retrieving the pid should work once it restarts.
// Critically there are now 2 pids because the client task state was
// lost Nomad started a new copy.
testutil.WaitForResult(func() (bool, error) {
allocfs := client.AllocFS()
r, err := allocfs.Cat(alloc, "sleeper/pid", nil)
if err != nil {
return false, err
}
defer r.Close()
out, err := ioutil.ReadAll(r)
if err != nil {
return false, err
}
lines := bytes.SplitN(out, []byte{'\n'}, 3)
if len(lines) != 3 || len(lines[2]) > 0 {
return false, fmt.Errorf("expected 2 lines not %v", lines)
}
return true, nil
}, func(err error) {
f.NoError(err)
})
// Alloc should still be running
assertHealthy()
}

View File

@ -0,0 +1,20 @@
# Restarter fakes being a flaky service that crashes and restarts constantly.
# Restarting the Nomad agent during task restarts was a known cause of state
# corruption in v0.8.
job "restarter" {
datacenters = ["dc1"]
group "restarter" {
restart {
attempts = 100
delay = "3s"
}
task "restarter" {
driver = "raw_exec"
config {
command = "/bin/bash"
args = ["-c", "echo $$ >> pid && sleep 1 && exit 99"]
}
}
}
}

View File

@ -0,0 +1,13 @@
# Sleeper is a fake service that outputs its pid to a file named `pid` to
# assert duplicate tasks are never started.
job "sleeper" {
datacenters = ["dc1"]
task "sleeper" {
driver = "raw_exec"
config {
command = "/bin/bash"
args = ["-c", "echo $$ >> pid && sleep 999999"]
}
}
}

View File

@ -4,6 +4,7 @@ import (
"testing"
_ "github.com/hashicorp/nomad/e2e/affinities"
_ "github.com/hashicorp/nomad/e2e/clientstate"
_ "github.com/hashicorp/nomad/e2e/consul"
_ "github.com/hashicorp/nomad/e2e/consultemplate"
_ "github.com/hashicorp/nomad/e2e/example"

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/jobspec"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/require"
)
@ -60,13 +61,19 @@ func RegisterAndWaitForAllocs(t *testing.T, nomadClient *api.Client, jobFile str
require.Nil(err)
job.ID = helper.StringToPtr(jobID)
g := NewGomegaWithT(t)
// Register job
jobs := nomadClient.Jobs()
resp, _, err := jobs.Register(job, nil)
require.Nil(err)
require.NotEmpty(resp.EvalID)
g := NewGomegaWithT(t)
testutil.WaitForResult(func() (bool, error) {
resp, _, err := jobs.Register(job, nil)
if err != nil {
return false, err
}
return resp.EvalID != "", fmt.Errorf("expected EvalID:%s", pretty.Sprint(resp))
}, func(err error) {
require.NoError(err)
})
// Wrap in retry to wait until placement
g.Eventually(func() []*api.AllocationListStub {

264
e2e/execagent/execagent.go Normal file
View File

@ -0,0 +1,264 @@
package execagent
import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"path/filepath"
"text/template"
"github.com/hashicorp/nomad/api"
)
type AgentMode int
const (
// Conf enum is for configuring either a client, server, or mixed agent.
ModeClient AgentMode = 1
ModeServer AgentMode = 2
ModeBoth = ModeClient | ModeServer
)
func init() {
if d := os.Getenv("NOMAD_TEST_DIR"); d != "" {
BaseDir = d
}
}
var (
// BaseDir is where tests will store state and can be overridden by
// setting NOMAD_TEST_DIR. Defaults to "/opt/nomadtest"
BaseDir = "/opt/nomadtest"
agentTemplate = template.Must(template.New("agent").Parse(`
enable_debug = true
log_level = "{{ or .LogLevel "DEBUG" }}"
ports {
http = {{.HTTP}}
rpc = {{.RPC}}
serf = {{.Serf}}
}
{{ if .EnableServer }}
server {
enabled = true
bootstrap_expect = 1
}
{{ end }}
{{ if .EnableClient }}
client {
enabled = true
options = {
"driver.raw_exec.enable" = "1"
}
}
{{ end }}
`))
)
type AgentTemplateVars struct {
HTTP int
RPC int
Serf int
EnableClient bool
EnableServer bool
LogLevel string
}
func newAgentTemplateVars() (*AgentTemplateVars, error) {
httpPort, err := getFreePort()
if err != nil {
return nil, err
}
rpcPort, err := getFreePort()
if err != nil {
return nil, err
}
serfPort, err := getFreePort()
if err != nil {
return nil, err
}
vars := AgentTemplateVars{
HTTP: httpPort,
RPC: rpcPort,
Serf: serfPort,
}
return &vars, nil
}
func writeConfig(path string, vars *AgentTemplateVars) error {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return err
}
defer f.Close()
return agentTemplate.Execute(f, vars)
}
// NomadAgent manages an external Nomad agent process.
type NomadAgent struct {
// BinPath is the path to the Nomad binary
BinPath string
// DataDir is the path state will be saved in
DataDir string
// ConfFile is the path to the agent's conf file
ConfFile string
// Cmd is the agent process
Cmd *exec.Cmd
// Vars are the config parameters used to template
Vars *AgentTemplateVars
}
// NewMixedAgent creates a new Nomad agent in mixed server+client mode but does
// not start the agent process until the Start() method is called.
func NewMixedAgent(bin string) (*NomadAgent, error) {
if err := os.MkdirAll(BaseDir, 755); err != nil {
return nil, err
}
dir, err := ioutil.TempDir(BaseDir, "agent")
if err != nil {
return nil, err
}
vars, err := newAgentTemplateVars()
if err != nil {
return nil, err
}
vars.EnableClient = true
vars.EnableServer = true
conf := filepath.Join(dir, "config.hcl")
if err := writeConfig(conf, vars); err != nil {
return nil, err
}
na := &NomadAgent{
BinPath: bin,
DataDir: dir,
ConfFile: conf,
Vars: vars,
Cmd: exec.Command(bin, "agent", "-config", conf, "-data-dir", dir),
}
return na, nil
}
// NewClientServerPair creates a pair of Nomad agents: 1 server, 1 client.
func NewClientServerPair(bin string, serverOut, clientOut io.Writer) (
server *NomadAgent, client *NomadAgent, err error) {
if err := os.MkdirAll(BaseDir, 755); err != nil {
return nil, nil, err
}
sdir, err := ioutil.TempDir(BaseDir, "server")
if err != nil {
return nil, nil, err
}
svars, err := newAgentTemplateVars()
if err != nil {
return nil, nil, err
}
svars.LogLevel = "WARN"
svars.EnableServer = true
sconf := filepath.Join(sdir, "config.hcl")
if err := writeConfig(sconf, svars); err != nil {
return nil, nil, err
}
server = &NomadAgent{
BinPath: bin,
DataDir: sdir,
ConfFile: sconf,
Vars: svars,
Cmd: exec.Command(bin, "agent", "-config", sconf, "-data-dir", sdir),
}
server.Cmd.Stdout = serverOut
server.Cmd.Stderr = serverOut
cdir, err := ioutil.TempDir(BaseDir, "client")
if err != nil {
return nil, nil, err
}
cvars, err := newAgentTemplateVars()
if err != nil {
return nil, nil, err
}
cvars.EnableClient = true
cconf := filepath.Join(cdir, "config.hcl")
if err := writeConfig(cconf, cvars); err != nil {
return nil, nil, err
}
client = &NomadAgent{
BinPath: bin,
DataDir: cdir,
ConfFile: cconf,
Vars: cvars,
Cmd: exec.Command(bin, "agent",
"-config", cconf,
"-data-dir", cdir,
"-servers", fmt.Sprintf("127.0.0.1:%d", svars.RPC),
),
}
client.Cmd.Stdout = clientOut
client.Cmd.Stderr = clientOut
return
}
// Start the agent command.
func (n *NomadAgent) Start() error {
return n.Cmd.Start()
}
// Stop sends an interrupt signal and returns the command's Wait error.
func (n *NomadAgent) Stop() error {
if err := n.Cmd.Process.Signal(os.Interrupt); err != nil {
return err
}
return n.Cmd.Wait()
}
// Destroy stops the agent and removes the data dir.
func (n *NomadAgent) Destroy() error {
if err := n.Stop(); err != nil {
return err
}
return os.RemoveAll(n.DataDir)
}
// Client returns an api.Client for the agent.
func (n *NomadAgent) Client() (*api.Client, error) {
conf := api.DefaultConfig()
conf.Address = fmt.Sprintf("http://127.0.0.1:%d", n.Vars.HTTP)
return api.NewClient(conf)
}
func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

View File

@ -4,19 +4,16 @@ package nomad09upgrade
import (
"fmt"
"html/template"
"io/ioutil"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
getter "github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/execagent"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/helper/testlog"
@ -36,7 +33,7 @@ func init() {
}
var (
nomadVersions = []string{
nomadVersions = [...]string{
"0.8.7",
"0.8.6",
"0.8.5",
@ -46,44 +43,8 @@ var (
"0.8.1",
"0.8.0",
}
agentTemplate = `
ports {
http = {{.HTTP}}
rpc = {{.RPC}}
serf = {{.Serf}}
}
server {
enabled = true
bootstrap_expect = 1
}
client {
enabled = true
options = {
"driver.raw_exec.enable" = "1"
}
}
`
)
type templateVars struct {
HTTP int
RPC int
Serf int
}
func writeConfig(path string, vars templateVars) error {
t := template.Must(template.New("config").Parse(agentTemplate))
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
return err
}
defer f.Close()
return t.Execute(f, vars)
}
type UpgradePathTC struct {
framework.TC
@ -91,98 +52,35 @@ type UpgradePathTC struct {
bin string
}
type nomadAgent struct {
bin string
dataDir string
cmd *exec.Cmd
vars templateVars
targetCmd *exec.Cmd
type upgradeAgents struct {
origAgent *execagent.NomadAgent
targetAgent *execagent.NomadAgent
}
func (tc *UpgradePathTC) newNomadServer(t *testing.T, ver string) (*nomadAgent, error) {
dir, err := ioutil.TempDir("/opt", "nomade2e")
func (tc *UpgradePathTC) newNomadServer(t *testing.T, ver string) (*upgradeAgents, error) {
binPath := filepath.Join(tc.binDir, ver, "nomad")
srv, err := execagent.NewMixedAgent(binPath)
if err != nil {
return nil, err
}
return tc.newNomadServerWithDataDir(t, ver, dir)
}
func (tc *UpgradePathTC) newNomadServerWithDataDir(t *testing.T, ver string, dataDir string) (*nomadAgent, error) {
srv := &nomadAgent{
bin: filepath.Join(tc.binDir, ver, "nomad"),
dataDir: dataDir,
}
conf := filepath.Join(dataDir, "config.hcl")
httpPort, err := getFreePort()
if err != nil {
return nil, err
}
rpcPort, err := getFreePort()
if err != nil {
return nil, err
}
serfPort, err := getFreePort()
if err != nil {
return nil, err
}
srv.vars = templateVars{
HTTP: httpPort,
RPC: rpcPort,
Serf: serfPort,
}
if err := writeConfig(conf, srv.vars); err != nil {
return nil, err
}
w := testlog.NewWriter(t)
srv.Cmd.Stdout = w
srv.Cmd.Stderr = w
srv.cmd = exec.Command(srv.bin, "agent", "-config", conf, "-log-level", "DEBUG", "-data-dir", srv.dataDir)
srv.cmd.Stdout = w
srv.cmd.Stderr = w
srv.targetCmd = exec.Command(tc.bin, "agent", "-config", conf, "-log-level", "TRACE", "-data-dir", srv.dataDir)
srv.targetCmd.Stdout = w
srv.targetCmd.Stderr = w
return srv, nil
}
func (n *nomadAgent) StartAgent() error {
return n.cmd.Start()
}
func (n *nomadAgent) StopAgent() error {
if err := n.cmd.Process.Signal(os.Interrupt); err != nil {
return err
// Target should copy everything but the binary to target
target, err := execagent.NewMixedAgent(tc.bin)
if err != nil {
return nil, err
}
target.Cmd.Stdout = w
target.Cmd.Stderr = w
n.cmd.Wait()
return nil
}
func (n *nomadAgent) StartTargetAgent() error {
return n.targetCmd.Start()
}
func (n *nomadAgent) StopTargetAgent() error {
if err := n.targetCmd.Process.Signal(os.Interrupt); err != nil {
return err
agents := &upgradeAgents{
origAgent: srv,
targetAgent: target,
}
n.targetCmd.Wait()
return nil
}
func (n *nomadAgent) Destroy() {
os.RemoveAll(n.dataDir)
}
func (n *nomadAgent) Nomad() (*api.Client, error) {
cfg := api.DefaultConfig()
cfg.Address = fmt.Sprintf("http://127.0.0.1:%d", n.vars.HTTP)
return api.NewClient(cfg)
return agents, nil
}
// BeforeAll downloads all of the desired nomad versions to test against
@ -249,13 +147,13 @@ func (tc *UpgradePathTC) TestDockerTaskUpgrade(f *framework.F) {
func (tc *UpgradePathTC) testUpgradeForJob(t *testing.T, ver string, jobfile string) {
require := require.New(t)
// Start a nomad agent for the given version
srv, err := tc.newNomadServer(t, ver)
agents, err := tc.newNomadServer(t, ver)
require.NoError(err)
t.Logf("launching v%s nomad agent", ver)
require.NoError(srv.StartAgent())
require.NoError(agents.origAgent.Start())
// Wait for the agent to be ready
client, err := srv.Nomad()
client, err := agents.origAgent.Client()
require.NoError(err)
e2eutil.WaitForNodesReady(t, client, 1)
@ -272,15 +170,13 @@ func (tc *UpgradePathTC) testUpgradeForJob(t *testing.T, ver string, jobfile str
e2eutil.WaitForAllocRunning(t, client, id)
// Stop the agent, leaving the sleep job running
require.NoError(srv.StopAgent())
require.NoError(agents.origAgent.Stop())
// Start a nomad agent with the to be tested nomad binary
t.Logf("launching test nomad agent")
require.NoError(srv.StartTargetAgent())
require.NoError(agents.targetAgent.Start())
// Wait for the agent to be ready
client, err = srv.Nomad()
require.NoError(err)
e2eutil.WaitForNodesReady(t, client, 1)
// Make sure the same allocation still exists
@ -329,20 +225,6 @@ func (tc *UpgradePathTC) testUpgradeForJob(t *testing.T, ver string, jobfile str
})
// Cleanup
srv.StopTargetAgent()
srv.Destroy()
}
func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
agents.targetAgent.Stop()
agents.targetAgent.Destroy()
}

View File

@ -4,6 +4,7 @@
package testlog
import (
"bytes"
"io"
"log"
"os"
@ -24,12 +25,13 @@ type LogPrinter interface {
// writer implements io.Writer on top of a Logger.
type writer struct {
t LogPrinter
prefix string
t LogPrinter
}
// Write to an underlying Logger. Never returns an error.
func (w *writer) Write(p []byte) (n int, err error) {
w.t.Logf(string(p))
w.t.Logf("%s%s", w.prefix, p)
return len(p), nil
}
@ -38,7 +40,16 @@ func NewWriter(t LogPrinter) io.Writer {
if UseStdout() {
return os.Stdout
}
return &writer{t}
return &writer{t: t}
}
// NewPrefixWriter creates a new io.Writer backed by a Logger with a custom
// prefix per Write.
func NewPrefixWriter(t LogPrinter, prefix string) io.Writer {
if UseStdout() {
return &prefixStdout{[]byte(prefix)}
}
return &writer{prefix, t}
}
// New returns a new test logger. See https://golang.org/pkg/log/#New
@ -46,7 +57,7 @@ func New(t LogPrinter, prefix string, flag int) *log.Logger {
if UseStdout() {
return log.New(os.Stdout, prefix, flag)
}
return log.New(&writer{t}, prefix, flag)
return log.New(&writer{t: t}, prefix, flag)
}
// WithPrefix returns a new test logger with the Lmicroseconds flag set.
@ -69,3 +80,24 @@ func HCLogger(t LogPrinter) hclog.Logger {
}
return hclog.New(opts)
}
type prefixStdout struct {
prefix []byte
}
// Write to stdout with a prefix per call containing non-whitespace characters.
func (w *prefixStdout) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
// Skip prefix if only writing whitespace
if len(bytes.TrimSpace(p)) > 0 {
_, err := os.Stdout.Write(w.prefix)
if err != nil {
return 0, err
}
}
return os.Stdout.Write(p)
}