Update Consul to v0.9.2 for Header and Method

This commit is contained in:
Michael Schurter 2017-08-15 16:13:29 -07:00
parent bb8d5689d8
commit 4e72a8dedf
13 changed files with 467 additions and 267 deletions

View File

@ -42,6 +42,24 @@ func (c *Client) ACL() *ACL {
return &ACL{c}
}
// Bootstrap is used to perform a one-time ACL bootstrap operation on a cluster
// to get the first management token.
func (a *ACL) Bootstrap() (string, *WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/bootstrap")
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// Create is used to generate a new token with the given parameters
func (a *ACL) Create(acl *ACLEntry, q *WriteOptions) (string, *WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/create")

View File

@ -67,17 +67,19 @@ type AgentCheckRegistration struct {
// AgentServiceCheck is used to define a node or service level check
type AgentServiceCheck struct {
Script string `json:",omitempty"`
DockerContainerID string `json:",omitempty"`
Shell string `json:",omitempty"` // Only supported for Docker.
Interval string `json:",omitempty"`
Timeout string `json:",omitempty"`
TTL string `json:",omitempty"`
HTTP string `json:",omitempty"`
TCP string `json:",omitempty"`
Status string `json:",omitempty"`
Notes string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
Script string `json:",omitempty"`
DockerContainerID string `json:",omitempty"`
Shell string `json:",omitempty"` // Only supported for Docker.
Interval string `json:",omitempty"`
Timeout string `json:",omitempty"`
TTL string `json:",omitempty"`
HTTP string `json:",omitempty"`
Header map[string][]string `json:",omitempty"`
Method string `json:",omitempty"`
TCP string `json:",omitempty"`
Status string `json:",omitempty"`
Notes string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
// In Consul 0.7 and later, checks that are associated with a service
// may also contain this optional DeregisterCriticalServiceAfter field,
@ -89,6 +91,47 @@ type AgentServiceCheck struct {
}
type AgentServiceChecks []*AgentServiceCheck
// AgentToken is used when updating ACL tokens for an agent.
type AgentToken struct {
Token string
}
// Metrics info is used to store different types of metric values from the agent.
type MetricsInfo struct {
Timestamp string
Gauges []GaugeValue
Points []PointValue
Counters []SampledValue
Samples []SampledValue
}
// GaugeValue stores one value that is updated as time goes on, such as
// the amount of memory allocated.
type GaugeValue struct {
Name string
Value float32
Labels map[string]string
}
// PointValue holds a series of points for a metric.
type PointValue struct {
Name string
Points []float32
}
// SampledValue stores info about a metric that is incremented over time,
// such as the number of requests to an HTTP endpoint.
type SampledValue struct {
Name string
Count int
Sum float64
Min float64
Max float64
Mean float64
Stddev float64
Labels map[string]string
}
// Agent can be used to query the Agent endpoints
type Agent struct {
c *Client
@ -119,6 +162,23 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
return out, nil
}
// Metrics is used to query the agent we are speaking to for
// its current internal metric data
func (a *Agent) Metrics() (*MetricsInfo, error) {
r := a.c.newRequest("GET", "/v1/agent/metrics")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out *MetricsInfo
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload")
@ -439,8 +499,9 @@ func (a *Agent) DisableNodeMaintenance() error {
// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop the
// log stream
func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions) (chan string, error) {
// log stream. An empty string will be sent down the given channel when there's
// nothing left to stream, after which the caller should close the stopCh.
func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) {
r := a.c.newRequest("GET", "/v1/agent/monitor")
r.setQueryOptions(q)
if loglevel != "" {
@ -464,10 +525,61 @@ func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions)
default:
}
if scanner.Scan() {
logCh <- scanner.Text()
// An empty string signals to the caller that
// the scan is done, so make sure we only emit
// that when the scanner says it's done, not if
// we happen to ingest an empty line.
if text := scanner.Text(); text != "" {
logCh <- text
} else {
logCh <- " "
}
} else {
logCh <- ""
}
}
}()
return logCh, nil
}
// UpdateACLToken updates the agent's "acl_token". See updateToken for more
// details.
func (c *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_token", token, q)
}
// UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken
// for more details.
func (c *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_agent_token", token, q)
}
// UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See
// updateToken for more details.
func (c *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_agent_master_token", token, q)
}
// UpdateACLReplicationToken updates the agent's "acl_replication_token". See
// updateToken for more details.
func (c *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_replication_token", token, q)
}
// updateToken can be used to update an agent's ACL token after the agent has
// started. The tokens are not persisted, so will need to be updated again if
// the agent is restarted.
func (c *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) {
r := c.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target))
r.setWriteOptions(q)
r.obj = &AgentToken{Token: token}
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
return wm, nil
}

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@ -104,6 +105,26 @@ type QueryOptions struct {
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8
// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context
}
func (o *QueryOptions) Context() context.Context {
if o != nil && o.ctx != nil {
return o.ctx
}
return context.Background()
}
func (o *QueryOptions) WithContext(ctx context.Context) *QueryOptions {
o2 := new(QueryOptions)
if o != nil {
*o2 = *o
}
o2.ctx = ctx
return o2
}
// WriteOptions are used to parameterize a write
@ -120,6 +141,26 @@ type WriteOptions struct {
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8
// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context
}
func (o *WriteOptions) Context() context.Context {
if o != nil && o.ctx != nil {
return o.ctx
}
return context.Background()
}
func (o *WriteOptions) WithContext(ctx context.Context) *WriteOptions {
o2 := new(WriteOptions)
if o != nil {
*o2 = *o
}
o2.ctx = ctx
return o2
}
// QueryMeta is used to return meta data about a query
@ -456,6 +497,7 @@ type request struct {
body io.Reader
header http.Header
obj interface{}
ctx context.Context
}
// setQueryOptions is used to annotate the request with
@ -493,6 +535,7 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.RelayFactor != 0 {
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
}
r.ctx = q.ctx
}
// durToMsec converts a duration to a millisecond specified string. If the
@ -537,6 +580,7 @@ func (r *request) setWriteOptions(q *WriteOptions) {
if q.RelayFactor != 0 {
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
}
r.ctx = q.ctx
}
// toHTTP converts the request to an HTTP request
@ -568,8 +612,11 @@ func (r *request) toHTTP() (*http.Request, error) {
if r.config.HttpAuth != nil {
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
}
return req, nil
if r.ctx != nil {
return req.WithContext(r.ctx), nil
} else {
return req, nil
}
}
// newRequest is used to create a new request
@ -648,6 +695,8 @@ func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
} else if _, err := ioutil.ReadAll(resp.Body); err != nil {
return nil, err
}
return wm, nil
}

