Merge branch 'master' into release/1-6
This commit is contained in:
commit
fa15914813
|
@ -247,8 +247,8 @@ func TestCacheNotifyPolling(t *testing.T) {
|
|||
|
||||
// wait for the next batch of responses
|
||||
events := make([]UpdateEvent, 0)
|
||||
// 110 is needed to allow for the jitter
|
||||
timeout := time.After(110 * time.Millisecond)
|
||||
// At least 110ms is needed to allow for the jitter
|
||||
timeout := time.After(150 * time.Millisecond)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
|
|
|
@ -480,14 +480,14 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
|
|||
retry.Run(t, func(r *retry.R) {
|
||||
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &acls); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if acls.Policy == nil {
|
||||
t.Fatalf("Bad: %v", acls)
|
||||
r.Fatalf("Bad: %v", acls)
|
||||
}
|
||||
if acls.TTL != 30*time.Second {
|
||||
t.Fatalf("bad: %v", acls)
|
||||
r.Fatalf("bad: %v", acls)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -668,7 +668,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
|
|||
t.Run("expired tokens are filtered", func(t *testing.T) {
|
||||
// insert a token that will expire
|
||||
token, err := upsertTestToken(codec, "root", "dc1", func(t *structs.ACLToken) {
|
||||
t.ExpirationTTL = 20 * time.Millisecond
|
||||
t.ExpirationTTL = 200 * time.Millisecond
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -686,8 +686,6 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
|
|||
require.Equal(t, token, resp.Token)
|
||||
})
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
t.Run("not returned when expired", func(t *testing.T) {
|
||||
req := structs.ACLTokenGetRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -698,8 +696,10 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
|
|||
|
||||
resp := structs.ACLTokenResponse{}
|
||||
|
||||
require.NoError(t, acl.TokenRead(&req, &resp))
|
||||
require.Nil(t, resp.Token)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.NoError(r, acl.TokenRead(&req, &resp))
|
||||
require.Nil(r, resp.Token)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -4339,7 +4339,7 @@ func TestACLEndpoint_SecureIntroEndpoints_OnlyCreateLocalData(t *testing.T) {
|
|||
}
|
||||
resp := structs.ACLToken{}
|
||||
|
||||
require.NoError(t, acl.Login(&req, &resp))
|
||||
require.NoError(t, acl2.Login(&req, &resp))
|
||||
remoteToken = &resp
|
||||
|
||||
// present in dc2
|
||||
|
|
|
@ -380,7 +380,7 @@ func TestACLReplication_Tokens(t *testing.T) {
|
|||
retry.Run(t, func(r *retry.R) {
|
||||
_, policy, err := s2.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID)
|
||||
require.NoError(r, err)
|
||||
require.NotNil(t, policy)
|
||||
require.NotNil(r, policy)
|
||||
})
|
||||
|
||||
// add some local tokens to the secondary DC
|
||||
|
|
|
@ -1046,26 +1046,28 @@ func TestCatalog_ListNodes_StaleRead(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{AllowStale: true},
|
||||
}
|
||||
var out structs.IndexedNodes
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, n := range out.Nodes {
|
||||
if n.Node == "foo" {
|
||||
found = true
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("failed to find foo in %#v", out.Nodes)
|
||||
}
|
||||
|
||||
if out.QueryMeta.LastContact == 0 {
|
||||
t.Fatalf("should have a last contact time")
|
||||
}
|
||||
if !out.QueryMeta.KnownLeader {
|
||||
t.Fatalf("should have known leader")
|
||||
}
|
||||
found := false
|
||||
for _, n := range out.Nodes {
|
||||
if n.Node == "foo" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
r.Fatalf("failed to find foo in %#v", out.Nodes)
|
||||
}
|
||||
if out.QueryMeta.LastContact == 0 {
|
||||
r.Fatalf("should have a last contact time")
|
||||
}
|
||||
if !out.QueryMeta.KnownLeader {
|
||||
r.Fatalf("should have known leader")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_ListNodes_ConsistentRead_Fail(t *testing.T) {
|
||||
|
@ -1621,18 +1623,19 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
|
|||
waitForLeader(s1, s2)
|
||||
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should find the services
|
||||
if len(out.Services) != 1 {
|
||||
t.Fatalf("bad: %#v", out.Services)
|
||||
}
|
||||
|
||||
if !out.KnownLeader {
|
||||
t.Fatalf("should have a leader: %v", out)
|
||||
}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
// Should find the services
|
||||
if len(out.Services) != 1 {
|
||||
r.Fatalf("bad: %#v", out.Services)
|
||||
}
|
||||
if !out.KnownLeader {
|
||||
r.Fatalf("should have a leader: %v", out)
|
||||
}
|
||||
})
|
||||
|
||||
s1.Leave()
|
||||
s1.Shutdown()
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -756,20 +756,20 @@ func TestLeader_ReapTombstones(t *testing.T) {
|
|||
|
||||
// Make sure there's a tombstone.
|
||||
state := s1.fsm.State()
|
||||
func() {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
snap := state.Snapshot()
|
||||
defer snap.Close()
|
||||
stones, err := snap.Tombstones()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
r.Fatalf("err: %s", err)
|
||||
}
|
||||
if stones.Next() == nil {
|
||||
t.Fatalf("missing tombstones")
|
||||
r.Fatalf("missing tombstones")
|
||||
}
|
||||
if stones.Next() != nil {
|
||||
t.Fatalf("unexpected extra tombstones")
|
||||
r.Fatalf("unexpected extra tombstones")
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
// Check that the new leader has a pending GC expiration by
|
||||
// watching for the tombstone to get removed.
|
||||
|
@ -930,9 +930,9 @@ func TestLeader_ChangeServerID(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
|
||||
joinLAN(t, s4, s1)
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, s4.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s4.RPC, "dc1")
|
||||
servers[2] = s4
|
||||
|
||||
// While integrating #3327 it uncovered that this test was flaky. The
|
||||
|
@ -942,11 +942,10 @@ func TestLeader_ChangeServerID(t *testing.T) {
|
|||
// away the connection if it sees an EOF error, since there's no way
|
||||
// that connection is going to work again. This made this test reliable
|
||||
// since it will make a new connection to s4.
|
||||
|
||||
// Make sure the dead server is removed and we're back to 3 total peers
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantRaft(servers))
|
||||
for _, s := range servers {
|
||||
// Make sure the dead server is removed and we're back below 4
|
||||
r.Check(wantPeers(s, 3))
|
||||
}
|
||||
})
|
||||
|
|
|
@ -61,7 +61,6 @@ func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
|||
if config.Logger == nil {
|
||||
config.Logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
limiter := rate.NewLimiter(rate.Limit(config.Rate), config.Burst)
|
||||
|
||||
maxWait := config.MaxRetryWait
|
||||
|
@ -77,8 +76,6 @@ func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
|||
return &Replicator{
|
||||
name: config.Name,
|
||||
running: false,
|
||||
cancel: cancel,
|
||||
ctx: ctx,
|
||||
limiter: limiter,
|
||||
waiter: waiter,
|
||||
replicate: config.ReplicateFn,
|
||||
|
@ -94,6 +91,8 @@ func (r *Replicator) Start() {
|
|||
return
|
||||
}
|
||||
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
|
||||
go r.run()
|
||||
|
||||
r.running = true
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReplicationRestart(t *testing.T) {
|
||||
config := ReplicatorConfig{
|
||||
Name: "mock",
|
||||
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
||||
return 1, false, nil
|
||||
},
|
||||
Rate: 1,
|
||||
Burst: 1,
|
||||
}
|
||||
|
||||
repl, err := NewReplicator(&config)
|
||||
require.NoError(t, err)
|
||||
|
||||
repl.Start()
|
||||
repl.Stop()
|
||||
repl.Start()
|
||||
// Previously this would have segfaulted
|
||||
repl.Stop()
|
||||
}
|
|
@ -158,10 +158,18 @@ func testServerWithConfig(t *testing.T, cb func(*Config)) (string, *Server) {
|
|||
if cb != nil {
|
||||
cb(config)
|
||||
}
|
||||
srv, err := newServer(config)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var srv *Server
|
||||
var err error
|
||||
|
||||
// Retry added to avoid cases where bind addr is already in use
|
||||
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
|
||||
srv, err = newServer(config)
|
||||
if err != nil {
|
||||
os.RemoveAll(dir)
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
return dir, srv
|
||||
}
|
||||
|
||||
|
@ -686,16 +694,42 @@ func TestServer_Expect(t *testing.T) {
|
|||
r.Check(wantPeers(s3, 3))
|
||||
})
|
||||
|
||||
// Make sure a leader is elected, grab the current term and then add in
|
||||
// the fourth server.
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
termBefore := s1.raft.Stats()["last_log_term"]
|
||||
// Join the fourth node.
|
||||
joinLAN(t, s4, s1)
|
||||
|
||||
// Wait for the new server to see itself added to the cluster.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantRaft([]*Server{s1, s2, s3, s4}))
|
||||
})
|
||||
}
|
||||
|
||||
// Should not trigger bootstrap and new election when s3 joins, since cluster exists
|
||||
func TestServer_AvoidReBootstrap(t *testing.T) {
|
||||
dir1, s1 := testServerDCExpect(t, "dc1", 2)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerDCExpect(t, "dc1", 0)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerDCExpect(t, "dc1", 2)
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Join the first two servers
|
||||
joinLAN(t, s2, s1)
|
||||
|
||||
// Make sure a leader is elected, grab the current term and then add in
|
||||
// the third server.
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
termBefore := s1.raft.Stats()["last_log_term"]
|
||||
joinLAN(t, s3, s1)
|
||||
|
||||
// Wait for the new server to see itself added to the cluster.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
r.Check(wantRaft([]*Server{s1, s2, s3}))
|
||||
})
|
||||
|
||||
// Make sure there's still a leader and that the term didn't change,
|
||||
// so we know an election didn't occur.
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
// verifySnapshot is a helper that does a snapshot and restore.
|
||||
|
@ -294,9 +294,14 @@ func TestSnapshot_ACLDeny(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSnapshot_Forward_Leader(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Bootstrap = true
|
||||
c.SerfWANConfig = nil
|
||||
|
||||
// Effectively disable autopilot
|
||||
// Changes in server config leads flakiness because snapshotting
|
||||
// fails if there are config changes outstanding
|
||||
c.AutopilotInterval = 50 * time.Second
|
||||
|
||||
// Since we are doing multiple restores to the same leader,
|
||||
// the default short time for a reconcile can cause the
|
||||
|
@ -306,17 +311,19 @@ func TestSnapshot_Forward_Leader(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.SerfWANConfig = nil
|
||||
c.AutopilotInterval = 50 * time.Second
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to join.
|
||||
joinLAN(t, s2, s1)
|
||||
testrpc.WaitForTestAgent(t, s2.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
// Run against the leader and the follower to ensure we forward. When
|
||||
// we changed to Raft protocol version 3, since we only have two servers,
|
||||
|
|
|
@ -107,6 +107,10 @@ func (f *StatsFetcher) Fetch(ctx context.Context, members []serf.Member) map[str
|
|||
case <-ctx.Done():
|
||||
f.logger.Printf("[WARN] consul: error getting server health from %q: %v",
|
||||
workItem.server.Name, ctx.Err())
|
||||
|
||||
f.inflightLock.Lock()
|
||||
delete(f.inflight, workItem.server.ID)
|
||||
f.inflightLock.Unlock()
|
||||
}
|
||||
}
|
||||
return replies
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
)
|
||||
|
||||
func TestStatsFetcher(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerDCExpect(t, "dc1", 3)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
@ -53,18 +52,18 @@ func TestStatsFetcher(t *testing.T) {
|
|||
defer cancel()
|
||||
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
|
||||
if len(stats) != 3 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
r.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
for id, stat := range stats {
|
||||
switch types.NodeID(id) {
|
||||
case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID:
|
||||
// OK
|
||||
default:
|
||||
t.Fatalf("bad: %s", id)
|
||||
r.Fatalf("bad: %s", id)
|
||||
}
|
||||
|
||||
if stat == nil || stat.LastTerm == 0 {
|
||||
t.Fatalf("bad: %#v", stat)
|
||||
r.Fatalf("bad: %#v", stat)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -81,20 +80,20 @@ func TestStatsFetcher(t *testing.T) {
|
|||
defer cancel()
|
||||
stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers())
|
||||
if len(stats) != 2 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
r.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
for id, stat := range stats {
|
||||
switch types.NodeID(id) {
|
||||
case s1.config.NodeID, s2.config.NodeID:
|
||||
// OK
|
||||
case s3.config.NodeID:
|
||||
t.Fatalf("bad")
|
||||
r.Fatalf("bad")
|
||||
default:
|
||||
t.Fatalf("bad: %s", id)
|
||||
r.Fatalf("bad: %s", id)
|
||||
}
|
||||
|
||||
if stat == nil || stat.LastTerm == 0 {
|
||||
t.Fatalf("bad: %#v", stat)
|
||||
r.Fatalf("bad: %#v", stat)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
@ -892,11 +892,12 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
testrpc.WaitForLeader(t, a.RPC, dc)
|
||||
var out struct{}
|
||||
if err := a.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
var out struct{}
|
||||
if err := a.RPC("Catalog.Register", args, &out); err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("bc_no_query_value", func(t *testing.T) {
|
||||
req, _ := http.NewRequest("GET", "/v1/health/service/consul?passing", nil)
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/pascaldekloe/goe/verify"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -165,9 +164,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
meta := services.NodeServices.Node.Meta
|
||||
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
|
||||
verify.Values(t, "node id", id, a.Config.NodeID)
|
||||
verify.Values(t, "tagged addrs", addrs, a.Config.TaggedAddresses)
|
||||
verify.Values(t, "node meta", meta, a.Config.NodeMeta)
|
||||
assert.Equal(t, a.Config.NodeID, id)
|
||||
assert.Equal(t, a.Config.TaggedAddresses, addrs)
|
||||
assert.Equal(t, a.Config.NodeMeta, meta)
|
||||
|
||||
// We should have 6 services (consul included)
|
||||
if len(services.NodeServices.Services) != 6 {
|
||||
|
@ -593,46 +592,44 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
|||
}
|
||||
var services structs.IndexedNodeServices
|
||||
|
||||
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "svc_id1":
|
||||
// tags should be modified but not the port
|
||||
got := serv
|
||||
want := &structs.NodeService{
|
||||
ID: "svc_id1",
|
||||
Service: "svc1",
|
||||
Tags: []string{"tag1_mod"},
|
||||
Port: 6100,
|
||||
EnableTagOverride: true,
|
||||
Weights: &structs.Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
}
|
||||
if !verify.Values(t, "", got, want) {
|
||||
t.FailNow()
|
||||
}
|
||||
case "svc_id2":
|
||||
got, want := serv, srv2
|
||||
if !verify.Values(t, "", got, want) {
|
||||
t.FailNow()
|
||||
}
|
||||
case structs.ConsulServiceID:
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "svc_id1":
|
||||
// tags should be modified but not the port
|
||||
got := serv
|
||||
want := &structs.NodeService{
|
||||
ID: "svc_id1",
|
||||
Service: "svc1",
|
||||
Tags: []string{"tag1_mod"},
|
||||
Port: 6100,
|
||||
EnableTagOverride: true,
|
||||
Weights: &structs.Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
}
|
||||
assert.Equal(r, want, got)
|
||||
case "svc_id2":
|
||||
got, want := serv, srv2
|
||||
assert.Equal(r, want, got)
|
||||
case structs.ConsulServiceID:
|
||||
// ignore
|
||||
default:
|
||||
r.Fatalf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
if err := servicesInSync(a.State, 2); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
||||
|
@ -1060,9 +1057,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
meta := services.NodeServices.Node.Meta
|
||||
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
|
||||
verify.Values(t, "node id", id, a.Config.NodeID)
|
||||
verify.Values(t, "tagged addrs", addrs, a.Config.TaggedAddresses)
|
||||
verify.Values(t, "node meta", meta, a.Config.NodeMeta)
|
||||
assert.Equal(t, a.Config.NodeID, id)
|
||||
assert.Equal(t, a.Config.TaggedAddresses, addrs)
|
||||
assert.Equal(t, a.Config.NodeMeta, meta)
|
||||
}
|
||||
|
||||
// Remove one of the checks
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -52,11 +53,20 @@ func makeClientWithConfig(
|
|||
if cb1 != nil {
|
||||
cb1(conf)
|
||||
}
|
||||
|
||||
// Create server
|
||||
server, err := testutil.NewTestServerConfigT(t, cb2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
var server *testutil.TestServer
|
||||
var err error
|
||||
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
|
||||
server, err = testutil.NewTestServerConfigT(t, cb2)
|
||||
if err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
if server.Config.Bootstrap {
|
||||
server.WaitForLeader(t)
|
||||
}
|
||||
|
||||
conf.Address = server.HTTPAddr
|
||||
|
||||
// Create client
|
||||
|
|
|
@ -34,7 +34,7 @@ func TestAPI_CatalogNodes(t *testing.T) {
|
|||
|
||||
s.WaitForSerfCheck(t)
|
||||
catalog := c.Catalog()
|
||||
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
nodes, meta, err := catalog.Nodes(nil)
|
||||
// We're not concerned about the createIndex of an agent
|
||||
// Hence we're setting it to the default value
|
||||
|
|
|
@ -352,10 +352,10 @@ func TestAPI_HealthService_SingleTag(t *testing.T) {
|
|||
defer agent.ServiceDeregister("foo1")
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
services, meta, err := health.Service("foo", "bar", true, nil)
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, meta.LastIndex, 0)
|
||||
require.Len(t, services, 1)
|
||||
require.Equal(t, services[0].Service.ID, "foo1")
|
||||
require.NoError(r, err)
|
||||
require.NotEqual(r, meta.LastIndex, 0)
|
||||
require.Len(r, services, 1)
|
||||
require.Equal(r, services[0].Service.ID, "foo1")
|
||||
})
|
||||
}
|
||||
func TestAPI_HealthService_MultipleTags(t *testing.T) {
|
||||
|
@ -397,19 +397,19 @@ func TestAPI_HealthService_MultipleTags(t *testing.T) {
|
|||
retry.Run(t, func(r *retry.R) {
|
||||
services, meta, err := health.ServiceMultipleTags("foo", []string{"bar"}, true, nil)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, meta.LastIndex, 0)
|
||||
require.Len(t, services, 2)
|
||||
require.NoError(r, err)
|
||||
require.NotEqual(r, meta.LastIndex, 0)
|
||||
require.Len(r, services, 2)
|
||||
})
|
||||
|
||||
// Test searching with two tags (one result)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
services, meta, err := health.ServiceMultipleTags("foo", []string{"bar", "v2"}, true, nil)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, meta.LastIndex, 0)
|
||||
require.Len(t, services, 1)
|
||||
require.Equal(t, services[0].Service.ID, "foo2")
|
||||
require.NoError(r, err)
|
||||
require.NotEqual(r, meta.LastIndex, 0)
|
||||
require.Len(r, services, 1)
|
||||
require.Equal(r, services[0].Service.ID, "foo2")
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -421,15 +421,17 @@ func TestAPI_HealthService_NodeMetaFilter(t *testing.T) {
|
|||
})
|
||||
defer s.Stop()
|
||||
|
||||
s.WaitForSerfCheck(t)
|
||||
|
||||
health := c.Health()
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// consul service should always exist...
|
||||
checks, meta, err := health.Service("consul", "", true, &QueryOptions{NodeMeta: meta})
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, meta.LastIndex, 0)
|
||||
require.NotEqual(t, len(checks), 0)
|
||||
require.Equal(t, checks[0].Node.Datacenter, "dc1")
|
||||
require.Contains(t, checks[0].Node.TaggedAddresses, "wan")
|
||||
require.NoError(r, err)
|
||||
require.NotEqual(r, meta.LastIndex, 0)
|
||||
require.NotEqual(r, len(checks), 0)
|
||||
require.Equal(r, checks[0].Node.Datacenter, "dc1")
|
||||
require.Contains(r, checks[0].Node.TaggedAddresses, "wan")
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@ func TestAPI_ClientTxn(t *testing.T) {
|
|||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
s.WaitForSerfCheck(t)
|
||||
|
||||
session := c.Session()
|
||||
txn := c.Txn()
|
||||
|
||||
|
|
|
@ -2,19 +2,16 @@ package agent
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/version"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
|
@ -82,55 +79,29 @@ func TestConfigFail(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRetryJoin(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := agent.NewTestAgent(t, t.Name(), "")
|
||||
defer a.Shutdown()
|
||||
|
||||
b := agent.NewTestAgent(t, t.Name(), `
|
||||
retry_join = ["`+a.Config.SerfBindAddrLAN.String()+`"]
|
||||
retry_join_wan = ["`+a.Config.SerfBindAddrWAN.String()+`"]
|
||||
retry_interval = "100ms"
|
||||
`)
|
||||
defer b.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
tmpDir := testutil.TempDir(t, "consul")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
args := []string{
|
||||
"-server",
|
||||
"-bind", a.Config.BindAddr.String(),
|
||||
"-data-dir", tmpDir,
|
||||
"-node", "Node 11111111-1111-1111-1111-111111111111",
|
||||
"-node-id", "11111111-1111-1111-1111-111111111111",
|
||||
"-advertise", a.Config.BindAddr.String(),
|
||||
"-retry-join", a.Config.SerfBindAddrLAN.String(),
|
||||
"-retry-interval", "1s",
|
||||
"-retry-join-wan", a.Config.SerfBindAddrWAN.String(),
|
||||
"-retry-interval-wan", "1s",
|
||||
}
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
cmd := New(ui, "", version.Version, "", "", shutdownCh)
|
||||
// closing shutdownCh triggers a SIGINT which triggers shutdown without leave
|
||||
// which will return 1
|
||||
if code := cmd.Run(args); code != 1 {
|
||||
t.Log(ui.ErrorWriter.String())
|
||||
t.Fatalf("bad: %d", code)
|
||||
}
|
||||
}()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := len(a.LANMembers()), 2; got != want {
|
||||
r.Fatalf("got %d LAN members want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := len(a.WANMembers()), 2; got != want {
|
||||
r.Fatalf("got %d WAN members want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
close(shutdownCh)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestRetryJoinFail(t *testing.T) {
|
||||
|
|
|
@ -89,12 +89,14 @@ func TestExecCommand_CrossDC(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if got, want := len(a1.WANMembers()), 2; got != want {
|
||||
t.Fatalf("got %d WAN members on a1 want %d", got, want)
|
||||
}
|
||||
if got, want := len(a2.WANMembers()), 2; got != want {
|
||||
t.Fatalf("got %d WAN members on a2 want %d", got, want)
|
||||
}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := len(a1.WANMembers()), 2; got != want {
|
||||
r.Fatalf("got %d WAN members on a1 want %d", got, want)
|
||||
}
|
||||
if got, want := len(a2.WANMembers()), 2; got != want {
|
||||
r.Fatalf("got %d WAN members on a2 want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
c := New(ui, nil)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -91,9 +92,11 @@ func TestConn(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
require.Equal(t, "ping 2\n", got)
|
||||
|
||||
tx, rx := c.Stats()
|
||||
assert.Equal(t, uint64(7), tx)
|
||||
assert.Equal(t, uint64(7), rx)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
tx, rx := c.Stats()
|
||||
assert.Equal(r, uint64(7), tx)
|
||||
assert.Equal(r, uint64(7), rx)
|
||||
})
|
||||
|
||||
_, err = src.Write([]byte("pong 1\n"))
|
||||
require.Nil(t, err)
|
||||
|
@ -108,9 +111,11 @@ func TestConn(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
require.Equal(t, "pong 2\n", got)
|
||||
|
||||
tx, rx = c.Stats()
|
||||
assert.Equal(t, uint64(14), tx)
|
||||
assert.Equal(t, uint64(14), rx)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
tx, rx := c.Stats()
|
||||
assert.Equal(r, uint64(14), tx)
|
||||
assert.Equal(r, uint64(14), rx)
|
||||
})
|
||||
|
||||
c.Close()
|
||||
|
||||
|
|
|
@ -236,7 +236,16 @@ func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, e
|
|||
"consul or skip this test")
|
||||
}
|
||||
|
||||
tmpdir := TempDir(t, "consul")
|
||||
prefix := "consul"
|
||||
if t != nil {
|
||||
// Use test name for tmpdir if available
|
||||
prefix = strings.Replace(t.Name(), "/", "_", -1)
|
||||
}
|
||||
tmpdir, err := ioutil.TempDir("", prefix)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create tempdir")
|
||||
}
|
||||
|
||||
cfg := defaultServerConfig()
|
||||
cfg.DataDir = filepath.Join(tmpdir, "data")
|
||||
if cb != nil {
|
||||
|
@ -245,13 +254,14 @@ func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, e
|
|||
|
||||
b, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
os.RemoveAll(tmpdir)
|
||||
return nil, errors.Wrap(err, "failed marshaling json")
|
||||
}
|
||||
|
||||
log.Printf("CONFIG JSON: %s", string(b))
|
||||
configFile := filepath.Join(tmpdir, "config.json")
|
||||
if err := ioutil.WriteFile(configFile, b, 0644); err != nil {
|
||||
defer os.RemoveAll(tmpdir)
|
||||
os.RemoveAll(tmpdir)
|
||||
return nil, errors.Wrap(err, "failed writing config content")
|
||||
}
|
||||
|
||||
|
@ -271,6 +281,7 @@ func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, e
|
|||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
os.RemoveAll(tmpdir)
|
||||
return nil, errors.Wrap(err, "failed starting command")
|
||||
}
|
||||
|
||||
|
@ -300,15 +311,11 @@ func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, e
|
|||
}
|
||||
|
||||
// Wait for the server to be ready
|
||||
if cfg.Bootstrap {
|
||||
err = server.waitForLeader()
|
||||
} else {
|
||||
err = server.waitForAPI()
|
||||
}
|
||||
if err != nil {
|
||||
defer server.Stop()
|
||||
return nil, errors.Wrap(err, "failed waiting for server to start")
|
||||
if err := server.waitForAPI(); err != nil {
|
||||
server.Stop()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
||||
|
@ -333,51 +340,49 @@ func (s *TestServer) Stop() error {
|
|||
return s.cmd.Wait()
|
||||
}
|
||||
|
||||
type failer struct {
|
||||
failed bool
|
||||
}
|
||||
|
||||
func (f *failer) Log(args ...interface{}) { fmt.Println(args...) }
|
||||
func (f *failer) FailNow() { f.failed = true }
|
||||
|
||||
// waitForAPI waits for only the agent HTTP endpoint to start
|
||||
// responding. This is an indication that the agent has started,
|
||||
// but will likely return before a leader is elected.
|
||||
func (s *TestServer) waitForAPI() error {
|
||||
f := &failer{}
|
||||
retry.Run(f, func(r *retry.R) {
|
||||
var failed bool
|
||||
|
||||
// This retry replicates the logic of retry.Run to allow for nested retries.
|
||||
// By returning an error we can wrap TestServer creation with retry.Run
|
||||
// in makeClientWithConfig.
|
||||
timer := retry.TwoSeconds()
|
||||
deadline := time.Now().Add(timer.Timeout)
|
||||
for !time.Now().After(deadline) {
|
||||
time.Sleep(timer.Wait)
|
||||
|
||||
resp, err := s.HTTPClient.Get(s.url("/v1/agent/self"))
|
||||
if err != nil {
|
||||
r.Fatal(err)
|
||||
failed = true
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if err := s.requireOK(resp); err != nil {
|
||||
r.Fatal("failed OK response", err)
|
||||
resp.Body.Close()
|
||||
|
||||
if err = s.requireOK(resp); err != nil {
|
||||
failed = true
|
||||
continue
|
||||
}
|
||||
})
|
||||
if f.failed {
|
||||
return errors.New("failed waiting for API")
|
||||
failed = false
|
||||
}
|
||||
if failed {
|
||||
return fmt.Errorf("api unavailable")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForLeader waits for the Consul server's HTTP API to become
|
||||
// available, and then waits for a known leader and an index of
|
||||
// 1 or more to be observed to confirm leader election is done.
|
||||
// It then waits to ensure the anti-entropy sync has completed.
|
||||
func (s *TestServer) waitForLeader() error {
|
||||
f := &failer{}
|
||||
timer := &retry.Timer{
|
||||
Timeout: s.Config.ReadyTimeout,
|
||||
Wait: 250 * time.Millisecond,
|
||||
}
|
||||
var index int64
|
||||
retry.RunWith(timer, f, func(r *retry.R) {
|
||||
// 2 or more to be observed to confirm leader election is done.
|
||||
func (s *TestServer) WaitForLeader(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Query the API and check the status code.
|
||||
url := s.url(fmt.Sprintf("/v1/catalog/nodes?index=%d", index))
|
||||
url := s.url("/v1/catalog/nodes")
|
||||
resp, err := s.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
r.Fatal("failed http get", err)
|
||||
r.Fatalf("failed http get '%s': %v", url, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if err := s.requireOK(resp); err != nil {
|
||||
|
@ -388,35 +393,14 @@ func (s *TestServer) waitForLeader() error {
|
|||
if leader := resp.Header.Get("X-Consul-KnownLeader"); leader != "true" {
|
||||
r.Fatalf("Consul leader status: %#v", leader)
|
||||
}
|
||||
index, err = strconv.ParseInt(resp.Header.Get("X-Consul-Index"), 10, 64)
|
||||
index, err := strconv.ParseInt(resp.Header.Get("X-Consul-Index"), 10, 64)
|
||||
if err != nil {
|
||||
r.Fatal("bad consul index", err)
|
||||
}
|
||||
if index == 0 {
|
||||
r.Fatal("consul index is 0")
|
||||
}
|
||||
|
||||
// Watch for the anti-entropy sync to finish.
|
||||
var v []map[string]interface{}
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
if err := dec.Decode(&v); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
if len(v) < 1 {
|
||||
r.Fatal("No nodes")
|
||||
}
|
||||
taggedAddresses, ok := v[0]["TaggedAddresses"].(map[string]interface{})
|
||||
if !ok {
|
||||
r.Fatal("Missing tagged addresses")
|
||||
}
|
||||
if _, ok := taggedAddresses["lan"]; !ok {
|
||||
r.Fatal("No lan tagged addresses")
|
||||
if index < 2 {
|
||||
r.Fatal("consul index should be at least 2")
|
||||
}
|
||||
})
|
||||
if f.failed {
|
||||
return errors.New("failed waiting for leader")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForSerfCheck ensures we have a node with serfHealth check registered
|
||||
|
|
Loading…
Reference in New Issue