Vault 2303: Count irrevocable leases in quotas/metrics and other improvements (#11542)

* shrink generic error message

* move zombie loading to updatePendingInternal from loadEntryInternal

* probably the right metric/lease behavior for irrevocable leases...

* comment improvements

* test total lease count with valid and irrevocable leases
This commit is contained in:
swayne275 2021-05-11 14:04:06 -06:00 committed by GitHub
parent 6a573b2d41
commit 0229787a26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 163 additions and 52 deletions

View File

@ -70,11 +70,9 @@ const (
// storage/memory
maxIrrevocableErrorLength = 240
genericIrrevocableErrorMessage = "no error message given"
)
genericIrrevocableErrorMessage = "unknown"
var (
errOutOfRetries = errors.New("out of retries")
outOfRetriesMessage = "out of retries"
)
type pendingInfo struct {
@ -239,7 +237,7 @@ func (r *revocationJob) OnFailure(err error) {
r.m.logger.Trace("marking lease as irrevocable", "lease_id", r.leaseID, "error", err)
if pending.revokesAttempted >= maxRevokeAttempts {
r.m.logger.Trace("lease has consumed all retry attempts", "lease_id", r.leaseID)
err = fmt.Errorf("%v: %w", errOutOfRetries.Error(), err)
err = fmt.Errorf("%v: %w", outOfRetriesMessage, err)
}
le, loadErr := r.m.loadEntry(r.nsCtx, r.leaseID)
@ -528,18 +526,28 @@ func (m *ExpirationManager) invalidate(key string) {
return
}
default:
// Handle lease update
// Update the lease in memory
m.updatePendingInternal(le)
}
default:
// There is no entry in the pending map and the invalidation
// resulted in a nil entry.
if le == nil {
// If in the nonexpiring map, remove there.
// There is no entry in the pending map and the invalidation
// resulted in a nil entry. Therefore we should clean up the
// other maps, and update metrics/quotas if appropriate.
m.nonexpiring.Delete(leaseID)
if _, ok := m.irrevocable.Load(leaseID); ok {
m.irrevocable.Delete(leaseID)
m.leaseCount--
if err := m.core.quotasHandleLeases(ctx, quotas.LeaseActionDeleted, []string{leaseID}); err != nil {
m.logger.Error("failed to update quota on lease invalidation", "error", err)
return
}
}
return
}
// Handle lease creation
// Handle lease update (if irrevocable) or creation (if pending)
m.updatePendingInternal(le)
}
}
@ -985,7 +993,7 @@ func (m *ExpirationManager) revokeCommon(ctx context.Context, leaseID string, fo
// Clear the expiration handler
m.pendingLock.Lock()
m.removeFromPending(ctx, leaseID)
m.removeFromPending(ctx, leaseID, true)
m.nonexpiring.Delete(leaseID)
m.irrevocable.Delete(leaseID)
m.pendingLock.Unlock()
@ -1715,12 +1723,8 @@ func (m *ExpirationManager) updatePending(le *leaseEntry) {
// updatePendingInternal is the locked version of updatePending; do not call
// this without a write lock on m.pending
func (m *ExpirationManager) updatePendingInternal(le *leaseEntry) {
if le.isIrrevocable() {
return
}
// Check for an existing timer
info, ok := m.pending.Load(le.LeaseID)
info, leaseInPending := m.pending.Load(le.LeaseID)
var pending pendingInfo
if le.ExpireTime.IsZero() {
@ -1735,7 +1739,7 @@ func (m *ExpirationManager) updatePendingInternal(le *leaseEntry) {
// if the timer happened to exist, stop the time and delete it from the
// pending timers.
if ok {
if leaseInPending {
info.(pendingInfo).timer.Stop()
m.pending.Delete(le.LeaseID)
m.leaseCount--
@ -1749,31 +1753,44 @@ func (m *ExpirationManager) updatePendingInternal(le *leaseEntry) {
leaseTotal := le.ExpireTime.Sub(time.Now())
leaseCreated := false
// Create entry if it does not exist or reset if it does
if ok {
pending = info.(pendingInfo)
pending.timer.Reset(leaseTotal)
// No change to lease count in this case
} else {
leaseID, namespace := le.LeaseID, le.namespace
// Extend the timer by the lease total
timer := time.AfterFunc(leaseTotal, func() {
m.expireFunc(m.quitContext, m, leaseID, namespace)
})
pending = pendingInfo{
timer: timer,
if le.isIrrevocable() {
// It's possible this function is being called to update the in-memory state
// for a lease from pending to irrevocable (we don't support the opposite).
// If this is the case, we need to know if the lease was previously counted
// so that we can maintain correct metric and quota lease counts.
_, leaseInIrrevocable := m.irrevocable.Load(le.LeaseID)
if !(leaseInPending || leaseInIrrevocable) {
leaseCreated = true
}
// new lease
m.leaseCount++
leaseCreated = true
m.removeFromPending(m.quitContext, le.LeaseID, false)
m.irrevocable.Store(le.LeaseID, m.inMemoryLeaseInfo(le))
} else {
// Create entry if it does not exist or reset if it does
if leaseInPending {
pending = info.(pendingInfo)
pending.timer.Reset(leaseTotal)
// No change to lease count in this case
} else {
leaseID, namespace := le.LeaseID, le.namespace
// Extend the timer by the lease total
timer := time.AfterFunc(leaseTotal, func() {
m.expireFunc(m.quitContext, m, leaseID, namespace)
})
pending = pendingInfo{
timer: timer,
}
leaseCreated = true
}
pending.cachedLeaseInfo = m.inMemoryLeaseInfo(le)
m.pending.Store(le.LeaseID, pending)
}
// Retain some information in-memory
pending.cachedLeaseInfo = m.inMemoryLeaseInfo(le)
m.pending.Store(le.LeaseID, pending)
if leaseCreated {
m.leaseCount++
if err := m.core.quotasHandleLeases(m.quitContext, quotas.LeaseActionCreated, []string{le.LeaseID}); err != nil {
m.logger.Error("failed to update quota on lease creation", "error", err)
return
@ -1913,11 +1930,6 @@ func (m *ExpirationManager) loadEntryInternal(ctx context.Context, leaseID strin
}
le.namespace = ns
if le.isIrrevocable() {
m.irrevocable.Store(le.LeaseID, m.inMemoryLeaseInfo(le))
return le, nil
}
if restoreMode {
if checkRestored {
// If we have already loaded this lease, we don't need to update on
@ -2351,19 +2363,24 @@ func (m *ExpirationManager) walkLeases(walkFn leaseWalkFunction) error {
}
// must be called with m.pendingLock held
func (m *ExpirationManager) removeFromPending(ctx context.Context, leaseID string) {
// set decrementCounters true to decrement the lease count metric and quota
func (m *ExpirationManager) removeFromPending(ctx context.Context, leaseID string, decrementCounters bool) {
if info, ok := m.pending.Load(leaseID); ok {
pending := info.(pendingInfo)
pending.timer.Stop()
m.pending.Delete(leaseID)
m.leaseCount--
// Log but do not fail; unit tests (and maybe Tidy on production systems)
if err := m.core.quotasHandleLeases(ctx, quotas.LeaseActionDeleted, []string{leaseID}); err != nil {
m.logger.Error("failed to update quota on revocation", "error", err)
if decrementCounters {
m.leaseCount--
// Log but do not fail; unit tests (and maybe Tidy on production systems)
if err := m.core.quotasHandleLeases(ctx, quotas.LeaseActionDeleted, []string{leaseID}); err != nil {
m.logger.Error("failed to update quota on revocation", "error", err)
}
}
}
}
// Marks a pending lease as irrevocable. Because the lease is being moved from
// pending to irrevocable, no total lease count metrics/quotas updates are needed
// note: must be called with pending lock held
func (m *ExpirationManager) markLeaseIrrevocable(ctx context.Context, le *leaseEntry, err error) {
if le == nil {
@ -2390,7 +2407,7 @@ func (m *ExpirationManager) markLeaseIrrevocable(ctx context.Context, le *leaseE
m.persistEntry(ctx, le)
m.irrevocable.Store(le.LeaseID, m.inMemoryLeaseInfo(le))
m.removeFromPending(ctx, le.LeaseID)
m.removeFromPending(ctx, le.LeaseID, false)
m.nonexpiring.Delete(le.LeaseID)
}

View File

@ -56,7 +56,7 @@ func TestExpiration_Metrics(t *testing.T) {
// Set up a count function to calculate number of leases
count := 0
countFunc := func(leaseID string) {
countFunc := func(_ string) {
count++
}
@ -177,7 +177,7 @@ func TestExpiration_Metrics(t *testing.T) {
}
if !foundLabelOne || !foundLabelTwo || !foundLabelThree {
t.Errorf("One of the labels is missing")
t.Errorf("One of the labels is missing. one: %t, two: %t, three: %t", foundLabelOne, foundLabelTwo, foundLabelThree)
}
// test the same leases while ignoring namespaces so the 2 different namespaces get aggregated
@ -219,6 +219,99 @@ func TestExpiration_Metrics(t *testing.T) {
}
}
func TestExpiration_TotalLeaseCount(t *testing.T) {
// Quotas and internal lease count tracker are coupled, so this is a proxy
// for testing the total lease count quota
c, _, _ := TestCoreUnsealed(t)
exp := c.expiration
expectedCount := 0
otherNS := &namespace.Namespace{
ID: "nsid",
Path: "foo/bar",
}
for i := 0; i < 50; i++ {
le := &leaseEntry{
LeaseID: "lease" + fmt.Sprintf("%d", i),
Path: "foo/bar/" + fmt.Sprintf("%d", i),
namespace: namespace.RootNamespace,
IssueTime: time.Now(),
ExpireTime: time.Now().Add(time.Hour),
}
otherNSle := &leaseEntry{
LeaseID: "lease" + fmt.Sprintf("%d", i) + "/blah.nsid",
Path: "foo/bar/" + fmt.Sprintf("%d", i) + "/blah.nsid",
namespace: otherNS,
IssueTime: time.Now(),
ExpireTime: time.Now().Add(time.Hour),
}
exp.pendingLock.Lock()
if err := exp.persistEntry(namespace.RootContext(nil), le); err != nil {
exp.pendingLock.Unlock()
t.Fatalf("error persisting irrevocable entry: %v", err)
}
exp.updatePendingInternal(le)
expectedCount++
if err := exp.persistEntry(namespace.RootContext(nil), otherNSle); err != nil {
exp.pendingLock.Unlock()
t.Fatalf("error persisting irrevocable entry: %v", err)
}
exp.updatePendingInternal(otherNSle)
expectedCount++
exp.pendingLock.Unlock()
}
// add some irrevocable leases to each count to ensure they are counted too
// note: irrevocable leases almost certainly have an expire time set in the
// past, but for this exercise it should be fine to set it to whatever
for i := 50; i < 60; i++ {
le := &leaseEntry{
LeaseID: "lease" + fmt.Sprintf("%d", i+1),
Path: "foo/bar/" + fmt.Sprintf("%d", i+1),
namespace: namespace.RootNamespace,
IssueTime: time.Now(),
ExpireTime: time.Now(),
RevokeErr: "some err message",
}
otherNSle := &leaseEntry{
LeaseID: "lease" + fmt.Sprintf("%d", i+1) + "/blah.nsid",
Path: "foo/bar/" + fmt.Sprintf("%d", i+1) + "/blah.nsid",
namespace: otherNS,
IssueTime: time.Now(),
ExpireTime: time.Now(),
RevokeErr: "some err message",
}
exp.pendingLock.Lock()
if err := exp.persistEntry(namespace.RootContext(nil), le); err != nil {
exp.pendingLock.Unlock()
t.Fatalf("error persisting irrevocable entry: %v", err)
}
exp.updatePendingInternal(le)
expectedCount++
if err := exp.persistEntry(namespace.RootContext(nil), otherNSle); err != nil {
exp.pendingLock.Unlock()
t.Fatalf("error persisting irrevocable entry: %v", err)
}
exp.updatePendingInternal(otherNSle)
expectedCount++
exp.pendingLock.Unlock()
}
exp.pendingLock.RLock()
count := exp.leaseCount
exp.pendingLock.RUnlock()
if count != expectedCount {
t.Errorf("bad lease count. expected %d, got %d", expectedCount, count)
}
}
func TestExpiration_Tidy(t *testing.T) {
var err error
@ -1145,7 +1238,7 @@ func TestExpiration_RevokeByToken(t *testing.T) {
defer noop.Unlock()
if len(noop.Requests) != 3 {
t.Fatalf("Bad: %v", noop.Requests)
t.Fatalf("Bad: %#v", noop.Requests)
}
for _, req := range noop.Requests {
if req.Operation != logical.RevokeOperation {
@ -2585,6 +2678,7 @@ func TestExpiration_MarkIrrevocable(t *testing.T) {
if err != nil {
t.Fatalf("error loading non irrevocable lease after restore: %v", err)
}
exp.updatePending(loadedLE)
if !loadedLE.isIrrevocable() {
t.Fatalf("irrevocable lease is not irrevocable and should be")