diff --git a/agent/consul/acl_endpoint_test.go b/agent/consul/acl_endpoint_test.go index daeb27c88..bba3e1172 100644 --- a/agent/consul/acl_endpoint_test.go +++ b/agent/consul/acl_endpoint_test.go @@ -5314,6 +5314,18 @@ func retrieveTestToken(codec rpc.ClientCodec, masterToken string, datacenter str return &out, nil } +func deleteTestToken(codec rpc.ClientCodec, masterToken string, datacenter string, tokenAccessor string) error { + arg := structs.ACLTokenDeleteRequest{ + Datacenter: datacenter, + TokenID: tokenAccessor, + WriteRequest: structs.WriteRequest{Token: masterToken}, + } + + var ignored string + err := msgpackrpc.CallWithCodec(codec, "ACL.TokenDelete", &arg, &ignored) + return err +} + func deleteTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, policyID string) error { arg := structs.ACLPolicyDeleteRequest{ Datacenter: datacenter, diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index 5a2e707ff..2b2118fc7 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -574,13 +574,15 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) { // the intentions there to the local state. func (s *Server) replicateIntentions(stopCh <-chan struct{}) { args := structs.DCSpecificRequest{ - Datacenter: s.config.PrimaryDatacenter, - QueryOptions: structs.QueryOptions{Token: s.tokens.ReplicationToken()}, + Datacenter: s.config.PrimaryDatacenter, } s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter) retryLoopBackoff(stopCh, func() error { + // Always use the latest replication token value in case it changed while looping. + args.QueryOptions.Token = s.tokens.ReplicationToken() + var remote structs.IndexedIntentions if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil { return err diff --git a/agent/consul/leader_connect_test.go b/agent/consul/leader_connect_test.go index fde0622f8..82c6b68fa 100644 --- a/agent/consul/leader_connect_test.go +++ b/agent/consul/leader_connect_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" + tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" uuid "github.com/hashicorp/go-uuid" @@ -463,28 +464,56 @@ func TestLeader_ReplicateIntentions(t *testing.T) { assert := assert.New(t) require := require.New(t) - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.PrimaryDatacenter = "dc1" + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) defer os.RemoveAll(dir1) defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() testrpc.WaitForLeader(t, s1.RPC, "dc1") + s1.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig) + + // create some tokens + replToken1, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`) + require.NoError(err) + + replToken2, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`) + require.NoError(err) + // dc2 as a secondary DC dir2, s2 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc2" c.PrimaryDatacenter = "dc1" + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLDefaultPolicy = "deny" + c.ACLTokenReplication = false }) defer os.RemoveAll(dir2) defer s2.Shutdown() + s2.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig) + + // start out with one token + s2.tokens.UpdateReplicationToken(replToken1.SecretID, tokenStore.TokenSourceConfig) + // Create the WAN link joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s2.RPC, "dc2") // Create an intention in dc1 ixn := structs.IntentionRequest{ - Datacenter: "dc1", - Op: structs.IntentionOpCreate, + Datacenter: "dc1", + WriteRequest: structs.WriteRequest{Token: "root"}, + Op: structs.IntentionOpCreate, Intention: &structs.Intention{ SourceNS: structs.IntentionDefaultNamespace, SourceName: "test", @@ -504,8 +533,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) { ixn.Intention.ID = reply retry.Run(t, func(r *retry.R) { req := &structs.IntentionQueryRequest{ - Datacenter: "dc2", - IntentionID: ixn.Intention.ID, + Datacenter: "dc2", + QueryOptions: structs.QueryOptions{Token: "root"}, + IntentionID: ixn.Intention.ID, } var resp structs.IndexedIntentions r.Check(s2.RPC("Intention.Get", req, &resp)) @@ -519,6 +549,12 @@ func TestLeader_ReplicateIntentions(t *testing.T) { // Sleep a bit so that the UpdatedAt field will definitely be different time.Sleep(1 * time.Millisecond) + // delete underlying acl token being used for replication + require.NoError(deleteTestToken(codec, "root", "dc1", replToken1.AccessorID)) + + // switch to the other token + s2.tokens.UpdateReplicationToken(replToken2.SecretID, tokenStore.TokenSourceConfig) + // Update the intention in dc1 ixn.Op = structs.IntentionOpUpdate ixn.Intention.ID = reply @@ -530,8 +566,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) { var resp structs.IndexedIntentions retry.Run(t, func(r *retry.R) { req := &structs.IntentionQueryRequest{ - Datacenter: "dc2", - IntentionID: ixn.Intention.ID, + Datacenter: "dc2", + QueryOptions: structs.QueryOptions{Token: "root"}, + IntentionID: ixn.Intention.ID, } r.Check(s2.RPC("Intention.Get", req, &resp)) if len(resp.Intentions) != 1 { @@ -559,8 +596,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) { // Wait for the delete to be replicated retry.Run(t, func(r *retry.R) { req := &structs.IntentionQueryRequest{ - Datacenter: "dc2", - IntentionID: ixn.Intention.ID, + Datacenter: "dc2", + QueryOptions: structs.QueryOptions{Token: "root"}, + IntentionID: ixn.Intention.ID, } var resp structs.IndexedIntentions err := s2.RPC("Intention.Get", req, &resp)