open-consul/agent/consul/leader_test.go

2346 lines
59 KiB
Go
Raw Normal View History

package consul
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"testing"
"time"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
)
func TestLeader_RegisterMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, c1, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Client should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Status != api.HealthPassing {
t.Fatalf("bad check: %v", checks[0])
}
// Server should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatalf("server not registered")
}
})
// Service should be registered
_, services, err := state.NodeServices(nil, s1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services.Services["consul"]; !ok {
t.Fatalf("consul service not registered: %v", services)
}
}
func TestLeader_FailedMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, c1, s1)
// Fail the member
c1.Shutdown()
// Should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
retry.Run(t, func(r *retry.R) {
_, checks, err = state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got status %q want %q", got, want)
}
})
}
func TestLeader_LeftMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, c1, s1)
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Node should leave
c1.Leave()
c1.Shutdown()
// Should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatal("client still registered")
}
})
}
2014-03-20 19:51:49 +00:00
func TestLeader_ReapMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
2014-03-20 19:51:49 +00:00
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, c1, s1)
2014-03-20 19:51:49 +00:00
state := s1.fsm.State()
2014-03-20 19:51:49 +00:00
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
2014-03-20 19:51:49 +00:00
// Simulate a node reaping
mems := s1.LANMembersInAgentPartition()
2014-03-20 19:51:49 +00:00
var c1mem serf.Member
for _, m := range mems {
if m.Name == c1.config.NodeName {
c1mem = m
c1mem.Status = StatusReap
break
}
}
s1.reconcileCh <- c1mem
// Should be deregistered; we have to poll quickly here because
// anti-entropy will put it back.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
reaped = true
break
}
}
if !reaped {
t.Fatalf("client should not be registered")
}
2014-03-20 19:51:49 +00:00
}
func TestLeader_CheckServersMeta(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
consulService := &structs.NodeService{
ID: "consul",
Service: "consul",
}
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, service, err := state.NodeService(s3.config.NodeName, "consul", &consulService.EnterpriseMeta)
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
if service.Meta["non_voter"] != "false" {
r.Fatalf("Expected to be non_voter == false, was: %s", service.Meta["non_voter"])
}
})
member := serf.Member{}
for _, m := range s1.serfLAN.Members() {
if m.Name == s3.config.NodeName {
member = m
member.Tags = make(map[string]string)
for key, value := range m.Tags {
member.Tags[key] = value
}
}
}
if member.Name != s3.config.NodeName {
t.Fatal("could not find node in serf members")
}
versionToExpect := "19.7.9"
retry.Run(t, func(r *retry.R) {
// DEPRECATED - remove nonvoter tag in favor of read_replica in a future version of consul
member.Tags["nonvoter"] = "1"
member.Tags["read_replica"] = "1"
member.Tags["build"] = versionToExpect
err := s1.handleAliveMember(member, nil)
if err != nil {
r.Fatalf("Unexpected error :%v", err)
}
_, service, err := state.NodeService(s3.config.NodeName, "consul", &consulService.EnterpriseMeta)
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
// DEPRECATED - remove non_voter in favor of read_replica in a future version of consul
if service.Meta["non_voter"] != "true" {
r.Fatalf("Expected to be non_voter == true, was: %s", service.Meta["non_voter"])
}
if service.Meta["read_replica"] != "true" {
r.Fatalf("Expected to be read_replica == true, was: %s", service.Meta["non_voter"])
}
newVersion := service.Meta["version"]
if newVersion != versionToExpect {
r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion)
}
})
}
func TestLeader_ReapServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// call reconcileReaped with a map that does not contain s3
knownMembers := make(map[string]struct{})
knownMembers[s1.config.NodeName] = struct{}{}
knownMembers[s2.config.NodeName] = struct{}{}
err := s1.reconcileReaped(knownMembers, nil)
if err != nil {
t.Fatalf("Unexpected error :%v", err)
}
// s3 should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatalf("server with id %v should not be registered", s3.config.NodeID)
}
})
}
func TestLeader_Reconcile_ReapMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register a non-existing member
dead := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: "no-longer-around",
Address: "127.1.1.1",
Check: &structs.HealthCheck{
Node: "no-longer-around",
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
},
WriteRequest: structs.WriteRequest{
Token: "root",
},
}
var out struct{}
if err := s1.RPC("Catalog.Register", &dead, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconciliation
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
// Node should be gone
state := s1.fsm.State()
_, node, err := state.GetNode("no-longer-around", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
}
func TestLeader_Reconcile(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Join before we have a leader, this should cause a reconcile!
2017-05-05 10:29:49 +00:00
joinLAN(t, c1, s1)
// Should not be registered
state := s1.fsm.State()
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
}
func TestLeader_Reconcile_Races(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
2017-05-05 10:29:49 +00:00
joinLAN(t, c1, s1)
// Wait for the server to reconcile the client and register it.
state := s1.fsm.State()
var nodeAddr string
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
nodeAddr = node.Address
})
// Add in some metadata via the catalog (as if the agent synced it
// there). We also set the serfHealth check to failing so the reconcile
// will attempt to flip it back
req := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: c1.config.NodeName,
ID: c1.config.NodeID,
Address: nodeAddr,
NodeMeta: map[string]string{"hello": "world"},
Check: &structs.HealthCheck{
Node: c1.config.NodeName,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
Output: "",
},
}
var out struct{}
if err := s1.RPC("Catalog.Register", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconcile and make sure the metadata stuck around.
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
// Fail the member and wait for the health to go critical.
c1.Shutdown()
retry.Run(t, func(r *retry.R) {
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
// Make sure the metadata didn't get clobbered.
_, node, err = state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
}
func TestLeader_LeftServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s3, s1}
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill any server
servers[0].Shutdown()
// Force remove the non-leader (transition to left state)
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Wait until the remaining servers show only 2 peers.
for _, s := range servers[1:] {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
s1.Shutdown()
}
func TestLeader_LeftLeader(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill the leader!
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
if !leader.isReadyForConsistentReads() {
t.Fatalf("Expected leader to be ready for consistent reads ")
}
leader.Leave()
if leader.isReadyForConsistentReads() {
t.Fatalf("Expected consistent read state to be false ")
}
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
var remain *Server
for _, s := range servers {
if s == leader {
continue
}
remain = s
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
// Verify the old leader is deregistered
state := remain.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(leader.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatal("leader should be deregistered")
}
})
}
func TestLeader_MultiBootstrap(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
servers := []*Server{s1, s2}
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
if got, want := len(s.serfLAN.Members()), 2; got != want {
r.Fatalf("got %d peers want %d", got, want)
}
})
}
// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.autopilot.NumVoters()
if peers != 1 {
t.Fatalf("should only have 1 raft peer!")
}
}
}
func TestLeader_TombstoneGC_Reset(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
// Check that the leader has a pending GC expiration
if !leader.tombstoneGC.PendingExpiration() {
t.Fatalf("should have pending expiration")
}
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
retry.Run(t, func(r *retry.R) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return
}
}
r.Fatal("no leader")
})
retry.Run(t, func(r *retry.R) {
if !leader.tombstoneGC.PendingExpiration() {
r.Fatal("leader has no pending GC expiration")
}
})
}
func TestLeader_ReapTombstones(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
c.TombstoneTTL = 50 * time.Millisecond
c.TombstoneTTLGranularity = 10 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Create a KV entry
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "test",
Value: []byte("test"),
},
WriteRequest: structs.WriteRequest{
Token: "root",
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the KV entry (tombstoned).
arg.Op = api.KVDelete
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
2015-10-13 18:48:35 +00:00
// Make sure there's a tombstone.
state := s1.fsm.State()
2019-07-12 15:52:26 +00:00
retry.Run(t, func(r *retry.R) {
2015-10-13 18:48:35 +00:00
snap := state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
2015-10-13 18:48:35 +00:00
if err != nil {
2019-07-12 15:52:26 +00:00
r.Fatalf("err: %s", err)
2015-10-13 18:48:35 +00:00
}
if stones.Next() == nil {
2019-07-12 15:52:26 +00:00
r.Fatalf("missing tombstones")
}
if stones.Next() != nil {
2019-07-12 15:52:26 +00:00
r.Fatalf("unexpected extra tombstones")
2015-10-13 18:48:35 +00:00
}
2019-07-12 15:52:26 +00:00
})
// Check that the new leader has a pending GC expiration by
2015-10-13 18:48:35 +00:00
// watching for the tombstone to get removed.
retry.Run(t, func(r *retry.R) {
2015-10-13 18:48:35 +00:00
snap := state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
r.Fatal(err)
}
if stones.Next() != nil {
r.Fatal("should have no tombstones")
}
})
}
2017-02-22 20:53:32 +00:00
func TestLeader_RollRaftServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
2017-02-22 20:53:32 +00:00
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
})
2017-02-22 20:53:32 +00:00
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
2017-02-22 20:53:32 +00:00
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
2017-02-22 20:53:32 +00:00
}
// Kill the v1 server
s2.Shutdown()
for _, s := range []*Server{s1, s3} {
retry.Run(t, func(r *retry.R) {
// autopilot should force removal of the shutdown node
r.Check(wantPeers(s, 2))
})
2017-02-22 20:53:32 +00:00
}
// Replace the dead server with a new one
2017-02-22 20:53:32 +00:00
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
2017-05-05 10:29:49 +00:00
joinLAN(t, s4, s1)
2017-02-22 20:53:32 +00:00
servers[1] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s, 3))
})
2017-02-22 20:53:32 +00:00
}
}
func TestLeader_ChangeServerID(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join and wait for all servers to get promoted
2017-05-05 10:29:49 +00:00
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Shut down a server, freeing up its address/port
s3.Shutdown()
retry.Run(t, func(r *retry.R) {
alive := 0
for _, m := range s1.LANMembersInAgentPartition() {
if m.Status == serf.StatusAlive {
alive++
}
}
if got, want := alive, 2; got != want {
r.Fatalf("got %d alive members want %d", got, want)
}
})
// Bring up a new server with s3's address that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
c.SerfLANConfig.MemberlistConfig = s3.config.SerfLANConfig.MemberlistConfig
c.RPCAddr = s3.config.RPCAddr
c.RPCAdvertise = s3.config.RPCAdvertise
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
2019-07-12 15:52:26 +00:00
2017-05-05 10:29:49 +00:00
joinLAN(t, s4, s1)
2019-07-12 15:52:26 +00:00
testrpc.WaitForLeader(t, s4.RPC, "dc1")
servers[2] = s4
// While integrating #3327 it uncovered that this test was flaky. The
// connection pool would use the same TCP connection to the old server
// which would give EOF errors to the autopilot health check RPC call.
// To make this more reliable we changed the connection pool to throw
// away the connection if it sees an EOF error, since there's no way
// that connection is going to work again. This made this test reliable
// since it will make a new connection to s4.
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
2019-07-12 15:52:26 +00:00
// Make sure the dead server is removed and we're back below 4
r.Check(wantPeers(s, 3))
}
})
}
func TestLeader_ChangeNodeID(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Shut down a server, freeing up its address/port
s3.Shutdown()
// wait for s1.LANMembersInAgentPartition() to show s3 as StatusFailed or StatusLeft on
retry.Run(t, func(r *retry.R) {
var gone bool
for _, m := range s1.LANMembersInAgentPartition() {
if m.Name == s3.config.NodeName && (m.Status == serf.StatusFailed || m.Status == serf.StatusLeft) {
gone = true
}
}
require.True(r, gone, "s3 has not been detected as failed or left after shutdown")
})
// Bring up a new server with s3's name that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.NodeName = s3.config.NodeName
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s4, s1)
servers[2] = s4
// Make sure the dead server is gone from both Raft and Serf and we're back to 3 total peers
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
retry.Run(t, func(r *retry.R) {
for _, m := range s1.LANMembersInAgentPartition() {
require.Equal(r, serf.StatusAlive, m.Status)
}
})
}
func TestLeader_ACL_Initialization(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
tests := []struct {
name string
build string
initialManagement string
bootstrap bool
}{
{"old version, no initial management", "0.8.0", "", true},
{"old version, initial management", "0.8.0", "root", false},
{"new version, no initial management", "0.9.1", "", true},
{"new version, initial management", "0.9.1", "root", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := func(c *Config) {
c.Build = tt.build
c.Bootstrap = true
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
c.ACLsEnabled = true
c.ACLInitialManagementToken = tt.initialManagement
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
if tt.initialManagement != "" {
_, initialManagement, err := s1.fsm.State().ACLTokenGetBySecret(nil, tt.initialManagement, nil)
require.NoError(t, err)
require.NotNil(t, initialManagement)
}
_, anon, err := s1.fsm.State().ACLTokenGetBySecret(nil, anonymousToken, nil)
require.NoError(t, err)
require.NotNil(t, anon)
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
canBootstrap, _, err := s1.fsm.State().CanBootstrapACLToken()
require.NoError(t, err)
require.Equal(t, tt.bootstrap, canBootstrap)
_, policy, err := s1.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, nil)
require.NoError(t, err)
require.NotNil(t, policy)
})
}
}
func TestLeader_ACLUpgrade_IsStickyEvenIfSerfTagsRegress(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// We test this by having two datacenters with one server each. They
// initially come up and complete the migration, then we power them both
// off. We leave the primary off permanently, and then we stand up the
// secondary. Hopefully it should transition to ENABLED instead of being
// stuck in LEGACY.
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = false
c.ACLReplicationRate = 100
c.ACLReplicationBurst = 100
c.ACLReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()
s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
testrpc.WaitForLeader(t, s2.RPC, "dc2")
waitForLeaderEstablishment(t, s2)
// Create the WAN link
joinWAN(t, s2, s1)
waitForLeaderEstablishment(t, s1)
waitForLeaderEstablishment(t, s2)
waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0)
// Everybody has the management policy.
retry.Run(t, func(r *retry.R) {
_, policy1, err := s1.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(r, err)
require.NotNil(r, policy1)
_, policy2, err := s2.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(r, err)
require.NotNil(r, policy2)
})
// Shutdown s1 and s2.
s1.Shutdown()
s2.Shutdown()
// Restart just s2
dir2new, s2new := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLTokenReplication = false
c.ACLReplicationRate = 100
c.ACLReplicationBurst = 100
c.ACLReplicationApplyLimit = 1000000
c.DataDir = s2.config.DataDir
c.NodeName = s2.config.NodeName
c.NodeID = s2.config.NodeID
})
defer os.RemoveAll(dir2new)
defer s2new.Shutdown()
waitForLeaderEstablishment(t, s2new)
}
func TestLeader_ConfigEntryBootstrap(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
global_entry_init := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"foo": "bar",
"bar": int64(1),
},
}
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Build = "1.5.0"
c.ConfigEntryBootstrap = []structs.ConfigEntry{
global_entry_init,
}
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
retry.Run(t, func(t *retry.R) {
_, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
require.NotNil(t, entry)
global, ok := entry.(*structs.ProxyConfigEntry)
require.True(t, ok)
require.Equal(t, global_entry_init.Kind, global.Kind)
require.Equal(t, global_entry_init.Name, global.Name)
require.Equal(t, global_entry_init.Config, global.Config)
})
}
func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
type testcase struct {
name string
entries []structs.ConfigEntry
serverCB func(c *Config)
expectMessage string
}
cases := []testcase{
{
name: "service-splitter without L7 protocol",
entries: []structs.ConfigEntry{
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "web",
Splits: []structs.ServiceSplit{
{Weight: 100, Service: "web"},
},
},
},
expectMessage: `Failed to apply configuration entry "service-splitter" / "web": discovery chain "web" uses a protocol "tcp" that does not permit advanced routing or splitting behavior"`,
},
{
name: "service-intentions without migration",
entries: []structs.ConfigEntry{
&structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "web",
Sources: []*structs.SourceIntention{
{
Name: "debug",
Action: structs.IntentionActionAllow,
},
},
},
},
serverCB: func(c *Config) {
c.OverrideInitialSerfTags = func(tags map[string]string) {
tags["ft_si"] = "0"
}
},
expectMessage: `Refusing to apply configuration entry "service-intentions" / "web" because intentions are still being migrated to config entries`,
},
{
name: "service-intentions without Connect",
entries: []structs.ConfigEntry{
&structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "web",
Sources: []*structs.SourceIntention{
{
Name: "debug",
Action: structs.IntentionActionAllow,
},
},
},
},
serverCB: func(c *Config) {
c.ConnectEnabled = false
},
expectMessage: `Refusing to apply configuration entry "service-intentions" / "web" because Connect must be enabled to bootstrap intentions"`,
2020-07-29 20:05:51 +00:00
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
pr, pw := io.Pipe()
defer pw.Close()
var (
ch = make(chan string, 1)
applyErrorLine string
)
go func() {
defer pr.Close()
scan := bufio.NewScanner(pr)
for scan.Scan() {
line := scan.Text()
if strings.Contains(line, "failed to establish leadership") {
applyErrorLine = line
ch <- ""
return
}
if strings.Contains(line, "successfully established leadership") {
ch <- "leadership should not have gotten here if config entries properly failed"
return
}
}
if scan.Err() != nil {
ch <- fmt.Sprintf("ERROR: %v", scan.Err())
} else {
ch <- "should not get here"
}
}()
_, config := testServerConfig(t)
config.Build = "1.6.0"
config.ConfigEntryBootstrap = tc.entries
if tc.serverCB != nil {
tc.serverCB(config)
}
2020-09-11 16:43:29 +00:00
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: config.NodeName,
Level: testutil.TestLogLevel,
Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)),
})
deps := newDefaultDeps(t, config)
deps.Logger = logger
2020-07-29 20:05:51 +00:00
srv, err := NewServer(config, deps)
require.NoError(t, err)
defer srv.Shutdown()
select {
case result := <-ch:
require.Empty(t, result)
if tc.expectMessage != "" {
require.Contains(t, applyErrorLine, tc.expectMessage)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for a result from tailing logs")
}
})
2020-07-29 20:05:51 +00:00
}
}
func TestDatacenterSupportsFederationStates(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
addGateway := func(t *testing.T, srv *Server, dc, node string) {
t.Helper()
arg := structs.RegisterRequest{
Datacenter: dc,
Node: node,
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Port: 8080,
},
}
var out struct{}
require.NoError(t, srv.RPC("Catalog.Register", &arg, &out))
}
t.Run("one node primary with old version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
updateSerfTags(s1, "ft_fs", "0")
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
})
t.Run("one node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
// Wait until after AE runs at least once.
retry.Run(t, func(r *retry.R) {
arg := structs.FederationStateQuery{
Datacenter: "dc1",
TargetDatacenter: "dc1",
}
var out structs.FederationStateResponse
require.NoError(r, s1.RPC("FederationState.Get", &arg, &out))
require.NotNil(r, out.State)
require.Len(r, out.State.MeshGateways, 1)
})
})
t.Run("two node primary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
updateSerfTags(s1, "ft_fs", "0")
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
waitForLeaderEstablishment(t, s1)
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 shouldn't activate fedstates")
}
})
})
t.Run("two node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
addGateway(t, s1, "dc1", "node1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 didn't activate fedstates")
}
})
// Wait until after AE runs at least once.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 1)
require.Len(r, out.States[0].MeshGateways, 1)
})
})
t.Run("primary and secondary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
addGateway(t, s1, "dc1", "node1")
addGateway(t, s2, "dc2", "node2")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 didn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 didn't activate fedstates")
}
})
// Wait until after AE runs at least once for both.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 2)
require.Len(r, out.States[0].MeshGateways, 1)
require.Len(r, out.States[1].MeshGateways, 1)
})
// Wait until after replication runs for the secondary.
retry.Run(t, func(r *retry.R) {
arg := structs.DCSpecificRequest{
Datacenter: "dc2",
}
var out structs.IndexedFederationStates
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
require.Len(r, out.States, 2)
require.Len(r, out.States[0].MeshGateways, 1)
require.Len(r, out.States[1].MeshGateways, 1)
})
})
t.Run("primary and secondary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
updateSerfTags(s1, "ft_fs", "0")
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
addGateway(t, s1, "dc1", "node1")
addGateway(t, s2, "dc2", "node2")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsFederationStates() {
r.Fatal("server 1 shouldn't activate fedstates")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsFederationStates() {
r.Fatal("server 2 shouldn't activate fedstates")
}
})
})
}
connect: intentions are now managed as a new config entry kind "service-intentions" (#8834) - Upgrade the ConfigEntry.ListAll RPC to be kind-aware so that older copies of consul will not see new config entries it doesn't understand replicate down. - Add shim conversion code so that the old API/CLI method of interacting with intentions will continue to work so long as none of these are edited via config entry endpoints. Almost all of the read-only APIs will continue to function indefinitely. - Add new APIs that operate on individual intentions without IDs so that the UI doesn't need to implement CAS operations. - Add a new serf feature flag indicating support for intentions-as-config-entries. - The old line-item intentions way of interacting with the state store will transparently flip between the legacy memdb table and the config entry representations so that readers will never see a hiccup during migration where the results are incomplete. It uses a piece of system metadata to control the flip. - The primary datacenter will begin migrating intentions into config entries on startup once all servers in the datacenter are on a version of Consul with the intentions-as-config-entries feature flag. When it is complete the old state store representations will be cleared. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up. - The secondary datacenters continue to run the old intentions replicator until all servers in the secondary DC and primary DC support intentions-as-config-entries (via serf flag). Once this condition it met the old intentions replicator ceases. - The secondary datacenters replicate the new config entries as they are migrated in the primary. When they detect that the primary has zeroed it's old state store table it waits until all config entries up to that point are replicated and then zeroes its own copy of the old state store table. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up.
2020-10-06 18:24:05 +00:00
func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
connect: intentions are now managed as a new config entry kind "service-intentions" (#8834) - Upgrade the ConfigEntry.ListAll RPC to be kind-aware so that older copies of consul will not see new config entries it doesn't understand replicate down. - Add shim conversion code so that the old API/CLI method of interacting with intentions will continue to work so long as none of these are edited via config entry endpoints. Almost all of the read-only APIs will continue to function indefinitely. - Add new APIs that operate on individual intentions without IDs so that the UI doesn't need to implement CAS operations. - Add a new serf feature flag indicating support for intentions-as-config-entries. - The old line-item intentions way of interacting with the state store will transparently flip between the legacy memdb table and the config entry representations so that readers will never see a hiccup during migration where the results are incomplete. It uses a piece of system metadata to control the flip. - The primary datacenter will begin migrating intentions into config entries on startup once all servers in the datacenter are on a version of Consul with the intentions-as-config-entries feature flag. When it is complete the old state store representations will be cleared. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up. - The secondary datacenters continue to run the old intentions replicator until all servers in the secondary DC and primary DC support intentions-as-config-entries (via serf flag). Once this condition it met the old intentions replicator ceases. - The secondary datacenters replicate the new config entries as they are migrated in the primary. When they detect that the primary has zeroed it's old state store table it waits until all config entries up to that point are replicated and then zeroes its own copy of the old state store table. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up.
2020-10-06 18:24:05 +00:00
addLegacyIntention := func(srv *Server, dc, src, dest string, allow bool) error {
ixn := &structs.Intention{
SourceNS: structs.IntentionDefaultNamespace,
SourceName: src,
DestinationNS: structs.IntentionDefaultNamespace,
DestinationName: dest,
SourceType: structs.IntentionSourceConsul,
Meta: map[string]string{},
}
if allow {
ixn.Action = structs.IntentionActionAllow
} else {
ixn.Action = structs.IntentionActionDeny
}
//nolint:staticcheck
ixn.UpdatePrecedence()
//nolint:staticcheck
ixn.SetHash()
arg := structs.IntentionRequest{
Datacenter: dc,
Op: structs.IntentionOpCreate,
Intention: ixn,
}
var id string
return srv.RPC("Intention.Apply", &arg, &id)
}
getConfigEntry := func(srv *Server, dc, kind, name string) (structs.ConfigEntry, error) {
arg := structs.ConfigEntryQuery{
Datacenter: dc,
Kind: kind,
Name: name,
}
var reply structs.ConfigEntryResponse
if err := srv.RPC("ConfigEntry.Get", &arg, &reply); err != nil {
return nil, err
}
return reply.Entry, nil
}
disableServiceIntentions := func(tags map[string]string) {
tags["ft_si"] = "0"
}
defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
connect: intentions are now managed as a new config entry kind "service-intentions" (#8834) - Upgrade the ConfigEntry.ListAll RPC to be kind-aware so that older copies of consul will not see new config entries it doesn't understand replicate down. - Add shim conversion code so that the old API/CLI method of interacting with intentions will continue to work so long as none of these are edited via config entry endpoints. Almost all of the read-only APIs will continue to function indefinitely. - Add new APIs that operate on individual intentions without IDs so that the UI doesn't need to implement CAS operations. - Add a new serf feature flag indicating support for intentions-as-config-entries. - The old line-item intentions way of interacting with the state store will transparently flip between the legacy memdb table and the config entry representations so that readers will never see a hiccup during migration where the results are incomplete. It uses a piece of system metadata to control the flip. - The primary datacenter will begin migrating intentions into config entries on startup once all servers in the datacenter are on a version of Consul with the intentions-as-config-entries feature flag. When it is complete the old state store representations will be cleared. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up. - The secondary datacenters continue to run the old intentions replicator until all servers in the secondary DC and primary DC support intentions-as-config-entries (via serf flag). Once this condition it met the old intentions replicator ceases. - The secondary datacenters replicate the new config entries as they are migrated in the primary. When they detect that the primary has zeroed it's old state store table it waits until all config entries up to that point are replicated and then zeroes its own copy of the old state store table. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up.
2020-10-06 18:24:05 +00:00
t.Run("one node primary with old version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.OverrideInitialSerfTags = disableServiceIntentions
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 shouldn't activate service-intentions")
}
})
testutil.RequireErrorContains(t,
addLegacyIntention(s1, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
})
t.Run("one node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 didn't activate service-intentions")
}
})
// try to write a using the legacy API and it should work
require.NoError(t, addLegacyIntention(s1, "dc1", "web", "api", true))
// read it back as a config entry and that should work too
raw, err := getConfigEntry(s1, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
got, ok := raw.(*structs.ServiceIntentionsConfigEntry)
require.True(t, ok)
require.Len(t, got.Sources, 1)
expect := &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "api",
EnterpriseMeta: *defaultEntMeta,
Sources: []*structs.SourceIntention{
{
Name: "web",
EnterpriseMeta: *defaultEntMeta,
Action: structs.IntentionActionAllow,
Type: structs.IntentionSourceConsul,
Precedence: 9,
LegacyMeta: map[string]string{},
LegacyID: got.Sources[0].LegacyID,
// steal
LegacyCreateTime: got.Sources[0].LegacyCreateTime,
LegacyUpdateTime: got.Sources[0].LegacyUpdateTime,
},
},
RaftIndex: got.RaftIndex,
}
require.Equal(t, expect, got)
})
t.Run("two node primary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.OverrideInitialSerfTags = disableServiceIntentions
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
waitForLeaderEstablishment(t, s1)
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 shouldn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 shouldn't activate service-intentions")
}
})
testutil.RequireErrorContains(t,
addLegacyIntention(s1, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
testutil.RequireErrorContains(t,
addLegacyIntention(s2, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
})
t.Run("two node primary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s1}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 didn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 didn't activate service-intentions")
}
})
// try to write a using the legacy API and it should work from both sides
require.NoError(t, addLegacyIntention(s1, "dc1", "web", "api", true))
require.NoError(t, addLegacyIntention(s2, "dc1", "web2", "api", true))
// read it back as a config entry and that should work too
raw, err := getConfigEntry(s1, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
raw, err = getConfigEntry(s2, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
})
t.Run("primary and secondary with new version", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
retry.Run(t, func(r *retry.R) {
if !s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 didn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if !s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 didn't activate service-intentions")
}
})
// try to write a using the legacy API
require.NoError(t, addLegacyIntention(s1, "dc1", "web", "api", true))
// read it back as a config entry and that should work too
raw, err := getConfigEntry(s1, "dc1", structs.ServiceIntentions, "api")
require.NoError(t, err)
require.NotNil(t, raw)
// Wait until after replication runs for the secondary.
retry.Run(t, func(r *retry.R) {
raw, err = getConfigEntry(s2, "dc1", structs.ServiceIntentions, "api")
require.NoError(r, err)
require.NotNil(r, raw)
})
})
t.Run("primary and secondary with mixed versions", func(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node1"
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.OverrideInitialSerfTags = disableServiceIntentions
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "node2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
// Try to join
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
retry.Run(t, func(r *retry.R) {
if s1.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 1 shouldn't activate service-intentions")
}
})
retry.Run(t, func(r *retry.R) {
if s2.DatacenterSupportsIntentionsAsConfigEntries() {
r.Fatal("server 2 shouldn't activate service-intentions")
}
})
testutil.RequireErrorContains(t,
addLegacyIntention(s1, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
testutil.RequireErrorContains(t,
addLegacyIntention(s2, "dc1", "web", "api", true),
ErrIntentionsNotUpgradedYet.Error(),
)
})
}
func TestLeader_EnableVirtualIPs(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
orig := virtualIPVersionCheckInterval
virtualIPVersionCheckInterval = 50 * time.Millisecond
t.Cleanup(func() { virtualIPVersionCheckInterval = orig })
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.Build = "1.11.2"
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
conf(c)
c.Build = "1.10.0"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Should have nothing stored.
state := s1.fsm.State()
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
require.NoError(t, err)
require.Nil(t, entry)
state = s1.fsm.State()
_, entry, err = state.SystemMetadataGet(nil, structs.SystemMetadataTermGatewayVirtualIPsEnabled)
require.NoError(t, err)
require.Nil(t, entry)
// Register a connect-native service and make sure we don't have a virtual IP yet.
err = state.EnsureRegistration(10, &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "api",
Connect: structs.ServiceConnect{
Native: true,
},
},
})
require.NoError(t, err)
vip, err := state.VirtualIPForService(structs.NewServiceName("api", nil))
require.NoError(t, err)
require.Equal(t, "", vip)
// Register a terminating gateway.
err = state.EnsureRegistration(11, &structs.RegisterRequest{
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
Service: "tgate1",
ID: "tgate1",
Kind: structs.ServiceKindTerminatingGateway,
},
})
require.NoError(t, err)
err = state.EnsureConfigEntry(12, &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "bar",
},
},
})
require.NoError(t, err)
// Make sure the service referenced in the terminating gateway config doesn't have
// a virtual IP yet.
vip, err = state.VirtualIPForService(structs.NewServiceName("bar", nil))
require.NoError(t, err)
require.Equal(t, "", vip)
// Leave s3 and wait for the version to get updated.
require.NoError(t, s3.Leave())
retry.Run(t, func(r *retry.R) {
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
require.NoError(r, err)
require.NotNil(r, entry)
require.Equal(r, "true", entry.Value)
_, entry, err = state.SystemMetadataGet(nil, structs.SystemMetadataTermGatewayVirtualIPsEnabled)
require.NoError(r, err)
require.NotNil(r, entry)
require.Equal(r, "true", entry.Value)
})
// Update the connect-native service - now there should be a virtual IP assigned.
err = state.EnsureRegistration(20, &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.2",
Service: &structs.NodeService{
Service: "api",
Connect: structs.ServiceConnect{
Native: true,
},
},
})
require.NoError(t, err)
vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil))
require.NoError(t, err)
require.Equal(t, "240.0.0.1", vip)
// Update the terminating gateway config entry - now there should be a virtual IP assigned.
err = state.EnsureConfigEntry(21, &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "api",
},
{
Name: "baz",
},
},
})
require.NoError(t, err)
_, node, err := state.NodeService("bar", "tgate1", nil)
require.NoError(t, err)
sn := structs.ServiceName{Name: "api"}
key := structs.ServiceGatewayVirtualIPTag(sn)
require.Contains(t, node.TaggedAddresses, key)
require.Equal(t, node.TaggedAddresses[key].Address, "240.0.0.1")
// Make sure the baz service (only referenced in the config entry so far)
// has a virtual IP.
vip, err = state.VirtualIPForService(structs.NewServiceName("baz", nil))
require.NoError(t, err)
require.Equal(t, "240.0.0.2", vip)
}
func TestLeader_ACL_Initialization_AnonymousToken(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.Datacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
_, anon, err := s1.fsm.State().ACLTokenGetBySecret(nil, anonymousToken, nil)
require.NoError(r, err)
require.NotNil(r, anon)
require.Len(r, anon.Policies, 0)
})
reqToken := structs.ACLTokenSetRequest{
Datacenter: "dc1",
ACLToken: structs.ACLToken{
AccessorID: structs.ACLTokenAnonymousID,
SecretID: anonymousToken,
Description: "Anonymous Token",
CreateTime: time.Now(),
Policies: []structs.ACLTokenPolicyLink{
{
ID: structs.ACLPolicyGlobalManagementID,
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var respToken structs.ACLToken
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &reqToken, &respToken))
// Restart the server to re-initialize ACLs when establishing leadership
require.NoError(t, s1.Shutdown())
dir2, newS1 := testServerWithConfig(t, func(c *Config) {
// Keep existing data dir and node info since it's a restart
c.DataDir = s1.config.DataDir
c.NodeName = s1.config.NodeName
c.NodeID = s1.config.NodeID
c.Bootstrap = true
c.Datacenter = "dc1"
c.ACLsEnabled = true
})
defer os.RemoveAll(dir2)
defer newS1.Shutdown()
testrpc.WaitForTestAgent(t, newS1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
_, anon, err := newS1.fsm.State().ACLTokenGetBySecret(nil, anonymousToken, nil)
require.NoError(r, err)
require.NotNil(r, anon)
// Existing token should not have been purged during ACL initialization
require.Len(r, anon.Policies, 1)
require.Equal(r, structs.ACLPolicyGlobalManagementID, anon.Policies[0].ID)
})
}