Vault-1403 Switch Expiration Manager to use Fairsharing Backpressure (#1709) (#10932)

* basic pool and start testing

* refactor a bit for testing

* workFunc, start/stop safety, testing

* cleanup function for worker quit, more tests

* redo public/private members

* improve tests, export types, switch uuid package

* fix loop capture bug, cleanup

* cleanup tests

* update worker pool file name, other improvements

* add job manager prototype

* remove remnants

* add functions to wait for job manager and worker pool to stop, other fixes

* test job manager functionality, fix bugs

* encapsulate how jobs are distributed to workers

* make worker job channel read only

* add job interface, more testing, fixes

* set name for dispatcher

* fix test races

* wire up expiration manager most of the way

* dispatcher and job manager constructors don't return errors

* logger now dependency injected

* make some members private, test fcn to get worker pool size

* make GetNumWorkers public

* Update helper/fairshare/jobmanager_test.go

Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>

* update fairsharing usage, add tests

* make workerpool private

* remove custom worker names

* concurrency improvements

* remove worker pool cleanup function

* remove cleanup func from job manager, remove non blocking stop from fairshare

* update job manager for new constructor

* stop job manager when expiration manager stopped

* unset env var after test

* stop fairshare when started in tests

* stop leaking job manager goroutine

* prototype channel for waking up to assign work

* fix typo/bug and add tests

* improve job manager wake up, fix test typo

* put channel drain back

* better start/pause test for job manager

* comment cleanup

* degrade possible noisy log

* remove closure, clean up context

* improve revocation context timer

* test: reduce number of revocation workers during many tests

* Update vault/expiration.go

Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>

* feedback tweaks

Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>

Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
This commit is contained in:
swayne275 2021-02-17 15:30:27 -07:00 committed by GitHub
parent 7f6ec265a1
commit e4119a6a8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 224 additions and 5 deletions

View File

@ -14,6 +14,7 @@ import (
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sync"
"sync/atomic"
@ -542,6 +543,9 @@ type Core struct {
// KeyRotateGracePeriod is how long we allow an upgrade path
// for standby instances before we delete the upgrade keys
keyRotateGracePeriod *int64
// number of workers to use for lease revocation in the expiration manager
numExpirationWorkers int
}
// CoreConfig is used to parameterize a core
@ -644,6 +648,9 @@ type CoreConfig struct {
// Activity log controls
ActivityLogConfig ActivityLogCoreConfig
// number of workers to use for lease revocation in the expiration manager
NumExpirationWorkers int
}
// GetServiceRegistration returns the config's ServiceRegistration, or nil if it does
@ -726,6 +733,10 @@ func NewCore(conf *CoreConfig) (*Core, error) {
clusterHeartbeatInterval = 5 * time.Second
}
if conf.NumExpirationWorkers == 0 {
conf.NumExpirationWorkers = numExpirationWorkersDefault
}
// Setup the core
c := &Core{
entCore: entCore{},
@ -785,6 +796,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
clusterHeartbeatInterval: clusterHeartbeatInterval,
activityLogConfig: conf.ActivityLogConfig,
keyRotateGracePeriod: new(int64),
numExpirationWorkers: conf.NumExpirationWorkers,
}
c.standbyStopCh.Store(make(chan struct{}))
atomic.StoreUint32(c.sealed, 1)
@ -1922,7 +1934,13 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
if err := c.startRollback(); err != nil {
return err
}
if err := c.setupExpiration(expireLeaseStrategyRevoke); err != nil {
var expirationStrategy ExpireLeaseStrategy
if os.Getenv("VAULT_LEASE_USE_LEGACY_REVOCATION_STRATEGY") != "" {
expirationStrategy = expireLeaseStrategyRevoke
} else {
expirationStrategy = expireLeaseStrategyFairsharing
}
if err := c.setupExpiration(expirationStrategy); err != nil {
return err
}
if err := c.loadAudits(ctx); err != nil {

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/errwrap"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/vault/helper/fairshare"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/framework"
@ -55,12 +56,21 @@ const (
// maxLeaseThreshold is the maximum lease count before generating log warning
maxLeaseThreshold = 256000
// numExpirationWorkersDefault is the maximum amount of workers working on lease expiration
numExpirationWorkersDefault = 200
// number of workers to use for general purpose testing
numExpirationWorkersTest = 10
fairshareWorkersOverrideVar = "VAULT_LEASE_REVOCATION_WORKERS"
)
type pendingInfo struct {
// A subset of the lease entry, cached in memory
cachedLeaseInfo *leaseEntry
timer *time.Timer
cachedLeaseInfo *leaseEntry
timer *time.Timer
revokesAttempted uint8
}
// ExpirationManager is used by the Core to manage leases. Secrets
@ -114,10 +124,130 @@ type ExpirationManager struct {
// RegisterAuth to simulate a partial failure during a token creation
// request. This value should only be set by tests.
testRegisterAuthFailure uberAtomic.Bool
jobManager *fairshare.JobManager
}
type ExpireLeaseStrategy func(context.Context, *ExpirationManager, string, *namespace.Namespace)
// revocationJob should only be created through newRevocationJob()
type revocationJob struct {
leaseID string
nsID string
m *ExpirationManager
nsCtx context.Context
}
func newRevocationJob(nsCtx context.Context, leaseID, nsID string, m *ExpirationManager) (*revocationJob, error) {
if leaseID == "" {
return nil, fmt.Errorf("cannot have empty lease id")
}
if nsID == "" {
return nil, fmt.Errorf("cannot have empty namespace id")
}
if m == nil {
return nil, fmt.Errorf("cannot have nil expiration manager")
}
if nsCtx == nil {
return nil, fmt.Errorf("cannot have nil namespace context.Context")
}
return &revocationJob{
leaseID: leaseID,
nsID: nsID,
m: m,
nsCtx: nsCtx,
}, nil
}
func (r *revocationJob) GetID() string {
return r.leaseID
}
func (r *revocationJob) Execute() error {
metrics.IncrCounterWithLabels([]string{"expire", "lease_expiration"}, 1, []metrics.Label{{"namespace", r.nsID}})
// don't start the timer until the revocation is being executed
revokeCtx, cancel := context.WithTimeout(r.nsCtx, DefaultMaxRequestDuration)
defer cancel()
go func() {
select {
case <-r.m.quitCh:
cancel()
case <-revokeCtx.Done():
}
}()
select {
case <-r.m.quitCh:
r.m.logger.Error("shutting down, not attempting further revocation of lease", "lease_id", r.leaseID)
return nil
case <-r.m.quitContext.Done():
r.m.logger.Error("core context canceled, not attempting further revocation of lease", "lease_id", r.leaseID)
return nil
default:
}
r.m.coreStateLock.RLock()
err := r.m.Revoke(revokeCtx, r.leaseID)
r.m.coreStateLock.RUnlock()
return err
}
func (r *revocationJob) OnFailure(err error) {
// TODO vault 1.8 do another one of these metrics for the zombie lease count and such (see metrics section of VLT-145)
metrics.IncrCounterWithLabels([]string{"expire", "lease_expiration", "error"}, 1, []metrics.Label{{"namespace", r.nsID}})
r.m.logger.Error("failed to revoke lease", "lease_id", r.leaseID, "error", err)
r.m.pendingLock.Lock()
defer r.m.pendingLock.Unlock()
pendingRaw, ok := r.m.pending.Load(r.leaseID)
if !ok {
r.m.logger.Warn("failed to find lease in pending map for revocation retry", "lease_id", r.leaseID)
return
}
pending := pendingRaw.(pendingInfo)
pending.revokesAttempted++
if pending.revokesAttempted >= maxRevokeAttempts {
// TODO vault 1.8 mark this as a zombie
r.m.logger.Trace("lease has consumed all retry attempts", "lease_id", r.leaseID)
return
}
// TODO vault 1.8 we added an exponential backoff library, check to see if it would be useful here
pending.timer.Reset((1 << pending.revokesAttempted) * revokeRetryBase)
r.m.pending.Store(r.leaseID, pending)
}
func expireLeaseStrategyFairsharing(ctx context.Context, m *ExpirationManager, leaseID string, ns *namespace.Namespace) {
nsCtx := namespace.ContextWithNamespace(ctx, ns)
var mountAccessor string
m.coreStateLock.RLock()
mount := m.core.router.MatchingMountEntry(nsCtx, leaseID)
m.coreStateLock.RUnlock()
if mount == nil {
// figure out what this means - if we couldn't find the mount, can we automatically revoke
m.logger.Debug("could not find lease path", "lease_id", leaseID)
mountAccessor = "mount-accessor-not-found"
} else {
mountAccessor = mount.Accessor
}
job, err := newRevocationJob(nsCtx, leaseID, ns.ID, m)
if err != nil {
m.logger.Warn("error creating revocation job", "error", err)
return
}
m.jobManager.AddJob(job, mountAccessor)
}
// revokeIDFunc is invoked when a given ID is expired
func expireLeaseStrategyRevoke(ctx context.Context, m *ExpirationManager, leaseID string, ns *namespace.Namespace) {
for attempt := uint(0); attempt < maxRevokeAttempts; attempt++ {
@ -174,6 +304,24 @@ func expireLeaseStrategyRevoke(ctx context.Context, m *ExpirationManager, leaseI
m.logger.Error("maximum revoke attempts reached", "lease_id", leaseID)
}
func getNumExpirationWorkers(c *Core, l log.Logger) int {
numWorkers := c.numExpirationWorkers
workerOverride := os.Getenv(fairshareWorkersOverrideVar)
if workerOverride != "" {
i, err := strconv.Atoi(workerOverride)
if err != nil {
l.Warn("vault lease revocation workers override must be an integer", "value", workerOverride)
} else if i < 1 || i > 10000 {
l.Warn("vault lease revocation workers override out of range", "value", i)
} else {
numWorkers = i
}
}
return numWorkers
}
// NewExpirationManager creates a new ExpirationManager that is backed
// using a given view, and uses the provided router for revocation.
func NewExpirationManager(c *Core, view *BarrierView, e ExpireLeaseStrategy, logger log.Logger) *ExpirationManager {
@ -189,6 +337,9 @@ func NewExpirationManager(c *Core, view *BarrierView, e ExpireLeaseStrategy, log
}
jobManager := fairshare.NewJobManager("expiration", getNumExpirationWorkers(c, logger), logger.Named("job-manager"))
jobManager.Start()
exp := &ExpirationManager{
core: c,
router: c.router,
@ -217,6 +368,8 @@ func NewExpirationManager(c *Core, view *BarrierView, e ExpireLeaseStrategy, log
logLeaseExpirations: os.Getenv("VAULT_SKIP_LOGGING_LEASE_EXPIRATIONS") == "",
expireFunc: e,
revokePermitPool: permitPool,
jobManager: jobManager,
}
*exp.restoreMode = 1
@ -647,6 +800,8 @@ func (m *ExpirationManager) Stop() error {
m.logger.Debug("stop triggered")
defer m.logger.Debug("finished stopping")
m.jobManager.Stop()
// Do this before stopping pending timers to avoid potential races with
// expiring timers
close(m.quitCh)

View File

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"os"
"reflect"
"sort"
"strings"
@ -16,6 +17,7 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/fairshare"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/framework"
@ -1862,7 +1864,9 @@ func TestExpiration_renewEntry(t *testing.T) {
}
}
func TestExpiration_revokeEntry_rejected(t *testing.T) {
func revokeEntryRejectedCore(t *testing.T, e ExpireLeaseStrategy) {
t.Helper()
core, _, _ := TestCoreUnsealed(t)
exp := core.expiration
@ -1929,7 +1933,7 @@ func TestExpiration_revokeEntry_rejected(t *testing.T) {
t.Fatal(err)
}
err = core.setupExpiration(expireLeaseStrategyRevoke)
err = core.setupExpiration(e)
if err != nil {
t.Fatal(err)
}
@ -1954,6 +1958,14 @@ func TestExpiration_revokeEntry_rejected(t *testing.T) {
}
}
func TestExpiration_revokeEntry_rejected_revoke(t *testing.T) {
revokeEntryRejectedCore(t, expireLeaseStrategyRevoke)
}
func TestExpiration_revokeEntry_rejected_fairsharing(t *testing.T) {
revokeEntryRejectedCore(t, expireLeaseStrategyFairsharing)
}
func TestExpiration_renewAuthEntry(t *testing.T) {
exp := mockExpiration(t)
@ -2449,3 +2461,33 @@ func TestExpiration_CachedPolicyIsShared(t *testing.T) {
}
}
}
func TestExpiration_FairsharingEnvVar(t *testing.T) {
testCases := []struct {
set string
expected int
}{
{
set: "15",
expected: 15,
},
{
set: "0",
expected: numExpirationWorkersTest,
},
{
set: "10001",
expected: numExpirationWorkersTest,
},
}
defer os.Unsetenv(fairshareWorkersOverrideVar)
for _, tc := range testCases {
os.Setenv(fairshareWorkersOverrideVar, tc.set)
exp := mockExpiration(t)
if fairshare.GetNumWorkers(exp.jobManager) != tc.expected {
t.Errorf("bad worker pool size. expected %d, got %d", tc.expected, fairshare.GetNumWorkers(exp.jobManager))
}
}
}

View File

@ -157,6 +157,7 @@ func TestCoreWithSealAndUI(t testing.T, opts *CoreConfig) *Core {
conf.DisableKeyEncodingChecks = opts.DisableKeyEncodingChecks
conf.MetricsHelper = opts.MetricsHelper
conf.MetricSink = opts.MetricSink
conf.NumExpirationWorkers = numExpirationWorkersTest
if opts.Logger != nil {
conf.Logger = opts.Logger
@ -395,6 +396,7 @@ func TestCoreUnsealedBackend(t testing.T, backend physical.Backend) (*Core, [][]
logger := logging.NewVaultLogger(log.Trace)
conf := testCoreConfig(t, backend, logger)
conf.Seal = NewTestSeal(t, nil)
conf.NumExpirationWorkers = numExpirationWorkersTest
core, err := NewCore(conf)
if err != nil {
@ -1817,6 +1819,8 @@ func (testCluster *TestCluster) newCore(t testing.T, idx int, coreConfig *CoreCo
localConfig.MetricSink, localConfig.MetricsHelper = opts.CoreMetricSinkProvider(localConfig.ClusterName)
}
localConfig.NumExpirationWorkers = numExpirationWorkersTest
c, err := NewCore(&localConfig)
if err != nil {
t.Fatalf("err: %v", err)