Fix leaked Vault LifetimeRenewers (#12607)

* Fix leaked Vault LifetimeRenewers

When the Vault CA Provider is reconfigured we do not stop the
LifetimeRenewers which can cause them to leak until the Consul processes
recycles. On Configure execute stopWatcher if it exists and is not nil
before starting a new renewal

* Add jitter before restarting the LifetimeWatcher

If we fail to login to Vault or our token is no longer valid we can
overwhelm a Vault instance with many requests very quickly by restarting
the LifetimeWatcher. Before restarting the LifetimeWatcher provide a
backoff time of 1 second or less.

* Use a retry.Waiter instead of RandomStagger

* changelog

* gofmt'd

* Swap out bool for atomic.Unit32 in test

* Provide some extra clarification in comment and changelog
This commit is contained in:
Connor 2022-03-28 09:58:16 -05:00 committed by GitHub
parent 2942fb4a75
commit f8fc317731
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 26 deletions

3
.changelog/12607.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect/ca: cancel old Vault renewal on CA configuration. Provide a 1 - 6 second backoff on repeated token renewal requests to prevent overwhelming Vault.
```

View File

@ -12,13 +12,14 @@ import (
"strings"
"time"
"github.com/hashicorp/consul/lib/decode"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog"
vaultapi "github.com/hashicorp/vault/api"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/decode"
)
const (
@ -43,6 +44,10 @@ const (
VaultAuthMethodTypeUserpass = "userpass"
defaultK8SServiceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
retryMin = 1 * time.Second
retryMax = 5 * time.Second
retryJitter = 20
)
var ErrBackendNotMounted = fmt.Errorf("backend not mounted")
@ -52,7 +57,7 @@ type VaultProvider struct {
config *structs.VaultCAProviderConfig
client *vaultapi.Client
shutdown func()
stopWatcher func()
isPrimary bool
clusterID string
@ -63,7 +68,7 @@ type VaultProvider struct {
func NewVaultProvider(logger hclog.Logger) *VaultProvider {
return &VaultProvider{
shutdown: func() {},
stopWatcher: func() {},
logger: logger,
}
}
@ -153,7 +158,10 @@ func (v *VaultProvider) Configure(cfg ProviderConfig) error {
}
ctx, cancel := context.WithCancel(context.Background())
v.shutdown = cancel
if v.stopWatcher != nil {
v.stopWatcher()
}
v.stopWatcher = cancel
go v.renewToken(ctx, lifetimeWatcher)
}
@ -195,16 +203,33 @@ func (v *VaultProvider) renewToken(ctx context.Context, watcher *vaultapi.Lifeti
go watcher.Start()
defer watcher.Stop()
// TODO: Once we've upgraded to a later version of protobuf we can upgrade to github.com/hashicorp/vault/api@1.1.1
// or later and rip this out.
retrier := retry.Waiter{
MinFailures: 5,
MinWait: retryMin,
MaxWait: retryMax,
Jitter: retry.NewJitter(retryJitter),
}
for {
select {
case <-ctx.Done():
return
case err := <-watcher.DoneCh():
// In the event we fail to login to Vault or our token is no longer valid we can overwhelm a Vault instance
// with rate limit configured. We would make these requests to Vault as fast as we possibly could and start
// causing all client's to receive 429 response codes. To mitigate that we're sleeping 1 second or less
// before moving on to login again and restart the lifetime watcher. Once we can upgrade to
// github.com/hashicorp/vault/api@v1.1.1 or later the LifetimeWatcher _should_ perform that backoff for us.
if err != nil {
v.logger.Error("Error renewing token for Vault provider", "error", err)
}
// wait at least 1 second after returning from the lifetime watcher
retrier.Wait(ctx)
// If the watcher has exited and auth method is enabled,
// re-authenticate using the auth method and set up a new watcher.
if v.config.AuthMethod != nil {
@ -212,7 +237,7 @@ func (v *VaultProvider) renewToken(ctx context.Context, watcher *vaultapi.Lifeti
loginResp, err := vaultLogin(v.client, v.config.AuthMethod)
if err != nil {
v.logger.Error("Error login in to Vault with %q auth method", v.config.AuthMethod.Type)
// Restart the watcher.
// Restart the watcher
go watcher.Start()
continue
}
@ -232,11 +257,12 @@ func (v *VaultProvider) renewToken(ctx context.Context, watcher *vaultapi.Lifeti
continue
}
}
// Restart the watcher.
go watcher.Start()
case <-watcher.RenewCh():
retrier.Reset()
v.logger.Info("Successfully renewed token for Vault provider")
}
}
@ -677,7 +703,7 @@ func (v *VaultProvider) Cleanup(providerTypeChange bool, otherConfig map[string]
// Stop shuts down the token renew goroutine.
func (v *VaultProvider) Stop() {
v.shutdown()
v.stopWatcher()
}
func (v *VaultProvider) PrimaryUsesIntermediate() {}

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"sync/atomic"
"testing"
"time"
@ -212,6 +213,52 @@ func TestVaultCAProvider_RenewToken(t *testing.T) {
})
}
func TestVaultCAProvider_RenewTokenStopWatcherOnConfigure(t *testing.T) {
SkipIfVaultNotPresent(t)
testVault, err := runTestVault(t)
require.NoError(t, err)
testVault.WaitUntilReady(t)
// Create a token with a short TTL to be renewed by the provider.
ttl := 1 * time.Second
tcr := &vaultapi.TokenCreateRequest{
TTL: ttl.String(),
}
secret, err := testVault.client.Auth().Token().Create(tcr)
require.NoError(t, err)
providerToken := secret.Auth.ClientToken
provider, err := createVaultProvider(t, true, testVault.Addr, providerToken, nil)
require.NoError(t, err)
var gotStopped = uint32(0)
provider.stopWatcher = func() {
atomic.StoreUint32(&gotStopped, 1)
}
// Check the last renewal time.
secret, err = testVault.client.Auth().Token().Lookup(providerToken)
require.NoError(t, err)
firstRenewal, err := secret.Data["last_renewal_time"].(json.Number).Int64()
require.NoError(t, err)
// Wait past the TTL and make sure the token has been renewed.
retry.Run(t, func(r *retry.R) {
secret, err = testVault.client.Auth().Token().Lookup(providerToken)
require.NoError(r, err)
lastRenewal, err := secret.Data["last_renewal_time"].(json.Number).Int64()
require.NoError(r, err)
require.Greater(r, lastRenewal, firstRenewal)
})
providerConfig := vaultProviderConfig(t, testVault.Addr, providerToken, nil)
require.NoError(t, provider.Configure(providerConfig))
require.Equal(t, uint32(1), atomic.LoadUint32(&gotStopped))
}
func TestVaultCAProvider_Bootstrap(t *testing.T) {
SkipIfVaultNotPresent(t)
@ -762,27 +809,10 @@ func testVaultProviderWithConfig(t *testing.T, isPrimary bool, rawConf map[strin
}
func createVaultProvider(t *testing.T, isPrimary bool, addr, token string, rawConf map[string]interface{}) (*VaultProvider, error) {
conf := map[string]interface{}{
"Address": addr,
"Token": token,
"RootPKIPath": "pki-root/",
"IntermediatePKIPath": "pki-intermediate/",
// Tests duration parsing after msgpack type mangling during raft apply.
"LeafCertTTL": []uint8("72h"),
}
for k, v := range rawConf {
conf[k] = v
}
cfg := vaultProviderConfig(t, addr, token, rawConf)
provider := NewVaultProvider(hclog.New(nil))
cfg := ProviderConfig{
ClusterID: connect.TestClusterID,
Datacenter: "dc1",
IsPrimary: true,
RawConfig: conf,
}
if !isPrimary {
cfg.IsPrimary = false
cfg.Datacenter = "dc2"
@ -799,3 +829,26 @@ func createVaultProvider(t *testing.T, isPrimary bool, addr, token string, rawCo
return provider, nil
}
func vaultProviderConfig(t *testing.T, addr, token string, rawConf map[string]interface{}) ProviderConfig {
conf := map[string]interface{}{
"Address": addr,
"Token": token,
"RootPKIPath": "pki-root/",
"IntermediatePKIPath": "pki-intermediate/",
// Tests duration parsing after msgpack type mangling during raft apply.
"LeafCertTTL": []uint8("72h"),
}
for k, v := range rawConf {
conf[k] = v
}
cfg := ProviderConfig{
ClusterID: connect.TestClusterID,
Datacenter: "dc1",
IsPrimary: true,
RawConfig: conf,
}
return cfg
}