server: strip local ACL tokens from RPCs during forwarding if crossing datacenters (#7419)
Fixes #7414
This commit is contained in:
parent
e142934a9c
commit
10d3ff9a4f
|
@ -18,6 +18,10 @@ func (q *QueryOptions) TokenSecret() string {
|
||||||
return q.Token
|
return q.Token
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *QueryOptions) SetTokenSecret(s string) {
|
||||||
|
q.Token = s
|
||||||
|
}
|
||||||
|
|
||||||
// SetToken is needed to implement the structs.QueryOptionsCompat interface
|
// SetToken is needed to implement the structs.QueryOptionsCompat interface
|
||||||
func (q *QueryOptions) SetToken(token string) {
|
func (q *QueryOptions) SetToken(token string) {
|
||||||
q.Token = token
|
q.Token = token
|
||||||
|
@ -102,6 +106,10 @@ func (w WriteRequest) TokenSecret() string {
|
||||||
return w.Token
|
return w.Token
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WriteRequest) SetTokenSecret(s string) {
|
||||||
|
w.Token = s
|
||||||
|
}
|
||||||
|
|
||||||
// AllowStaleRead returns whether a stale read should be allowed
|
// AllowStaleRead returns whether a stale read should be allowed
|
||||||
func (w WriteRequest) AllowStaleRead() bool {
|
func (w WriteRequest) AllowStaleRead() bool {
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -103,6 +103,10 @@ func (id *missingIdentity) IsExpired(asOf time.Time) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (id *missingIdentity) IsLocal() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (id *missingIdentity) EnterpriseMetadata() *structs.EnterpriseMeta {
|
func (id *missingIdentity) EnterpriseMetadata() *structs.EnterpriseMeta {
|
||||||
return structs.DefaultEnterpriseMeta()
|
return structs.DefaultEnterpriseMeta()
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,6 +161,8 @@ func joinWAN(t *testing.T, member, leader *Server) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForNewACLs(t *testing.T, server *Server) {
|
func waitForNewACLs(t *testing.T, server *Server) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
require.False(r, server.UseLegacyACLs(), "Server cannot use new ACLs")
|
require.False(r, server.UseLegacyACLs(), "Server cannot use new ACLs")
|
||||||
})
|
})
|
||||||
|
@ -169,6 +171,7 @@ func waitForNewACLs(t *testing.T, server *Server) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationType structs.ACLReplicationType, minPolicyIndex, minTokenIndex, minRoleIndex uint64) {
|
func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationType structs.ACLReplicationType, minPolicyIndex, minTokenIndex, minRoleIndex uint64) {
|
||||||
|
t.Helper()
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
status := server.getACLReplicationStatus()
|
status := server.getACLReplicationStatus()
|
||||||
require.Equal(r, expectedReplicationType, status.ReplicationType, "Server not running new replicator yet")
|
require.Equal(r, expectedReplicationType, status.ReplicationType, "Server not running new replicator yet")
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/consul/wanfed"
|
"github.com/hashicorp/consul/agent/consul/wanfed"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
|
@ -470,6 +471,23 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
|
||||||
// Handle DC forwarding
|
// Handle DC forwarding
|
||||||
dc := info.RequestDatacenter()
|
dc := info.RequestDatacenter()
|
||||||
if dc != s.config.Datacenter {
|
if dc != s.config.Datacenter {
|
||||||
|
// Local tokens only work within the current datacenter. Check to see
|
||||||
|
// if we are attempting to forward one to a remote datacenter and strip
|
||||||
|
// it, falling back on the anonymous token on the other end.
|
||||||
|
if token := info.TokenSecret(); token != "" {
|
||||||
|
done, ident, err := s.ResolveIdentityFromToken(token)
|
||||||
|
if done {
|
||||||
|
if err != nil && !acl.IsErrNotFound(err) {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if ident != nil && ident.IsLocal() {
|
||||||
|
// Strip it from the request.
|
||||||
|
info.SetTokenSecret("")
|
||||||
|
defer info.SetTokenSecret(token)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err := s.forwardDC(method, dc, args, reply)
|
err := s.forwardDC(method, dc, args, reply)
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,9 +11,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
tokenStore "github.com/hashicorp/consul/agent/token"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
@ -732,3 +735,134 @@ func TestRPC_readUint32(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRPC_LocalTokenStrippedOnForward(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.PrimaryDatacenter = "dc1"
|
||||||
|
c.ACLsEnabled = true
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLEnforceVersion8 = true
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.Datacenter = "dc2"
|
||||||
|
c.PrimaryDatacenter = "dc1"
|
||||||
|
c.ACLsEnabled = true
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
c.ACLTokenReplication = true
|
||||||
|
c.ACLEnforceVersion8 = true
|
||||||
|
c.ACLReplicationRate = 100
|
||||||
|
c.ACLReplicationBurst = 100
|
||||||
|
c.ACLReplicationApplyLimit = 1000000
|
||||||
|
})
|
||||||
|
s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
|
||||||
|
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
codec2 := rpcClient(t, s2)
|
||||||
|
defer codec2.Close()
|
||||||
|
|
||||||
|
// Try to join.
|
||||||
|
joinWAN(t, s2, s1)
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||||
|
|
||||||
|
// Wait for legacy acls to be disabled so we are clear that
|
||||||
|
// legacy replication isn't meddling.
|
||||||
|
waitForNewACLs(t, s1)
|
||||||
|
waitForNewACLs(t, s2)
|
||||||
|
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||||
|
|
||||||
|
// create simple kv policy
|
||||||
|
kvPolicy, err := upsertTestPolicyWithRules(codec, "root", "dc1", `
|
||||||
|
key_prefix "" { policy = "write" }
|
||||||
|
`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Wait for it to replicate
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, p, err := s2.fsm.State().ACLPolicyGetByID(nil, kvPolicy.ID, &structs.EnterpriseMeta{})
|
||||||
|
require.Nil(r, err)
|
||||||
|
require.NotNil(r, p)
|
||||||
|
})
|
||||||
|
|
||||||
|
// create local token that only works in DC2
|
||||||
|
localToken2, err := upsertTestToken(codec, "root", "dc2", func(token *structs.ACLToken) {
|
||||||
|
token.Local = true
|
||||||
|
token.Policies = []structs.ACLTokenPolicyLink{
|
||||||
|
{ID: kvPolicy.ID},
|
||||||
|
}
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Try to use it locally (it should work)
|
||||||
|
arg := structs.KVSRequest{
|
||||||
|
Datacenter: "dc2",
|
||||||
|
Op: api.KVSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: localToken2.SecretID},
|
||||||
|
}
|
||||||
|
var out bool
|
||||||
|
err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped")
|
||||||
|
|
||||||
|
// Try to use it remotely
|
||||||
|
arg = structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: api.KVSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: localToken2.SecretID},
|
||||||
|
}
|
||||||
|
err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out)
|
||||||
|
if !acl.IsErrPermissionDenied(err) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the anon token to also be able to write to kv
|
||||||
|
{
|
||||||
|
tokenUpsertReq := structs.ACLTokenSetRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ACLToken: structs.ACLToken{
|
||||||
|
AccessorID: structs.ACLTokenAnonymousID,
|
||||||
|
Policies: []structs.ACLTokenPolicyLink{
|
||||||
|
structs.ACLTokenPolicyLink{
|
||||||
|
ID: kvPolicy.ID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
token := structs.ACLToken{}
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &tokenUpsertReq, &token)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, token.SecretID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to use it remotely again, but this time it should fallback to anon
|
||||||
|
arg = structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: api.KVSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: localToken2.SecretID},
|
||||||
|
}
|
||||||
|
err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped")
|
||||||
|
}
|
||||||
|
|
|
@ -119,6 +119,7 @@ type ACLIdentity interface {
|
||||||
EmbeddedPolicy() *ACLPolicy
|
EmbeddedPolicy() *ACLPolicy
|
||||||
ServiceIdentityList() []*ACLServiceIdentity
|
ServiceIdentityList() []*ACLServiceIdentity
|
||||||
IsExpired(asOf time.Time) bool
|
IsExpired(asOf time.Time) bool
|
||||||
|
IsLocal() bool
|
||||||
EnterpriseMetadata() *EnterpriseMeta
|
EnterpriseMetadata() *EnterpriseMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,6 +370,10 @@ func (t *ACLToken) IsExpired(asOf time.Time) bool {
|
||||||
return t.ExpirationTime.Before(asOf)
|
return t.ExpirationTime.Before(asOf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *ACLToken) IsLocal() bool {
|
||||||
|
return t.Local
|
||||||
|
}
|
||||||
|
|
||||||
func (t *ACLToken) HasExpirationTime() bool {
|
func (t *ACLToken) HasExpirationTime() bool {
|
||||||
return t.ExpirationTime != nil && !t.ExpirationTime.IsZero()
|
return t.ExpirationTime != nil && !t.ExpirationTime.IsZero()
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,6 +148,7 @@ type RPCInfo interface {
|
||||||
IsRead() bool
|
IsRead() bool
|
||||||
AllowStaleRead() bool
|
AllowStaleRead() bool
|
||||||
TokenSecret() string
|
TokenSecret() string
|
||||||
|
SetTokenSecret(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryOptions is used to specify various flags for read queries
|
// QueryOptions is used to specify various flags for read queries
|
||||||
|
@ -237,6 +238,10 @@ func (q QueryOptions) TokenSecret() string {
|
||||||
return q.Token
|
return q.Token
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *QueryOptions) SetTokenSecret(s string) {
|
||||||
|
q.Token = s
|
||||||
|
}
|
||||||
|
|
||||||
type WriteRequest struct {
|
type WriteRequest struct {
|
||||||
// Token is the ACL token ID. If not provided, the 'anonymous'
|
// Token is the ACL token ID. If not provided, the 'anonymous'
|
||||||
// token is assumed for backwards compatibility.
|
// token is assumed for backwards compatibility.
|
||||||
|
@ -256,6 +261,10 @@ func (w WriteRequest) TokenSecret() string {
|
||||||
return w.Token
|
return w.Token
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WriteRequest) SetTokenSecret(s string) {
|
||||||
|
w.Token = s
|
||||||
|
}
|
||||||
|
|
||||||
// QueryMeta allows a query response to include potentially
|
// QueryMeta allows a query response to include potentially
|
||||||
// useful metadata about a query
|
// useful metadata about a query
|
||||||
type QueryMeta struct {
|
type QueryMeta struct {
|
||||||
|
|
Loading…
Reference in New Issue