Fix legacy management tokens in unupgraded secondary dcs (#7908)

The ACL.GetPolicy RPC endpoint was supposed to return the “parent” policy and not always the default policy. In the case of legacy management tokens the parent policy was supposed to be “manage”. The result of us not sending this properly was that operations that required specifically a management token such as saving a snapshot would not work in secondary DCs until they were upgraded.
This commit is contained in:
Matt Keeler 2020-06-03 11:22:22 -04:00 committed by GitHub
parent e7428634f2
commit 2c615807af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 28 deletions

View File

@ -1154,16 +1154,17 @@ func (r *ACLResolver) ACLsEnabled() bool {
return true return true
} }
func (r *ACLResolver) GetMergedPolicyForToken(token string) (*acl.Policy, error) { func (r *ACLResolver) GetMergedPolicyForToken(token string) (structs.ACLIdentity, *acl.Policy, error) {
policies, err := r.resolveTokenToPolicies(token) ident, policies, err := r.resolveTokenToIdentityAndPolicies(token)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if len(policies) == 0 { if len(policies) == 0 {
return nil, acl.ErrNotFound return nil, nil, acl.ErrNotFound
} }
return policies.Merge(r.cache, r.aclConf) policy, err := policies.Merge(r.cache, r.aclConf)
return ident, policy, err
} }
// aclFilter is used to filter results from our state store based on ACL rules // aclFilter is used to filter results from our state store based on ACL rules

View File

@ -1321,11 +1321,15 @@ func (a *ACL) GetPolicy(args *structs.ACLPolicyResolveLegacyRequest, reply *stru
// Get the policy via the cache // Get the policy via the cache
parent := a.srv.config.ACLDefaultPolicy parent := a.srv.config.ACLDefaultPolicy
policy, err := a.srv.acls.GetMergedPolicyForToken(args.ACL) ident, policy, err := a.srv.acls.GetMergedPolicyForToken(args.ACL)
if err != nil { if err != nil {
return err return err
} }
if token, ok := ident.(*structs.ACLToken); ok && token.Type == structs.ACLTokenTypeManagement {
parent = "manage"
}
// translates the structures internals to most closely match what could be expressed in the original rule language // translates the structures internals to most closely match what could be expressed in the original rule language
policy = policy.ConvertToLegacy() policy = policy.ConvertToLegacy()

View File

@ -453,6 +453,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
c.ACLDatacenter = "dc1" c.ACLDatacenter = "dc1"
c.ACLsEnabled = true c.ACLsEnabled = true
c.ACLMasterToken = "root" c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
}) })
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
@ -471,9 +472,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"}, WriteRequest: structs.WriteRequest{Token: "root"},
} }
var out string var out string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out))
t.Fatalf("err: %v", err)
}
getR := structs.ACLPolicyResolveLegacyRequest{ getR := structs.ACLPolicyResolveLegacyRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -482,32 +481,43 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
var acls structs.ACLPolicyResolveLegacyResponse var acls structs.ACLPolicyResolveLegacyResponse
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
require.NoError(r, msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &acls))
if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &acls); err != nil { require.NotNil(t, acls.Policy)
r.Fatalf("err: %v", err) require.Equal(t, "deny", acls.Parent)
} require.Equal(t, 30*time.Second, acls.TTL)
if acls.Policy == nil {
r.Fatalf("Bad: %v", acls)
}
if acls.TTL != 30*time.Second {
r.Fatalf("bad: %v", acls)
}
}) })
// Do a conditional lookup with etag // Do a conditional lookup with etag
getR.ETag = acls.ETag getR.ETag = acls.ETag
var out2 structs.ACLPolicyResolveLegacyResponse var out2 structs.ACLPolicyResolveLegacyResponse
if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &out2); err != nil { require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &out2))
t.Fatalf("err: %v", err)
require.Nil(t, out2.Policy)
require.Equal(t, 30*time.Second, out2.TTL)
}
func TestACLEndpoint_GetPolicy_Management(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, testServerACLConfig(nil))
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
// wait for leader election and leader establishment to finish.
// after this the global management policy, master token and
// anonymous token will have been injected into the state store
// and we will be ready to resolve the master token
waitForLeaderEstablishment(t, s1)
req := structs.ACLPolicyResolveLegacyRequest{
Datacenter: s1.config.Datacenter,
ACL: TestDefaultMasterToken,
} }
if out2.Policy != nil { var resp structs.ACLPolicyResolveLegacyResponse
t.Fatalf("Bad: %v", out2) require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &req, &resp))
} require.Equal(t, "manage", resp.Parent)
if out2.TTL != 30*time.Second {
t.Fatalf("bad: %v", out2)
}
} }
func TestACLEndpoint_List(t *testing.T) { func TestACLEndpoint_List(t *testing.T) {