connect: ensure intention replication continues to work when the replication ACL token changes (#6288)

This commit is contained in:
R.B. Boyer 2019-08-07 11:34:09 -05:00 committed by GitHub
parent 3ac803da5e
commit 357ca39868
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 11 deletions

View File

@ -5314,6 +5314,18 @@ func retrieveTestToken(codec rpc.ClientCodec, masterToken string, datacenter str
return &out, nil return &out, nil
} }
func deleteTestToken(codec rpc.ClientCodec, masterToken string, datacenter string, tokenAccessor string) error {
arg := structs.ACLTokenDeleteRequest{
Datacenter: datacenter,
TokenID: tokenAccessor,
WriteRequest: structs.WriteRequest{Token: masterToken},
}
var ignored string
err := msgpackrpc.CallWithCodec(codec, "ACL.TokenDelete", &arg, &ignored)
return err
}
func deleteTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, policyID string) error { func deleteTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, policyID string) error {
arg := structs.ACLPolicyDeleteRequest{ arg := structs.ACLPolicyDeleteRequest{
Datacenter: datacenter, Datacenter: datacenter,

View File

@ -574,13 +574,15 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
// the intentions there to the local state. // the intentions there to the local state.
func (s *Server) replicateIntentions(stopCh <-chan struct{}) { func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
args := structs.DCSpecificRequest{ args := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter, Datacenter: s.config.PrimaryDatacenter,
QueryOptions: structs.QueryOptions{Token: s.tokens.ReplicationToken()},
} }
s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter) s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter)
retryLoopBackoff(stopCh, func() error { retryLoopBackoff(stopCh, func() error {
// Always use the latest replication token value in case it changed while looping.
args.QueryOptions.Token = s.tokens.ReplicationToken()
var remote structs.IndexedIntentions var remote structs.IndexedIntentions
if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil { if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil {
return err return err

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
uuid "github.com/hashicorp/go-uuid" uuid "github.com/hashicorp/go-uuid"
@ -463,28 +464,56 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
require := require.New(t) require := require.New(t)
dir1, s1 := testServer(t) dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
s1.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)
// create some tokens
replToken1, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`)
require.NoError(err)
replToken2, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`)
require.NoError(err)
// dc2 as a secondary DC // dc2 as a secondary DC
dir2, s2 := testServerWithConfig(t, func(c *Config) { dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2" c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1" c.PrimaryDatacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLDefaultPolicy = "deny"
c.ACLTokenReplication = false
}) })
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
s2.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)
// start out with one token
s2.tokens.UpdateReplicationToken(replToken1.SecretID, tokenStore.TokenSourceConfig)
// Create the WAN link // Create the WAN link
joinWAN(t, s2, s1) joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s2.RPC, "dc2") testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Create an intention in dc1 // Create an intention in dc1
ixn := structs.IntentionRequest{ ixn := structs.IntentionRequest{
Datacenter: "dc1", Datacenter: "dc1",
Op: structs.IntentionOpCreate, WriteRequest: structs.WriteRequest{Token: "root"},
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{ Intention: &structs.Intention{
SourceNS: structs.IntentionDefaultNamespace, SourceNS: structs.IntentionDefaultNamespace,
SourceName: "test", SourceName: "test",
@ -504,8 +533,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
ixn.Intention.ID = reply ixn.Intention.ID = reply
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{ req := &structs.IntentionQueryRequest{
Datacenter: "dc2", Datacenter: "dc2",
IntentionID: ixn.Intention.ID, QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID,
} }
var resp structs.IndexedIntentions var resp structs.IndexedIntentions
r.Check(s2.RPC("Intention.Get", req, &resp)) r.Check(s2.RPC("Intention.Get", req, &resp))
@ -519,6 +549,12 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
// Sleep a bit so that the UpdatedAt field will definitely be different // Sleep a bit so that the UpdatedAt field will definitely be different
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
// delete underlying acl token being used for replication
require.NoError(deleteTestToken(codec, "root", "dc1", replToken1.AccessorID))
// switch to the other token
s2.tokens.UpdateReplicationToken(replToken2.SecretID, tokenStore.TokenSourceConfig)
// Update the intention in dc1 // Update the intention in dc1
ixn.Op = structs.IntentionOpUpdate ixn.Op = structs.IntentionOpUpdate
ixn.Intention.ID = reply ixn.Intention.ID = reply
@ -530,8 +566,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
var resp structs.IndexedIntentions var resp structs.IndexedIntentions
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{ req := &structs.IntentionQueryRequest{
Datacenter: "dc2", Datacenter: "dc2",
IntentionID: ixn.Intention.ID, QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID,
} }
r.Check(s2.RPC("Intention.Get", req, &resp)) r.Check(s2.RPC("Intention.Get", req, &resp))
if len(resp.Intentions) != 1 { if len(resp.Intentions) != 1 {
@ -559,8 +596,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
// Wait for the delete to be replicated // Wait for the delete to be replicated
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{ req := &structs.IntentionQueryRequest{
Datacenter: "dc2", Datacenter: "dc2",
IntentionID: ixn.Intention.ID, QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID,
} }
var resp structs.IndexedIntentions var resp structs.IndexedIntentions
err := s2.RPC("Intention.Get", req, &resp) err := s2.RPC("Intention.Get", req, &resp)