09e4c2995b
* Fix CA pruning when CA config uses string durations. The tl;dr here is: - Configuring LeafCertTTL with a string like "72h" is how we do it by default and should be supported - Most of our tests managed to escape this by defining them as time.Duration directly - Out actual default value is a string - Since this is stored in a map[string]interface{} config, when it is written to Raft it goes through a msgpack encode/decode cycle (even though it's written from server not over RPC). - msgpack decode leaves the string as a `[]uint8` - Some of our parsers required string and failed - So after 1 hour, a default configured server would throw an error about pruning old CAs - If a new CA was configured that set LeafCertTTL as a time.Duration, things might be OK after that, but if a new CA was just configured from config file, intialization would cause same issue but always fail still so would never prune the old CA. - Mostly this is just a janky error that got passed tests due to many levels of complicated encoding/decoding. tl;dr of the tl;dr: Yay for type safety. Map[string]interface{} combined with msgpack always goes wrong but we somehow get bitten every time in a new way :D We already fixed this once! The main CA config had the same problem so @kyhavlov already wrote the mapstructure DecodeHook that fixes it. It wasn't used in several places it needed to be and one of those is notw in `structs` which caused a dependency cycle so I've moved them. This adds a whole new test thta explicitly tests the case that broke here. It also adds tests that would have failed in other places before (Consul and Vaul provider parsing functions). I'm not sure if they would ever be affected as it is now as we've not seen things broken with them but it seems better to explicitly test that and support it to not be bitten a third time! * Typo fix * Fix bad Uint8 usage
1152 lines
26 KiB
Go
1152 lines
26 KiB
Go
package consul
|
|
|
|
import (
|
|
"os"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/testrpc"
|
|
"github.com/hashicorp/consul/testutil/retry"
|
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
|
"github.com/hashicorp/serf/serf"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestLeader_RegisterMember(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.ACLEnforceVersion8 = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, c1 := testClient(t)
|
|
defer os.RemoveAll(dir2)
|
|
defer c1.Shutdown()
|
|
|
|
// Try to join
|
|
joinLAN(t, c1, s1)
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Client should be registered
|
|
state := s1.fsm.State()
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
r.Fatal("client not registered")
|
|
}
|
|
})
|
|
|
|
// Should have a check
|
|
_, checks, err := state.NodeChecks(nil, c1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(checks) != 1 {
|
|
t.Fatalf("client missing check")
|
|
}
|
|
if checks[0].CheckID != structs.SerfCheckID {
|
|
t.Fatalf("bad check: %v", checks[0])
|
|
}
|
|
if checks[0].Name != structs.SerfCheckName {
|
|
t.Fatalf("bad check: %v", checks[0])
|
|
}
|
|
if checks[0].Status != api.HealthPassing {
|
|
t.Fatalf("bad check: %v", checks[0])
|
|
}
|
|
|
|
// Server should be registered
|
|
_, node, err := state.GetNode(s1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
t.Fatalf("server not registered")
|
|
}
|
|
|
|
// Service should be registered
|
|
_, services, err := state.NodeServices(nil, s1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if _, ok := services.Services["consul"]; !ok {
|
|
t.Fatalf("consul service not registered: %v", services)
|
|
}
|
|
}
|
|
|
|
func TestLeader_FailedMember(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.ACLEnforceVersion8 = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, c1 := testClient(t)
|
|
defer os.RemoveAll(dir2)
|
|
defer c1.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Try to join
|
|
joinLAN(t, c1, s1)
|
|
|
|
// Fail the member
|
|
c1.Shutdown()
|
|
|
|
// Should be registered
|
|
state := s1.fsm.State()
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
r.Fatal("client not registered")
|
|
}
|
|
})
|
|
|
|
// Should have a check
|
|
_, checks, err := state.NodeChecks(nil, c1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(checks) != 1 {
|
|
t.Fatalf("client missing check")
|
|
}
|
|
if checks[0].CheckID != structs.SerfCheckID {
|
|
t.Fatalf("bad check: %v", checks[0])
|
|
}
|
|
if checks[0].Name != structs.SerfCheckName {
|
|
t.Fatalf("bad check: %v", checks[0])
|
|
}
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, checks, err = state.NodeChecks(nil, c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if got, want := checks[0].Status, api.HealthCritical; got != want {
|
|
r.Fatalf("got status %q want %q", got, want)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestLeader_LeftMember(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.ACLEnforceVersion8 = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, c1 := testClient(t)
|
|
defer os.RemoveAll(dir2)
|
|
defer c1.Shutdown()
|
|
|
|
// Try to join
|
|
joinLAN(t, c1, s1)
|
|
|
|
state := s1.fsm.State()
|
|
|
|
// Should be registered
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
r.Fatal("client not registered")
|
|
}
|
|
})
|
|
|
|
// Node should leave
|
|
c1.Leave()
|
|
c1.Shutdown()
|
|
|
|
// Should be deregistered
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node != nil {
|
|
r.Fatal("client still registered")
|
|
}
|
|
})
|
|
}
|
|
func TestLeader_ReapMember(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.ACLEnforceVersion8 = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, c1 := testClient(t)
|
|
defer os.RemoveAll(dir2)
|
|
defer c1.Shutdown()
|
|
|
|
// Try to join
|
|
joinLAN(t, c1, s1)
|
|
|
|
state := s1.fsm.State()
|
|
|
|
// Should be registered
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
r.Fatal("client not registered")
|
|
}
|
|
})
|
|
|
|
// Simulate a node reaping
|
|
mems := s1.LANMembers()
|
|
var c1mem serf.Member
|
|
for _, m := range mems {
|
|
if m.Name == c1.config.NodeName {
|
|
c1mem = m
|
|
c1mem.Status = StatusReap
|
|
break
|
|
}
|
|
}
|
|
s1.reconcileCh <- c1mem
|
|
|
|
// Should be deregistered; we have to poll quickly here because
|
|
// anti-entropy will put it back.
|
|
reaped := false
|
|
for start := time.Now(); time.Since(start) < 5*time.Second; {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
reaped = true
|
|
break
|
|
}
|
|
}
|
|
if !reaped {
|
|
t.Fatalf("client should not be registered")
|
|
}
|
|
}
|
|
|
|
func TestLeader_ReapServer(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "allow"
|
|
c.ACLEnforceVersion8 = true
|
|
c.Bootstrap = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "allow"
|
|
c.ACLEnforceVersion8 = true
|
|
c.Bootstrap = false
|
|
})
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "allow"
|
|
c.ACLEnforceVersion8 = true
|
|
c.Bootstrap = false
|
|
})
|
|
defer os.RemoveAll(dir3)
|
|
defer s3.Shutdown()
|
|
|
|
// Try to join
|
|
joinLAN(t, s1, s2)
|
|
joinLAN(t, s1, s3)
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
|
testrpc.WaitForLeader(t, s3.RPC, "dc1")
|
|
state := s1.fsm.State()
|
|
|
|
// s3 should be registered
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(s3.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
r.Fatal("client not registered")
|
|
}
|
|
})
|
|
|
|
// call reconcileReaped with a map that does not contain s3
|
|
knownMembers := make(map[string]struct{})
|
|
knownMembers[s1.config.NodeName] = struct{}{}
|
|
knownMembers[s2.config.NodeName] = struct{}{}
|
|
|
|
err := s1.reconcileReaped(knownMembers)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error :%v", err)
|
|
}
|
|
// s3 should be deregistered
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(s3.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node != nil {
|
|
r.Fatalf("server with id %v should not be registered", s3.config.NodeID)
|
|
}
|
|
})
|
|
|
|
}
|
|
|
|
func TestLeader_Reconcile_ReapMember(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.ACLEnforceVersion8 = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Register a non-existing member
|
|
dead := structs.RegisterRequest{
|
|
Datacenter: s1.config.Datacenter,
|
|
Node: "no-longer-around",
|
|
Address: "127.1.1.1",
|
|
Check: &structs.HealthCheck{
|
|
Node: "no-longer-around",
|
|
CheckID: structs.SerfCheckID,
|
|
Name: structs.SerfCheckName,
|
|
Status: api.HealthCritical,
|
|
},
|
|
WriteRequest: structs.WriteRequest{
|
|
Token: "root",
|
|
},
|
|
}
|
|
var out struct{}
|
|
if err := s1.RPC("Catalog.Register", &dead, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Force a reconciliation
|
|
if err := s1.reconcile(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Node should be gone
|
|
state := s1.fsm.State()
|
|
_, node, err := state.GetNode("no-longer-around")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if node != nil {
|
|
t.Fatalf("client registered")
|
|
}
|
|
}
|
|
|
|
func TestLeader_Reconcile(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.ACLEnforceVersion8 = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, c1 := testClient(t)
|
|
defer os.RemoveAll(dir2)
|
|
defer c1.Shutdown()
|
|
|
|
// Join before we have a leader, this should cause a reconcile!
|
|
joinLAN(t, c1, s1)
|
|
|
|
// Should not be registered
|
|
state := s1.fsm.State()
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if node != nil {
|
|
t.Fatalf("client registered")
|
|
}
|
|
|
|
// Should be registered
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
r.Fatal("client not registered")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestLeader_Reconcile_Races(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
dir2, c1 := testClient(t)
|
|
defer os.RemoveAll(dir2)
|
|
defer c1.Shutdown()
|
|
|
|
joinLAN(t, c1, s1)
|
|
|
|
// Wait for the server to reconcile the client and register it.
|
|
state := s1.fsm.State()
|
|
var nodeAddr string
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
r.Fatal("client not registered")
|
|
}
|
|
nodeAddr = node.Address
|
|
})
|
|
|
|
// Add in some metadata via the catalog (as if the agent synced it
|
|
// there). We also set the serfHealth check to failing so the reconcile
|
|
// will attempt to flip it back
|
|
req := structs.RegisterRequest{
|
|
Datacenter: s1.config.Datacenter,
|
|
Node: c1.config.NodeName,
|
|
ID: c1.config.NodeID,
|
|
Address: nodeAddr,
|
|
NodeMeta: map[string]string{"hello": "world"},
|
|
Check: &structs.HealthCheck{
|
|
Node: c1.config.NodeName,
|
|
CheckID: structs.SerfCheckID,
|
|
Name: structs.SerfCheckName,
|
|
Status: api.HealthCritical,
|
|
Output: "",
|
|
},
|
|
}
|
|
var out struct{}
|
|
if err := s1.RPC("Catalog.Register", &req, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Force a reconcile and make sure the metadata stuck around.
|
|
if err := s1.reconcile(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
_, node, err := state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
t.Fatalf("bad")
|
|
}
|
|
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
|
|
t.Fatalf("bad")
|
|
}
|
|
|
|
// Fail the member and wait for the health to go critical.
|
|
c1.Shutdown()
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, checks, err := state.NodeChecks(nil, c1.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if got, want := checks[0].Status, api.HealthCritical; got != want {
|
|
r.Fatalf("got state %q want %q", got, want)
|
|
}
|
|
})
|
|
|
|
// Make sure the metadata didn't get clobbered.
|
|
_, node, err = state.GetNode(c1.config.NodeName)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if node == nil {
|
|
t.Fatalf("bad")
|
|
}
|
|
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
|
|
t.Fatalf("bad")
|
|
}
|
|
}
|
|
|
|
func TestLeader_LeftServer(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir3)
|
|
defer s3.Shutdown()
|
|
|
|
// Put s1 last so we don't trigger a leader election.
|
|
servers := []*Server{s2, s3, s1}
|
|
|
|
// Try to join
|
|
joinLAN(t, s2, s1)
|
|
joinLAN(t, s3, s1)
|
|
for _, s := range servers {
|
|
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
|
|
}
|
|
|
|
// Kill any server
|
|
servers[0].Shutdown()
|
|
|
|
// Force remove the non-leader (transition to left state)
|
|
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Wait until the remaining servers show only 2 peers.
|
|
for _, s := range servers[1:] {
|
|
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
|
|
}
|
|
s1.Shutdown()
|
|
}
|
|
|
|
func TestLeader_LeftLeader(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir3)
|
|
defer s3.Shutdown()
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Try to join
|
|
joinLAN(t, s2, s1)
|
|
joinLAN(t, s3, s1)
|
|
|
|
for _, s := range servers {
|
|
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
|
|
}
|
|
|
|
// Kill the leader!
|
|
var leader *Server
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
break
|
|
}
|
|
}
|
|
if leader == nil {
|
|
t.Fatalf("Should have a leader")
|
|
}
|
|
if !leader.isReadyForConsistentReads() {
|
|
t.Fatalf("Expected leader to be ready for consistent reads ")
|
|
}
|
|
leader.Leave()
|
|
if leader.isReadyForConsistentReads() {
|
|
t.Fatalf("Expected consistent read state to be false ")
|
|
}
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
var remain *Server
|
|
for _, s := range servers {
|
|
if s == leader {
|
|
continue
|
|
}
|
|
remain = s
|
|
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
|
|
}
|
|
|
|
// Verify the old leader is deregistered
|
|
state := remain.fsm.State()
|
|
retry.Run(t, func(r *retry.R) {
|
|
_, node, err := state.GetNode(leader.config.NodeName)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
if node != nil {
|
|
r.Fatal("leader should be deregistered")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestLeader_MultiBootstrap(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServer(t)
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
servers := []*Server{s1, s2}
|
|
|
|
// Try to join
|
|
joinLAN(t, s2, s1)
|
|
|
|
for _, s := range servers {
|
|
retry.Run(t, func(r *retry.R) {
|
|
if got, want := len(s.serfLAN.Members()), 2; got != want {
|
|
r.Fatalf("got %d peers want %d", got, want)
|
|
}
|
|
})
|
|
}
|
|
|
|
// Ensure we don't have multiple raft peers
|
|
for _, s := range servers {
|
|
peers, _ := s.numPeers()
|
|
if peers != 1 {
|
|
t.Fatalf("should only have 1 raft peer!")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestLeader_TombstoneGC_Reset(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir3)
|
|
defer s3.Shutdown()
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Try to join
|
|
joinLAN(t, s2, s1)
|
|
joinLAN(t, s3, s1)
|
|
|
|
for _, s := range servers {
|
|
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
|
|
}
|
|
|
|
var leader *Server
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
break
|
|
}
|
|
}
|
|
if leader == nil {
|
|
t.Fatalf("Should have a leader")
|
|
}
|
|
|
|
// Check that the leader has a pending GC expiration
|
|
if !leader.tombstoneGC.PendingExpiration() {
|
|
t.Fatalf("should have pending expiration")
|
|
}
|
|
|
|
// Kill the leader
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Wait for a new leader
|
|
leader = nil
|
|
retry.Run(t, func(r *retry.R) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return
|
|
}
|
|
}
|
|
r.Fatal("no leader")
|
|
})
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
if !leader.tombstoneGC.PendingExpiration() {
|
|
r.Fatal("leader has no pending GC expiration")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestLeader_ReapTombstones(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.TombstoneTTL = 50 * time.Millisecond
|
|
c.TombstoneTTLGranularity = 10 * time.Millisecond
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Create a KV entry
|
|
arg := structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "test",
|
|
Value: []byte("test"),
|
|
},
|
|
WriteRequest: structs.WriteRequest{
|
|
Token: "root",
|
|
},
|
|
}
|
|
var out bool
|
|
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Delete the KV entry (tombstoned).
|
|
arg.Op = api.KVDelete
|
|
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Make sure there's a tombstone.
|
|
state := s1.fsm.State()
|
|
func() {
|
|
snap := state.Snapshot()
|
|
defer snap.Close()
|
|
stones, err := snap.Tombstones()
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if stones.Next() == nil {
|
|
t.Fatalf("missing tombstones")
|
|
}
|
|
if stones.Next() != nil {
|
|
t.Fatalf("unexpected extra tombstones")
|
|
}
|
|
}()
|
|
|
|
// Check that the new leader has a pending GC expiration by
|
|
// watching for the tombstone to get removed.
|
|
retry.Run(t, func(r *retry.R) {
|
|
snap := state.Snapshot()
|
|
defer snap.Close()
|
|
stones, err := snap.Tombstones()
|
|
if err != nil {
|
|
r.Fatal(err)
|
|
}
|
|
if stones.Next() != nil {
|
|
r.Fatal("should have no tombstones")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestLeader_RollRaftServer(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = true
|
|
c.Datacenter = "dc1"
|
|
c.RaftConfig.ProtocolVersion = 2
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = false
|
|
c.Datacenter = "dc1"
|
|
c.RaftConfig.ProtocolVersion = 1
|
|
})
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = false
|
|
c.Datacenter = "dc1"
|
|
c.RaftConfig.ProtocolVersion = 2
|
|
})
|
|
defer os.RemoveAll(dir3)
|
|
defer s3.Shutdown()
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Try to join
|
|
joinLAN(t, s2, s1)
|
|
joinLAN(t, s3, s1)
|
|
|
|
for _, s := range servers {
|
|
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
|
|
}
|
|
|
|
// Kill the v1 server
|
|
s2.Shutdown()
|
|
|
|
for _, s := range []*Server{s1, s3} {
|
|
retry.Run(t, func(r *retry.R) {
|
|
minVer, err := s.autopilot.MinRaftProtocol()
|
|
if err != nil {
|
|
r.Fatal(err)
|
|
}
|
|
if got, want := minVer, 2; got != want {
|
|
r.Fatalf("got min raft version %d want %d", got, want)
|
|
}
|
|
})
|
|
}
|
|
|
|
// Replace the dead server with one running raft protocol v3
|
|
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = false
|
|
c.Datacenter = "dc1"
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer os.RemoveAll(dir4)
|
|
defer s4.Shutdown()
|
|
joinLAN(t, s4, s1)
|
|
servers[1] = s4
|
|
|
|
// Make sure the dead server is removed and we're back to 3 total peers
|
|
for _, s := range servers {
|
|
retry.Run(t, func(r *retry.R) {
|
|
addrs := 0
|
|
ids := 0
|
|
future := s.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
r.Fatal(err)
|
|
}
|
|
for _, server := range future.Configuration().Servers {
|
|
if string(server.ID) == string(server.Address) {
|
|
addrs++
|
|
} else {
|
|
ids++
|
|
}
|
|
}
|
|
if got, want := addrs, 2; got != want {
|
|
r.Fatalf("got %d server addresses want %d", got, want)
|
|
}
|
|
if got, want := ids, 1; got != want {
|
|
r.Fatalf("got %d server ids want %d", got, want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLeader_ChangeServerID(t *testing.T) {
|
|
t.Parallel()
|
|
conf := func(c *Config) {
|
|
c.Bootstrap = false
|
|
c.BootstrapExpect = 3
|
|
c.Datacenter = "dc1"
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
}
|
|
dir1, s1 := testServerWithConfig(t, conf)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerWithConfig(t, conf)
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
dir3, s3 := testServerWithConfig(t, conf)
|
|
defer os.RemoveAll(dir3)
|
|
defer s3.Shutdown()
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Try to join and wait for all servers to get promoted
|
|
joinLAN(t, s2, s1)
|
|
joinLAN(t, s3, s1)
|
|
for _, s := range servers {
|
|
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
|
|
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
|
|
}
|
|
|
|
// Shut down a server, freeing up its address/port
|
|
s3.Shutdown()
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
alive := 0
|
|
for _, m := range s1.LANMembers() {
|
|
if m.Status == serf.StatusAlive {
|
|
alive++
|
|
}
|
|
}
|
|
if got, want := alive, 2; got != want {
|
|
r.Fatalf("got %d alive members want %d", got, want)
|
|
}
|
|
})
|
|
|
|
// Bring up a new server with s3's address that will get a different ID
|
|
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = false
|
|
c.BootstrapExpect = 3
|
|
c.Datacenter = "dc1"
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
c.SerfLANConfig.MemberlistConfig = s3.config.SerfLANConfig.MemberlistConfig
|
|
c.RPCAddr = s3.config.RPCAddr
|
|
c.RPCAdvertise = s3.config.RPCAdvertise
|
|
})
|
|
defer os.RemoveAll(dir4)
|
|
defer s4.Shutdown()
|
|
joinLAN(t, s4, s1)
|
|
servers[2] = s4
|
|
|
|
// While integrating #3327 it uncovered that this test was flaky. The
|
|
// connection pool would use the same TCP connection to the old server
|
|
// which would give EOF errors to the autopilot health check RPC call.
|
|
// To make this more reliable we changed the connection pool to throw
|
|
// away the connection if it sees an EOF error, since there's no way
|
|
// that connection is going to work again. This made this test reliable
|
|
// since it will make a new connection to s4.
|
|
|
|
// Make sure the dead server is removed and we're back to 3 total peers
|
|
retry.Run(t, func(r *retry.R) {
|
|
r.Check(wantRaft(servers))
|
|
for _, s := range servers {
|
|
r.Check(wantPeers(s, 3))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestLeader_ACL_Initialization(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
tests := []struct {
|
|
name string
|
|
build string
|
|
master string
|
|
init bool
|
|
bootstrap bool
|
|
}{
|
|
{"old version, no master", "0.8.0", "", false, false},
|
|
{"old version, master", "0.8.0", "root", false, false},
|
|
{"new version, no master", "0.9.1", "", true, true},
|
|
{"new version, master", "0.9.1", "root", true, false},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
conf := func(c *Config) {
|
|
c.Build = tt.build
|
|
c.Bootstrap = true
|
|
c.Datacenter = "dc1"
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = tt.master
|
|
}
|
|
dir1, s1 := testServerWithConfig(t, conf)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
if tt.master != "" {
|
|
_, master, err := s1.fsm.State().ACLGet(nil, tt.master)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if master == nil {
|
|
t.Fatalf("master token wasn't created")
|
|
}
|
|
}
|
|
|
|
_, anon, err := s1.fsm.State().ACLGet(nil, anonymousToken)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if anon == nil {
|
|
t.Fatalf("anonymous token wasn't created")
|
|
}
|
|
|
|
bs, err := s1.fsm.State().ACLGetBootstrap()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if !tt.init {
|
|
if bs != nil {
|
|
t.Fatalf("bootstrap should not be initialized")
|
|
}
|
|
} else {
|
|
if bs == nil {
|
|
t.Fatalf("bootstrap should be initialized")
|
|
}
|
|
if got, want := bs.AllowBootstrap, tt.bootstrap; got != want {
|
|
t.Fatalf("got %v want %v", got, want)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLeader_CARootPruning(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
caRootPruneInterval = 200 * time.Millisecond
|
|
|
|
require := require.New(t)
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
|
|
|
// Get the current root
|
|
rootReq := &structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
}
|
|
var rootList structs.IndexedCARoots
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList))
|
|
require.Len(rootList.Roots, 1)
|
|
oldRoot := rootList.Roots[0]
|
|
|
|
// Update the provider config to use a new private key, which should
|
|
// cause a rotation.
|
|
_, newKey, err := connect.GeneratePrivateKey()
|
|
require.NoError(err)
|
|
newConfig := &structs.CAConfiguration{
|
|
Provider: "consul",
|
|
Config: map[string]interface{}{
|
|
"LeafCertTTL": "500ms",
|
|
"PrivateKey": newKey,
|
|
"RootCert": "",
|
|
"RotationPeriod": "2160h",
|
|
"SkipValidate": true,
|
|
},
|
|
}
|
|
{
|
|
args := &structs.CARequest{
|
|
Datacenter: "dc1",
|
|
Config: newConfig,
|
|
}
|
|
var reply interface{}
|
|
|
|
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply))
|
|
}
|
|
|
|
// Should have 2 roots now.
|
|
_, roots, err := s1.fsm.State().CARoots(nil)
|
|
require.NoError(err)
|
|
require.Len(roots, 2)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// Now the old root should be pruned.
|
|
_, roots, err = s1.fsm.State().CARoots(nil)
|
|
require.NoError(err)
|
|
require.Len(roots, 1)
|
|
require.True(roots[0].Active)
|
|
require.NotEqual(roots[0].ID, oldRoot.ID)
|
|
}
|
|
|
|
func TestLeader_PersistIntermediateCAs(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
require := require.New(t)
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
|
|
defer os.RemoveAll(dir3)
|
|
defer s3.Shutdown()
|
|
|
|
joinLAN(t, s2, s1)
|
|
joinLAN(t, s3, s1)
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Get the current root
|
|
rootReq := &structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
}
|
|
var rootList structs.IndexedCARoots
|
|
require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList))
|
|
require.Len(rootList.Roots, 1)
|
|
|
|
// Update the provider config to use a new private key, which should
|
|
// cause a rotation.
|
|
_, newKey, err := connect.GeneratePrivateKey()
|
|
require.NoError(err)
|
|
newConfig := &structs.CAConfiguration{
|
|
Provider: "consul",
|
|
Config: map[string]interface{}{
|
|
"PrivateKey": newKey,
|
|
"RootCert": "",
|
|
"RotationPeriod": 90 * 24 * time.Hour,
|
|
},
|
|
}
|
|
{
|
|
args := &structs.CARequest{
|
|
Datacenter: "dc1",
|
|
Config: newConfig,
|
|
}
|
|
var reply interface{}
|
|
|
|
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply))
|
|
}
|
|
|
|
// Get the active root before leader change.
|
|
_, root := s1.getCAProvider()
|
|
require.Len(root.IntermediateCerts, 1)
|
|
|
|
// Force a leader change and make sure the root CA values are preserved.
|
|
s1.Leave()
|
|
s1.Shutdown()
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
var leader *Server
|
|
for _, s := range []*Server{s2, s3} {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
break
|
|
}
|
|
}
|
|
if leader == nil {
|
|
r.Fatal("no leader")
|
|
}
|
|
|
|
_, newLeaderRoot := leader.getCAProvider()
|
|
if !reflect.DeepEqual(newLeaderRoot, root) {
|
|
r.Fatalf("got %v, want %v", newLeaderRoot, root)
|
|
}
|
|
})
|
|
}
|