Merge pull request #3434 from hashicorp/f-flaky

Fix flaky tests
This commit is contained in:
Alex Dadgar 2017-10-25 10:49:54 -07:00 committed by GitHub
commit 3327dc8d2d
22 changed files with 296 additions and 143 deletions

View file

@ -221,7 +221,6 @@ test: ## Run the Nomad test suite and/or the Nomad UI test suite
fi
.PHONY: test-nomad
test-nomad: LOCAL_PACKAGES = $(shell go list ./... | grep -v '/vendor/')
test-nomad: dev ## Run Nomad test suites
@echo "==> Running Nomad test suites:"
@NOMAD_TEST_RKT=1 \
@ -229,7 +228,7 @@ test-nomad: dev ## Run Nomad test suites
-cover \
-timeout=900s \
-tags="nomad_test $(if $(HAS_LXC),lxc)" \
$(LOCAL_PACKAGES)
./...
.PHONY: clean
clean: GOPATH=$(shell go env GOPATH)

View file

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/lib/freeport"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/fingerprint"
@ -28,10 +29,6 @@ import (
ctestutil "github.com/hashicorp/nomad/client/testutil"
)
func getPort() int {
return 1030 + int(rand.Int31n(6440))
}
func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken) {
server, addr := testServer(t, func(c *nomad.Config) {
c.ACLEnabled = true
@ -78,12 +75,13 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
}
for i := 10; i >= 0; i-- {
ports := freeport.GetT(t, 2)
config.RPCAddr = &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: getPort(),
Port: ports[0],
}
config.NodeName = fmt.Sprintf("Node %d", config.RPCAddr.Port)
config.SerfConfig.MemberlistConfig.BindPort = getPort()
config.SerfConfig.MemberlistConfig.BindPort = ports[1]
// Create server
server, err := nomad.NewServer(config, catalog, logger)

View file

@ -16,6 +16,7 @@ import (
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/lib/freeport"
sockaddr "github.com/hashicorp/go-sockaddr"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
@ -42,17 +43,12 @@ func dockerIsRemote(t *testing.T) bool {
return false
}
// Ports used by tests
var (
docker_reserved = 2000 + int(rand.Int31n(10000))
docker_dynamic = 2000 + int(rand.Int31n(10000))
)
// Returns a task with a reserved and dynamic port. The ports are returned
// respectively.
func dockerTask() (*structs.Task, int, int) {
docker_reserved += 1
docker_dynamic += 1
func dockerTask(t *testing.T) (*structs.Task, int, int) {
ports := freeport.GetT(t, 2)
dockerReserved := ports[0]
dockerDynamic := ports[1]
return &structs.Task{
Name: "redis-demo",
Driver: "docker",
@ -72,12 +68,12 @@ func dockerTask() (*structs.Task, int, int) {
Networks: []*structs.NetworkResource{
{
IP: "127.0.0.1",
ReservedPorts: []structs.Port{{Label: "main", Value: docker_reserved}},
DynamicPorts: []structs.Port{{Label: "REDIS", Value: docker_dynamic}},
ReservedPorts: []structs.Port{{Label: "main", Value: dockerReserved}},
DynamicPorts: []structs.Port{{Label: "REDIS", Value: dockerDynamic}},
},
},
},
}, docker_reserved, docker_dynamic
}, dockerReserved, dockerDynamic
}
// dockerSetup does all of the basic setup you need to get a running docker
@ -563,9 +559,9 @@ func TestDockerDriver_StartN(t *testing.T) {
t.Skip("Docker not connected")
}
task1, _, _ := dockerTask()
task2, _, _ := dockerTask()
task3, _, _ := dockerTask()
task1, _, _ := dockerTask(t)
task2, _, _ := dockerTask(t)
task3, _, _ := dockerTask(t)
taskList := []*structs.Task{task1, task2, task3}
handles := make([]DriverHandle, len(taskList))
@ -617,15 +613,15 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
t.Skip("Docker not connected")
}
task1, _, _ := dockerTask()
task1, _, _ := dockerTask(t)
task1.Config["image"] = "busybox"
task1.Config["load"] = "busybox.tar"
task2, _, _ := dockerTask()
task2, _, _ := dockerTask(t)
task2.Config["image"] = "busybox:musl"
task2.Config["load"] = "busybox_musl.tar"
task3, _, _ := dockerTask()
task3, _, _ := dockerTask(t)
task3.Config["image"] = "busybox:glibc"
task3.Config["load"] = "busybox_glibc.tar"
@ -795,7 +791,7 @@ func TestDockerDriver_Labels(t *testing.T) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask()
task, _, _ := dockerTask(t)
task.Config["labels"] = []map[string]string{
{
"label1": "value1",
@ -830,7 +826,7 @@ func TestDockerDriver_ForcePull_IsInvalidConfig(t *testing.T) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask()
task, _, _ := dockerTask(t)
task.Config["force_pull"] = "nothing"
ctx := testDockerDriverContexts(t, task)
@ -851,7 +847,7 @@ func TestDockerDriver_ForcePull(t *testing.T) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask()
task, _, _ := dockerTask(t)
task.Config["force_pull"] = "true"
client, handle, cleanup := dockerSetup(t, task)
@ -873,7 +869,7 @@ func TestDockerDriver_SecurityOpt(t *testing.T) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask()
task, _, _ := dockerTask(t)
task.Config["security_opt"] = []string{"seccomp=unconfined"}
client, handle, cleanup := dockerSetup(t, task)
@ -899,7 +895,7 @@ func TestDockerDriver_DNS(t *testing.T) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask()
task, _, _ := dockerTask(t)
task.Config["dns_servers"] = []string{"8.8.8.8", "8.8.4.4"}
task.Config["dns_search_domains"] = []string{"example.com", "example.org", "example.net"}
task.Config["dns_options"] = []string{"ndots:1"}
@ -935,7 +931,7 @@ func TestDockerDriver_MACAddress(t *testing.T) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask()
task, _, _ := dockerTask(t)
task.Config["mac_address"] = "00:16:3e:00:00:00"
client, handle, cleanup := dockerSetup(t, task)
@ -961,7 +957,7 @@ func TestDockerWorkDir(t *testing.T) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask()
task, _, _ := dockerTask(t)
task.Config["work_dir"] = "/some/path"
client, handle, cleanup := dockerSetup(t, task)
@ -994,7 +990,7 @@ func TestDockerDriver_PortsNoMap(t *testing.T) {
t.Skip("Docker not connected")
}
task, res, dyn := dockerTask()
task, res, dyn := dockerTask(t)
client, handle, cleanup := dockerSetup(t, task)
defer cleanup()
@ -1051,7 +1047,7 @@ func TestDockerDriver_PortsMapping(t *testing.T) {
t.Skip("Docker not connected")
}
task, res, dyn := dockerTask()
task, res, dyn := dockerTask(t)
task.Config["port_map"] = []map[string]string{
{
"main": "8080",

View file

@ -265,7 +265,7 @@ func TestRawExecDriver_HandlerExec(t *testing.T) {
Driver: "raw_exec",
Config: map[string]interface{}{
"command": testtask.Path(),
"args": []string{"sleep", "9000"},
"args": []string{"sleep", "9000s"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
@ -310,6 +310,12 @@ func TestRawExecDriver_HandlerExec(t *testing.T) {
t.Fatalf("expected output to contain %q but found: %q", expected, out)
}
select {
case res := <-resp.Handle.WaitCh():
t.Fatalf("Shouldn't be exited: %v", res.String())
default:
}
if err := resp.Handle.Kill(); err != nil {
t.Fatalf("error killing exec handle: %v", err)
}

View file

@ -4,7 +4,6 @@ import (
"encoding/json"
"io/ioutil"
"log"
"net"
"os"
"strings"
"testing"
@ -15,20 +14,6 @@ import (
"github.com/stretchr/testify/assert"
)
func getPort() int {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
panic(err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
func tmpDir(t testing.TB) string {
dir, err := ioutil.TempDir("", "nomad")
if err != nil {
@ -39,7 +24,7 @@ func tmpDir(t testing.TB) string {
func TestAgent_RPCPing(t *testing.T) {
t.Parallel()
agent := NewTestAgent(t.Name(), nil)
agent := NewTestAgent(t, t.Name(), nil)
defer agent.Shutdown()
var out struct{}

View file

@ -82,7 +82,7 @@ func TestCommand_Args(t *testing.T) {
// TODO Why is this failing
func TestRetryJoin(t *testing.T) {
t.Parallel()
agent := NewTestAgent(t.Name(), nil)
agent := NewTestAgent(t, t.Name(), nil)
defer agent.Shutdown()
doneCh := make(chan struct{})

View file

@ -27,7 +27,7 @@ import (
// makeHTTPServer returns a test server whose logs will be written to
// the passed writer. If the writer is nil, the logs are written to stderr.
func makeHTTPServer(t testing.TB, cb func(c *Config)) *TestAgent {
return NewTestAgent(t.Name(), cb)
return NewTestAgent(t, t.Name(), cb)
}
func BenchmarkHTTPRequests(b *testing.B) {

View file

@ -13,7 +13,7 @@ func TestAgent_LoadKeyrings(t *testing.T) {
key := "tbLJg26ZJyJ9pK3qhc9jig=="
// Should be no configured keyring file by default
agent1 := NewTestAgent(t.Name(), nil)
agent1 := NewTestAgent(t, t.Name(), nil)
defer agent1.Shutdown()
c := agent1.server.GetConfig()
@ -26,6 +26,7 @@ func TestAgent_LoadKeyrings(t *testing.T) {
// Server should auto-load WAN keyring files
agent2 := &TestAgent{
T: t,
Name: t.Name() + "2",
Key: key,
}

View file

@ -6,6 +6,7 @@ import (
"testing"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
)
@ -41,12 +42,17 @@ func TestHTTP_Metrics(t *testing.T) {
assert.Nil(err)
respW = httptest.NewRecorder()
resp, err := s.Server.MetricsRequest(respW, req)
assert.Nil(err)
testutil.WaitForResult(func() (bool, error) {
resp, err := s.Server.MetricsRequest(respW, req)
if err != nil {
return false, err
}
respW.Flush()
res := resp.(metrics.MetricsSummary)
gauges := res.Gauges
assert.NotEqual(0, len(gauges))
res := resp.(metrics.MetricsSummary)
return len(res.Gauges) != 0, nil
}, func(err error) {
t.Fatalf("should have metrics: %v", err)
})
})
}

View file

@ -13,7 +13,10 @@ import (
"strings"
"time"
"github.com/mitchellh/go-testing-interface"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad"
@ -30,11 +33,13 @@ func init() {
// TempDir defines the base dir for temporary directories.
var TempDir = os.TempDir()
// TestAgent encapsulates an Agent with a default configuration and
// startup procedure suitable for testing. It panics if there are errors
// during creation or startup instead of returning errors. It manages a
// temporary data directory which is removed after shutdown.
// TestAgent encapsulates an Agent with a default configuration and startup
// procedure suitable for testing. It manages a temporary data directory which
// is removed after shutdown.
type TestAgent struct {
// T is the testing object
T testing.T
// Name is an optional name of the agent.
Name string
@ -74,19 +79,23 @@ type TestAgent struct {
}
// NewTestAgent returns a started agent with the given name and
// configuration. It panics if the agent could not be started. The
// caller should call Shutdown() to stop the agent and remove temporary
// directories.
func NewTestAgent(name string, configCallback func(*Config)) *TestAgent {
a := &TestAgent{Name: name, ConfigCallback: configCallback}
// configuration. The caller should call Shutdown() to stop the agent and
// remove temporary directories.
func NewTestAgent(t testing.T, name string, configCallback func(*Config)) *TestAgent {
a := &TestAgent{
T: t,
Name: name,
ConfigCallback: configCallback,
}
a.Start()
return a
}
// Start starts a test agent. It panics if the agent could not be started.
// Start starts a test agent.
func (a *TestAgent) Start() *TestAgent {
if a.Agent != nil {
panic("TestAgent already started")
a.T.Fatalf("TestAgent already started")
}
if a.Config == nil {
a.Config = a.config()
@ -99,7 +108,7 @@ func (a *TestAgent) Start() *TestAgent {
name = strings.Replace(name, "/", "_", -1)
d, err := ioutil.TempDir(TempDir, name)
if err != nil {
panic(fmt.Sprintf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err))
a.T.Fatalf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err)
}
a.DataDir = d
a.Config.DataDir = d
@ -107,7 +116,7 @@ func (a *TestAgent) Start() *TestAgent {
}
for i := 10; i >= 0; i-- {
pickRandomPorts(a.Config)
a.pickRandomPorts(a.Config)
if a.Config.NodeName == "" {
a.Config.NodeName = fmt.Sprintf("Node %d", a.Config.Ports.RPC)
}
@ -117,7 +126,7 @@ func (a *TestAgent) Start() *TestAgent {
writeKey := func(key, filename string) {
path := filepath.Join(a.Config.DataDir, filename)
if err := initKeyring(path, key); err != nil {
panic(fmt.Sprintf("Error creating keyring %s: %s", path, err))
a.T.Fatalf("Error creating keyring %s: %s", path, err)
}
}
writeKey(a.Key, serfKeyring)
@ -157,7 +166,7 @@ func (a *TestAgent) Start() *TestAgent {
err := a.RPC("Status.Leader", args, &leader)
return leader != "", err
}, func(err error) {
panic(fmt.Sprintf("failed to find leader: %v", err))
a.T.Fatalf("failed to find leader: %v", err)
})
} else {
testutil.WaitForResult(func() (bool, error) {
@ -166,7 +175,7 @@ func (a *TestAgent) Start() *TestAgent {
_, err := a.Server.AgentSelfRequest(resp, req)
return err == nil && resp.Code == 200, err
}, func(err error) {
panic(fmt.Sprintf("failed OK response: %v", err))
a.T.Fatalf("failed OK response: %v", err)
})
}
@ -177,7 +186,7 @@ func (a *TestAgent) Start() *TestAgent {
a.RootToken = mock.ACLManagementToken()
state := a.Agent.server.State()
if err := state.BootstrapACLTokens(1, 0, a.RootToken); err != nil {
panic(fmt.Sprintf("token bootstrap failed: %v", err))
a.T.Fatalf("token bootstrap failed: %v", err)
}
}
return a
@ -236,17 +245,11 @@ func (a *TestAgent) Client() *api.Client {
conf.Address = a.HTTPAddr()
c, err := api.NewClient(conf)
if err != nil {
panic(fmt.Sprintf("Error creating Nomad API client: %s", err))
a.T.Fatalf("Error creating Nomad API client: %s", err)
}
return c
}
// FivePorts returns the first port number of a block of
// five random ports.
func FivePorts() int {
return 1030 + int(rand.Int31n(6440))*5
}
// pickRandomPorts selects random ports from fixed size random blocks of
// ports. This does not eliminate the chance for port conflict but
// reduces it significanltly with little overhead. Furthermore, asking
@ -255,14 +258,14 @@ func FivePorts() int {
// chance of port conflicts for concurrently executed test binaries.
// Instead of relying on one set of ports to be sufficient we retry
// starting the agent with different ports on port conflict.
func pickRandomPorts(c *Config) {
port := FivePorts()
c.Ports.HTTP = port + 1
c.Ports.RPC = port + 2
c.Ports.Serf = port + 3
func (a *TestAgent) pickRandomPorts(c *Config) {
ports := freeport.GetT(a.T, 3)
c.Ports.HTTP = ports[0]
c.Ports.RPC = ports[1]
c.Ports.Serf = ports[2]
if err := c.normalizeAddrs(); err != nil {
panic(fmt.Sprintf("error normalizing config: %v", err))
a.T.Fatalf("error normalizing config: %v", err)
}
}

View file

@ -188,13 +188,13 @@ job "job1" {
stdinW.Close()
}()
args := []string{"-"}
args := []string{"-address=nope", "-"}
if code := cmd.Run(args); code != 1 {
t.Fatalf("expected exit code 1, got %d: %q", code, ui.ErrorWriter.String())
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "connection refused") {
t.Fatalf("expected connection refused error, got: %s", out)
if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error submitting job") {
t.Fatalf("expected submission error, got: %s", out)
}
ui.ErrorWriter.Reset()
}

View file

@ -10,7 +10,7 @@ import (
func testServer(t *testing.T, runClient bool, cb func(*agent.Config)) (*agent.TestAgent, *api.Client, string) {
// Make a new test server
a := agent.NewTestAgent(t.Name(), func(config *agent.Config) {
a := agent.NewTestAgent(t, t.Name(), func(config *agent.Config) {
config.Client.Enabled = runClient
if cb != nil {

View file

@ -88,7 +88,7 @@ func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
s1.heartbeatTimersLock.Lock()
s1.resetHeartbeatTimerLocked("foo", 5*time.Millisecond)
s1.resetHeartbeatTimerLocked("foo", 30*time.Millisecond)
s1.heartbeatTimersLock.Unlock()
if _, ok := s1.heartbeatTimers["foo"]; !ok {
@ -99,23 +99,23 @@ func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
// Renew the heartbeat
s1.heartbeatTimersLock.Lock()
s1.resetHeartbeatTimerLocked("foo", 5*time.Millisecond)
s1.resetHeartbeatTimerLocked("foo", 30*time.Millisecond)
s1.heartbeatTimersLock.Unlock()
renew := time.Now()
// Watch for invalidation
for time.Now().Sub(renew) < time.Duration(testutil.TestMultiplier()*20)*time.Millisecond {
for time.Now().Sub(renew) < time.Duration(testutil.TestMultiplier()*100)*time.Millisecond {
s1.heartbeatTimersLock.Lock()
_, ok := s1.heartbeatTimers["foo"]
s1.heartbeatTimersLock.Unlock()
if !ok {
end := time.Now()
if diff := end.Sub(renew); diff < 5*time.Millisecond {
if diff := end.Sub(renew); diff < 30*time.Millisecond {
t.Fatalf("early invalidate %v", diff)
}
return
}
time.Sleep(time.Millisecond)
time.Sleep(2 * time.Millisecond)
}
t.Fatalf("should have expired")
}

View file

@ -40,17 +40,30 @@ func TestLeader_LeftServer(t *testing.T) {
}
// Kill any server
servers[0].Shutdown()
var peer *Server
for _, s := range servers {
if !s.IsLeader() {
peer = s
break
}
}
if peer == nil {
t.Fatalf("Should have a non-leader")
}
peer.Shutdown()
name := fmt.Sprintf("%s.%s", peer.config.NodeName, peer.config.Region)
testutil.WaitForResult(func() (bool, error) {
// Force remove the non-leader (transition to left state)
name := fmt.Sprintf("%s.%s",
servers[0].config.NodeName, servers[0].config.Region)
if err := servers[1].RemoveFailedNode(name); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
if s == peer {
continue
}
// Force remove the non-leader (transition to left state)
if err := s.RemoveFailedNode(name); err != nil {
return false, err
}
for _, s := range servers[1:] {
peers, _ := s.numPeers()
return peers == 2, errors.New(fmt.Sprintf("%v", peers))
}

View file

@ -6,6 +6,7 @@ import (
"strings"
"testing"
"github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
@ -128,7 +129,7 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
// Try to remove a peer that's not there.
arg := structs.RaftPeerByAddressRequest{
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())),
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", freeport.GetT(t, 1)[0])),
}
arg.Region = s1.config.Region
var reply struct{}
@ -188,7 +189,7 @@ func TestOperator_RaftRemovePeerByAddress_ACL(t *testing.T) {
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
arg := structs.RaftPeerByAddressRequest{
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())),
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", freeport.GetT(t, 1)[0])),
}
arg.Region = s1.config.Region

View file

@ -97,6 +97,7 @@ func TestNomad_ReapPeer(t *testing.T) {
dir := tmpDir(t)
defer os.RemoveAll(dir)
s1 := testServer(t, func(c *Config) {
c.NodeName = "node1"
c.BootstrapExpect = 3
c.DevMode = false
c.DevDisableBootstrap = true
@ -104,6 +105,7 @@ func TestNomad_ReapPeer(t *testing.T) {
})
defer s1.Shutdown()
s2 := testServer(t, func(c *Config) {
c.NodeName = "node2"
c.BootstrapExpect = 3
c.DevMode = false
c.DevDisableBootstrap = true
@ -111,6 +113,7 @@ func TestNomad_ReapPeer(t *testing.T) {
})
defer s2.Shutdown()
s3 := testServer(t, func(c *Config) {
c.NodeName = "node3"
c.BootstrapExpect = 3
c.DevMode = false
c.DevDisableBootstrap = true
@ -120,14 +123,16 @@ func TestNomad_ReapPeer(t *testing.T) {
testJoin(t, s1, s2, s3)
testutil.WaitForResult(func() (bool, error) {
// Retry the join to decrease flakiness
testJoin(t, s1, s2, s3)
if members := s1.Members(); len(members) != 3 {
return false, fmt.Errorf("bad: %#v", members)
return false, fmt.Errorf("bad s1: %#v", members)
}
if members := s2.Members(); len(members) != 3 {
return false, fmt.Errorf("bad: %#v", members)
return false, fmt.Errorf("bad s2: %#v", members)
}
if members := s3.Members(); len(members) != 3 {
return false, fmt.Errorf("bad: %#v", members)
return false, fmt.Errorf("bad s3: %#v", members)
}
return true, nil
}, func(err error) {
@ -210,6 +215,8 @@ func TestNomad_BootstrapExpect(t *testing.T) {
testJoin(t, s1, s2, s3)
testutil.WaitForResult(func() (bool, error) {
// Retry the join to decrease flakiness
testJoin(t, s1, s2, s3)
peers, err := s1.numPeers()
if err != nil {
return false, err
@ -259,14 +266,23 @@ func TestNomad_BootstrapExpect(t *testing.T) {
// the fourth server.
testutil.WaitForLeader(t, s1.RPC)
termBefore := s1.raft.Stats()["last_log_term"]
addr := fmt.Sprintf("127.0.0.1:%d", s1.config.SerfConfig.MemberlistConfig.BindPort)
if _, err := s4.Join([]string{addr}); err != nil {
var addresses []string
for _, s := range []*Server{s1, s2, s3} {
addr := fmt.Sprintf("127.0.0.1:%d", s.config.SerfConfig.MemberlistConfig.BindPort)
addresses = append(addresses, addr)
}
if _, err := s4.Join(addresses); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the new server to see itself added to the cluster.
var p4 int
testutil.WaitForResult(func() (bool, error) {
// Retry join to reduce flakiness
if _, err := s4.Join(addresses); err != nil {
t.Fatalf("err: %v", err)
}
p4, _ = s4.numPeers()
return p4 == 4, errors.New(fmt.Sprintf("%d", p4))
}, func(err error) {

View file

@ -12,6 +12,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@ -24,10 +25,6 @@ var (
nodeNumber uint32 = 0
)
func getPort() int {
return 1030 + int(rand.Int31n(6440))
}
func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
}
@ -99,11 +96,12 @@ func testServer(t *testing.T, cb func(*Config)) *Server {
for i := 10; i >= 0; i-- {
// Get random ports
ports := freeport.GetT(t, 2)
config.RPCAddr = &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: getPort(),
Port: ports[0],
}
config.SerfConfig.MemberlistConfig.BindPort = getPort()
config.SerfConfig.MemberlistConfig.BindPort = ports[1]
// Create server
server, err := NewServer(config, catalog, logger)

View file

@ -697,15 +697,16 @@ func TestVaultClient_LookupToken_RateLimit(t *testing.T) {
}
client.SetActive(true)
defer client.Stop()
client.setLimit(rate.Limit(1.0))
waitForConnection(client, t)
client.setLimit(rate.Limit(1.0))
// Spin up many requests. These should block
ctx, cancel := context.WithCancel(context.Background())
cancels := 0
numRequests := 10
numRequests := 20
unblock := make(chan struct{})
for i := 0; i < numRequests; i++ {
go func() {
@ -734,13 +735,13 @@ func TestVaultClient_LookupToken_RateLimit(t *testing.T) {
desired := numRequests - 1
testutil.WaitForResult(func() (bool, error) {
if cancels != desired {
if desired-cancels > 2 {
return false, fmt.Errorf("Incorrect number of cancels; got %d; want %d", cancels, desired)
}
return true, nil
}, func(err error) {
t.Fatalf("Connection not established")
t.Fatal(err)
})
}

View file

@ -20,16 +20,13 @@ import (
"net/http"
"os"
"os/exec"
"sync/atomic"
"github.com/hashicorp/consul/lib/freeport"
cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/nomad/helper/discover"
testing "github.com/mitchellh/go-testing-interface"
)
// offset is used to atomically increment the port numbers.
var offset uint64
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
NodeName string `json:"name,omitempty"`
@ -88,11 +85,10 @@ type ServerConfigCallback func(c *TestServerConfig)
// defaultServerConfig returns a new TestServerConfig struct
// with all of the listen ports incremented by one.
func defaultServerConfig() *TestServerConfig {
idx := int(atomic.AddUint64(&offset, 1))
func defaultServerConfig(t testing.T) *TestServerConfig {
ports := freeport.GetT(t, 3)
return &TestServerConfig{
NodeName: fmt.Sprintf("node%d", idx),
NodeName: fmt.Sprintf("node-%d", ports[0]),
DisableCheckpoint: true,
LogLevel: "DEBUG",
// Advertise can't be localhost
@ -102,9 +98,9 @@ func defaultServerConfig() *TestServerConfig {
Serf: "169.254.42.42",
},
Ports: &PortsConfig{
HTTP: 20000 + idx,
RPC: 21000 + idx,
Serf: 22000 + idx,
HTTP: ports[0],
RPC: ports[1],
Serf: ports[2],
},
Server: &ServerConfig{
Enabled: true,
@ -161,7 +157,7 @@ func NewTestServer(t testing.T, cb ServerConfigCallback) *TestServer {
}
defer configFile.Close()
nomadConfig := defaultServerConfig()
nomadConfig := defaultServerConfig(t)
nomadConfig.DataDir = dataDir
if cb != nil {

View file

@ -7,6 +7,7 @@ import (
"os/exec"
"time"
"github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs/config"
vapi "github.com/hashicorp/vault/api"
@ -36,7 +37,7 @@ type TestVault struct {
// NewTestVault returns a new TestVault instance that has yet to be started
func NewTestVault(t testing.T) *TestVault {
for i := 10; i >= 0; i-- {
port := getPort()
port := freeport.GetT(t, 1)[0]
token := uuid.Generate()
bind := fmt.Sprintf("-dev-listen-address=127.0.0.1:%d", port)
http := fmt.Sprintf("http://127.0.0.1:%d", port)
@ -117,7 +118,7 @@ func NewTestVault(t testing.T) *TestVault {
// Start must be called and it is the callers responsibility to deal with any
// port conflicts that may occur and retry accordingly.
func NewTestVaultDelayed(t testing.T) *TestVault {
port := getPort()
port := freeport.GetT(t, 1)[0]
token := uuid.Generate()
bind := fmt.Sprintf("-dev-listen-address=127.0.0.1:%d", port)
http := fmt.Sprintf("http://127.0.0.1:%d", port)
@ -210,10 +211,6 @@ func (tv *TestVault) waitForAPI() error {
return waitErr
}
func getPort() int {
return 1030 + int(rand.Int31n(6440))
}
// VaultVersion returns the Vault version as a string or an error if it couldn't
// be determined
func VaultVersion() (string, error) {

View file

@ -0,0 +1,131 @@
// Package freeport provides a helper for allocating free ports across multiple
// processes on the same machine.
package freeport
import (
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/mitchellh/go-testing-interface"
)
const (
// blockSize is the size of the allocated port block. ports are given out
// consecutively from that block with roll-over for the lifetime of the
// application/test run.
blockSize = 500
// maxBlocks is the number of available port blocks.
// lowPort + maxBlocks * blockSize must be less than 65535.
maxBlocks = 30
// lowPort is the lowest port number that should be used.
lowPort = 10000
// attempts is how often we try to allocate a port block
// before giving up.
attempts = 10
)
var (
// firstPort is the first port of the allocated block.
firstPort int
// lockLn is the system-wide mutex for the port block.
lockLn net.Listener
// mu guards nextPort
mu sync.Mutex
// port is the last allocated port.
port int
)
func init() {
if lowPort+maxBlocks*blockSize > 65535 {
panic("freeport: block size too big or too many blocks requested")
}
rand.Seed(time.Now().UnixNano())
firstPort, lockLn = alloc()
}
// alloc reserves a port block for exclusive use for the lifetime of the
// application. lockLn serves as a system-wide mutex for the port block and is
// implemented as a TCP listener which is bound to the firstPort and which will
// be automatically released when the application terminates.
func alloc() (int, net.Listener) {
for i := 0; i < attempts; i++ {
block := int(rand.Int31n(int32(maxBlocks)))
firstPort := lowPort + block*blockSize
ln, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", firstPort))
if err != nil {
continue
}
// log.Printf("[DEBUG] freeport: allocated port block %d (%d-%d)", block, firstPort, firstPort+blockSize-1)
return firstPort, ln
}
panic("freeport: cannot allocate port block")
}
func tcpAddr(ip string, port int) *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP(ip), Port: port}
}
// Get wraps the Free function and panics on any failure retrieving ports.
func Get(n int) (ports []int) {
ports, err := Free(n)
if err != nil {
panic(err)
}
return ports
}
// GetT is suitable for use when retrieving unused ports in tests. If there is
// an error retrieving free ports, the test will be failed.
func GetT(t testing.T, n int) (ports []int) {
ports, err := Free(n)
if err != nil {
t.Fatalf("Failed retrieving free port: %v", err)
}
return ports
}
// Free returns a list of free ports from the allocated port block. It is safe
// to call this method concurrently. Ports have been tested to be available on
// 127.0.0.1 TCP but there is no guarantee that they will remain free in the
// future.
func Free(n int) (ports []int, err error) {
mu.Lock()
defer mu.Unlock()
if n > blockSize-1 {
return nil, fmt.Errorf("freeport: block size too small")
}
for len(ports) < n {
port++
// roll-over the port
if port < firstPort+1 || port >= firstPort+blockSize {
port = firstPort + 1
}
// if the port is in use then skip it
ln, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", port))
if err != nil {
// log.Println("[DEBUG] freeport: port already in use: ", port)
continue
}
ln.Close()
ports = append(ports, port)
}
// log.Println("[DEBUG] freeport: free ports:", ports)
return ports, nil
}

6
vendor/vendor.json vendored
View file

@ -760,6 +760,12 @@
"revision": "51ea240df8476e02215d53fbfad5838bf0d44d21",
"revisionTime": "2017-10-16T16:22:40Z"
},
{
"checksumSHA1": "XUc/5Wg49jT0dGHRv7FhzDosj2Q=",
"path": "github.com/hashicorp/consul/lib/freeport",
"revision": "be18f97531edb0b75a91e61c7e26a66224a46468",
"revisionTime": "2017-10-23T23:34:27Z"
},
{
"checksumSHA1": "5XjgqE4UIfwXvkq5VssGNc7uPhQ=",
"path": "github.com/hashicorp/consul/test/porter",