deps: upgrade to hashicorp/golang-lru/v2 (#16085)
This commit is contained in:
parent
a4519c739d
commit
0e7bf87ee1
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
acl: refactor ACL cache based on golang-lru/v2
|
||||
```
|
|
@ -3,8 +3,7 @@ package client
|
|||
import (
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
@ -32,54 +31,25 @@ const (
|
|||
// of ACLs
|
||||
type clientACLResolver struct {
|
||||
// aclCache is used to maintain the parsed ACL objects
|
||||
aclCache *lru.TwoQueueCache
|
||||
aclCache *structs.ACLCache[*acl.ACL]
|
||||
|
||||
// policyCache is used to maintain the fetched policy objects
|
||||
policyCache *lru.TwoQueueCache
|
||||
policyCache *structs.ACLCache[*structs.ACLPolicy]
|
||||
|
||||
// tokenCache is used to maintain the fetched token objects
|
||||
tokenCache *lru.TwoQueueCache
|
||||
tokenCache *structs.ACLCache[*structs.ACLToken]
|
||||
|
||||
// roleCache is used to maintain a cache of the fetched ACL roles. Each
|
||||
// entry is keyed by the role ID.
|
||||
roleCache *lru.TwoQueueCache
|
||||
roleCache *structs.ACLCache[*structs.ACLRole]
|
||||
}
|
||||
|
||||
// init is used to setup the client resolver state
|
||||
func (c *clientACLResolver) init() error {
|
||||
// Create the ACL object cache
|
||||
var err error
|
||||
c.aclCache, err = lru.New2Q(aclCacheSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.policyCache, err = lru.New2Q(policyCacheSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.tokenCache, err = lru.New2Q(tokenCacheSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.roleCache, err = lru.New2Q(roleCacheSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// cachedACLValue is used to manage ACL Token, Policy, or Role cache entries
|
||||
// and their TTLs.
|
||||
type cachedACLValue struct {
|
||||
Token *structs.ACLToken
|
||||
Policy *structs.ACLPolicy
|
||||
Role *structs.ACLRole
|
||||
CacheTime time.Time
|
||||
}
|
||||
|
||||
// Age is the time since the token was cached
|
||||
func (c *cachedACLValue) Age() time.Duration {
|
||||
return time.Since(c.CacheTime)
|
||||
func (c *clientACLResolver) init() {
|
||||
c.aclCache = structs.NewACLCache[*acl.ACL](aclCacheSize)
|
||||
c.policyCache = structs.NewACLCache[*structs.ACLPolicy](policyCacheSize)
|
||||
c.tokenCache = structs.NewACLCache[*structs.ACLToken](tokenCacheSize)
|
||||
c.roleCache = structs.NewACLCache[*structs.ACLRole](roleCacheSize)
|
||||
}
|
||||
|
||||
// ResolveToken is used to translate an ACL Token Secret ID into
|
||||
|
@ -154,12 +124,11 @@ func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
|
|||
return structs.AnonymousACLToken, nil
|
||||
}
|
||||
|
||||
// Lookup the token in the cache
|
||||
raw, ok := c.tokenCache.Get(secretID)
|
||||
// Lookup the token entry in the cache
|
||||
entry, ok := c.tokenCache.Get(secretID)
|
||||
if ok {
|
||||
cached := raw.(*cachedACLValue)
|
||||
if cached.Age() <= c.GetConfig().ACLTokenTTL {
|
||||
return cached.Token, nil
|
||||
if entry.Age() <= c.GetConfig().ACLTokenTTL {
|
||||
return entry.Get(), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,17 +145,13 @@ func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
|
|||
// If we encounter an error but have a cached value, mask the error and extend the cache
|
||||
if ok {
|
||||
c.logger.Warn("failed to resolve token, using expired cached value", "error", err)
|
||||
cached := raw.(*cachedACLValue)
|
||||
return cached.Token, nil
|
||||
return entry.Get(), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Cache the response (positive or negative)
|
||||
c.tokenCache.Add(secretID, &cachedACLValue{
|
||||
Token: resp.Token,
|
||||
CacheTime: time.Now(),
|
||||
})
|
||||
c.tokenCache.Add(secretID, resp.Token)
|
||||
return resp.Token, nil
|
||||
}
|
||||
|
||||
|
@ -202,18 +167,17 @@ func (c *Client) resolvePolicies(secretID string, policies []string) ([]*structs
|
|||
// Scan the cache for each policy
|
||||
for _, policyName := range policies {
|
||||
// Lookup the policy in the cache
|
||||
raw, ok := c.policyCache.Get(policyName)
|
||||
entry, ok := c.policyCache.Get(policyName)
|
||||
if !ok {
|
||||
missing = append(missing, policyName)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the cached value is valid or expired
|
||||
cached := raw.(*cachedACLValue)
|
||||
if cached.Age() <= c.GetConfig().ACLPolicyTTL {
|
||||
out = append(out, cached.Policy)
|
||||
if entry.Age() <= c.GetConfig().ACLPolicyTTL {
|
||||
out = append(out, entry.Get())
|
||||
} else {
|
||||
expired = append(expired, cached.Policy)
|
||||
expired = append(expired, entry.Get())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -248,10 +212,7 @@ func (c *Client) resolvePolicies(secretID string, policies []string) ([]*structs
|
|||
|
||||
// Handle each output
|
||||
for _, policy := range resp.Policies {
|
||||
c.policyCache.Add(policy.Name, &cachedACLValue{
|
||||
Policy: policy,
|
||||
CacheTime: time.Now(),
|
||||
})
|
||||
c.policyCache.Add(policy.Name, policy)
|
||||
out = append(out, policy)
|
||||
}
|
||||
|
||||
|
@ -290,7 +251,7 @@ func (c *Client) resolveTokenACLRoles(secretID string, roleLinks []*structs.ACLT
|
|||
// Look within the cache to see if the role is already present. If we
|
||||
// do not find it, add the ID to our tracking, so we look this up via
|
||||
// RPC.
|
||||
raw, ok := c.roleCache.Get(roleLink.ID)
|
||||
entry, ok := c.roleCache.Get(roleLink.ID)
|
||||
if !ok {
|
||||
missingRoleIDs = append(missingRoleIDs, roleLink.ID)
|
||||
continue
|
||||
|
@ -299,13 +260,12 @@ func (c *Client) resolveTokenACLRoles(secretID string, roleLinks []*structs.ACLT
|
|||
// If the cached value is expired, add the ID to our tracking, so we
|
||||
// look this up via RPC. Otherwise, iterate the policy links and add
|
||||
// each policy name to our return object tracking.
|
||||
cached := raw.(*cachedACLValue)
|
||||
if cached.Age() <= c.GetConfig().ACLRoleTTL {
|
||||
for _, policyLink := range cached.Role.Policies {
|
||||
if entry.Age() <= c.GetConfig().ACLRoleTTL {
|
||||
for _, policyLink := range entry.Get().Policies {
|
||||
policyNames = append(policyNames, policyLink.Name)
|
||||
}
|
||||
} else {
|
||||
expiredRoleIDs = append(expiredRoleIDs, cached.Role.ID)
|
||||
expiredRoleIDs = append(expiredRoleIDs, entry.Get().ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -354,13 +314,11 @@ func (c *Client) resolveTokenACLRoles(secretID string, roleLinks []*structs.ACLT
|
|||
// Generate a timestamp for the cache entry. We do not need to use a
|
||||
// timestamp per ACL role response integration.
|
||||
now := time.Now()
|
||||
|
||||
for _, aclRole := range roleByIDResp.ACLRoles {
|
||||
|
||||
// Add an entry to the cache using the generated timestamp for future
|
||||
// expiry calculations. Any existing, expired entry will be
|
||||
// overwritten.
|
||||
c.roleCache.Add(aclRole.ID, &cachedACLValue{Role: aclRole, CacheTime: now})
|
||||
c.roleCache.AddAtTime(aclRole.ID, aclRole, now)
|
||||
|
||||
// Iterate the role policy links, extracting the name and adding this
|
||||
// to our return response tracking.
|
||||
|
|
|
@ -17,8 +17,8 @@ import (
|
|||
)
|
||||
|
||||
func Test_clientACLResolver_init(t *testing.T) {
|
||||
resolver := &clientACLResolver{}
|
||||
must.NoError(t, resolver.init())
|
||||
resolver := new(clientACLResolver)
|
||||
resolver.init()
|
||||
must.NotNil(t, resolver.aclCache)
|
||||
must.NotNil(t, resolver.policyCache)
|
||||
must.NotNil(t, resolver.tokenCache)
|
||||
|
|
|
@ -441,9 +441,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
|||
c.setupClientRpc(rpcs)
|
||||
|
||||
// Initialize the ACL state
|
||||
if err := c.clientACLResolver.init(); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize ACL state: %v", err)
|
||||
}
|
||||
c.clientACLResolver.init()
|
||||
|
||||
// Setup the node
|
||||
if err := c.setupNode(); err != nil {
|
||||
|
|
4
go.mod
4
go.mod
|
@ -69,7 +69,7 @@ require (
|
|||
github.com/hashicorp/go-syslog v1.0.0
|
||||
github.com/hashicorp/go-uuid v1.0.3
|
||||
github.com/hashicorp/go-version v1.6.0
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.1
|
||||
github.com/hashicorp/hcl v1.0.1-vault-3
|
||||
github.com/hashicorp/hcl/v2 v2.9.2-0.20220525143345-ab3cae0737bc
|
||||
github.com/hashicorp/hil v0.0.0-20210521165536-27a72121fd40
|
||||
|
@ -214,7 +214,7 @@ require (
|
|||
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect
|
||||
github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1 // indirect
|
||||
github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.2 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/hashicorp/mdns v1.0.4 // indirect
|
||||
github.com/hashicorp/vault/api/auth/kubernetes v0.3.0 // indirect
|
||||
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -732,8 +732,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
|
|||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.0 h1:Lf+9eD8m5pncvHAOCQj49GSN6aQI8XGfI5OpXNkoWaA=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.0/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hashicorp/hcl v1.0.1-0.20201016140508-a07e7d50bbee h1:8B4HqvMUtYSjsGkYjiQGStc9pXffY2J+Z2SPQAj+wMY=
|
||||
github.com/hashicorp/hcl v1.0.1-0.20201016140508-a07e7d50bbee/go.mod h1:gwlu9+/P9MmKtYrMsHeFRZPXj2CTPm11TDnMeaRHS7g=
|
||||
github.com/hashicorp/hcl/v2 v2.9.2-0.20220525143345-ab3cae0737bc h1:32lGaCPq5JPYNgFFTjl/cTIar9UWWxCbimCs5G2hMHg=
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
// Package lang provides some features that really 'ought to be part of the Go language
|
||||
package lang
|
|
@ -0,0 +1,7 @@
|
|||
package lang
|
||||
|
||||
// Pair associates two arbitrary types together.
|
||||
type Pair[T, U any] struct {
|
||||
First T
|
||||
Second U
|
||||
}
|
|
@ -7,7 +7,6 @@ import (
|
|||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
|
@ -283,7 +282,7 @@ func (s *Server) ResolveClaims(claims *structs.IdentityClaims) (*acl.ACL, error)
|
|||
// resolveTokenFromSnapshotCache is used to resolve an ACL object from a
|
||||
// snapshot of state, using a cache to avoid parsing and ACL construction when
|
||||
// possible. It is split from resolveToken to simplify testing.
|
||||
func resolveTokenFromSnapshotCache(snap *state.StateSnapshot, cache *lru.TwoQueueCache, secretID string) (*acl.ACL, error) {
|
||||
func resolveTokenFromSnapshotCache(snap *state.StateSnapshot, cache *structs.ACLCache[*acl.ACL], secretID string) (*acl.ACL, error) {
|
||||
// Lookup the ACL Token
|
||||
var token *structs.ACLToken
|
||||
var err error
|
||||
|
@ -308,7 +307,7 @@ func resolveTokenFromSnapshotCache(snap *state.StateSnapshot, cache *lru.TwoQueu
|
|||
|
||||
}
|
||||
|
||||
func resolveACLFromToken(snap *state.StateSnapshot, cache *lru.TwoQueueCache, token *structs.ACLToken) (*acl.ACL, error) {
|
||||
func resolveACLFromToken(snap *state.StateSnapshot, cache *structs.ACLCache[*acl.ACL], token *structs.ACLToken) (*acl.ACL, error) {
|
||||
|
||||
// Check if this is a management token
|
||||
if token.Type == structs.ACLManagementToken {
|
||||
|
|
|
@ -4,9 +4,9 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
@ -36,7 +36,7 @@ func (n *NoopBadNodeTracker) Add(string) bool {
|
|||
// frequency and recency.
|
||||
type CachedBadNodeTracker struct {
|
||||
logger hclog.Logger
|
||||
cache *lru.TwoQueueCache
|
||||
cache *lru.TwoQueueCache[string, *badNodeStats]
|
||||
limiter *rate.Limiter
|
||||
window time.Duration
|
||||
threshold int
|
||||
|
@ -72,7 +72,7 @@ func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerCon
|
|||
With("window", config.Window).
|
||||
With("threshold", config.Threshold)
|
||||
|
||||
cache, err := lru.New2Q(config.CacheSize)
|
||||
cache, err := lru.New2Q[string, *badNodeStats](config.CacheSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create new bad node tracker: %v", err)
|
||||
}
|
||||
|
@ -93,12 +93,11 @@ func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerCon
|
|||
// cache. If the cache is full the least recently updated or accessed node is
|
||||
// evicted.
|
||||
func (c *CachedBadNodeTracker) Add(nodeID string) bool {
|
||||
value, ok := c.cache.Get(nodeID)
|
||||
stats, ok := c.cache.Get(nodeID)
|
||||
if !ok {
|
||||
value = newBadNodeStats(nodeID, c.window)
|
||||
c.cache.Add(nodeID, value)
|
||||
stats = newBadNodeStats(nodeID, c.window)
|
||||
c.cache.Add(nodeID, stats)
|
||||
}
|
||||
stats := value.(*badNodeStats)
|
||||
|
||||
now := time.Now()
|
||||
stats.record(now)
|
||||
|
@ -147,13 +146,12 @@ func (c *CachedBadNodeTracker) isBad(t time.Time, stats *badNodeStats) bool {
|
|||
|
||||
func (c *CachedBadNodeTracker) emitStats() {
|
||||
now := time.Now()
|
||||
for _, k := range c.cache.Keys() {
|
||||
value, _ := c.cache.Get(k)
|
||||
stats := value.(*badNodeStats)
|
||||
for _, nodeID := range c.cache.Keys() {
|
||||
stats, _ := c.cache.Get(nodeID)
|
||||
score := stats.score(now)
|
||||
|
||||
labels := []metrics.Label{
|
||||
{Name: "node_id", Value: k.(string)},
|
||||
{Name: "node_id", Value: nodeID},
|
||||
}
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "plan", "rejection_tracker", "node_score"}, float32(score), labels)
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TesCachedtBadNodeTracker(t *testing.T) {
|
||||
func TestCachedtBadNodeTracker(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
config := DefaultCachedBadNodeTrackerConfig()
|
||||
|
@ -74,11 +74,10 @@ func TestCachedBadNodeTracker_isBad(t *testing.T) {
|
|||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Read value from cached.
|
||||
v, ok := tracker.cache.Get(tc.nodeID)
|
||||
stats, ok := tracker.cache.Get(tc.nodeID)
|
||||
require.True(t, ok)
|
||||
|
||||
// Check if it's bad.
|
||||
stats := v.(*badNodeStats)
|
||||
got := tracker.isBad(now, stats)
|
||||
require.Equal(t, tc.bad, got)
|
||||
})
|
||||
|
@ -88,10 +87,9 @@ func TestCachedBadNodeTracker_isBad(t *testing.T) {
|
|||
nodes := []string{"node-1", "node-2", "node-3"}
|
||||
for _, n := range nodes {
|
||||
t.Run(fmt.Sprintf("%s cache expires", n), func(t *testing.T) {
|
||||
v, ok := tracker.cache.Get(n)
|
||||
stats, ok := tracker.cache.Get(n)
|
||||
require.True(t, ok)
|
||||
|
||||
stats := v.(*badNodeStats)
|
||||
bad := tracker.isBad(future, stats)
|
||||
require.False(t, bad)
|
||||
})
|
||||
|
@ -115,11 +113,9 @@ func TesCachedtBadNodeTracker_rateLimit(t *testing.T) {
|
|||
tracker.Add("node-1")
|
||||
tracker.Add("node-1")
|
||||
|
||||
v, ok := tracker.cache.Get("node-1")
|
||||
stats, ok := tracker.cache.Get("node-1")
|
||||
require.True(t, ok)
|
||||
|
||||
stats := v.(*badNodeStats)
|
||||
|
||||
// Burst allows for max 3 operations.
|
||||
now := time.Now()
|
||||
require.True(t, tracker.isBad(now, stats))
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
consulapi "github.com/hashicorp/consul/api"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/raft"
|
||||
autopilot "github.com/hashicorp/raft-autopilot"
|
||||
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
||||
|
@ -254,7 +254,7 @@ type Server struct {
|
|||
workersEventCh chan interface{}
|
||||
|
||||
// aclCache is used to maintain the parsed ACL objects
|
||||
aclCache *lru.TwoQueueCache
|
||||
aclCache *structs.ACLCache[*acl.ACL]
|
||||
|
||||
// oidcProviderCache maintains a cache of OIDC providers. This is useful as
|
||||
// the provider performs background HTTP requests. When the Nomad server is
|
||||
|
@ -341,10 +341,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
|
|||
}
|
||||
|
||||
// Create the ACL object cache
|
||||
aclCache, err := lru.New2Q(aclCacheSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
aclCache := structs.NewACLCache[*acl.ACL](aclCacheSize)
|
||||
|
||||
// Create the logger
|
||||
logger := config.Logger.ResetNamedIntercept("nomad")
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
|
@ -42,7 +41,7 @@ type EventBroker struct {
|
|||
publishCh chan *structs.Events
|
||||
|
||||
aclDelegate ACLDelegate
|
||||
aclCache *lru.TwoQueueCache
|
||||
aclCache *structs.ACLCache[*acl.ACL]
|
||||
|
||||
aclCh chan structs.Event
|
||||
|
||||
|
@ -63,11 +62,6 @@ func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBroke
|
|||
cfg.EventBufferSize = 100
|
||||
}
|
||||
|
||||
aclCache, err := lru.New2Q(aclCacheSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buffer := newEventBuffer(cfg.EventBufferSize)
|
||||
e := &EventBroker{
|
||||
logger: cfg.Logger.Named("event_broker"),
|
||||
|
@ -75,7 +69,7 @@ func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBroke
|
|||
publishCh: make(chan *structs.Events, 64),
|
||||
aclCh: make(chan structs.Event, 10),
|
||||
aclDelegate: aclDelegate,
|
||||
aclCache: aclCache,
|
||||
aclCache: structs.NewACLCache[*acl.ACL](aclCacheSize),
|
||||
subscriptions: &subscriptions{
|
||||
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
||||
},
|
||||
|
@ -277,7 +271,7 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
|
|||
}
|
||||
|
||||
func aclObjFromSnapshotForTokenSecretID(
|
||||
aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (
|
||||
aclSnapshot ACLTokenProvider, aclCache *structs.ACLCache[*acl.ACL], tokenSecretID string) (
|
||||
*acl.ACL, *time.Time, error) {
|
||||
|
||||
aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
|
||||
|
|
|
@ -11,11 +11,14 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-set"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/pointer"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/lib/lang"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
"golang.org/x/exp/slices"
|
||||
"oss.indeed.com/go/libtime"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -187,10 +190,48 @@ var (
|
|||
// ValidACLRoleName is used to validate an ACL role name.
|
||||
ValidACLRoleName = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$")
|
||||
|
||||
// validACLAuthMethodName is used to validate an ACL auth method name.
|
||||
// ValidACLAuthMethod is used to validate an ACL auth method name.
|
||||
ValidACLAuthMethod = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$")
|
||||
)
|
||||
|
||||
type ACLCacheEntry[T any] lang.Pair[T, time.Time]
|
||||
|
||||
func (e ACLCacheEntry[T]) Age() time.Duration {
|
||||
return time.Since(e.Second)
|
||||
}
|
||||
|
||||
func (e ACLCacheEntry[T]) Get() T {
|
||||
return e.First
|
||||
}
|
||||
|
||||
// An ACLCache caches ACL tokens by their policy content.
|
||||
type ACLCache[T any] struct {
|
||||
*lru.TwoQueueCache[string, ACLCacheEntry[T]]
|
||||
clock libtime.Clock
|
||||
}
|
||||
|
||||
func (c *ACLCache[T]) Add(key string, item T) {
|
||||
c.AddAtTime(key, item, c.clock.Now())
|
||||
}
|
||||
|
||||
func (c *ACLCache[T]) AddAtTime(key string, item T, now time.Time) {
|
||||
c.TwoQueueCache.Add(key, ACLCacheEntry[T]{
|
||||
First: item,
|
||||
Second: now,
|
||||
})
|
||||
}
|
||||
|
||||
func NewACLCache[T any](size int) *ACLCache[T] {
|
||||
c, err := lru.New2Q[string, ACLCacheEntry[T]](size)
|
||||
if err != nil {
|
||||
panic(err) // not possible
|
||||
}
|
||||
return &ACLCache[T]{
|
||||
TwoQueueCache: c,
|
||||
clock: libtime.SystemClock(),
|
||||
}
|
||||
}
|
||||
|
||||
// ACLTokenRoleLink is used to link an ACL token to an ACL role. The ACL token
|
||||
// can therefore inherit all the ACL policy permissions that the ACL role
|
||||
// contains.
|
||||
|
|
|
@ -10,9 +10,8 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-set"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
)
|
||||
|
@ -436,7 +435,7 @@ func ACLPolicyListHash(policies []*ACLPolicy) string {
|
|||
}
|
||||
|
||||
// CompileACLObject compiles a set of ACL policies into an ACL object with a cache
|
||||
func CompileACLObject(cache *lru.TwoQueueCache, policies []*ACLPolicy) (*acl.ACL, error) {
|
||||
func CompileACLObject(cache *ACLCache[*acl.ACL], policies []*ACLPolicy) (*acl.ACL, error) {
|
||||
// Sort the policies to ensure consistent ordering
|
||||
sort.Slice(policies, func(i, j int) bool {
|
||||
return policies[i].Name < policies[j].Name
|
||||
|
@ -444,9 +443,9 @@ func CompileACLObject(cache *lru.TwoQueueCache, policies []*ACLPolicy) (*acl.ACL
|
|||
|
||||
// Determine the cache key
|
||||
cacheKey := ACLPolicyListHash(policies)
|
||||
aclRaw, ok := cache.Get(cacheKey)
|
||||
entry, ok := cache.Get(cacheKey)
|
||||
if ok {
|
||||
return aclRaw.(*acl.ACL), nil
|
||||
return entry.Get(), nil
|
||||
}
|
||||
|
||||
// Parse the policies
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -1007,8 +1007,7 @@ func TestCompileACLObject(t *testing.T) {
|
|||
p2.Name = fmt.Sprintf("policy-%s", uuid.Generate())
|
||||
|
||||
// Create a small cache
|
||||
cache, err := lru.New2Q(16)
|
||||
assert.Nil(t, err)
|
||||
cache := NewACLCache[*acl.ACL](10)
|
||||
|
||||
// Test compilation
|
||||
aclObj, err := CompileACLObject(cache, []*ACLPolicy{p1})
|
||||
|
|
|
@ -8,5 +8,5 @@ codecgen \
|
|||
-d 100 \
|
||||
-t codegen_generated \
|
||||
-o structs.generated.go \
|
||||
-nr="^IdentityClaims$" \
|
||||
-nr="(^ACLCache$)|(^IdentityClaims$)" \
|
||||
${FILES}
|
||||
|
|
Loading…
Reference in New Issue