Reduce coupling with testing.T

This reduces the coupling with testing.T, allowing many of the
server's startup-related functions to return an error. This makes them
more re-usable.
This commit is contained in:
Seth Vargo 2017-03-23 16:26:05 -04:00
parent 306acf9054
commit 3077d0f68c
No known key found for this signature in database
GPG Key ID: C921994F9C27E0FF
5 changed files with 412 additions and 249 deletions

View File

@ -25,41 +25,51 @@ import (
"github.com/hashicorp/consul/testutil"
)
func TestMain(t *testing.T) {
func TestFoo_bar(t *testing.T) {
// Create a test Consul server
srv1 := testutil.NewTestServer(t)
srv1, err := testutil.NewTestServer()
if err != nil {
t.Fatal(err)
}
defer srv1.Stop()
// Create a secondary server, passing in configuration
// to avoid bootstrapping as we are forming a cluster.
srv2 := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
srv2, err := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
c.Bootstrap = false
})
if err != nil {
t.Fatal(err)
}
defer srv2.Stop()
// Join the servers together
srv1.JoinLAN(srv2.LANAddr)
srv1.JoinLAN(t, srv2.LANAddr)
// Create a test key/value pair
srv1.SetKV("foo", []byte("bar"))
srv1.SetKV(t, "foo", []byte("bar"))
// Create lots of test key/value pairs
srv1.PopulateKV(map[string][]byte{
srv1.PopulateKV(t, map[string][]byte{
"bar": []byte("123"),
"baz": []byte("456"),
})
// Create a service
srv1.AddService("redis", structs.HealthPassing, []string{"master"})
srv1.AddService(t, "redis", structs.HealthPassing, []string{"master"})
// Create a service check
srv1.AddCheck("service:redis", "redis", structs.HealthPassing)
srv1.AddCheck(t, "service:redis", "redis", structs.HealthPassing)
// Create a node check
srv1.AddCheck("mem", "", structs.HealthCritical)
srv1.AddCheck(t, "mem", "", structs.HealthCritical)
// The HTTPAddr field contains the address of the Consul
// API on the new test server instance.
println(srv1.HTTPAddr)
// All functions also have a wrapper method to limit the passing of "t"
wrap := srv1.Wrap(t)
wrap.SetKV("foo", []byte("bar"))
}
```

View File

@ -12,9 +12,7 @@ package testutil
// otherwise cause an import cycle.
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
@ -26,8 +24,8 @@ import (
"strconv"
"strings"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-cleanhttp"
"github.com/pkg/errors"
)
// TestPerformanceConfig configures the performance parameters.
@ -129,15 +127,6 @@ type TestCheck struct {
TTL string `json:",omitempty"`
}
// TestingT is an interface wrapper around TestingT
type TestingT interface {
Logf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Fatal(args ...interface{})
Skip(args ...interface{})
}
// TestKVResponse is what we use to decode KV data.
type TestKVResponse struct {
Value string
@ -147,7 +136,6 @@ type TestKVResponse struct {
type TestServer struct {
cmd *exec.Cmd
Config *TestServerConfig
t TestingT
HTTPAddr string
LANAddr string
@ -158,27 +146,29 @@ type TestServer struct {
// NewTestServer is an easy helper method to create a new Consul
// test server with the most basic configuration.
func NewTestServer(t TestingT) *TestServer {
return NewTestServerConfig(t, nil)
func NewTestServer() (*TestServer, error) {
return NewTestServerConfig(nil)
}
// NewTestServerConfig creates a new TestServer, and makes a call to
// an optional callback function to modify the configuration.
func NewTestServerConfig(t TestingT, cb ServerConfigCallback) *TestServer {
// NewTestServerConfig creates a new TestServer, and makes a call to an optional
// callback function to modify the configuration. If there is an error
// configuring or starting the server, the server will NOT be running when the
// function returns (thus you do not need to stop it).
func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) {
if path, err := exec.LookPath("consul"); err != nil || path == "" {
t.Fatal("consul not found on $PATH - download and install " +
return nil, fmt.Errorf("consul not found on $PATH - download and install " +
"consul or skip this test")
}
dataDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
return nil, errors.Wrap(err, "failed creating tempdir")
}
configFile, err := ioutil.TempFile(dataDir, "config")
if err != nil {
defer os.RemoveAll(dataDir)
t.Fatalf("err: %s", err)
return nil, errors.Wrap(err, "failed creating temp config")
}
consulConfig := defaultServerConfig()
@ -190,11 +180,13 @@ func NewTestServerConfig(t TestingT, cb ServerConfigCallback) *TestServer {
configContent, err := json.Marshal(consulConfig)
if err != nil {
t.Fatalf("err: %s", err)
return nil, errors.Wrap(err, "failed marshaling json")
}
if _, err := configFile.Write(configContent); err != nil {
t.Fatalf("err: %s", err)
defer configFile.Close()
defer os.RemoveAll(dataDir)
return nil, errors.Wrap(err, "failed writing config content")
}
configFile.Close()
@ -215,7 +207,7 @@ func NewTestServerConfig(t TestingT, cb ServerConfigCallback) *TestServer {
cmd.Stdout = stdout
cmd.Stderr = stderr
if err := cmd.Start(); err != nil {
t.Fatalf("err: %s", err)
return nil, errors.Wrap(err, "failed starting command")
}
var httpAddr string
@ -237,7 +229,6 @@ func NewTestServerConfig(t TestingT, cb ServerConfigCallback) *TestServer {
server := &TestServer{
Config: consulConfig,
cmd: cmd,
t: t,
HTTPAddr: httpAddr,
LANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfLan),
@ -247,65 +238,77 @@ func NewTestServerConfig(t TestingT, cb ServerConfigCallback) *TestServer {
}
// Wait for the server to be ready
var startErr error
if consulConfig.Bootstrap {
server.waitForLeader()
startErr = server.waitForLeader()
} else {
server.waitForAPI()
startErr = server.waitForAPI()
}
if startErr != nil {
defer server.Stop()
return nil, errors.Wrap(err, "failed waiting for server to start")
}
return server
return server, nil
}
// Stop stops the test Consul server, and removes the Consul data
// directory once we are done.
func (s *TestServer) Stop() {
func (s *TestServer) Stop() error {
defer os.RemoveAll(s.Config.DataDir)
if err := s.cmd.Process.Kill(); err != nil {
s.t.Errorf("err: %s", err)
if s.cmd != nil {
if s.cmd.Process != nil {
if err := s.cmd.Process.Kill(); err != nil {
return errors.Wrap(err, "failed to kill consul server")
}
}
// wait for the process to exit to be sure that the data dir can be
// deleted on all platforms.
return s.cmd.Wait()
}
// wait for the process to exit to be sure that the data dir can be
// deleted on all platforms.
s.cmd.Wait()
// There was no process
return nil
}
// waitForAPI waits for only the agent HTTP endpoint to start
// responding. This is an indication that the agent has started,
// but will likely return before a leader is elected.
func (s *TestServer) waitForAPI() {
WaitForResult(func() (bool, error) {
func (s *TestServer) waitForAPI() error {
if err := WaitForResult(func() (bool, error) {
resp, err := s.HttpClient.Get(s.url("/v1/agent/self"))
if err != nil {
return false, err
return false, errors.Wrap(err, "failed http get")
}
defer resp.Body.Close()
if err := s.requireOK(resp); err != nil {
return false, err
return false, errors.Wrap(err, "failed OK response")
}
return true, nil
}, func(err error) {
defer s.Stop()
s.t.Fatalf("err: %s", err)
})
}); err != nil {
return errors.Wrap(err, "failed waiting for API")
}
return nil
}
// waitForLeader waits for the Consul server's HTTP API to become
// available, and then waits for a known leader and an index of
// 1 or more to be observed to confirm leader election is done.
// It then waits to ensure the anti-entropy sync has completed.
func (s *TestServer) waitForLeader() {
func (s *TestServer) waitForLeader() error {
var index int64
WaitForResult(func() (bool, error) {
if err := WaitForResult(func() (bool, error) {
// Query the API and check the status code.
url := s.url(fmt.Sprintf("/v1/catalog/nodes?index=%d&wait=2s", index))
resp, err := s.HttpClient.Get(url)
if err != nil {
return false, err
return false, errors.Wrap(err, "failed http get")
}
defer resp.Body.Close()
if err := s.requireOK(resp); err != nil {
return false, err
return false, errors.Wrap(err, "failed OK response")
}
// Ensure we have a leader and a node registration.
@ -314,10 +317,10 @@ func (s *TestServer) waitForLeader() {
}
index, err = strconv.ParseInt(resp.Header.Get("X-Consul-Index"), 10, 64)
if err != nil {
return false, fmt.Errorf("Consul index was bad: %v", err)
return false, errors.Wrap(err, "bad consul index")
}
if index == 0 {
return false, fmt.Errorf("Consul index is 0")
return false, fmt.Errorf("consul index is 0")
}
// Watch for the anti-entropy sync to finish.
@ -337,192 +340,8 @@ func (s *TestServer) waitForLeader() {
return false, fmt.Errorf("No lan tagged addresses")
}
return true, nil
}, func(err error) {
defer s.Stop()
s.t.Fatalf("err: %s", err)
})
}
// url is a helper function which takes a relative URL and
// makes it into a proper URL against the local Consul server.
func (s *TestServer) url(path string) string {
return fmt.Sprintf("http://127.0.0.1:%d%s", s.Config.Ports.HTTP, path)
}
// requireOK checks the HTTP response code and ensures it is acceptable.
func (s *TestServer) requireOK(resp *http.Response) error {
if resp.StatusCode != 200 {
return fmt.Errorf("Bad status code: %d", resp.StatusCode)
}); err != nil {
return errors.Wrap(err, "failed waiting for leader")
}
return nil
}
// put performs a new HTTP PUT request.
func (s *TestServer) put(path string, body io.Reader) *http.Response {
req, err := http.NewRequest("PUT", s.url(path), body)
if err != nil {
s.t.Fatalf("err: %s", err)
}
resp, err := s.HttpClient.Do(req)
if err != nil {
s.t.Fatalf("err: %s", err)
}
if err := s.requireOK(resp); err != nil {
defer resp.Body.Close()
s.t.Fatal(err)
}
return resp
}
// get performs a new HTTP GET request.
func (s *TestServer) get(path string) *http.Response {
resp, err := s.HttpClient.Get(s.url(path))
if err != nil {
s.t.Fatalf("err: %s", err)
}
if err := s.requireOK(resp); err != nil {
defer resp.Body.Close()
s.t.Fatal(err)
}
return resp
}
// encodePayload returns a new io.Reader wrapping the encoded contents
// of the payload, suitable for passing directly to a new request.
func (s *TestServer) encodePayload(payload interface{}) io.Reader {
var encoded bytes.Buffer
enc := json.NewEncoder(&encoded)
if err := enc.Encode(payload); err != nil {
s.t.Fatalf("err: %s", err)
}
return &encoded
}
// JoinLAN is used to join nodes within the same datacenter.
func (s *TestServer) JoinLAN(addr string) {
resp := s.get("/v1/agent/join/" + addr)
resp.Body.Close()
}
// JoinWAN is used to join remote datacenters together.
func (s *TestServer) JoinWAN(addr string) {
resp := s.get("/v1/agent/join/" + addr + "?wan=1")
resp.Body.Close()
}
// SetKV sets an individual key in the K/V store.
func (s *TestServer) SetKV(key string, val []byte) {
resp := s.put("/v1/kv/"+key, bytes.NewBuffer(val))
resp.Body.Close()
}
// GetKV retrieves a single key and returns its value
func (s *TestServer) GetKV(key string) []byte {
resp := s.get("/v1/kv/" + key)
defer resp.Body.Close()
raw, err := ioutil.ReadAll(resp.Body)
if err != nil {
s.t.Fatalf("err: %s", err)
}
var result []*TestKVResponse
if err := json.Unmarshal(raw, &result); err != nil {
s.t.Fatalf("err: %s", err)
}
if len(result) < 1 {
s.t.Fatalf("key does not exist: %s", key)
}
v, err := base64.StdEncoding.DecodeString(result[0].Value)
if err != nil {
s.t.Fatalf("err: %s", err)
}
return v
}
// PopulateKV fills the Consul KV with data from a generic map.
func (s *TestServer) PopulateKV(data map[string][]byte) {
for k, v := range data {
s.SetKV(k, v)
}
}
// ListKV returns a list of keys present in the KV store. This will list all
// keys under the given prefix recursively and return them as a slice.
func (s *TestServer) ListKV(prefix string) []string {
resp := s.get("/v1/kv/" + prefix + "?keys")
defer resp.Body.Close()
raw, err := ioutil.ReadAll(resp.Body)
if err != nil {
s.t.Fatalf("err: %s", err)
}
var result []string
if err := json.Unmarshal(raw, &result); err != nil {
s.t.Fatalf("err: %s", err)
}
return result
}
// AddService adds a new service to the Consul instance. It also
// automatically adds a health check with the given status, which
// can be one of "passing", "warning", or "critical".
func (s *TestServer) AddService(name, status string, tags []string) {
svc := &TestService{
Name: name,
Tags: tags,
}
payload := s.encodePayload(svc)
s.put("/v1/agent/service/register", payload)
chkName := "service:" + name
chk := &TestCheck{
Name: chkName,
ServiceID: name,
TTL: "10m",
}
payload = s.encodePayload(chk)
s.put("/v1/agent/check/register", payload)
switch status {
case structs.HealthPassing:
s.put("/v1/agent/check/pass/"+chkName, nil)
case structs.HealthWarning:
s.put("/v1/agent/check/warn/"+chkName, nil)
case structs.HealthCritical:
s.put("/v1/agent/check/fail/"+chkName, nil)
default:
s.t.Fatalf("Unrecognized status: %s", status)
}
}
// AddCheck adds a check to the Consul instance. If the serviceID is
// left empty (""), then the check will be associated with the node.
// The check status may be "passing", "warning", or "critical".
func (s *TestServer) AddCheck(name, serviceID, status string) {
chk := &TestCheck{
ID: name,
Name: name,
TTL: "10m",
}
if serviceID != "" {
chk.ServiceID = serviceID
}
payload := s.encodePayload(chk)
s.put("/v1/agent/check/register", payload)
switch status {
case structs.HealthPassing:
s.put("/v1/agent/check/pass/"+name, nil)
case structs.HealthWarning:
s.put("/v1/agent/check/warn/"+name, nil)
case structs.HealthCritical:
s.put("/v1/agent/check/fail/"+name, nil)
default:
s.t.Fatalf("Unrecognized status: %s", status)
}
}

237
testutil/server_methods.go Normal file
View File

@ -0,0 +1,237 @@
package testutil
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/pkg/errors"
)
// JoinLAN is used to join local datacenters together.
func (s *TestServer) JoinLAN(t *testing.T, addr string) {
resp := s.get(t, "/v1/agent/join/"+addr)
defer resp.Body.Close()
}
// JoinWAN is used to join remote datacenters together.
func (s *TestServer) JoinWAN(t *testing.T, addr string) {
resp := s.get(t, "/v1/agent/join/"+addr+"?wan=1")
resp.Body.Close()
}
// SetKV sets an individual key in the K/V store.
func (s *TestServer) SetKV(t *testing.T, key string, val []byte) {
resp := s.put(t, "/v1/kv/"+key, bytes.NewBuffer(val))
resp.Body.Close()
}
// SetKVString sets an individual key in the K/V store, but accepts a string
// instead of []byte.
func (s *TestServer) SetKVString(t *testing.T, key string, val string) {
resp := s.put(t, "/v1/kv/"+key, bytes.NewBufferString(val))
resp.Body.Close()
}
// GetKV retrieves a single key and returns its value
func (s *TestServer) GetKV(t *testing.T, key string) []byte {
resp := s.get(t, "/v1/kv/"+key)
defer resp.Body.Close()
raw, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed to read body: %s", err)
}
var result []*TestKVResponse
if err := json.Unmarshal(raw, &result); err != nil {
t.Fatalf("failed to unmarshal: %s", err)
}
if len(result) < 1 {
t.Fatalf("key does not exist: %s", key)
}
v, err := base64.StdEncoding.DecodeString(result[0].Value)
if err != nil {
t.Fatalf("failed to base64 decode: %s", err)
}
return v
}
// GetKVString retrieves a value from the store, but returns as a string instead
// of []byte.
func (s *TestServer) GetKVString(t *testing.T, key string) string {
return string(s.GetKV(t, key))
}
// PopulateKV fills the Consul KV with data from a generic map.
func (s *TestServer) PopulateKV(t *testing.T, data map[string][]byte) {
for k, v := range data {
s.SetKV(t, k, v)
}
}
// ListKV returns a list of keys present in the KV store. This will list all
// keys under the given prefix recursively and return them as a slice.
func (s *TestServer) ListKV(t *testing.T, prefix string) []string {
resp := s.get(t, "/v1/kv/"+prefix+"?keys")
defer resp.Body.Close()
raw, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed to read body: %s", err)
}
var result []string
if err := json.Unmarshal(raw, &result); err != nil {
t.Fatalf("failed to unmarshal: %s", err)
}
return result
}
// AddService adds a new service to the Consul instance. It also
// automatically adds a health check with the given status, which
// can be one of "passing", "warning", or "critical".
func (s *TestServer) AddService(t *testing.T, name, status string, tags []string) {
svc := &TestService{
Name: name,
Tags: tags,
}
payload, err := s.encodePayload(svc)
if err != nil {
t.Fatal(err)
}
s.put(t, "/v1/agent/service/register", payload)
chkName := "service:" + name
chk := &TestCheck{
Name: chkName,
ServiceID: name,
TTL: "10m",
}
payload, err = s.encodePayload(chk)
if err != nil {
t.Fatal(err)
}
s.put(t, "/v1/agent/check/register", payload)
switch status {
case structs.HealthPassing:
s.put(t, "/v1/agent/check/pass/"+chkName, nil)
case structs.HealthWarning:
s.put(t, "/v1/agent/check/warn/"+chkName, nil)
case structs.HealthCritical:
s.put(t, "/v1/agent/check/fail/"+chkName, nil)
default:
t.Fatalf("Unrecognized status: %s", status)
}
}
// AddCheck adds a check to the Consul instance. If the serviceID is
// left empty (""), then the check will be associated with the node.
// The check status may be "passing", "warning", or "critical".
func (s *TestServer) AddCheck(t *testing.T, name, serviceID, status string) {
chk := &TestCheck{
ID: name,
Name: name,
TTL: "10m",
}
if serviceID != "" {
chk.ServiceID = serviceID
}
payload, err := s.encodePayload(chk)
if err != nil {
t.Fatal(err)
}
s.put(t, "/v1/agent/check/register", payload)
switch status {
case structs.HealthPassing:
s.put(t, "/v1/agent/check/pass/"+name, nil)
case structs.HealthWarning:
s.put(t, "/v1/agent/check/warn/"+name, nil)
case structs.HealthCritical:
s.put(t, "/v1/agent/check/fail/"+name, nil)
default:
t.Fatalf("Unrecognized status: %s", status)
}
}
// put performs a new HTTP PUT request.
func (s *TestServer) put(t *testing.T, path string, body io.Reader) *http.Response {
req, err := http.NewRequest("PUT", s.url(path), body)
if err != nil {
t.Fatalf("failed to create PUT request: %s", err)
}
resp, err := s.HttpClient.Do(req)
if err != nil {
t.Fatalf("failed to make PUT request: %s", err)
}
if err := s.requireOK(resp); err != nil {
defer resp.Body.Close()
t.Fatalf("not OK PUT: %s", err)
}
return resp
}
// get performs a new HTTP GET request.
func (s *TestServer) get(t *testing.T, path string) *http.Response {
resp, err := s.HttpClient.Get(s.url(path))
if err != nil {
t.Fatalf("failed to create GET request: %s", err)
}
if err := s.requireOK(resp); err != nil {
defer resp.Body.Close()
t.Fatalf("not OK GET: %s", err)
}
return resp
}
// encodePayload returns a new io.Reader wrapping the encoded contents
// of the payload, suitable for passing directly to a new request.
func (s *TestServer) encodePayload(payload interface{}) (io.Reader, error) {
var encoded bytes.Buffer
enc := json.NewEncoder(&encoded)
if err := enc.Encode(payload); err != nil {
return nil, errors.Wrap(err, "failed to encode payload")
}
return &encoded, nil
}
// url is a helper function which takes a relative URL and
// makes it into a proper URL against the local Consul server.
func (s *TestServer) url(path string) string {
if s == nil {
log.Fatal("s is nil")
}
if s.Config == nil {
log.Fatal("s.Config is nil")
}
if s.Config.Ports == nil {
log.Fatal("s.Config.Ports is nil")
}
if s.Config.Ports.HTTP == 0 {
log.Fatal("s.Config.Ports.HTTP is 0")
}
if path == "" {
log.Fatal("path is empty")
}
return fmt.Sprintf("http://127.0.0.1:%d%s", s.Config.Ports.HTTP, path)
}
// requireOK checks the HTTP response code and ensures it is acceptable.
func (s *TestServer) requireOK(resp *http.Response) error {
if resp.StatusCode != 200 {
return fmt.Errorf("Bad status code: %d", resp.StatusCode)
}
return nil
}

View File

@ -0,0 +1,96 @@
package testutil
import (
"testing"
)
type WrappedServer struct {
s *TestServer
t *testing.T
}
// Wrap wraps the test server in a `testing.t` for convenience.
//
// For example, the following code snippets are equivalent.
//
// server.JoinLAN(t, "1.2.3.4")
// server.Wrap(t).JoinLAN("1.2.3.4")
//
// This is useful when you are calling multiple functions and save the wrapped
// value as another variable to reduce the inclusion of "t".
func (s *TestServer) Wrap(t *testing.T) *WrappedServer {
return &WrappedServer{
s: s,
t: t,
}
}
// See Also
//
// TestServer.JoinLAN()
func (w *WrappedServer) JoinLAN(addr string) {
w.s.JoinLAN(w.t, addr)
}
// See Also
//
// TestServer.JoinWAN()
func (w *WrappedServer) JoinWAN(addr string) {
w.s.JoinWAN(w.t, addr)
}
// See Also
//
// TestServer.SetKV()
func (w *WrappedServer) SetKV(key string, val []byte) {
w.s.SetKV(w.t, key, val)
}
// See Also
//
// TestServer.SetKVString()
func (w *WrappedServer) SetKVString(key string, val string) {
w.s.SetKVString(w.t, key, val)
}
// See Also
//
// TestServer.GetKV()
func (w *WrappedServer) GetKV(key string) []byte {
return w.s.GetKV(w.t, key)
}
// See Also
//
// TestServer.GetKVString()
func (w *WrappedServer) GetKVString(key string) string {
return w.s.GetKVString(w.t, key)
}
// See Also
//
// TestServer.PopulateKV()
func (w *WrappedServer) PopulateKV(data map[string][]byte) {
w.s.PopulateKV(w.t, data)
}
// See Also
//
// TestServer.ListKV()
func (w *WrappedServer) ListKV(prefix string) []string {
return w.s.ListKV(w.t, prefix)
}
// See Also
//
// TestServer.AddService()
func (w *WrappedServer) AddService(name, status string, tags []string) {
w.s.AddService(w.t, name, status, tags)
}
// See Also
//
// TestServer.AddCheck()
func (w *WrappedServer) AddCheck(name, serviceID, status string) {
w.s.AddCheck(w.t, name, serviceID, status)
}

View File

@ -16,7 +16,7 @@ const (
maxWait = 100 * time.Millisecond
)
func WaitForResult(try testFn, fail errorFn) {
func WaitForResult(try testFn) error {
var err error
wait := baseWait
for retries := 100; retries > 0; retries-- {
@ -24,7 +24,7 @@ func WaitForResult(try testFn, fail errorFn) {
success, err = try()
if success {
time.Sleep(25 * time.Millisecond)
return
return nil
}
time.Sleep(wait)
@ -33,14 +33,15 @@ func WaitForResult(try testFn, fail errorFn) {
wait = maxWait
}
}
fail(err)
return err
}
type rpcFn func(string, interface{}, interface{}) error
func WaitForLeader(t *testing.T, rpc rpcFn, dc string) structs.IndexedNodes {
var out structs.IndexedNodes
WaitForResult(func() (bool, error) {
if err := WaitForResult(func() (bool, error) {
// Ensure we have a leader and a node registration.
args := &structs.DCSpecificRequest{
Datacenter: dc,
@ -55,8 +56,8 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string) structs.IndexedNodes {
return false, fmt.Errorf("Consul index is 0")
}
return true, nil
}, func(err error) {
}); err != nil {
t.Fatalf("failed to find leader: %v", err)
})
}
return out
}