Merge pull request #8553 from hashicorp/b-vault-revoke-batch
vault: expired tokens count toward batch limit
This commit is contained in:
commit
947491c255
|
@ -242,6 +242,10 @@ type vaultClient struct {
|
|||
// setConfigLock serializes access to the SetConfig method
|
||||
setConfigLock sync.Mutex
|
||||
|
||||
// consts as struct fields for overriding in tests
|
||||
maxRevokeBatchSize int
|
||||
revocationIntv time.Duration
|
||||
|
||||
entHandler taskClientHandler
|
||||
}
|
||||
|
||||
|
@ -267,13 +271,15 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
|
|||
}
|
||||
|
||||
v := &vaultClient{
|
||||
config: c,
|
||||
logger: logger.Named("vault"),
|
||||
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
|
||||
revoking: make(map[*structs.VaultAccessor]time.Time),
|
||||
purgeFn: purgeFn,
|
||||
tomb: &tomb.Tomb{},
|
||||
entHandler: delegate,
|
||||
config: c,
|
||||
logger: logger.Named("vault"),
|
||||
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
|
||||
revoking: make(map[*structs.VaultAccessor]time.Time),
|
||||
purgeFn: purgeFn,
|
||||
tomb: &tomb.Tomb{},
|
||||
maxRevokeBatchSize: maxVaultRevokeBatchSize,
|
||||
revocationIntv: vaultRevocationIntv,
|
||||
entHandler: delegate,
|
||||
}
|
||||
|
||||
if v.config.IsEnabled() {
|
||||
|
@ -1259,19 +1265,22 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
|
|||
}
|
||||
|
||||
// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
|
||||
// at any given time.
|
||||
// and purge at any given time.
|
||||
//
|
||||
// Limiting the revocation batch size is beneficial for few reasons:
|
||||
// * A single revocation failure of any entry in batch result into retrying the whole batch;
|
||||
// the larger the batch is the higher likelihood of such failure
|
||||
// * Smaller batch sizes result into more co-operativeness: provides hooks for
|
||||
// reconsidering token TTL and leadership steps down.
|
||||
// * Batches limit the size of the Raft message purging tokens. Due to bugs
|
||||
// pre-0.11.3, expired tokens were not properly purged, so users upgrading from
|
||||
// older versions may have huge numbers (millions) of expired tokens to purge.
|
||||
const maxVaultRevokeBatchSize = 1000
|
||||
|
||||
// revokeDaemon should be called in a goroutine and is used to periodically
|
||||
// revoke Vault accessors that failed the original revocation
|
||||
func (v *vaultClient) revokeDaemon() {
|
||||
ticker := time.NewTicker(vaultRevocationIntv)
|
||||
ticker := time.NewTicker(v.revocationIntv)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
|
@ -1293,8 +1302,8 @@ func (v *vaultClient) revokeDaemon() {
|
|||
|
||||
// Build the list of accessors that need to be revoked while pruning any TTL'd checks
|
||||
toRevoke := len(v.revoking)
|
||||
if toRevoke > maxVaultRevokeBatchSize {
|
||||
toRevoke = maxVaultRevokeBatchSize
|
||||
if toRevoke > v.maxRevokeBatchSize {
|
||||
toRevoke = v.maxRevokeBatchSize
|
||||
}
|
||||
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
|
||||
ttlExpired := []*structs.VaultAccessor{}
|
||||
|
@ -1305,7 +1314,10 @@ func (v *vaultClient) revokeDaemon() {
|
|||
revoking = append(revoking, va)
|
||||
}
|
||||
|
||||
if len(revoking) >= toRevoke {
|
||||
// Batches should consider tokens to be revoked
|
||||
// as well as expired tokens to ensure the Raft
|
||||
// message is reasonably sized.
|
||||
if len(revoking)+len(ttlExpired) >= toRevoke {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -1436,7 +1437,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestVaultClient_RevokeTokens_failures_TTL asserts that
|
||||
// TestVaultClient_RevokeTokens_Failures_TTL asserts that
|
||||
// the registered TTL doesn't get extended on retries
|
||||
func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
@ -1694,6 +1695,81 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) {
|
|||
require.Errorf(t, err, "failed to purge token: %v", s)
|
||||
}
|
||||
|
||||
// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation
|
||||
// batches are bounded in size.
|
||||
func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) {
|
||||
t.Parallel()
|
||||
v := testutil.NewTestVault(t)
|
||||
defer v.Stop()
|
||||
|
||||
// Set the configs token in a new test role
|
||||
v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5)
|
||||
|
||||
// Disable client until we can change settings for testing
|
||||
conf := v.Config.Copy()
|
||||
conf.Enabled = helper.BoolToPtr(false)
|
||||
|
||||
const (
|
||||
batchSize = 100
|
||||
batches = 3
|
||||
)
|
||||
resultCh := make(chan error, batches)
|
||||
var totalPurges int64
|
||||
|
||||
// Purge function asserts batches are always < batchSize
|
||||
purge := func(vas []*structs.VaultAccessor) error {
|
||||
if len(vas) > batchSize {
|
||||
resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize)
|
||||
} else {
|
||||
resultCh <- nil
|
||||
}
|
||||
atomic.AddInt64(&totalPurges, int64(len(vas)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := testlog.HCLogger(t)
|
||||
client, err := NewVaultClient(conf, logger, purge, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Override settings for testing and then enable client
|
||||
client.maxRevokeBatchSize = batchSize
|
||||
client.revocationIntv = 3 * time.Millisecond
|
||||
conf = v.Config.Copy()
|
||||
conf.Enabled = helper.BoolToPtr(true)
|
||||
require.NoError(t, client.SetConfig(conf))
|
||||
|
||||
client.SetActive(true)
|
||||
defer client.Stop()
|
||||
|
||||
waitForConnection(client, t)
|
||||
|
||||
// Create more tokens in Nomad than can fit in a batch; they don't need
|
||||
// to exist in Vault.
|
||||
accessors := make([]*structs.VaultAccessor, batchSize*batches)
|
||||
for i := 0; i < len(accessors); i++ {
|
||||
accessors[i] = &structs.VaultAccessor{Accessor: "abcd"}
|
||||
}
|
||||
|
||||
// Mark for revocation
|
||||
require.NoError(t, client.MarkForRevocation(accessors))
|
||||
|
||||
// Wait for tokens to be revoked
|
||||
for i := 0; i < batches; i++ {
|
||||
select {
|
||||
case err := <-resultCh:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(10 * time.Second):
|
||||
// 10 seconds should be plenty long to process 3
|
||||
// batches at a 3ms tick interval!
|
||||
t.Errorf("timed out processing %d batches. %d/%d complete in 10s",
|
||||
batches, i, batches)
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, int64(len(accessors)), atomic.LoadInt64(&totalPurges))
|
||||
}
|
||||
|
||||
func waitForConnection(v *vaultClient, t *testing.T) {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return v.ConnectionEstablished()
|
||||
|
|
Loading…
Reference in a new issue