open-consul/agent/consul/leader_test.go

2026 lines
50 KiB
Go

package consul
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
)
func TestLeader_RegisterMember(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Client should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Status != api.HealthPassing {
t.Fatalf("bad check: %v", checks[0])
}
// Server should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatalf("server not registered")
}
})
// Service should be registered
_, services, err := state.NodeServices(nil, s1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services.Services["consul"]; !ok {
t.Fatalf("consul service not registered: %v", services)
}
}
func TestLeader_FailedMember(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Try to join
joinLAN(t, c1, s1)
// Fail the member
c1.Shutdown()
// Should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
retry.Run(t, func(r *retry.R) {
_, checks, err = state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got status %q want %q", got, want)
}
})
}
func TestLeader_LeftMember(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Node should leave
c1.Leave()
c1.Shutdown()
// Should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatal("client still registered")
}
})
}
func TestLeader_ReapMember(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Simulate a node reaping
mems := s1.LANMembers()
var c1mem serf.Member
for _, m := range mems {
if m.Name == c1.config.NodeName {
c1mem = m
c1mem.Status = StatusReap
break
}
}
s1.reconcileCh <- c1mem
// Should be deregistered; we have to poll quickly here because
// anti-entropy will put it back.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
reaped = true
break
}
}
if !reaped {
t.Fatalf("client should not be registered")
}
}
func TestLeader_CheckServersMeta(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
consulService := &structs.NodeService{
ID: "consul",
Service: "consul",
}
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, service, err := state.NodeService(s3.config.NodeName, "consul", &consulService.EnterpriseMeta)
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
if service.Meta["non_voter"] != "false" {
r.Fatalf("Expected to be non_voter == false, was: %s", service.Meta["non_voter"])
}
})
member := serf.Member{}
for _, m := range s1.serfLAN.Members() {
if m.Name == s3.config.NodeName {
member = m
member.Tags = make(map[string]string)
for key, value := range m.Tags {
member.Tags[key] = value
}
}
}
if member.Name != s3.config.NodeName {
t.Fatal("could not find node in serf members")
}
versionToExpect := "19.7.9"
retry.Run(t, func(r *retry.R) {
member.Tags["nonvoter"] = "1"
member.Tags["build"] = versionToExpect
err := s1.handleAliveMember(member)
if err != nil {
r.Fatalf("Unexpected error :%v", err)
}
_, service, err := state.NodeService(s3.config.NodeName, "consul", &consulService.EnterpriseMeta)
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
if service.Meta["non_voter"] != "true" {
r.Fatalf("Expected to be non_voter == true, was: %s", service.Meta["non_voter"])
}
newVersion := service.Meta["version"]
if newVersion != versionToExpect {
r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion)
}
})
}
func TestLeader_ReapServer(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// call reconcileReaped with a map that does not contain s3
knownMembers := make(map[string]struct{})
knownMembers[s1.config.NodeName] = struct{}{}
knownMembers[s2.config.NodeName] = struct{}{}
err := s1.reconcileReaped(knownMembers)
if err != nil {
t.Fatalf("Unexpected error :%v", err)
}
// s3 should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatalf("server with id %v should not be registered", s3.config.NodeID)
}
})
}
func TestLeader_Reconcile_ReapMember(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register a non-existing member
dead := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: "no-longer-around",
Address: "127.1.1.1",
Check: &structs.HealthCheck{
Node: "no-longer-around",
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
},
WriteRequest: structs.WriteRequest{
Token: "root",
},
}
var out struct{}
if err := s1.RPC("Catalog.Register", &dead, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconciliation
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
// Node should be gone
state := s1.fsm.State()
_, node, err := state.GetNode("no-longer-around")
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
}
func TestLeader_Reconcile(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Join before we have a leader, this should cause a reconcile!
joinLAN(t, c1, s1)
// Should not be registered
state := s1.fsm.State()
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
}
func TestLeader_Reconcile_Races(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
joinLAN(t, c1, s1)
// Wait for the server to reconcile the client and register it.
state := s1.fsm.State()
var nodeAddr string
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
nodeAddr = node.Address
})
// Add in some metadata via the catalog (as if the agent synced it
// there). We also set the serfHealth check to failing so the reconcile
// will attempt to flip it back
req := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: c1.config.NodeName,
ID: c1.config.NodeID,
Address: nodeAddr,
NodeMeta: map[string]string{"hello": "world"},
Check: &structs.HealthCheck{
Node: c1.config.NodeName,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
Output: "",
},
}
var out struct{}
if err := s1.RPC("Catalog.Register", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconcile and make sure the metadata stuck around.
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
// Fail the member and wait for the health to go critical.
c1.Shutdown()
retry.Run(t, func(r *retry.R) {
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
// Make sure the metadata didn't get clobbered.
_, node, err = state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
}
func TestLeader_LeftServer(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s3, s1}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill any server
servers[0].Shutdown()
// Force remove the non-leader (transition to left state)
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false); err != nil {
t.Fatalf("err: %v", err)
}
// Wait until the remaining servers show only 2 peers.
for _, s := range servers[1:] {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
s1.Shutdown()
}
func TestLeader_LeftLeader(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill the leader!
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
if !leader.isReadyForConsistentReads() {
t.Fatalf("Expected leader to be ready for consistent reads ")
}
leader.Leave()
if leader.isReadyForConsistentReads() {
t.Fatalf("Expected consistent read state to be false ")
}
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
var remain *Server
for _, s := range servers {
if s == leader {
continue
}
remain = s
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
// Verify the old leader is deregistered
state := remain.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(leader.config.NodeName)
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatal("leader should be deregistered")
}
})
}
func TestLeader_MultiBootstrap(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
servers := []*Server{s1, s2}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
if got, want := len(s.serfLAN.Members()), 2; got != want {
r.Fatalf("got %d peers want %d", got, want)
}
})
}
// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.autopilot.NumVoters()
if peers != 1 {
t.Fatalf("should only have 1 raft peer!")
}
}
}
func TestLeader_TombstoneGC_Reset(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
// Check that the leader has a pending GC expiration
if !leader.tombstoneGC.PendingExpiration() {
t.Fatalf("should have pending expiration")
}
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
retry.Run(t, func(r *retry.R) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return
}
}
r.Fatal("no leader")
})
retry.Run(t, func(r *retry.R) {
if !leader.tombstoneGC.PendingExpiration() {
r.Fatal("leader has no pending GC expiration")
}
})
}
func TestLeader_ReapTombstones(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.TombstoneTTL = 50 * time.Millisecond
c.TombstoneTTLGranularity = 10 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Create a KV entry
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "test",
Value: []byte("test"),
},
WriteRequest: structs.WriteRequest{
Token: "root",
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the KV entry (tombstoned).
arg.Op = api.KVDelete
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure there's a tombstone.
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
snap := state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
r.Fatalf("err: %s", err)
}
if stones.Next() == nil {
r.Fatalf("missing tombstones")
}
if stones.Next() != nil {
r.Fatalf("unexpected extra tombstones")
}
})
// Check that the new leader has a pending GC expiration by
// watching for the tombstone to get removed.
retry.Run(t, func(r *retry.R) {
snap := state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
r.Fatal(err)
}
if stones.Next() != nil {
r.Fatal("should have no tombstones")
}
})
}
func TestLeader_RollRaftServer(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill the v1 server
s2.Shutdown()
for _, s := range []*Server{s1, s3} {
retry.Run(t, func(r *retry.R) {
// autopilot should force removal of the shutdown node
r.Check(wantPeers(s, 2))
})
}
// Replace the dead server with a new one
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s4, s1)
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s, 3))
})
}
}
func TestLeader_ChangeServerID(t *testing.T) {
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Shut down a server, freeing up its address/port
s3.Shutdown()
retry.Run(t, func(r *retry.R) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
if got, want := alive, 2; got != want {
r.Fatalf("got %d alive members want %d", got, want)
}
})
// Bring up a new server with s3's address that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
c.SerfLANConfig.MemberlistConfig = s3.config.SerfLANConfig.MemberlistConfig
c.RPCAddr = s3.config.RPCAddr
c.RPCAdvertise = s3.config.RPCAdvertise
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s4, s1)
testrpc.WaitForLeader(t, s4.RPC, "dc1")
servers[2] = s4
// While integrating #3327 it uncovered that this test was flaky. The
// connection pool would use the same TCP connection to the old server
// which would give EOF errors to the autopilot health check RPC call.
// To make this more reliable we changed the connection pool to throw
// away the connection if it sees an EOF error, since there's no way
// that connection is going to work again. This made this test reliable
// since it will make a new connection to s4.
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
// Make sure the dead server is removed and we're back below 4
r.Check(wantPeers(s, 3))
}
})
}
func TestLeader_ChangeNodeID(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Shut down a server, freeing up its address/port
s3.Shutdown()
// wait for s1.LANMembers() to show s3 as StatusFailed or StatusLeft on
retry.Run(t, func(r *retry.R) {
var gone bool
for _, m := range s1.LANMembers() {
if m.Name == s3.config.NodeName && (m.Status == serf.StatusFailed || m.Status == serf.StatusLeft) {
gone = true
}
}
require.True(r, gone, "s3 has not been detected as failed or left after shutdown")
})
// Bring up a new server with s3's name that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.NodeName = s3.config.NodeName
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s4, s1)
servers[2] = s4
// Make sure the dead server is gone from both Raft and Serf and we're back to 3 total peers
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
retry.Run(t, func(r *retry.R) {
for _, m := range s1.LANMembers() {
require.Equal(r, serf.StatusAlive, m.Status)
}
})
}
func TestLeader_ACL_Initialization(t *testing.T) {
t.Parallel()
tests := []struct {
name string
build string
master string
bootstrap bool
}{
{"old version, no master", "0.8.0", "", true},
{"old version, master", "0.8.0", "root", false},
{"new version, no master", "0.9.1", "", true},
{"new version, master", "0.9.1", "root", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := func(c *Config) {
c.Build = tt.build
c.Bootstrap = true
c.Datacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = tt.master
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
if tt.master != "" {
_, master, err := s1.fsm.State().ACLTokenGetBySecret(nil, tt.master, nil)
require.NoError(t, err)
require.NotNil(t, master)
}
_, anon, err := s1.fsm.State().ACLTokenGetBySecret(nil, anonymousToken, nil)
require.NoError(t, err)
require.NotNil(t, anon)
canBootstrap, _, err := s1.fsm.State().CanBootstrapACLToken()
require.NoError(t, err)
require.Equal(t, tt.bootstrap, canBootstrap)
_, policy, err := s1.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, nil)
require.NoError(t, err)
require.NotNil(t, policy)
})
}
}
func TestLeader_ACLUpgrade(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLsEnabled = true
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// create a legacy management ACL
mgmt := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "Management token",
Type: structs.ACLTokenTypeManagement,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var mgmt_id string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.Apply", &mgmt, &mgmt_id))
// wait for it to be upgraded
retry.Run(t, func(t *retry.R) {
_, token, err := s1.fsm.State().ACLTokenGetBySecret(nil, mgmt_id, nil)
require.NoError(t, err)
require.NotNil(t, token)
require.NotEqual(t, "", token.AccessorID)
require.Equal(t, structs.ACLTokenTypeManagement, token.Type)
require.Len(t, token.Policies, 1)
require.Equal(t, structs.ACLPolicyGlobalManagementID, token.Policies[0].ID)
})
// create a legacy management ACL
client := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "Management token",
Type: structs.ACLTokenTypeClient,
Rules: `node "" { policy = "read"}`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var client_id string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.Apply", &client, &client_id))
// wait for it to be upgraded
retry.Run(t, func(t *retry.R) {
_, token, err := s1.fsm.State().ACLTokenGetBySecret(nil, client_id, nil)
require.NoError(t, err)
require.NotNil(t, token)
require.NotEqual(t, "", token.AccessorID)
require.Len(t, token.Policies, 0)
require.Equal(t, structs.ACLTokenTypeClient, token.Type)
require.Equal(t, client.ACL.Rules, token.Rules)
})
}
func TestLeader_ConfigEntryBootstrap(t *testing.T) {
t.Parallel()
global_entry_init := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"foo": "bar",
"bar": int64(1),
},
}
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Build = "1.5.0"
c.ConfigEntryBootstrap = []structs.ConfigEntry{
global_entry_init,
}
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
retry.Run(t, func(t *retry.R) {
_, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
require.NoError(t, err)
require.NotNil(t, entry)
global, ok := entry.(*structs.ProxyConfigEntry)
require.True(t, ok)
require.Equal(t, global_entry_init.Kind, global.Kind)
require.Equal(t, global_entry_init.Name, global.Name)
require.Equal(t, global_entry_init.Config, global.Config)
})
}
func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
t.Parallel()
type testcase struct {
name string
entries []structs.ConfigEntry
serverCB func(c *Config)
expectMessage string
}
cases := []testcase{
{
name: "service-splitter without L7 protocol",
entries: []structs.ConfigEntry{
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "web",
Splits: []structs.ServiceSplit{
{Weight: 100, Service: "web"},
},
},
},
expectMessage: `Failed to apply configuration entry "service-splitter" / "web": discovery chain "web" uses a protocol "tcp" that does not permit advanced routing or splitting behavior"`,
},
{
name: "service-intentions without migration",
entries: []structs.ConfigEntry{
&structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "web",
Sources: []*structs.SourceIntention{
{
Name: "debug",
Action: structs.IntentionActionAllow,
},
},
},
},
serverCB: func(c *Config) {
c.OverrideInitialSerfTags = func(tags map[string]string) {
tags["ft_si"] = "0"
}
},
expectMessage: `Refusing to apply configuration entry "service-intentions" / "web" because intentions are still being migrated to config entries`,
},
{
name: "service-intentions without Connect",
entries: []structs.ConfigEntry{
&structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "web",
Sources: []*structs.SourceIntention{
{
Name: "debug",
Action: structs.IntentionActionAllow,
},
},
},
},
serverCB: func(c *Config) {
c.ConnectEnabled = false
},
expectMessage: `Refusing to apply configuration entry "service-intentions" / "web" because Connect must be enabled to bootstrap intentions"`,
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
pr, pw := io.Pipe()
defer pw.Close()
var (
ch = make(chan string, 1)
applyErrorLine string
)
go func() {
defer pr.Close()
scan := bufio.NewScanner(pr)
for scan.Scan() {
line := scan.Text()
if strings.Contains(line, "failed to establish leadership") {
applyErrorLine = line
ch <- ""
return
}
if strings.Contains(line, "successfully established leadership") {
ch <- "leadership should not have gotten here if config entries properly failed"
return
}
}
if scan.Err() != nil {
ch <- fmt.Sprintf("ERROR: %v", scan.Err())
} else {
ch <- "should not get here"
}
}()
_, config := testServerConfig(t)
config.Build = "1.6.0"
config.ConfigEntryBootstrap = tc.entries
if tc.serverCB != nil {
tc.serverCB(config)
}
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: config.NodeName,
Level: hclog.Debug,
Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)),
})
deps := newDefaultDeps(t, config)
deps.Logger = logger
srv, err := NewServer(config, deps)
require.NoError(t, err)
defer srv.Shutdown()
select {
case result := <-ch:
require.Empty(t, result)
if tc.expectMessage != "" {
require.Contains(t, applyErrorLine, tc.expectMessage)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for a result from tailing logs")
}
})
}
}
func TestLeader_ACLLegacyReplication(t *testing.T) {
t.Parallel()
// This test relies on configuring a secondary DC with no route to the primary DC
// Having no route will cause the ACL mode checking of the primary to "fail". In this
// scenario legacy ACL replication should be enabled without also running new ACL
// replication routines.
cb := func(c *Config) {
c.Datacenter = "dc2"
c.ACLTokenReplication = true
}
_, srv, _ := testACLServerWithConfig(t, cb, true)
waitForLeaderEstablishment(t, srv)
require.True(t, srv.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName))
require.False(t, srv.leaderRoutineManager.IsRunning(aclPolicyReplicationRoutineName))
require.False(t, srv.leaderRoutineManager.IsRunning(aclRoleReplicationRoutineName))
require.False(t, srv.leaderRoutineManager.IsRunning(aclTokenReplicationRoutineName))
}
func TestDatacenterSupportsFederationStates(t *testing.T) {
addGateway := func(t *testing.T, srv *Server, dc, node string) {
t.Helper()
arg := structs.RegisterRequest{
Datacenter: dc,
Node: node,
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Port: 8080,
},
}
var out struct{}
require.NoError(t, srv.RPC("Catalog.Register", &arg, &out))
}
t.Run("one node primary with old version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s1.updateSerfTags("ft_fs", "0")
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
})
t.Run("one node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
// Wait until after AE runs at least once.
retry.Run(t, func(r *retry.R) {
arg := structs.FederationStateQuery{
Datacenter: "dc1",
TargetDatacenter: "dc1",
}
var out structs.FederationStateResponse
require.NoError(r, s1.RPC("FederationState.Get", &arg, &out))
require.NotNil(r, out.State)
require.Len(r, out.State.MeshGateways, 1)
})
})
t.Run("two node primary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s1.updateSerfTags("ft_fs", "0")
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 shouldn't activate fedstates")
}
})
})
t.Run("two node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 didn't activate fedstates")
}
})
// Wait until after AE runs at least once.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 1)
require.Len(r, out.States[0].MeshGateways, 1)
})
})
t.Run("primary and secondary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
addGateway(t, s1, "dc1", "node1")
addGateway(t, s2, "dc2", "node2")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 didn't activate fedstates")
}
})
// Wait until after AE runs at least once for both.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 2)
require.Len(r, out.States[0].MeshGateways, 1)
require.Len(r, out.States[1].MeshGateways, 1)
})
// Wait until after replication runs for the secondary.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc2",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 2)
require.Len(r, out.States[0].MeshGateways, 1)
require.Len(r, out.States[1].MeshGateways, 1)
})
})
t.Run("primary and secondary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s1.updateSerfTags("ft_fs", "0")
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
addGateway(t, s1, "dc1", "node1")
addGateway(t, s2, "dc2", "node2")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 shouldn't activate fedstates")
}
})
})
}
func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
addLegacyIntention := func(srv *Server, dc, src, dest string, allow bool) error {
ixn := &structs.Intention{
SourceNS: structs.IntentionDefaultNamespace,
SourceName: src,
DestinationNS: structs.IntentionDefaultNamespace,
DestinationName: dest,
SourceType: structs.IntentionSourceConsul,
Meta: map[string]string{},
}
if allow {
ixn.Action = structs.IntentionActionAllow
} else {
ixn.Action = structs.IntentionActionDeny
}
//nolint:staticcheck
ixn.UpdatePrecedence()
//nolint:staticcheck
ixn.SetHash()
arg := structs.IntentionRequest{
Datacenter: dc,
Op: structs.IntentionOpCreate,
Intention: ixn,
}
var id string
return srv.RPC("Intention.Apply", &arg, &id)
}
getConfigEntry := func(srv *Server, dc, kind, name string) (structs.ConfigEntry, error) {
arg := structs.ConfigEntryQuery{
Datacenter: dc,
Kind: kind,
Name: name,
}
var reply structs.ConfigEntryResponse
if err := srv.RPC("ConfigEntry.Get", &arg, &reply); err != nil {
return nil, err
}
return reply.Entry, nil
}
disableServiceIntentions := func(tags map[string]string) {
tags["ft_si"] = "0"
}
defaultEntMeta := structs.DefaultEnterpriseMeta()
t.Run("one node primary with old version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.OverrideInitialSerfTags = disableServiceIntentions
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 shouldn't activate service-intentions")
}
})
testutil.RequireErrorContains(t,
addLegacyIntention(s1, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
})
t.Run("one node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 didn't activate service-intentions")
}
})
// try to write a using the legacy API and it should work
require.NoError(t, addLegacyIntention(s1, "dc1", "web", "api", true))
// read it back as a config entry and that should work too
raw, err := getConfigEntry(s1, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
got, ok := raw.(*structs.ServiceIntentionsConfigEntry)
require.True(t, ok)
require.Len(t, got.Sources, 1)
expect := &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "api",
EnterpriseMeta: *defaultEntMeta,
Sources: []*structs.SourceIntention{
{
Name: "web",
EnterpriseMeta: *defaultEntMeta,
Action: structs.IntentionActionAllow,
Type: structs.IntentionSourceConsul,
Precedence: 9,
LegacyMeta: map[string]string{},
LegacyID: got.Sources[0].LegacyID,
// steal
LegacyCreateTime: got.Sources[0].LegacyCreateTime,
LegacyUpdateTime: got.Sources[0].LegacyUpdateTime,
},
},
RaftIndex: got.RaftIndex,
}
require.Equal(t, expect, got)
})
t.Run("two node primary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.OverrideInitialSerfTags = disableServiceIntentions
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
waitForLeaderEstablishment(t, s1)
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 shouldn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 shouldn't activate service-intentions")
}
})
testutil.RequireErrorContains(t,
addLegacyIntention(s1, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
testutil.RequireErrorContains(t,
addLegacyIntention(s2, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
})
t.Run("two node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 didn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 didn't activate service-intentions")
}
})
// try to write a using the legacy API and it should work from both sides
require.NoError(t, addLegacyIntention(s1, "dc1", "web", "api", true))
require.NoError(t, addLegacyIntention(s2, "dc1", "web2", "api", true))
// read it back as a config entry and that should work too
raw, err := getConfigEntry(s1, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
raw, err = getConfigEntry(s2, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
})
t.Run("primary and secondary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 didn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 didn't activate service-intentions")
}
})
// try to write a using the legacy API
require.NoError(t, addLegacyIntention(s1, "dc1", "web", "api", true))
// read it back as a config entry and that should work too
raw, err := getConfigEntry(s1, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
// Wait until after replication runs for the secondary.
retry.Run(t, func(r *retry.R) {
raw, err = getConfigEntry(s2, "dc1", structs.ServiceIntentions, "api")
require.NoError(r, err)
require.NotNil(r, raw)
})
})
t.Run("primary and secondary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.OverrideInitialSerfTags = disableServiceIntentions
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 shouldn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 shouldn't activate service-intentions")
}
})
testutil.RequireErrorContains(t,
addLegacyIntention(s1, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
testutil.RequireErrorContains(t,
addLegacyIntention(s2, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
})
}