View File

@ -39,6 +39,10 @@ type AutopilotConfiguration struct {
// cluster before promoting them to voters.
DisableUpgradeMigration bool
// (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when
// performing upgrade migrations. If left blank, the Consul version will be used.
UpgradeVersionTag string
// CreateIndex holds the index corresponding the creation of this configuration.
// This is a read-only field.
CreateIndex uint64

View File

@ -145,7 +145,9 @@ func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta,
// RenewPeriodic is used to periodically invoke Session.Renew on a
// session until a doneCh is closed. This is meant to be used in a long running
// goroutine to ensure a session stays valid.
func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh chan struct{}) error {
func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh <-chan struct{}) error {
ctx := q.Context()
ttl, err := time.ParseDuration(initialTTL)
if err != nil {
return err
@ -179,6 +181,11 @@ func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, d
// Attempt a session destroy
s.Destroy(id, q)
return nil
case <-ctx.Done():
// Bail immediately since attempting the destroy would
// use the canceled context in q, which would just bail.
return ctx.Err()
}
}
}

View File

@ -31,7 +31,7 @@ func DurationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter
return min, max
}
// Returns a random stagger interval between 0 and the duration
// RandomStagger returns an interval between 0 and the duration
func RandomStagger(intv time.Duration) time.Duration {
if intv == 0 {
return 0

View File

@ -58,6 +58,9 @@ func TestFoo_bar(t *testing.T) {
// Create a service
srv1.AddService(t, "redis", structs.HealthPassing, []string{"master"})
// Create a service that will be accessed in target source code
srv1.AddAccessibleService("redis", structs.HealthPassing, "127.0.0.1", 6379, []string{"master"})
// Create a service check
srv1.AddCheck(t, "service:redis", "redis", structs.HealthPassing)

61
vendor/github.com/hashicorp/consul/testutil/io.go generated vendored Normal file
View File

@ -0,0 +1,61 @@
package testutil
import (
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
)
// tmpdir is the base directory for all temporary directories
// and files created with TempDir and TempFile. This could be
// achieved by setting a system environment variable but then
// the test execution would depend on whether or not the
// environment variable is set.
//
// On macOS the temp base directory is quite long and that
// triggers a problem with some tests that bind to UNIX sockets
// where the filename seems to be too long. Using a shorter name
// fixes this and makes the paths more readable.
//
// It also provides a single base directory for cleanup.
var tmpdir = "/tmp/consul-test"
func init() {
if err := os.MkdirAll(tmpdir, 0755); err != nil {
fmt.Printf("Cannot create %s. Reverting to /tmp\n", tmpdir)
tmpdir = "/tmp"
}
}
// TempDir creates a temporary directory within tmpdir
// with the name 'testname-name'. If the directory cannot
// be created t.Fatal is called.
func TempDir(t *testing.T, name string) string {
if t != nil && t.Name() != "" {
name = t.Name() + "-" + name
}
name = strings.Replace(name, "/", "_", -1)
d, err := ioutil.TempDir(tmpdir, name)
if err != nil {
t.Fatalf("err: %s", err)
}
return d
}
// TempFile creates a temporary file within tmpdir
// with the name 'testname-name'. If the file cannot
// be created t.Fatal is called. If a temporary directory
// has been created before consider storing the file
// inside this directory to avoid double cleanup.
func TempFile(t *testing.T, name string) *os.File {
if t != nil && t.Name() != "" {
name = t.Name() + "-" + name
}
f, err := ioutil.TempFile(tmpdir, name)
if err != nil {
t.Fatalf("err: %s", err)
}
return f
}

View File

@ -21,9 +21,13 @@ import (
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-uuid"
"github.com/pkg/errors"
@ -56,32 +60,36 @@ type TestAddressConfig struct {
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
NodeName string `json:"node_name"`
NodeID string `json:"node_id"`
NodeMeta map[string]string `json:"node_meta,omitempty"`
Performance *TestPerformanceConfig `json:"performance,omitempty"`
Bootstrap bool `json:"bootstrap,omitempty"`
Server bool `json:"server,omitempty"`
DataDir string `json:"data_dir,omitempty"`
Datacenter string `json:"datacenter,omitempty"`
DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"`
Bind string `json:"bind_addr,omitempty"`
Addresses *TestAddressConfig `json:"addresses,omitempty"`
Ports *TestPortConfig `json:"ports,omitempty"`
RaftProtocol int `json:"raft_protocol,omitempty"`
ACLMasterToken string `json:"acl_master_token,omitempty"`
ACLDatacenter string `json:"acl_datacenter,omitempty"`
ACLDefaultPolicy string `json:"acl_default_policy,omitempty"`
ACLEnforceVersion8 bool `json:"acl_enforce_version_8"`
Encrypt string `json:"encrypt,omitempty"`
CAFile string `json:"ca_file,omitempty"`
CertFile string `json:"cert_file,omitempty"`
KeyFile string `json:"key_file,omitempty"`
VerifyIncoming bool `json:"verify_incoming,omitempty"`
VerifyOutgoing bool `json:"verify_outgoing,omitempty"`
Stdout, Stderr io.Writer `json:"-"`
Args []string `json:"-"`
NodeName string `json:"node_name"`
NodeID string `json:"node_id"`
NodeMeta map[string]string `json:"node_meta,omitempty"`
Performance *TestPerformanceConfig `json:"performance,omitempty"`
Bootstrap bool `json:"bootstrap,omitempty"`
Server bool `json:"server,omitempty"`
DataDir string `json:"data_dir,omitempty"`
Datacenter string `json:"datacenter,omitempty"`
DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"`
Bind string `json:"bind_addr,omitempty"`
Addresses *TestAddressConfig `json:"addresses,omitempty"`
Ports *TestPortConfig `json:"ports,omitempty"`
RaftProtocol int `json:"raft_protocol,omitempty"`
ACLMasterToken string `json:"acl_master_token,omitempty"`
ACLDatacenter string `json:"acl_datacenter,omitempty"`
ACLDefaultPolicy string `json:"acl_default_policy,omitempty"`
ACLEnforceVersion8 bool `json:"acl_enforce_version_8"`
Encrypt string `json:"encrypt,omitempty"`
CAFile string `json:"ca_file,omitempty"`
CertFile string `json:"cert_file,omitempty"`
KeyFile string `json:"key_file,omitempty"`
VerifyIncoming bool `json:"verify_incoming,omitempty"`
VerifyIncomingRPC bool `json:"verify_incoming_rpc,omitempty"`
VerifyIncomingHTTPS bool `json:"verify_incoming_https,omitempty"`
VerifyOutgoing bool `json:"verify_outgoing,omitempty"`
EnableScriptChecks bool `json:"enable_script_checks,omitempty"`
ReadyTimeout time.Duration `json:"-"`
Stdout, Stderr io.Writer `json:"-"`
Args []string `json:"-"`
}
// ServerConfigCallback is a function interface which can be
@ -117,6 +125,7 @@ func defaultServerConfig() *TestServerConfig {
Server: randomPort(),
RPC: randomPort(),
},
ReadyTimeout: 10 * time.Second,
}
}
@ -162,68 +171,75 @@ type TestServer struct {
LANAddr string
WANAddr string
HttpClient *http.Client
HTTPClient *http.Client
tmpdir string
}
// NewTestServer is an easy helper method to create a new Consul
// test server with the most basic configuration.
func NewTestServer() (*TestServer, error) {
return NewTestServerConfig(nil)
return NewTestServerConfigT(nil, nil)
}
func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) {
return NewTestServerConfigT(nil, cb)
}
// NewTestServerConfig creates a new TestServer, and makes a call to an optional
// callback function to modify the configuration. If there is an error
// configuring or starting the server, the server will NOT be running when the
// function returns (thus you do not need to stop it).
func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) {
if path, err := exec.LookPath("consul"); err != nil || path == "" {
func NewTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, error) {
var server *TestServer
retry.Run(t, func(r *retry.R) {
var err error
server, err = newTestServerConfigT(t, cb)
if err != nil {
r.Fatalf("failed starting test server: %v", err)
}
})
return server, nil
}
// newTestServerConfigT is the internal helper for NewTestServerConfigT.
func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, error) {
path, err := exec.LookPath("consul")
if err != nil || path == "" {
return nil, fmt.Errorf("consul not found on $PATH - download and install " +
"consul or skip this test")
}
dataDir, err := ioutil.TempDir("", "consul")
if err != nil {
return nil, errors.Wrap(err, "failed creating tempdir")
}
configFile, err := ioutil.TempFile(dataDir, "config")
if err != nil {
defer os.RemoveAll(dataDir)
return nil, errors.Wrap(err, "failed creating temp config")
}
consulConfig := defaultServerConfig()
consulConfig.DataDir = dataDir
tmpdir := TempDir(t, "consul")
cfg := defaultServerConfig()
cfg.DataDir = filepath.Join(tmpdir, "data")
if cb != nil {
cb(consulConfig)
cb(cfg)
}
configContent, err := json.Marshal(consulConfig)
b, err := json.Marshal(cfg)
if err != nil {
return nil, errors.Wrap(err, "failed marshaling json")
}
if _, err := configFile.Write(configContent); err != nil {
defer configFile.Close()
defer os.RemoveAll(dataDir)
configFile := filepath.Join(tmpdir, "config.json")
if err := ioutil.WriteFile(configFile, b, 0644); err != nil {
defer os.RemoveAll(tmpdir)
return nil, errors.Wrap(err, "failed writing config content")
}
configFile.Close()
stdout := io.Writer(os.Stdout)
if consulConfig.Stdout != nil {
stdout = consulConfig.Stdout
if cfg.Stdout != nil {
stdout = cfg.Stdout
}
stderr := io.Writer(os.Stderr)
if consulConfig.Stderr != nil {
stderr = consulConfig.Stderr
if cfg.Stderr != nil {
stderr = cfg.Stderr
}
// Start the server
args := []string{"agent", "-config-file", configFile.Name()}
args = append(args, consulConfig.Args...)
args := []string{"agent", "-config-file", configFile}
args = append(args, cfg.Args...)
cmd := exec.Command("consul", args...)
cmd.Stdout = stdout
cmd.Stderr = stderr
@ -231,86 +247,89 @@ func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) {
return nil, errors.Wrap(err, "failed starting command")
}
var httpAddr string
var client *http.Client
if strings.HasPrefix(consulConfig.Addresses.HTTP, "unix://") {
httpAddr = consulConfig.Addresses.HTTP
trans := cleanhttp.DefaultTransport()
trans.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", httpAddr[7:])
httpAddr := fmt.Sprintf("127.0.0.1:%d", cfg.Ports.HTTP)
client := cleanhttp.DefaultClient()
if strings.HasPrefix(cfg.Addresses.HTTP, "unix://") {
httpAddr = cfg.Addresses.HTTP
tr := cleanhttp.DefaultTransport()
tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", httpAddr[len("unix://"):])
}
client = &http.Client{
Transport: trans,
}
} else {
httpAddr = fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.HTTP)
client = cleanhttp.DefaultClient()
client = &http.Client{Transport: tr}
}
server := &TestServer{
Config: consulConfig,
Config: cfg,
cmd: cmd,
HTTPAddr: httpAddr,
HTTPSAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.HTTPS),
LANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfLan),
WANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfWan),
HTTPSAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.HTTPS),
LANAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.SerfLan),
WANAddr: fmt.Sprintf("127.0.0.1:%d", cfg.Ports.SerfWan),
HttpClient: client,
HTTPClient: client,
tmpdir: tmpdir,
}
// Wait for the server to be ready
var startErr error
if consulConfig.Bootstrap {
startErr = server.waitForLeader()
if cfg.Bootstrap {
err = server.waitForLeader()
} else {
startErr = server.waitForAPI()
err = server.waitForAPI()
}
if startErr != nil {
if err != nil {
defer server.Stop()
return nil, errors.Wrap(startErr, "failed waiting for server to start")
return nil, errors.Wrap(err, "failed waiting for server to start")
}
return server, nil
}
// Stop stops the test Consul server, and removes the Consul data
// directory once we are done.
func (s *TestServer) Stop() error {
defer os.RemoveAll(s.Config.DataDir)
if s.cmd != nil {
if s.cmd.Process != nil {
if err := s.cmd.Process.Kill(); err != nil {
return errors.Wrap(err, "failed to kill consul server")
}
}
// wait for the process to exit to be sure that the data dir can be
// deleted on all platforms.
return s.cmd.Wait()
}
defer os.RemoveAll(s.tmpdir)
// There was no process
return nil
if s.cmd == nil {
return nil
}
if s.cmd.Process != nil {
if err := s.cmd.Process.Signal(os.Interrupt); err != nil {
return errors.Wrap(err, "failed to kill consul server")
}
}
// wait for the process to exit to be sure that the data dir can be
// deleted on all platforms.
return s.cmd.Wait()
}
type failer struct {
failed bool
}
func (f *failer) Log(args ...interface{}) { fmt.Println(args) }
func (f *failer) FailNow() { f.failed = true }
// waitForAPI waits for only the agent HTTP endpoint to start
// responding. This is an indication that the agent has started,
// but will likely return before a leader is elected.
func (s *TestServer) waitForAPI() error {
if err := WaitForResult(func() (bool, error) {
resp, err := s.HttpClient.Get(s.url("/v1/agent/self"))
f := &failer{}
retry.Run(f, func(r *retry.R) {
resp, err := s.HTTPClient.Get(s.url("/v1/agent/self"))
if err != nil {
return false, errors.Wrap(err, "failed http get")
r.Fatal(err)
}
defer resp.Body.Close()
if err := s.requireOK(resp); err != nil {
return false, errors.Wrap(err, "failed OK response")
r.Fatal("failed OK respose", err)
}
return true, nil
}); err != nil {
return errors.Wrap(err, "failed waiting for API")
})
if f.failed {
return errors.New("failed waiting for API")
}
return nil
}
@ -320,50 +339,55 @@ func (s *TestServer) waitForAPI() error {
// 1 or more to be observed to confirm leader election is done.
// It then waits to ensure the anti-entropy sync has completed.
func (s *TestServer) waitForLeader() error {
f := &failer{}
timer := &retry.Timer{
Timeout: s.Config.ReadyTimeout,
Wait: 250 * time.Millisecond,
}
var index int64
if err := WaitForResult(func() (bool, error) {
retry.RunWith(timer, f, func(r *retry.R) {
// Query the API and check the status code.
url := s.url(fmt.Sprintf("/v1/catalog/nodes?index=%d&wait=2s", index))
resp, err := s.HttpClient.Get(url)
url := s.url(fmt.Sprintf("/v1/catalog/nodes?index=%d", index))
resp, err := s.HTTPClient.Get(url)
if err != nil {
return false, errors.Wrap(err, "failed http get")
r.Fatal("failed http get", err)
}
defer resp.Body.Close()
if err := s.requireOK(resp); err != nil {
return false, errors.Wrap(err, "failed OK response")
r.Fatal("failed OK response", err)
}
// Ensure we have a leader and a node registration.
if leader := resp.Header.Get("X-Consul-KnownLeader"); leader != "true" {
return false, fmt.Errorf("Consul leader status: %#v", leader)
r.Fatalf("Consul leader status: %#v", leader)
}
index, err = strconv.ParseInt(resp.Header.Get("X-Consul-Index"), 10, 64)
if err != nil {
return false, errors.Wrap(err, "bad consul index")
r.Fatal("bad consul index", err)
}
if index == 0 {
return false, fmt.Errorf("consul index is 0")
r.Fatal("consul index is 0")
}
// Watch for the anti-entropy sync to finish.
var parsed []map[string]interface{}
var v []map[string]interface{}
dec := json.NewDecoder(resp.Body)
if err := dec.Decode(&parsed); err != nil {
return false, err
if err := dec.Decode(&v); err != nil {
r.Fatal(err)
}
if len(parsed) < 1 {
return false, fmt.Errorf("No nodes")
if len(v) < 1 {
r.Fatal("No nodes")
}
taggedAddresses, ok := parsed[0]["TaggedAddresses"].(map[string]interface{})
taggedAddresses, ok := v[0]["TaggedAddresses"].(map[string]interface{})
if !ok {
return false, fmt.Errorf("Missing tagged addresses")
r.Fatal("Missing tagged addresses")
}
if _, ok := taggedAddresses["lan"]; !ok {
return false, fmt.Errorf("No lan tagged addresses")
r.Fatal("No lan tagged addresses")
}
return true, nil
}); err != nil {
return errors.Wrap(err, "failed waiting for leader")
})
if f.failed {
return errors.New("failed waiting for leader")
}
return nil
}

View File

@ -11,10 +11,18 @@ import (
"net/http"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/pkg/errors"
)
// copied from testutil to break circular dependency
const (
HealthAny = "any"
HealthPassing = "passing"
HealthWarning = "warning"
HealthCritical = "critical"
HealthMaint = "maintenance"
)
// JoinLAN is used to join local datacenters together.
func (s *TestServer) JoinLAN(t *testing.T, addr string) {
resp := s.get(t, "/v1/agent/join/"+addr)
@ -101,9 +109,20 @@ func (s *TestServer) ListKV(t *testing.T, prefix string) []string {
// automatically adds a health check with the given status, which
// can be one of "passing", "warning", or "critical".
func (s *TestServer) AddService(t *testing.T, name, status string, tags []string) {
s.AddAddressableService(t, name, status, "", 0, tags) // set empty address and 0 as port for non-accessible service
}
// AddAddressableService adds a new service to the Consul instance by
// passing "address" and "port". It is helpful when you need to prepare a fakeService
// that maybe accessed with in target source code.
// It also automatically adds a health check with the given status, which
// can be one of "passing", "warning", or "critical", just like `AddService` does.
func (s *TestServer) AddAddressableService(t *testing.T, name, status, address string, port int, tags []string) {
svc := &TestService{
Name: name,
Tags: tags,
Name: name,
Tags: tags,
Address: address,
Port: port,
}
payload, err := s.encodePayload(svc)
if err != nil {
@ -124,11 +143,11 @@ func (s *TestServer) AddService(t *testing.T, name, status string, tags []string
s.put(t, "/v1/agent/check/register", payload)
switch status {
case structs.HealthPassing:
case HealthPassing:
s.put(t, "/v1/agent/check/pass/"+chkName, nil)
case structs.HealthWarning:
case HealthWarning:
s.put(t, "/v1/agent/check/warn/"+chkName, nil)
case structs.HealthCritical:
case HealthCritical:
s.put(t, "/v1/agent/check/fail/"+chkName, nil)
default:
t.Fatalf("Unrecognized status: %s", status)
@ -155,11 +174,11 @@ func (s *TestServer) AddCheck(t *testing.T, name, serviceID, status string) {
s.put(t, "/v1/agent/check/register", payload)
switch status {
case structs.HealthPassing:
case HealthPassing:
s.put(t, "/v1/agent/check/pass/"+name, nil)
case structs.HealthWarning:
case HealthWarning:
s.put(t, "/v1/agent/check/warn/"+name, nil)
case structs.HealthCritical:
case HealthCritical:
s.put(t, "/v1/agent/check/fail/"+name, nil)
default:
t.Fatalf("Unrecognized status: %s", status)
@ -172,7 +191,7 @@ func (s *TestServer) put(t *testing.T, path string, body io.Reader) *http.Respon
if err != nil {
t.Fatalf("failed to create PUT request: %s", err)
}
resp, err := s.HttpClient.Do(req)
resp, err := s.HTTPClient.Do(req)
if err != nil {
t.Fatalf("failed to make PUT request: %s", err)
}
@ -185,7 +204,7 @@ func (s *TestServer) put(t *testing.T, path string, body io.Reader) *http.Respon
// get performs a new HTTP GET request.
func (s *TestServer) get(t *testing.T, path string) *http.Response {
resp, err := s.HttpClient.Get(s.url(path))
resp, err := s.HTTPClient.Get(s.url(path))
if err != nil {
t.Fatalf("failed to create GET request: %s", err)
}

View File

@ -1,8 +1,6 @@
package testutil
import (
"testing"
)
import "testing"
type WrappedServer struct {
s *TestServer
@ -19,78 +17,49 @@ type WrappedServer struct {
// This is useful when you are calling multiple functions and save the wrapped
// value as another variable to reduce the inclusion of "t".
func (s *TestServer) Wrap(t *testing.T) *WrappedServer {
return &WrappedServer{
s: s,
t: t,
}
return &WrappedServer{s, t}
}
// See Also
//
// TestServer.JoinLAN()
func (w *WrappedServer) JoinLAN(addr string) {
w.s.JoinLAN(w.t, addr)
}
// See Also
//
// TestServer.JoinWAN()
func (w *WrappedServer) JoinWAN(addr string) {
w.s.JoinWAN(w.t, addr)
}
// See Also
//
// TestServer.SetKV()
func (w *WrappedServer) SetKV(key string, val []byte) {
w.s.SetKV(w.t, key, val)
}
// See Also
//
// TestServer.SetKVString()
func (w *WrappedServer) SetKVString(key string, val string) {
w.s.SetKVString(w.t, key, val)
}
// See Also
//
// TestServer.GetKV()
func (w *WrappedServer) GetKV(key string) []byte {
return w.s.GetKV(w.t, key)
}
// See Also
//
// TestServer.GetKVString()
func (w *WrappedServer) GetKVString(key string) string {
return w.s.GetKVString(w.t, key)
}
// See Also
//
// TestServer.PopulateKV()
func (w *WrappedServer) PopulateKV(data map[string][]byte) {
w.s.PopulateKV(w.t, data)
}
// See Also
//
// TestServer.ListKV()
func (w *WrappedServer) ListKV(prefix string) []string {
return w.s.ListKV(w.t, prefix)
}
// See Also
//
// TestServer.AddService()
func (w *WrappedServer) AddService(name, status string, tags []string) {
w.s.AddService(w.t, name, status, tags)
}
// See Also
//
// TestServer.AddCheck()
func (w *WrappedServer) AddAddressableService(name, status, address string, port int, tags []string) {
w.s.AddAddressableService(w.t, name, status, address, port, tags)
}
func (w *WrappedServer) AddCheck(name, serviceID, status string) {
w.s.AddCheck(w.t, name, serviceID, status)
}

View File

@ -1,66 +0,0 @@
package testutil
import (
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/pkg/errors"
)
type testFn func() (bool, error)
const (
baseWait = 1 * time.Millisecond
maxWait = 100 * time.Millisecond
)
func WaitForResult(try testFn) error {
var err error
wait := baseWait
for retries := 100; retries > 0; retries-- {
var success bool
success, err = try()
if success {
time.Sleep(25 * time.Millisecond)
return nil
}
time.Sleep(wait)
wait *= 2
if wait > maxWait {
wait = maxWait
}
}
if err != nil {
return errors.Wrap(err, "timed out with error")
} else {
return fmt.Errorf("timed out")
}
}
type rpcFn func(string, interface{}, interface{}) error
func WaitForLeader(t *testing.T, rpc rpcFn, dc string) structs.IndexedNodes {
var out structs.IndexedNodes
if err := WaitForResult(func() (bool, error) {
// Ensure we have a leader and a node registration.
args := &structs.DCSpecificRequest{
Datacenter: dc,
}
if err := rpc("Catalog.ListNodes", args, &out); err != nil {
return false, fmt.Errorf("Catalog.ListNodes failed: %v", err)
}
if !out.QueryMeta.KnownLeader {
return false, fmt.Errorf("No leader")
}
if out.Index == 0 {
return false, fmt.Errorf("Consul index is 0")
}
return true, nil
}); err != nil {
t.Fatalf("failed to find leader: %v", err)
}
return out
}

26
vendor/vendor.json vendored
View File

@ -653,14 +653,14 @@
{
"checksumSHA1": "jfELEMRhiTcppZmRH+ZwtkVS5Uw=",
"path": "github.com/hashicorp/consul/acl",
"revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a",
"revisionTime": "2017-04-17T18:01:43Z"
"revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4",
"revisionTime": "2017-08-10T00:46:41Z"
},
{
"checksumSHA1": "RmhTKLvlDtxNPKZFnPYnfG/HzrI=",
"checksumSHA1": "AgdPCR9/+M0kkVpaeZ0PnATxfSc=",
"path": "github.com/hashicorp/consul/api",
"revision": "eea8f4ce75e8e6ff97c9913d89f687e8f8489ce6",
"revisionTime": "2017-05-30T15:52:51Z"
"revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4",
"revisionTime": "2017-08-10T00:46:41Z"
},
{
"checksumSHA1": "Z1N3jX/5B7GbLNfNp5GTxrsJItc=",
@ -669,22 +669,22 @@
"revisionTime": "2017-04-17T18:01:43Z"
},
{
"checksumSHA1": "XTA8JEhsuJGTUTchjM++oEG7B14=",
"checksumSHA1": "X3kV+a3rz+kw6SJupDVjHmUEUCQ=",
"path": "github.com/hashicorp/consul/lib",
"revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a",
"revisionTime": "2017-04-17T18:01:43Z"
"revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4",
"revisionTime": "2017-08-10T00:46:41Z"
},
{
"checksumSHA1": "yG7c7ZInBE36kw8IPvYb0smctRw=",
"checksumSHA1": "++0PVBxbpylmllyCxSa7cdc6dDc=",
"path": "github.com/hashicorp/consul/testutil",
"revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a",
"revisionTime": "2017-04-17T18:01:43Z"
"revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4",
"revisionTime": "2017-08-10T00:46:41Z"
},
{
"checksumSHA1": "bYK/7DsyTM3YDjvc0RRUH4I+jic=",
"path": "github.com/hashicorp/consul/types",
"revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a",
"revisionTime": "2017-04-17T18:01:43Z"
"revision": "75ca2cace08e38de8af1731ee8614d0533d5a4d4",
"revisionTime": "2017-08-10T00:46:41Z"
},
{
"path": "github.com/hashicorp/errwrap",