145 lines
3.3 KiB
Go
145 lines
3.3 KiB
Go
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
func (s *Server) startACLTokenReaping() {
|
|
s.aclTokenReapLock.Lock()
|
|
defer s.aclTokenReapLock.Unlock()
|
|
|
|
if s.aclTokenReapEnabled {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.aclTokenReapCancel = cancel
|
|
|
|
// Do a quick check for config settings that would imply the goroutine
|
|
// below will just spin forever.
|
|
//
|
|
// We can only check the config settings here that cannot change without a
|
|
// restart, so we omit the check for a non-empty replication token as that
|
|
// can be changed at runtime.
|
|
if !s.InACLDatacenter() && !s.config.ACLTokenReplication {
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
limiter := rate.NewLimiter(aclTokenReapingRateLimit, aclTokenReapingBurst)
|
|
|
|
for {
|
|
if err := limiter.Wait(ctx); err != nil {
|
|
return
|
|
}
|
|
|
|
if s.LocalTokensEnabled() {
|
|
if _, err := s.reapExpiredLocalACLTokens(); err != nil {
|
|
s.logger.Printf("[ERR] acl: error reaping expired local ACL tokens: %v", err)
|
|
}
|
|
}
|
|
if s.InACLDatacenter() {
|
|
if _, err := s.reapExpiredGlobalACLTokens(); err != nil {
|
|
s.logger.Printf("[ERR] acl: error reaping expired global ACL tokens: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
s.aclTokenReapEnabled = true
|
|
}
|
|
|
|
func (s *Server) stopACLTokenReaping() {
|
|
s.aclTokenReapLock.Lock()
|
|
defer s.aclTokenReapLock.Unlock()
|
|
|
|
if !s.aclTokenReapEnabled {
|
|
return
|
|
}
|
|
|
|
s.aclTokenReapCancel()
|
|
s.aclTokenReapCancel = nil
|
|
s.aclTokenReapEnabled = false
|
|
}
|
|
|
|
func (s *Server) reapExpiredGlobalACLTokens() (int, error) {
|
|
return s.reapExpiredACLTokens(false, true)
|
|
}
|
|
func (s *Server) reapExpiredLocalACLTokens() (int, error) {
|
|
return s.reapExpiredACLTokens(true, false)
|
|
}
|
|
func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
|
|
if !s.ACLsEnabled() {
|
|
return 0, nil
|
|
}
|
|
if s.UseLegacyACLs() {
|
|
return 0, nil
|
|
}
|
|
if local == global {
|
|
return 0, fmt.Errorf("cannot reap both local and global tokens in the same request")
|
|
}
|
|
|
|
locality := localityName(local)
|
|
|
|
minExpiredTime, err := s.fsm.State().ACLTokenMinExpirationTime(local)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
if minExpiredTime.After(now) {
|
|
return 0, nil // nothing to do
|
|
}
|
|
|
|
tokens, _, err := s.fsm.State().ACLTokenListExpired(local, now, aclBatchDeleteSize)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(tokens) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
var (
|
|
secretIDs []string
|
|
req structs.ACLTokenBatchDeleteRequest
|
|
)
|
|
for _, token := range tokens {
|
|
if token.Local != local {
|
|
return 0, fmt.Errorf("expired index for local=%v returned a mismatched token with local=%v: %s", local, token.Local, token.AccessorID)
|
|
}
|
|
req.TokenIDs = append(req.TokenIDs, token.AccessorID)
|
|
secretIDs = append(secretIDs, token.SecretID)
|
|
}
|
|
|
|
s.logger.Printf("[INFO] acl: deleting %d expired %s tokens", len(req.TokenIDs), locality)
|
|
resp, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err)
|
|
}
|
|
|
|
// Purge the identities from the cache
|
|
for _, secretID := range secretIDs {
|
|
s.acls.cache.RemoveIdentity(secretID)
|
|
}
|
|
|
|
if respErr, ok := resp.(error); ok {
|
|
return 0, respErr
|
|
}
|
|
|
|
return len(req.TokenIDs), nil
|
|
}
|
|
|
|
func localityName(local bool) string {
|
|
if local {
|
|
return "local"
|
|
}
|
|
return "global"
|
|
}
|