Accept Workload Identities for Client RPCs (#16254)
This change resolves policies for workload identities when calling Client RPCs. Previously only ACL tokens could be used for Client RPCs. Since the same cache is used for both bearer tokens (ACL and Workload ID), the token cache size was doubled. --------- Co-authored-by: James Rasell <jrasell@users.noreply.github.com>
This commit is contained in:
parent
39e3a1ac3e
commit
bd7b60712e
|
@ -1,5 +1,7 @@
|
|||
rules:
|
||||
# Check potentially unauthenticated RPC endpoints
|
||||
# Check potentially unauthenticated RPC endpoints. Technically more
|
||||
# authorization (authz) oriented than authn, but before Nomad 1.4/1.5 that
|
||||
# distinction wasn't as important.
|
||||
- id: "rpc-potentially-unauthenticated"
|
||||
patterns:
|
||||
- pattern: |
|
||||
|
@ -89,6 +91,7 @@ rules:
|
|||
patterns:
|
||||
# Endpoints that are expected not to have authentication.
|
||||
- pattern-not: '"ACL.Bootstrap"'
|
||||
- pattern-not: '"ACL.GetClaimPolicies"'
|
||||
- pattern-not: '"ACL.ResolveToken"'
|
||||
- pattern-not: '"ACL.UpsertOneTimeToken"'
|
||||
- pattern-not: '"ACL.ExchangeOneTimeToken"'
|
||||
|
|
113
client/acl.go
113
client/acl.go
|
@ -17,9 +17,10 @@ const (
|
|||
// construction cost, so we keep the hot objects cached to reduce the ACL token resolution time.
|
||||
aclCacheSize = 64
|
||||
|
||||
// tokenCacheSize is the number of ACL tokens to keep cached. Tokens have a fetching cost,
|
||||
// so we keep the hot tokens cached to reduce the lookups.
|
||||
tokenCacheSize = 64
|
||||
// tokenCacheSize is the number of bearer tokens, ACL and workload identity,
|
||||
// to keep cached. Tokens have a fetching cost, so we keep the hot tokens
|
||||
// cached to reduce the lookups.
|
||||
tokenCacheSize = 128
|
||||
|
||||
// roleCacheSize is the number of ACL roles to keep cached. Looking up
|
||||
// roles requires an RPC call, so we keep the hot roles cached to reduce
|
||||
|
@ -37,7 +38,7 @@ type clientACLResolver struct {
|
|||
policyCache *structs.ACLCache[*structs.ACLPolicy]
|
||||
|
||||
// tokenCache is used to maintain the fetched token objects
|
||||
tokenCache *structs.ACLCache[*structs.ACLToken]
|
||||
tokenCache *structs.ACLCache[*structs.AuthenticatedIdentity]
|
||||
|
||||
// roleCache is used to maintain a cache of the fetched ACL roles. Each
|
||||
// entry is keyed by the role ID.
|
||||
|
@ -48,23 +49,18 @@ type clientACLResolver struct {
|
|||
func (c *clientACLResolver) init() {
|
||||
c.aclCache = structs.NewACLCache[*acl.ACL](aclCacheSize)
|
||||
c.policyCache = structs.NewACLCache[*structs.ACLPolicy](policyCacheSize)
|
||||
c.tokenCache = structs.NewACLCache[*structs.ACLToken](tokenCacheSize)
|
||||
c.tokenCache = structs.NewACLCache[*structs.AuthenticatedIdentity](tokenCacheSize)
|
||||
c.roleCache = structs.NewACLCache[*structs.ACLRole](roleCacheSize)
|
||||
}
|
||||
|
||||
// ResolveToken is used to translate an ACL Token Secret ID into
|
||||
// an ACL object, nil if ACLs are disabled, or an error.
|
||||
func (c *Client) ResolveToken(secretID string) (*acl.ACL, error) {
|
||||
a, _, err := c.resolveTokenAndACL(secretID)
|
||||
// ResolveToken is used to translate an ACL Token Secret ID or workload
|
||||
// identity into an ACL object, nil if ACLs are disabled, or an error.
|
||||
func (c *Client) ResolveToken(bearerToken string) (*acl.ACL, error) {
|
||||
a, _, err := c.resolveTokenAndACL(bearerToken)
|
||||
return a, err
|
||||
}
|
||||
|
||||
func (c *Client) ResolveSecretToken(secretID string) (*structs.ACLToken, error) {
|
||||
_, t, err := c.resolveTokenAndACL(secretID)
|
||||
return t, err
|
||||
}
|
||||
|
||||
func (c *Client) resolveTokenAndACL(secretID string) (*acl.ACL, *structs.ACLToken, error) {
|
||||
func (c *Client) resolveTokenAndACL(bearerToken string) (*acl.ACL, *structs.AuthenticatedIdentity, error) {
|
||||
// Fast-path if ACLs are disabled
|
||||
if !c.GetConfig().ACLEnabled {
|
||||
return nil, nil, nil
|
||||
|
@ -72,39 +68,61 @@ func (c *Client) resolveTokenAndACL(secretID string) (*acl.ACL, *structs.ACLToke
|
|||
defer metrics.MeasureSince([]string{"client", "acl", "resolve_token"}, time.Now())
|
||||
|
||||
// Resolve the token value
|
||||
token, err := c.resolveTokenValue(secretID)
|
||||
ident, err := c.resolveTokenValue(bearerToken)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if token == nil {
|
||||
|
||||
// Only allow ACLs and workload identities to call client RPCs
|
||||
if ident.ACLToken == nil && ident.Claims == nil {
|
||||
return nil, nil, structs.ErrTokenNotFound
|
||||
}
|
||||
|
||||
// Give the token expiry some slight leeway in case the client and server
|
||||
// clocks are skewed.
|
||||
if token.IsExpired(time.Now().Add(2 * time.Second)) {
|
||||
if ident.IsExpired(time.Now().Add(2 * time.Second)) {
|
||||
return nil, nil, structs.ErrTokenExpired
|
||||
}
|
||||
|
||||
// Check if this is a management token
|
||||
if token.Type == structs.ACLManagementToken {
|
||||
return acl.ManagementACL, token, nil
|
||||
}
|
||||
var policies []*structs.ACLPolicy
|
||||
|
||||
// Resolve the policy links within the token ACL roles.
|
||||
policyNames, err := c.resolveTokenACLRoles(secretID, token.Roles)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Resolve token policies
|
||||
if token := ident.ACLToken; token != nil {
|
||||
// Check if this is a management token
|
||||
if ident.ACLToken.Type == structs.ACLManagementToken {
|
||||
return acl.ManagementACL, ident, nil
|
||||
}
|
||||
|
||||
// Generate a slice of all policy names included within the token, taken
|
||||
// from both the ACL roles and the direct assignments.
|
||||
policyNames = append(policyNames, token.Policies...)
|
||||
// Resolve the policy links within the token ACL roles.
|
||||
policyNames, err := c.resolveTokenACLRoles(bearerToken, token.Roles)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Resolve the policies
|
||||
policies, err := c.resolvePolicies(token.SecretID, policyNames)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
// Generate a slice of all policy names included within the token, taken
|
||||
// from both the ACL roles and the direct assignments.
|
||||
policyNames = append(policyNames, token.Policies...)
|
||||
|
||||
// Resolve ACL token policies
|
||||
if policies, err = c.resolvePolicies(token.SecretID, policyNames); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
} else {
|
||||
// Resolve policies for workload identities
|
||||
policyArgs := structs.GenericRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
AuthToken: bearerToken,
|
||||
Region: c.Region(),
|
||||
},
|
||||
}
|
||||
policyReply := structs.ACLPolicySetResponse{}
|
||||
if err := c.RPC("ACL.GetClaimPolicies", &policyArgs, &policyReply); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
policies = make([]*structs.ACLPolicy, 0, len(policyReply.Policies))
|
||||
for _, p := range policyReply.Policies {
|
||||
policies = append(policies, p)
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve the ACL object
|
||||
|
@ -112,20 +130,21 @@ func (c *Client) resolveTokenAndACL(secretID string) (*acl.ACL, *structs.ACLToke
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return aclObj, token, nil
|
||||
return aclObj, ident, nil
|
||||
}
|
||||
|
||||
// resolveTokenValue is used to translate a secret ID into an ACL token with caching
|
||||
// We use a local cache up to the TTL limit, and then resolve via a server. If we cannot
|
||||
// resolveTokenValue is used to translate a bearer token, either an ACL token's
|
||||
// secret or a workload identity, into an ACL token with caching We use a local
|
||||
// cache up to the TTL limit, and then resolve via a server. If we cannot
|
||||
// reach a server, but have a cached value we extend the TTL to gracefully handle outages.
|
||||
func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
|
||||
func (c *Client) resolveTokenValue(bearerToken string) (*structs.AuthenticatedIdentity, error) {
|
||||
// Hot-path the anonymous token
|
||||
if secretID == "" {
|
||||
return structs.AnonymousACLToken, nil
|
||||
if bearerToken == "" {
|
||||
return &structs.AuthenticatedIdentity{ACLToken: structs.AnonymousACLToken}, nil
|
||||
}
|
||||
|
||||
// Lookup the token entry in the cache
|
||||
entry, ok := c.tokenCache.Get(secretID)
|
||||
entry, ok := c.tokenCache.Get(bearerToken)
|
||||
if ok {
|
||||
if entry.Age() <= c.GetConfig().ACLTokenTTL {
|
||||
return entry.Get(), nil
|
||||
|
@ -133,15 +152,15 @@ func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
|
|||
}
|
||||
|
||||
// Lookup the token
|
||||
req := structs.ResolveACLTokenRequest{
|
||||
SecretID: secretID,
|
||||
req := structs.GenericRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
AuthToken: bearerToken,
|
||||
Region: c.Region(),
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
var resp structs.ResolveACLTokenResponse
|
||||
if err := c.RPC("ACL.ResolveToken", &req, &resp); err != nil {
|
||||
var resp structs.ACLWhoAmIResponse
|
||||
if err := c.RPC("ACL.WhoAmI", &req, &resp); err != nil {
|
||||
// If we encounter an error but have a cached value, mask the error and extend the cache
|
||||
if ok {
|
||||
c.logger.Warn("failed to resolve token, using expired cached value", "error", err)
|
||||
|
@ -151,8 +170,8 @@ func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
|
|||
}
|
||||
|
||||
// Cache the response (positive or negative)
|
||||
c.tokenCache.Add(secretID, resp.Token)
|
||||
return resp.Token, nil
|
||||
c.tokenCache.Add(bearerToken, resp.Identity)
|
||||
return resp.Identity, nil
|
||||
}
|
||||
|
||||
// resolvePolicies is used to translate a set of named ACL policies into the objects.
|
||||
|
|
|
@ -12,8 +12,8 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/shoenig/test"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_clientACLResolver_init(t *testing.T) {
|
||||
|
@ -47,33 +47,29 @@ func TestClient_ACL_resolveTokenValue(t *testing.T) {
|
|||
token2.Type = structs.ACLManagementToken
|
||||
token2.Policies = nil
|
||||
err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy, policy2})
|
||||
assert.Nil(t, err)
|
||||
must.NoError(t, err)
|
||||
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token, token2})
|
||||
assert.Nil(t, err)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Test the client resolution
|
||||
out0, err := c1.resolveTokenValue("")
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out0)
|
||||
assert.Equal(t, structs.AnonymousACLToken, out0)
|
||||
test.Nil(t, err)
|
||||
must.NotNil(t, out0)
|
||||
test.Eq(t, structs.AnonymousACLToken, out0.ACLToken)
|
||||
|
||||
// Test the client resolution
|
||||
out1, err := c1.resolveTokenValue(token.SecretID)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out1)
|
||||
assert.Equal(t, token, out1)
|
||||
test.Nil(t, err)
|
||||
must.NotNil(t, out1)
|
||||
test.Eq(t, token, out1.ACLToken)
|
||||
|
||||
out2, err := c1.resolveTokenValue(token2.SecretID)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out2)
|
||||
assert.Equal(t, token2, out2)
|
||||
test.Nil(t, err)
|
||||
must.NotNil(t, out2)
|
||||
test.Eq(t, token2, out2.ACLToken)
|
||||
|
||||
out3, err := c1.resolveTokenValue(token.SecretID)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out3)
|
||||
if out1 != out3 {
|
||||
t.Fatalf("bad caching")
|
||||
}
|
||||
test.Nil(t, err)
|
||||
must.Eq(t, out1, out3, must.Sprintf("bad caching"))
|
||||
}
|
||||
|
||||
func TestClient_ACL_resolvePolicies(t *testing.T) {
|
||||
|
@ -98,19 +94,19 @@ func TestClient_ACL_resolvePolicies(t *testing.T) {
|
|||
token2.Type = structs.ACLManagementToken
|
||||
token2.Policies = nil
|
||||
err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy, policy2})
|
||||
assert.Nil(t, err)
|
||||
must.NoError(t, err)
|
||||
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token, token2})
|
||||
assert.Nil(t, err)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Test the client resolution
|
||||
out, err := c1.resolvePolicies(root.SecretID, []string{policy.Name, policy2.Name})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 2, len(out))
|
||||
must.NoError(t, err)
|
||||
test.Len(t, 2, out)
|
||||
|
||||
// Test caching
|
||||
out2, err := c1.resolvePolicies(root.SecretID, []string{policy.Name, policy2.Name})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 2, len(out2))
|
||||
must.NoError(t, err)
|
||||
test.Len(t, 2, out2)
|
||||
|
||||
// Check we get the same objects back (ignore ordering)
|
||||
if out[0] != out2[0] && out[0] != out2[1] {
|
||||
|
@ -173,8 +169,8 @@ func TestClient_ACL_ResolveToken_Disabled(t *testing.T) {
|
|||
|
||||
// Should always get nil when disabled
|
||||
aclObj, err := c1.ResolveToken("blah")
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, aclObj)
|
||||
must.NoError(t, err)
|
||||
must.Nil(t, aclObj)
|
||||
}
|
||||
|
||||
func TestClient_ACL_ResolveToken(t *testing.T) {
|
||||
|
@ -199,36 +195,32 @@ func TestClient_ACL_ResolveToken(t *testing.T) {
|
|||
token2.Type = structs.ACLManagementToken
|
||||
token2.Policies = nil
|
||||
err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{policy, policy2})
|
||||
assert.Nil(t, err)
|
||||
must.NoError(t, err)
|
||||
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token, token2})
|
||||
assert.Nil(t, err)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Test the client resolution
|
||||
out, err := c1.ResolveToken(token.SecretID)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, out)
|
||||
must.NoError(t, err)
|
||||
test.NotNil(t, out)
|
||||
|
||||
// Test caching
|
||||
out2, err := c1.ResolveToken(token.SecretID)
|
||||
assert.Nil(t, err)
|
||||
if out != out2 {
|
||||
t.Fatalf("should be cached")
|
||||
}
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, out, out2, must.Sprintf("should be cached"))
|
||||
|
||||
// Test management token
|
||||
out3, err := c1.ResolveToken(token2.SecretID)
|
||||
assert.Nil(t, err)
|
||||
if acl.ManagementACL != out3 {
|
||||
t.Fatalf("should be management")
|
||||
}
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, acl.ManagementACL, out3)
|
||||
|
||||
// Test bad token
|
||||
out4, err := c1.ResolveToken(uuid.Generate())
|
||||
assert.Equal(t, structs.ErrTokenNotFound, err)
|
||||
assert.Nil(t, out4)
|
||||
test.EqError(t, err, structs.ErrPermissionDenied.Error())
|
||||
test.Nil(t, out4)
|
||||
}
|
||||
|
||||
func TestClient_ACL_ResolveSecretToken(t *testing.T) {
|
||||
func TestClient_ACL_ResolveToken_Expired(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s1, _, _, cleanupS1 := testACLServer(t, nil)
|
||||
|
@ -241,25 +233,118 @@ func TestClient_ACL_ResolveSecretToken(t *testing.T) {
|
|||
})
|
||||
defer cleanup()
|
||||
|
||||
token := mock.ACLToken()
|
||||
|
||||
err := s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 110, []*structs.ACLToken{token})
|
||||
assert.Nil(t, err)
|
||||
|
||||
respToken, err := c1.ResolveSecretToken(token.SecretID)
|
||||
assert.Nil(t, err)
|
||||
if assert.NotNil(t, respToken) {
|
||||
assert.NotEmpty(t, respToken.AccessorID)
|
||||
}
|
||||
|
||||
// Create and upsert a token which has just expired.
|
||||
mockExpiredToken := mock.ACLToken()
|
||||
mockExpiredToken.ExpirationTime = pointer.Of(time.Now().Add(-5 * time.Minute))
|
||||
|
||||
err = s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 120, []*structs.ACLToken{mockExpiredToken})
|
||||
err := s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 120, []*structs.ACLToken{mockExpiredToken})
|
||||
must.NoError(t, err)
|
||||
|
||||
expiredTokenResp, err := c1.ResolveSecretToken(mockExpiredToken.SecretID)
|
||||
expiredTokenResp, err := c1.ResolveToken(mockExpiredToken.SecretID)
|
||||
must.Nil(t, expiredTokenResp)
|
||||
must.StrContains(t, err.Error(), "ACL token expired")
|
||||
must.ErrorContains(t, err, "ACL token expired")
|
||||
}
|
||||
|
||||
// TestClient_ACL_ResolveToken_Claims asserts that ResolveToken
|
||||
// properly resolves valid workload identity claims.
|
||||
func TestClient_ACL_ResolveToken_Claims(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s1, _, rootToken, cleanupS1 := testACLServer(t, nil)
|
||||
defer cleanupS1()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
c1, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.RPCHandler = s1
|
||||
c.ACLEnabled = true
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
// Create a minimal job
|
||||
job := mock.MinJob()
|
||||
|
||||
// Add a job policy
|
||||
polArgs := structs.ACLPolicyUpsertRequest{
|
||||
Policies: []*structs.ACLPolicy{
|
||||
{
|
||||
Name: "nw",
|
||||
Description: "test job can write to nodes",
|
||||
Rules: `node { policy = "write" }`,
|
||||
JobACL: &structs.JobACL{
|
||||
Namespace: job.Namespace,
|
||||
JobID: job.ID,
|
||||
},
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: job.Region,
|
||||
AuthToken: rootToken.SecretID,
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
polReply := structs.GenericResponse{}
|
||||
must.NoError(t, s1.RPC("ACL.UpsertPolicies", &polArgs, &polReply))
|
||||
must.NonZero(t, polReply.WriteMeta.Index)
|
||||
|
||||
allocs := testutil.WaitForRunningWithToken(t, s1.RPC, job, rootToken.SecretID)
|
||||
must.Len(t, 1, allocs)
|
||||
|
||||
alloc, err := s1.State().AllocByID(nil, allocs[0].ID)
|
||||
must.NoError(t, err)
|
||||
must.MapContainsKey(t, alloc.SignedIdentities, "t")
|
||||
wid := alloc.SignedIdentities["t"]
|
||||
|
||||
aclObj, err := c1.ResolveToken(wid)
|
||||
must.NoError(t, err)
|
||||
must.True(t, aclObj.AllowNodeWrite(), must.Sprintf("expected workload id to allow node write"))
|
||||
}
|
||||
|
||||
// TestClient_ACL_ResolveToken_InvalidClaims asserts that ResolveToken properly
|
||||
// rejects invalid workload identity claims.
|
||||
func TestClient_ACL_ResolveToken_InvalidClaims(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s1, _, rootToken, cleanupS1 := testACLServer(t, nil)
|
||||
defer cleanupS1()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
c1, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.RPCHandler = s1
|
||||
c.ACLEnabled = true
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
// Create a minimal job
|
||||
job := mock.MinJob()
|
||||
allocs := testutil.WaitForRunningWithToken(t, s1.RPC, job, rootToken.SecretID)
|
||||
must.Len(t, 1, allocs)
|
||||
|
||||
// Get wid while it's still running
|
||||
alloc, err := s1.State().AllocByID(nil, allocs[0].ID)
|
||||
must.NoError(t, err)
|
||||
must.MapContainsKey(t, alloc.SignedIdentities, "t")
|
||||
wid := alloc.SignedIdentities["t"]
|
||||
|
||||
// Stop job
|
||||
deregArgs := structs.JobDeregisterRequest{
|
||||
JobID: job.ID,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: job.Region,
|
||||
Namespace: job.Namespace,
|
||||
AuthToken: rootToken.SecretID,
|
||||
},
|
||||
}
|
||||
deregReply := structs.JobDeregisterResponse{}
|
||||
must.NoError(t, s1.RPC("Job.Deregister", &deregArgs, &deregReply))
|
||||
|
||||
cond := map[string]int{
|
||||
structs.AllocClientStatusComplete: 1,
|
||||
}
|
||||
allocs = testutil.WaitForJobAllocStatusWithToken(t, s1.RPC, job, cond, rootToken.SecretID)
|
||||
must.Len(t, 1, allocs)
|
||||
|
||||
// ResolveToken should error now that alloc is dead
|
||||
aclObj, err := c1.ResolveToken(wid)
|
||||
must.ErrorContains(t, err, "allocation is terminal")
|
||||
must.Nil(t, aclObj)
|
||||
}
|
||||
|
|
|
@ -204,23 +204,33 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e
|
|||
}
|
||||
alloc := ar.Alloc()
|
||||
|
||||
aclObj, token, err := a.c.resolveTokenAndACL(req.QueryOptions.AuthToken)
|
||||
aclObj, ident, err := a.c.resolveTokenAndACL(req.QueryOptions.AuthToken)
|
||||
{
|
||||
// log access
|
||||
tokenName, tokenID := "", ""
|
||||
if token != nil {
|
||||
tokenName, tokenID = token.Name, token.AccessorID
|
||||
}
|
||||
|
||||
a.c.logger.Info("task exec session starting",
|
||||
logArgs := []any{
|
||||
"exec_id", execID,
|
||||
"alloc_id", req.AllocID,
|
||||
"task", req.Task,
|
||||
"command", req.Cmd,
|
||||
"tty", req.Tty,
|
||||
"access_token_name", tokenName,
|
||||
"access_token_id", tokenID,
|
||||
)
|
||||
}
|
||||
if ident != nil {
|
||||
if ident.ACLToken != nil {
|
||||
logArgs = append(logArgs,
|
||||
"access_token_name", ident.ACLToken.Name,
|
||||
"access_token_id", ident.ACLToken.AccessorID,
|
||||
)
|
||||
} else if ident.Claims != nil {
|
||||
logArgs = append(logArgs,
|
||||
"ns", ident.Claims.Namespace,
|
||||
"job", ident.Claims.JobID,
|
||||
"alloc", ident.Claims.AllocationID,
|
||||
"task", ident.Claims.TaskName,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
a.c.logger.Info("task exec session starting", logArgs...)
|
||||
}
|
||||
|
||||
// Check alloc-exec permission.
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package e2eutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/shoenig/test"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
// ApplyJobPolicy applies an ACL job policy or noops if ACLs are disabled.
|
||||
// Registers a cleanup function to delete the policy.
|
||||
func ApplyJobPolicy(t *testing.T, nomad *api.Client, ns, j, g, task, rules string) *api.ACLPolicy {
|
||||
|
||||
policy := &api.ACLPolicy{
|
||||
Name: j + uuid.Short(),
|
||||
Description: fmt.Sprintf("Policy for test=%s ns=%s job=%s group=%s task=%s rules=%s",
|
||||
t.Name(), ns, j, g, task, rules),
|
||||
Rules: rules,
|
||||
JobACL: &api.JobACL{
|
||||
Namespace: ns,
|
||||
JobID: j,
|
||||
Group: g,
|
||||
Task: task,
|
||||
},
|
||||
}
|
||||
|
||||
wm, err := nomad.ACLPolicies().Upsert(policy, nil)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "ACL support disabled") {
|
||||
t.Logf("ACL support disabled. Skipping ApplyJobPolicy(t, c, %q, %q, %q, %q, %q)",
|
||||
ns, j, g, task, rules)
|
||||
return nil
|
||||
}
|
||||
must.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
_, err := nomad.ACLPolicies().Delete(policy.Name, nil)
|
||||
test.NoError(t, err)
|
||||
})
|
||||
|
||||
policy.CreateIndex = wm.LastIndex
|
||||
policy.ModifyIndex = wm.LastIndex
|
||||
return policy
|
||||
}
|
|
@ -70,6 +70,20 @@ func testDynamicNodeMetadata(t *testing.T) {
|
|||
})
|
||||
must.NoError(t, err)
|
||||
job.ID = pointer.Of(jobID)
|
||||
|
||||
// Setup ACLs
|
||||
for _, task := range job.TaskGroups[0].Tasks {
|
||||
p := e2eutil.ApplyJobPolicy(t, nomad, "default",
|
||||
jobID, *job.TaskGroups[0].Name, task.Name, `node { policy = "write" }`)
|
||||
|
||||
if p == nil {
|
||||
t.Logf("skipping policy for %s as ACLs are disabled", task.Name)
|
||||
} else {
|
||||
t.Logf("created policy %s for %s", p.Name, task.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Register job
|
||||
_, _, err = nomad.Jobs().Register(job, nil)
|
||||
must.NoError(t, err)
|
||||
|
||||
|
|
|
@ -416,6 +416,48 @@ func (a *ACL) GetPolicies(args *structs.ACLPolicySetRequest, reply *structs.ACLP
|
|||
return a.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// GetClaimPolicies return the ACLPolicy objects for a workload identity.
|
||||
// Similar to GetPolicies except an error will *not* be returned if ACLs are
|
||||
// disabled.
|
||||
func (a *ACL) GetClaimPolicies(args *structs.GenericRequest, reply *structs.ACLPolicySetResponse) error {
|
||||
authErr := a.srv.Authenticate(a.ctx, args)
|
||||
if done, err := a.srv.forward("ACL.GetClaimPolicies", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
a.srv.MeasureRPCRate("acl", structs.RateMetricList, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "acl", "get_claim_policies"}, time.Now())
|
||||
|
||||
// Should only be called using a workload identity
|
||||
claims := args.GetIdentity().Claims
|
||||
if claims == nil {
|
||||
// Calling this RPC without a workload identity is either a bug or an
|
||||
// attacker as this RPC is not exposed to users directly.
|
||||
a.logger.Debug("ACL.GetClaimPolicies called without a workload identity", "id", args.GetIdentity())
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
policies, err := a.srv.resolvePoliciesForClaims(claims)
|
||||
if err != nil {
|
||||
// Likely only hit if a job/alloc has been GC'd on the server but the
|
||||
// client hasn't stopped it yet. Return Permission Denied as there's no way
|
||||
// this call should error that leaves the claims valid.
|
||||
a.logger.Warn("Policies could not be resolved for claims", "error", err, "id", args.GetIdentity())
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
reply.Policies = make(map[string]*structs.ACLPolicy, len(policies))
|
||||
for _, p := range policies {
|
||||
if p.ModifyIndex > reply.QueryMeta.Index {
|
||||
reply.QueryMeta.Index = p.ModifyIndex
|
||||
}
|
||||
reply.Policies[p.Name] = p
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Bootstrap is used to bootstrap the initial token
|
||||
func (a *ACL) Bootstrap(args *structs.ACLTokenBootstrapRequest, reply *structs.ACLTokenUpsertResponse) error {
|
||||
// Ensure ACLs are enabled, and always flow modification requests to the authoritative region
|
||||
|
@ -988,7 +1030,12 @@ func (a *ACL) GetTokens(args *structs.ACLTokenSetRequest, reply *structs.ACLToke
|
|||
return a.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// ResolveToken is used to lookup a specific token by a secret ID. This is used for enforcing ACLs by clients.
|
||||
// ResolveToken is used to lookup a specific token by a secret ID.
|
||||
//
|
||||
// Deprecated: Prior to Nomad 1.5 this RPC was used by clients for
|
||||
// authenticating local RPCs. Since Nomad 1.5 added workload identity support,
|
||||
// clients now use the more flexible ACL.WhoAmI RPC. The /v1/acl/token/self API
|
||||
// is the only remaining caller and should be switched to ACL.WhoAmI.
|
||||
func (a *ACL) ResolveToken(args *structs.ResolveACLTokenRequest, reply *structs.ResolveACLTokenResponse) error {
|
||||
if !a.srv.config.ACLEnabled {
|
||||
return aclDisabled
|
||||
|
@ -996,6 +1043,7 @@ func (a *ACL) ResolveToken(args *structs.ResolveACLTokenRequest, reply *structs.
|
|||
if done, err := a.srv.forward("ACL.ResolveToken", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
a.srv.MeasureRPCRate("acl", structs.RateMetricRead, args)
|
||||
defer metrics.MeasureSince([]string{"nomad", "acl", "resolve_token"}, time.Now())
|
||||
|
||||
// Setup the query meta
|
||||
|
|
|
@ -122,6 +122,36 @@ func Job() *structs.Job {
|
|||
return job
|
||||
}
|
||||
|
||||
// MinJob returns a minimal service job with a mock driver task.
|
||||
func MinJob() *structs.Job {
|
||||
job := &structs.Job{
|
||||
ID: "j" + uuid.Short(),
|
||||
Name: "j",
|
||||
Region: "global",
|
||||
Type: "service",
|
||||
TaskGroups: []*structs.TaskGroup{
|
||||
{
|
||||
Name: "g",
|
||||
Count: 1,
|
||||
Tasks: []*structs.Task{
|
||||
{
|
||||
Name: "t",
|
||||
Driver: "mock_driver",
|
||||
Config: map[string]any{
|
||||
// An empty config actually causes an error, so set a reasonably
|
||||
// long run_for duration.
|
||||
"run_for": "10m",
|
||||
},
|
||||
LogConfig: structs.DefaultLogConfig(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
job.Canonicalize()
|
||||
return job
|
||||
}
|
||||
|
||||
func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {
|
||||
job := Job()
|
||||
policy := &structs.ScalingPolicy{
|
||||
|
|
|
@ -281,7 +281,8 @@ type QueryOptions struct {
|
|||
// If set, used as prefix for resource list searches
|
||||
Prefix string
|
||||
|
||||
// AuthToken is secret portion of the ACL token used for the request
|
||||
// AuthToken is secret portion of the ACL token or workload identity used for
|
||||
// the request.
|
||||
AuthToken string
|
||||
|
||||
// Filter specifies the go-bexpr filter expression to be used for
|
||||
|
@ -480,8 +481,13 @@ func (w WriteRequest) GetIdentity() *AuthenticatedIdentity {
|
|||
// ACLToken makes the original of the credential clear to RPC handlers, who may
|
||||
// have different behavior for internal vs external origins.
|
||||
type AuthenticatedIdentity struct {
|
||||
// ACLToken authenticated. Claims will be nil if this is set.
|
||||
ACLToken *ACLToken
|
||||
Claims *IdentityClaims
|
||||
|
||||
// Claims authenticated by workload identity. ACLToken will be nil if this is
|
||||
// set.
|
||||
Claims *IdentityClaims
|
||||
|
||||
ClientID string
|
||||
TLSName string
|
||||
RemoteIP net.IP
|
||||
|
@ -517,6 +523,16 @@ func (ai *AuthenticatedIdentity) String() string {
|
|||
return fmt.Sprintf("%s:%s", ai.TLSName, ai.RemoteIP.String())
|
||||
}
|
||||
|
||||
func (ai *AuthenticatedIdentity) IsExpired(now time.Time) bool {
|
||||
// Only ACLTokens currently support expiry so return unexpired if there isn't
|
||||
// one.
|
||||
if ai.ACLToken == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return ai.ACLToken.IsExpired(now)
|
||||
}
|
||||
|
||||
type RequestWithIdentity interface {
|
||||
GetAuthToken() string
|
||||
SetIdentity(identity *AuthenticatedIdentity)
|
||||
|
@ -10598,10 +10614,10 @@ func (a *Allocation) ToIdentityClaims(job *Job) *IdentityClaims {
|
|||
JobID: a.JobID,
|
||||
AllocationID: a.ID,
|
||||
RegisteredClaims: jwt.RegisteredClaims{
|
||||
// TODO: in Nomad 1.5.0 we'll have a refresh loop to
|
||||
// prevent allocation identities from expiring before the
|
||||
// allocation is terminal. Once that's implemented, add an
|
||||
// ExpiresAt here ExpiresAt: &jwt.NumericDate{},
|
||||
// TODO: implement a refresh loop to prevent allocation identities from
|
||||
// expiring before the allocation is terminal. Once that's implemented,
|
||||
// add an ExpiresAt here ExpiresAt: &jwt.NumericDate{}
|
||||
// https://github.com/hashicorp/nomad/issues/16258
|
||||
NotBefore: now,
|
||||
IssuedAt: now,
|
||||
},
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testFn func() (bool, error)
|
||||
|
@ -256,7 +255,7 @@ func WaitForRunningWithToken(t testing.TB, rpc rpcFn, job *structs.Job, token st
|
|||
|
||||
if len(resp.Allocations) == 0 {
|
||||
evals := structs.JobEvaluationsResponse{}
|
||||
require.NoError(t, rpc("Job.Evaluations", args, &evals), "error looking up evals")
|
||||
must.NoError(t, rpc("Job.Evaluations", args, &evals), must.Sprintf("error looking up evals"))
|
||||
return false, fmt.Errorf("0 allocations; evals: %s", pretty.Sprint(evals.Evaluations))
|
||||
}
|
||||
|
||||
|
@ -269,7 +268,7 @@ func WaitForRunningWithToken(t testing.TB, rpc rpcFn, job *structs.Job, token st
|
|||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
must.NoError(t, err)
|
||||
})
|
||||
|
||||
return resp.Allocations
|
||||
|
@ -289,9 +288,10 @@ func WaitForJobAllocStatus(t testing.TB, rpc rpcFn, job *structs.Job, allocStatu
|
|||
|
||||
// WaitForJobAllocStatusWithToken behaves the same way as WaitForJobAllocStatus
|
||||
// but is used for clusters with ACL enabled.
|
||||
func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int, token string) {
|
||||
func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, allocStatus map[string]int, token string) []*structs.AllocListStub {
|
||||
t.Helper()
|
||||
|
||||
var allocs []*structs.AllocListStub
|
||||
WaitForResultRetries(2000*TestMultiplier(), func() (bool, error) {
|
||||
args := &structs.JobSpecificRequest{
|
||||
JobID: job.ID,
|
||||
|
@ -310,10 +310,12 @@ func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, a
|
|||
|
||||
if len(resp.Allocations) == 0 {
|
||||
evals := structs.JobEvaluationsResponse{}
|
||||
require.NoError(t, rpc("Job.Evaluations", args, &evals), "error looking up evals")
|
||||
must.NoError(t, rpc("Job.Evaluations", args, &evals), must.Sprintf("error looking up evals"))
|
||||
return false, fmt.Errorf("0 allocations; evals: %s", pretty.Sprint(evals.Evaluations))
|
||||
}
|
||||
|
||||
allocs = resp.Allocations
|
||||
|
||||
got := map[string]int{}
|
||||
for _, alloc := range resp.Allocations {
|
||||
got[alloc.ClientStatus]++
|
||||
|
@ -325,6 +327,8 @@ func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, a
|
|||
}, func(err error) {
|
||||
must.NoError(t, err)
|
||||
})
|
||||
|
||||
return allocs
|
||||
}
|
||||
|
||||
// WaitForFiles blocks until all the files in the slice are present
|
||||
|
|
Loading…
Reference in New Issue