From 28f2585da37b76fef6e27d1b5bf4e755c9898c2c Mon Sep 17 00:00:00 2001 From: hc-github-team-secure-vault-core <82990506+hc-github-team-secure-vault-core@users.noreply.github.com> Date: Mon, 2 Oct 2023 12:49:05 -0400 Subject: [PATCH] backport of commit 0fa36a36ae1b4842d96623eef0d20af5dea557c0 (#23443) Co-authored-by: Paul Banks --- changelog/23013.txt | 7 + helper/testhelpers/consul/cluster_storage.go | 70 ++++ helper/testhelpers/consul/consulhelper.go | 90 ++++- .../testhelpers/teststorage/consul/consul.go | 92 ++++- physical/consul/consul.go | 358 +++++++++++++++--- physical/consul/consul_test.go | 49 ++- sdk/helper/testcluster/docker/environment.go | 111 +++++- sdk/helper/testcluster/types.go | 8 + sdk/helper/testcluster/util.go | 44 ++- sdk/physical/physical.go | 65 +++- sdk/physical/testing.go | 20 + .../consul_fencing_test.go | 295 +++++++++++++++ vault/ha.go | 27 +- 13 files changed, 1140 insertions(+), 96 deletions(-) create mode 100644 changelog/23013.txt create mode 100644 helper/testhelpers/consul/cluster_storage.go create mode 100644 vault/external_tests/consul_fencing_binary/consul_fencing_test.go diff --git a/changelog/23013.txt b/changelog/23013.txt new file mode 100644 index 000000000..78987e659 --- /dev/null +++ b/changelog/23013.txt @@ -0,0 +1,7 @@ +```release-note:bug +storage/consul: fix a bug where an active node in a specific sort of network +partition could continue to write data to Consul after a new leader is elected +potentially causing data loss or corruption for keys with many concurrent +writers. For Enterprise clusters this could cause corruption of the merkle trees +leading to failure to complete merkle sync without a full re-index. +``` diff --git a/helper/testhelpers/consul/cluster_storage.go b/helper/testhelpers/consul/cluster_storage.go new file mode 100644 index 000000000..9ca1080c6 --- /dev/null +++ b/helper/testhelpers/consul/cluster_storage.go @@ -0,0 +1,70 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "context" + "fmt" + + "github.com/hashicorp/vault/sdk/helper/testcluster" +) + +type ClusterStorage struct { + // Set these after calling `NewConsulClusterStorage` but before `Start` (or + // passing in to NewDockerCluster) to control Consul version specifically in + // your test. Leave empty for latest OSS (defined in consulhelper.go). + ConsulVersion string + ConsulEnterprise bool + + cleanup func() + config *Config +} + +var _ testcluster.ClusterStorage = &ClusterStorage{} + +func NewClusterStorage() *ClusterStorage { + return &ClusterStorage{} +} + +func (s *ClusterStorage) Start(ctx context.Context, opts *testcluster.ClusterOptions) error { + prefix := "" + if opts != nil && opts.ClusterName != "" { + prefix = fmt.Sprintf("%s-", opts.ClusterName) + } + cleanup, config, err := RunContainer(ctx, prefix, s.ConsulVersion, s.ConsulEnterprise, true) + if err != nil { + return err + } + s.cleanup = cleanup + s.config = config + + return nil +} + +func (s *ClusterStorage) Cleanup() error { + if s.cleanup != nil { + s.cleanup() + s.cleanup = nil + } + return nil +} + +func (s *ClusterStorage) Opts() map[string]interface{} { + if s.config == nil { + return nil + } + return map[string]interface{}{ + "address": s.config.ContainerHTTPAddr, + "token": s.config.Token, + "max_parallel": "32", + } +} + +func (s *ClusterStorage) Type() string { + return "consul" +} + +func (s *ClusterStorage) Config() *Config { + return s.config +} diff --git a/helper/testhelpers/consul/consulhelper.go b/helper/testhelpers/consul/consulhelper.go index 4bca3f60b..9b3f48824 100644 --- a/helper/testhelpers/consul/consulhelper.go +++ b/helper/testhelpers/consul/consulhelper.go @@ -5,6 +5,7 @@ package consul import ( "context" + "fmt" "os" "strings" "testing" @@ -14,9 +15,16 @@ import ( "github.com/hashicorp/vault/sdk/helper/docker" ) +// LatestConsulVersion is the most recent version of Consul which is used unless +// another version is specified in the test config or environment. This will +// probably go stale as we don't always update it on every release but we rarely +// rely on specific new Consul functionality so that's probably not a problem. +const LatestConsulVersion = "1.15.3" + type Config struct { docker.ServiceHostPort - Token string + Token string + ContainerHTTPAddr string } func (c *Config) APIConfig() *consulapi.Config { @@ -26,19 +34,39 @@ func (c *Config) APIConfig() *consulapi.Config { return apiConfig } -// PrepareTestContainer creates a Consul docker container. If version is empty, -// the Consul version used will be given by the environment variable -// CONSUL_DOCKER_VERSION, or if that's empty, whatever we've hardcoded as the -// the latest Consul version. +// PrepareTestContainer is a test helper that creates a Consul docker container +// or fails the test if unsuccessful. See RunContainer for more details on the +// configuration. func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config) { t.Helper() + cleanup, config, err := RunContainer(context.Background(), "", version, isEnterprise, doBootstrapSetup) + if err != nil { + t.Fatalf("failed starting consul: %s", err) + } + return cleanup, config +} + +// RunContainer runs Consul in a Docker container unless CONSUL_HTTP_ADDR is +// already found in the environment. Consul version is determined by the version +// argument. If version is empty string, the CONSUL_DOCKER_VERSION environment +// variable is used and if that is empty too, LatestConsulVersion is used +// (defined above). If namePrefix is provided we assume you have chosen a unique +// enough prefix to avoid collision with other tests that may be running in +// parallel and so _do not_ add an additional unique ID suffix. We will also +// ensure previous instances are deleted and leave the container running for +// debugging. This is useful for using Consul as part of at testcluster (i.e. +// when Vault is in Docker too). If namePrefix is empty then a unique suffix is +// added since many older tests rely on a uniq instance of the container. This +// is used by `PrepareTestContainer` which is used typically in tests that rely +// on Consul but run tested code within the test process. +func RunContainer(ctx context.Context, namePrefix, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config, error) { if retAddress := os.Getenv("CONSUL_HTTP_ADDR"); retAddress != "" { shp, err := docker.NewServiceHostPortParse(retAddress) if err != nil { - t.Fatal(err) + return nil, nil, err } - return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")} + return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")}, nil } config := `acl { enabled = true default_policy = "deny" }` @@ -47,7 +75,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo if consulVersion != "" { version = consulVersion } else { - version = "1.11.3" // Latest Consul version, update as new releases come out + version = LatestConsulVersion } } if strings.HasPrefix(version, "1.3") { @@ -66,15 +94,18 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo envVars = append(envVars, "CONSUL_LICENSE="+license) if !hasLicense { - t.Fatalf("Failed to find enterprise license") + return nil, nil, fmt.Errorf("Failed to find enterprise license") } } + if namePrefix != "" { + name = namePrefix + name + } if dockerRepo, hasEnvRepo := os.LookupEnv("CONSUL_DOCKER_REPO"); hasEnvRepo { repo = dockerRepo } - runner, err := docker.NewServiceRunner(docker.RunOptions{ + dockerOpts := docker.RunOptions{ ContainerName: name, ImageRepo: repo, ImageTag: version, @@ -83,12 +114,25 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo Ports: []string{"8500/tcp"}, AuthUsername: os.Getenv("CONSUL_DOCKER_USERNAME"), AuthPassword: os.Getenv("CONSUL_DOCKER_PASSWORD"), - }) - if err != nil { - t.Fatalf("Could not start docker Consul: %s", err) } - svc, err := runner.StartService(context.Background(), func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) { + // Add a unique suffix if there is no per-test prefix provided + addSuffix := true + if namePrefix != "" { + // Don't add a suffix if the caller already provided a prefix + addSuffix = false + // Also enable predelete and non-removal to make debugging easier for test + // cases with named containers). + dockerOpts.PreDelete = true + dockerOpts.DoNotAutoRemove = true + } + + runner, err := docker.NewServiceRunner(dockerOpts) + if err != nil { + return nil, nil, fmt.Errorf("Could not start docker Consul: %s", err) + } + + svc, _, err := runner.StartNewService(ctx, addSuffix, false, func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) { shp := docker.NewServiceHostPort(host, port) apiConfig := consulapi.DefaultNonPooledConfig() apiConfig.Address = shp.Address() @@ -165,7 +209,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo } } - // Configure a namespace and parition if testing enterprise Consul + // Configure a namespace and partition if testing enterprise Consul if isEnterprise { // Namespaces require Consul 1.7 or newer namespaceVersion, _ := goversion.NewVersion("1.7") @@ -229,8 +273,20 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo }, nil }) if err != nil { - t.Fatalf("Could not start docker Consul: %s", err) + return nil, nil, err } - return svc.Cleanup, svc.Config.(*Config) + // Find the container network info. + if len(svc.Container.NetworkSettings.Networks) < 1 { + svc.Cleanup() + return nil, nil, fmt.Errorf("failed to find any network settings for container") + } + cfg := svc.Config.(*Config) + for _, eps := range svc.Container.NetworkSettings.Networks { + // Just pick the first network, we assume only one for now. + // Pull out the real container IP and set that up + cfg.ContainerHTTPAddr = fmt.Sprintf("http://%s:8500", eps.IPAddress) + break + } + return svc.Cleanup, cfg, nil } diff --git a/helper/testhelpers/teststorage/consul/consul.go b/helper/testhelpers/teststorage/consul/consul.go index bfea5ddbb..d16e4f5b7 100644 --- a/helper/testhelpers/teststorage/consul/consul.go +++ b/helper/testhelpers/teststorage/consul/consul.go @@ -4,11 +4,11 @@ package consul import ( + "sync" realtesting "testing" "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/helper/testhelpers/consul" - "github.com/hashicorp/vault/helper/testhelpers/teststorage" physConsul "github.com/hashicorp/vault/physical/consul" "github.com/hashicorp/vault/vault" "github.com/mitchellh/go-testing-interface" @@ -33,5 +33,93 @@ func MakeConsulBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendB } func ConsulBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { - opts.PhysicalFactory = teststorage.SharedPhysicalFactory(MakeConsulBackend) + m := &consulContainerManager{} + opts.PhysicalFactory = m.Backend +} + +// consulContainerManager exposes Backend which matches the PhysicalFactory func +// type. When called, it will ensure that a separate Consul container is started +// for each distinct vault cluster that calls it and ensures that each Vault +// core gets a separate Consul backend instance since that contains state +// related to lock sessions. The whole test framework doesn't have a concept of +// "cluster names" outside of the prefix attached to the logger and other +// backend factories, mostly via SharedPhysicalFactory currently implicitly rely +// on being called in a sequence of core 0, 1, 2,... on one cluster and then +// core 0, 1, 2... on the next and so on. Refactoring lots of things to make +// first-class cluster identifiers a thing seems like a heavy lift given that we +// already rely on sequence of calls everywhere else anyway so we do the same +// here - each time the Backend method is called with coreIdx == 0 we create a +// whole new Consul and assume subsequent non 0 index cores are in the same +// cluster. +type consulContainerManager struct { + mu sync.Mutex + current *consulContainerBackendFactory +} + +func (m *consulContainerManager) Backend(t testing.T, coreIdx int, + logger hclog.Logger, conf map[string]interface{}, +) *vault.PhysicalBackendBundle { + m.mu.Lock() + if coreIdx == 0 || m.current == nil { + // Create a new consul container factory + m.current = &consulContainerBackendFactory{} + } + f := m.current + m.mu.Unlock() + + return f.Backend(t, coreIdx, logger, conf) +} + +type consulContainerBackendFactory struct { + mu sync.Mutex + refCount int + cleanupFn func() + config map[string]string +} + +func (f *consulContainerBackendFactory) Backend(t testing.T, coreIdx int, + logger hclog.Logger, conf map[string]interface{}, +) *vault.PhysicalBackendBundle { + f.mu.Lock() + defer f.mu.Unlock() + + if f.refCount == 0 { + f.startContainerLocked(t) + logger.Debug("started consul container", "clusterID", conf["cluster_id"], + "address", f.config["address"]) + } + + f.refCount++ + consulBackend, err := physConsul.NewConsulBackend(f.config, logger.Named("consul")) + if err != nil { + t.Fatal(err) + } + return &vault.PhysicalBackendBundle{ + Backend: consulBackend, + Cleanup: f.cleanup, + } +} + +func (f *consulContainerBackendFactory) startContainerLocked(t testing.T) { + cleanup, config := consul.PrepareTestContainer(t.(*realtesting.T), "", false, true) + f.config = map[string]string{ + "address": config.Address(), + "token": config.Token, + "max_parallel": "32", + } + f.cleanupFn = cleanup +} + +func (f *consulContainerBackendFactory) cleanup() { + f.mu.Lock() + defer f.mu.Unlock() + + if f.refCount < 1 || f.cleanupFn == nil { + return + } + f.refCount-- + if f.refCount == 0 { + f.cleanupFn() + f.cleanupFn = nil + } } diff --git a/physical/consul/consul.go b/physical/consul/consul.go index b17dbc4c1..d26c729dd 100644 --- a/physical/consul/consul.go +++ b/physical/consul/consul.go @@ -10,6 +10,7 @@ import ( "net/http" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -40,10 +41,10 @@ const ( // Verify ConsulBackend satisfies the correct interfaces var ( - _ physical.Backend = (*ConsulBackend)(nil) - _ physical.HABackend = (*ConsulBackend)(nil) - _ physical.Lock = (*ConsulLock)(nil) - _ physical.Transactional = (*ConsulBackend)(nil) + _ physical.Backend = (*ConsulBackend)(nil) + _ physical.FencingHABackend = (*ConsulBackend)(nil) + _ physical.Lock = (*ConsulLock)(nil) + _ physical.Transactional = (*ConsulBackend)(nil) GetInTxnDisabledError = errors.New("get operations inside transactions are disabled in consul backend") ) @@ -53,6 +54,7 @@ var ( // it allows Vault to run on multiple machines in a highly-available manner. // failGetInTxn is only used in tests. type ConsulBackend struct { + logger log.Logger client *api.Client path string kv *api.KV @@ -62,6 +64,7 @@ type ConsulBackend struct { sessionTTL string lockWaitTime time.Duration failGetInTxn *uint32 + activeNodeLock atomic.Pointer[ConsulLock] } // NewConsulBackend constructs a Consul backend using the given API client @@ -152,6 +155,7 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe // Set up the backend c := &ConsulBackend{ + logger: logger, path: path, client: client, kv: client.KV(), @@ -262,12 +266,53 @@ func (c *ConsulBackend) ExpandedCapabilitiesAvailable(ctx context.Context) bool return available } +func (c *ConsulBackend) writeTxnOps(ctx context.Context, len int) ([]*api.TxnOp, string) { + if len < 1 { + len = 1 + } + ops := make([]*api.TxnOp, 0, len+1) + + // If we don't have a lock yet, return a transaction with no session check. We + // need to do this to allow writes during cluster initialization before there + // is an active node. + lock := c.activeNodeLock.Load() + if lock == nil { + return ops, "" + } + + lockKey, lockSession := lock.Info() + if lockKey == "" || lockSession == "" { + return ops, "" + } + + // If the context used to write has been marked as a special case write that + // happens outside of a lock then don't add the session check. + if physical.IsUnfencedWrite(ctx) { + return ops, "" + } + + // Insert the session check operation at index 0. This will allow us later to + // work out easily if a write failure is because of the session check. + ops = append(ops, &api.TxnOp{ + KV: &api.KVTxnOp{ + Verb: api.KVCheckSession, + Key: lockKey, + Session: lockSession, + }, + }) + return ops, lockSession +} + // Transaction is used to run multiple entries via a transaction. func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { + return c.txnInternal(ctx, txns, "transaction") +} + +func (c *ConsulBackend) txnInternal(ctx context.Context, txns []*physical.TxnEntry, apiOpName string) error { if len(txns) == 0 { return nil } - defer metrics.MeasureSince([]string{"consul", "transaction"}, time.Now()) + defer metrics.MeasureSince([]string{"consul", apiOpName}, time.Now()) failGetInTxn := atomic.LoadUint32(c.failGetInTxn) for _, t := range txns { @@ -276,7 +321,7 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt } } - ops := make([]*api.TxnOp, 0, len(txns)) + ops, sessionID := c.writeTxnOps(ctx, len(txns)) for _, t := range txns { o, err := c.makeApiTxn(t) if err != nil { @@ -302,14 +347,15 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt } return err } - if ok && len(resp.Errors) == 0 { - // Loop over results and cache them in a map. Note that we're only caching the first time we see a key, - // which _should_ correspond to a Get operation, since we expect those come first in our txns slice. + // Loop over results and cache them in a map. Note that we're only caching + // the first time we see a key, which _should_ correspond to a Get + // operation, since we expect those come first in our txns slice (though + // after check-session). for _, txnr := range resp.Results { if len(txnr.KV.Value) > 0 { - // We need to trim the Consul kv path (typically "vault/") from the key otherwise it won't - // match the transaction entries we have. + // We need to trim the Consul kv path (typically "vault/") from the key + // otherwise it won't match the transaction entries we have. key := strings.TrimPrefix(txnr.KV.Key, c.path) if _, found := kvMap[key]; !found { kvMap[key] = txnr.KV.Value @@ -321,6 +367,31 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt if len(resp.Errors) > 0 { for _, res := range resp.Errors { retErr = multierror.Append(retErr, errors.New(res.What)) + if res.OpIndex == 0 && sessionID != "" { + // We added a session check (sessionID not empty) so an error at OpIndex + // 0 means that we failed that session check. We don't attempt to string + // match because Consul can return at least three different errors here + // with no common string. In all cases though failing this check means + // we no longer hold the lock because it was released, modified or + // deleted. Rather than just continuing to try writing until the + // blocking query manages to notice we're no longer the lock holder + // (which can take 10s of seconds even in good network conditions in my + // testing) we can now Unlock directly here. Our ConsulLock now has a + // shortcut that will cause the lock to close the leaderCh immediately + // when we call without waiting for the blocking query to return (unlike + // Consul's current Lock implementation). But before we unlock, we + // should re-load the lock and ensure it's still the same instance we + // just tried to write with in case this goroutine is somehow really + // delayed and we actually acquired a whole new lock in the meantime! + lock := c.activeNodeLock.Load() + if lock != nil { + _, lockSessionID := lock.Info() + if sessionID == lockSessionID { + c.logger.Warn("session check failed on write, we lost active node lock, stepping down", "err", res.What) + lock.Unlock() + } + } + } } } @@ -361,27 +432,13 @@ func (c *ConsulBackend) makeApiTxn(txn *physical.TxnEntry) (*api.TxnOp, error) { // Put is used to insert or update an entry func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error { - defer metrics.MeasureSince([]string{"consul", "put"}, time.Now()) - - c.permitPool.Acquire() - defer c.permitPool.Release() - - pair := &api.KVPair{ - Key: c.path + entry.Key, - Value: entry.Value, + txns := []*physical.TxnEntry{ + { + Operation: physical.PutOperation, + Entry: entry, + }, } - - writeOpts := &api.WriteOptions{} - writeOpts = writeOpts.WithContext(ctx) - - _, err := c.kv.Put(pair, writeOpts) - if err != nil { - if strings.Contains(err.Error(), "Value exceeds") { - return fmt.Errorf("%s: %w", physical.ErrValueTooLarge, err) - } - return err - } - return nil + return c.txnInternal(ctx, txns, "put") } // Get is used to fetch an entry @@ -414,16 +471,15 @@ func (c *ConsulBackend) Get(ctx context.Context, key string) (*physical.Entry, e // Delete is used to permanently delete an entry func (c *ConsulBackend) Delete(ctx context.Context, key string) error { - defer metrics.MeasureSince([]string{"consul", "delete"}, time.Now()) - - c.permitPool.Acquire() - defer c.permitPool.Release() - - writeOpts := &api.WriteOptions{} - writeOpts = writeOpts.WithContext(ctx) - - _, err := c.kv.Delete(c.path+key, writeOpts) - return err + txns := []*physical.TxnEntry{ + { + Operation: physical.DeleteOperation, + Entry: &physical.Entry{ + Key: key, + }, + }, + } + return c.txnInternal(ctx, txns, "delete") } // List is used to list all the keys under a given @@ -463,24 +519,14 @@ func (c *ConsulBackend) FailGetInTxn(fail bool) { // LockWith is used for mutual exclusion based on the given key. func (c *ConsulBackend) LockWith(key, value string) (physical.Lock, error) { - // Create the lock - opts := &api.LockOptions{ - Key: c.path + key, - Value: []byte(value), - SessionName: "Vault Lock", - MonitorRetries: 5, - SessionTTL: c.sessionTTL, - LockWaitTime: c.lockWaitTime, - } - lock, err := c.client.LockOpts(opts) - if err != nil { - return nil, fmt.Errorf("failed to create lock: %w", err) - } cl := &ConsulLock{ + logger: c.logger, client: c.client, key: c.path + key, - lock: lock, + value: value, consistencyMode: c.consistencyMode, + sessionTTL: c.sessionTTL, + lockWaitTime: c.lockWaitTime, } return cl, nil } @@ -505,20 +551,203 @@ func (c *ConsulBackend) DetectHostAddr() (string, error) { return addr, nil } -// ConsulLock is used to provide the Lock interface backed by Consul +// RegisterActiveNodeLock is called after active node lock is obtained to allow +// us to fence future writes. +func (c *ConsulBackend) RegisterActiveNodeLock(l physical.Lock) error { + cl, ok := l.(*ConsulLock) + if !ok { + return fmt.Errorf("invalid Lock type") + } + c.activeNodeLock.Store(cl) + key, sessionID := cl.Info() + c.logger.Info("registered active node lock", "key", key, "sessionID", sessionID) + return nil +} + +// ConsulLock is used to provide the Lock interface backed by Consul. We work +// around some limitations of Consuls api.Lock noted in +// https://github.com/hashicorp/consul/issues/18271 by creating and managing the +// session ourselves, while using Consul's Lock to do the heavy lifting. type ConsulLock struct { + logger log.Logger client *api.Client key string - lock *api.Lock + value string consistencyMode string + sessionTTL string + lockWaitTime time.Duration + + mu sync.Mutex // protects session state + session *lockSession + // sessionID is a copy of the value from session.id. We use a separate field + // because `Info` needs to keep returning the same sessionID after Unlock has + // cleaned up the session state so that we continue to fence any writes still + // in flight after the lock is Unlocked. It's easier to reason about that as a + // separate field rather than keeping an already-terminated session object + // around. Once Lock is called again this will be replaced (while mu is + // locked) with the new session ID. Must hold mu to read or write this. + sessionID string +} + +type lockSession struct { + // id is immutable after the session is created so does not need mu held + id string + + // mu protects the lock and unlockCh to ensure they are only cleaned up once + mu sync.Mutex + lock *api.Lock + unlockCh chan struct{} +} + +func (s *lockSession) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + s.mu.Lock() + defer s.mu.Unlock() + + lockHeld := false + defer func() { + if !lockHeld { + s.cleanupLocked() + } + }() + + consulLeaderCh, err := s.lock.Lock(stopCh) + if err != nil { + return nil, err + } + if consulLeaderCh == nil { + // If both leaderCh and err are nil from Consul's Lock then it means we + // waited for the lockWait without grabbing it. + return nil, nil + } + // We got the Lock, monitor it! + lockHeld = true + leaderCh := make(chan struct{}) + go s.monitorLock(leaderCh, s.unlockCh, consulLeaderCh) + return leaderCh, nil +} + +// monitorLock waits for either unlockCh or consulLeaderCh to close and then +// closes leaderCh. It's designed to be run in a separate goroutine. Note that +// we pass unlockCh rather than accessing it via the member variable because it +// is mutated under the lock during Unlock so reading it from c could be racy. +// We just need the chan created at the call site here so we pass it instead of +// locking and unlocking in here. +func (s *lockSession) monitorLock(leaderCh chan struct{}, unlockCh, consulLeaderCh <-chan struct{}) { + select { + case <-unlockCh: + case <-consulLeaderCh: + } + // We lost the lock. Close the leaderCh + close(leaderCh) + + // Whichever chan closed, cleanup to unwind all the state. If we were + // triggered by a cleanup call this will be a no-op, but if not it ensures all + // state is cleaned up correctly. + s.cleanup() +} + +func (s *lockSession) cleanup() { + s.mu.Lock() + defer s.mu.Unlock() + + s.cleanupLocked() +} + +func (s *lockSession) cleanupLocked() { + if s.lock != nil { + s.lock.Unlock() + s.lock = nil + } + if s.unlockCh != nil { + close(s.unlockCh) + s.unlockCh = nil + } + // Don't bother destroying sessions as they will be destroyed after TTL + // anyway. +} + +func (c *ConsulLock) createSession() (*lockSession, error) { + se := &api.SessionEntry{ + Name: "Vault Lock", + TTL: c.sessionTTL, + // We use Consul's default LockDelay of 15s by not specifying it + } + session, _, err := c.client.Session().Create(se, nil) + if err != nil { + return nil, err + } + + opts := &api.LockOptions{ + Key: c.key, + Value: []byte(c.value), + Session: session, + MonitorRetries: 5, + LockWaitTime: c.lockWaitTime, + SessionTTL: c.sessionTTL, + } + lock, err := c.client.LockOpts(opts) + if err != nil { + // Don't bother destroying sessions as they will be destroyed after TTL + // anyway. + return nil, fmt.Errorf("failed to create lock: %w", err) + } + + unlockCh := make(chan struct{}) + + s := &lockSession{ + id: session, + lock: lock, + unlockCh: unlockCh, + } + + // Start renewals of the session + go func() { + // Note we capture unlockCh here rather than s.unlockCh because s.unlockCh + // is mutated on cleanup which is racy since we don't hold a lock here. + // unlockCh will never be mutated though. + err := c.client.Session().RenewPeriodic(c.sessionTTL, session, nil, unlockCh) + if err != nil { + c.logger.Error("failed to renew consul session for more than the TTL, lock lost", "err", err) + } + // release other resources for this session only i.e. don't c.Unlock as that + // might now be locked under a different session). + s.cleanup() + }() + return s, nil } func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { - return c.lock.Lock(stopCh) + c.mu.Lock() + defer c.mu.Unlock() + + if c.session != nil { + return nil, fmt.Errorf("lock instance already locked") + } + + session, err := c.createSession() + if err != nil { + return nil, err + } + leaderCh, err := session.Lock(stopCh) + if leaderCh != nil && err == nil { + // We hold the lock, store the session + c.session = session + c.sessionID = session.id + } + return leaderCh, err } func (c *ConsulLock) Unlock() error { - return c.lock.Unlock() + c.mu.Lock() + defer c.mu.Unlock() + + if c.session != nil { + c.session.cleanup() + c.session = nil + // Don't clear c.sessionID since we rely on returning the same old ID after + // Unlock until the next Lock. + } + return nil } func (c *ConsulLock) Value() (bool, string, error) { @@ -538,7 +767,18 @@ func (c *ConsulLock) Value() (bool, string, error) { if pair == nil { return false, "", nil } + // Note that held is expected to mean "does _any_ node hold the lock" not + // "does this current instance hold the lock" so although we know what our own + // session ID is, we don't check it matches here only that there is _some_ + // session in Consul holding the lock right now. held := pair.Session != "" value := string(pair.Value) return held, value, nil } + +func (c *ConsulLock) Info() (key, sessionid string) { + c.mu.Lock() + defer c.mu.Unlock() + + return c.key, c.sessionID +} diff --git a/physical/consul/consul_test.go b/physical/consul/consul_test.go index b0a16ce85..7de69b6c7 100644 --- a/physical/consul/consul_test.go +++ b/physical/consul/consul_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/vault/helper/testhelpers/consul" "github.com/hashicorp/vault/sdk/helper/logging" "github.com/hashicorp/vault/sdk/physical" + "github.com/stretchr/testify/require" ) func TestConsul_newConsulBackend(t *testing.T) { @@ -442,7 +443,9 @@ func TestConsulHABackend(t *testing.T) { t.Fatalf("err: %v", err) } - randPath := fmt.Sprintf("vault-%d/", time.Now().Unix()) + // We used to use a timestamp here but then if you run multiple instances in + // parallel with one Consul they end up conflicting. + randPath := fmt.Sprintf("vault-%d/", rand.Int()) defer func() { client.KV().DeleteTree(randPath, nil) }() @@ -453,6 +456,10 @@ func TestConsulHABackend(t *testing.T) { "token": config.Token, "path": randPath, "max_parallel": "-1", + // We have to wait this out as part of the test so shorten it a little from + // the default 15 seconds helps with test run times, especially when running + // this in a loop to detect flakes! + "lock_wait_time": "3s", } b, err := NewConsulBackend(backendConfig, logger) @@ -478,4 +485,44 @@ func TestConsulHABackend(t *testing.T) { if host == "" { t.Fatalf("bad addr: %v", host) } + + // Calling `Info` on a Lock that has been unlocked must still return the old + // sessionID (until it is locked again) otherwise we will fail to fence writes + // that are still in flight from before (e.g. queued WAL or Merkle flushes) as + // soon as the first one unlocks the session allowing corruption again. + l, err := b.(physical.HABackend).LockWith("test-lock-session-info", "bar") + require.NoError(t, err) + + expectKey := randPath + "test-lock-session-info" + + cl := l.(*ConsulLock) + + stopCh := make(chan struct{}) + time.AfterFunc(5*time.Second, func() { + close(stopCh) + }) + leaderCh, err := cl.Lock(stopCh) + require.NoError(t, err) + require.NotNil(t, leaderCh) + + key, sid := cl.Info() + require.Equal(t, expectKey, key) + require.NotEmpty(t, sid) + + // Now Unlock the lock, sessionID should be reset to empty string + err = cl.Unlock() + require.NoError(t, err) + key2, sid2 := cl.Info() + require.Equal(t, key, key2) + require.Equal(t, sid, sid2) + + // Lock it again, this should cause a new session to be created so SID should + // change. + leaderCh, err = cl.Lock(stopCh) + require.NoError(t, err) + require.NotNil(t, leaderCh) + + key3, sid3 := cl.Info() + require.Equal(t, key, key3) + require.NotEqual(t, sid, sid3) } diff --git a/sdk/helper/testcluster/docker/environment.go b/sdk/helper/testcluster/docker/environment.go index b53de23a9..b0d72a00d 100644 --- a/sdk/helper/testcluster/docker/environment.go +++ b/sdk/helper/testcluster/docker/environment.go @@ -55,7 +55,6 @@ const MaxClusterNameLength = 52 type DockerCluster struct { ClusterName string - RaftStorage bool ClusterNodes []*DockerClusterNode // Certificate fields @@ -73,6 +72,8 @@ type DockerCluster struct { ID string Logger log.Logger builtTags map[string]struct{} + + storage testcluster.ClusterStorage } func (dc *DockerCluster) NamedLogger(s string) log.Logger { @@ -407,9 +408,6 @@ func NewTestDockerCluster(t *testing.T, opts *DockerClusterOptions) *DockerClust if opts.NetworkName == "" { opts.NetworkName = os.Getenv("TEST_DOCKER_NETWORK_NAME") } - if opts.VaultLicense == "" { - opts.VaultLicense = os.Getenv(testcluster.EnvVaultLicenseCI) - } ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) t.Cleanup(cancel) @@ -434,14 +432,17 @@ func NewDockerCluster(ctx context.Context, opts *DockerClusterOptions) (*DockerC if opts.Logger == nil { opts.Logger = log.NewNullLogger() } + if opts.VaultLicense == "" { + opts.VaultLicense = os.Getenv(testcluster.EnvVaultLicenseCI) + } dc := &DockerCluster{ DockerAPI: api, - RaftStorage: true, ClusterName: opts.ClusterName, Logger: opts.Logger, builtTags: map[string]struct{}{}, CA: opts.CA, + storage: opts.Storage, } if err := dc.setupDockerCluster(ctx, opts); err != nil { @@ -588,21 +589,31 @@ func (n *DockerClusterNode) Start(ctx context.Context, opts *DockerClusterOption vaultCfg["telemetry"] = map[string]interface{}{ "disable_hostname": true, } - raftOpts := map[string]interface{}{ + + // Setup storage. Default is raft. + storageType := "raft" + storageOpts := map[string]interface{}{ // TODO add options from vnc "path": "/vault/file", "node_id": n.NodeID, } - vaultCfg["storage"] = map[string]interface{}{ - "raft": raftOpts, + + if opts.Storage != nil { + storageType = opts.Storage.Type() + storageOpts = opts.Storage.Opts() } - if opts != nil && opts.VaultNodeConfig != nil && len(opts.VaultNodeConfig.StorageOptions) > 0 { + + if opts != nil && opts.VaultNodeConfig != nil { for k, v := range opts.VaultNodeConfig.StorageOptions { - if _, ok := raftOpts[k].(string); !ok { - raftOpts[k] = v + if _, ok := storageOpts[k].(string); !ok { + storageOpts[k] = v } } } + vaultCfg["storage"] = map[string]interface{}{ + storageType: storageOpts, + } + //// disable_mlock is required for working in the Docker environment with //// custom plugins vaultCfg["disable_mlock"] = true @@ -817,6 +828,72 @@ func (n *DockerClusterNode) AddNetworkDelay(ctx context.Context, delay time.Dura return nil } +// PartitionFromCluster will cause the node to be disconnected at the network +// level from the rest of the docker cluster. It does so in a way that the node +// will not see TCP RSTs and all packets it sends will be "black holed". It +// attempts to keep packets to and from the host intact which allows docker +// daemon to continue streaming logs and any test code to continue making +// requests from the host to the partitioned node. +func (n *DockerClusterNode) PartitionFromCluster(ctx context.Context) error { + stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{ + "/bin/sh", + "-xec", strings.Join([]string{ + fmt.Sprintf("echo partitioning container from network"), + "apk add iproute2", + // Get the gateway address for the bridge so we can allow host to + // container traffic still. + "GW=$(ip r | grep default | grep eth0 | cut -f 3 -d' ')", + // First delete the rules in case this is called twice otherwise we'll add + // multiple copies and only remove one in Unpartition (yay iptables). + // Ignore the error if it didn't exist. + "iptables -D INPUT -i eth0 ! -s \"$GW\" -j DROP | true", + "iptables -D OUTPUT -o eth0 ! -d \"$GW\" -j DROP | true", + // Add rules to drop all packets in and out of the docker network + // connection. + "iptables -I INPUT -i eth0 ! -s \"$GW\" -j DROP", + "iptables -I OUTPUT -o eth0 ! -d \"$GW\" -j DROP", + }, "; "), + }) + if err != nil { + return err + } + + n.Logger.Trace(string(stdout)) + n.Logger.Trace(string(stderr)) + if exitCode != 0 { + return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode) + } + return nil +} + +// UnpartitionFromCluster reverses a previous call to PartitionFromCluster and +// restores full connectivity. Currently assumes the default "bridge" network. +func (n *DockerClusterNode) UnpartitionFromCluster(ctx context.Context) error { + stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{ + "/bin/sh", + "-xec", strings.Join([]string{ + fmt.Sprintf("echo un-partitioning container from network"), + // Get the gateway address for the bridge so we can allow host to + // container traffic still. + "GW=$(ip r | grep default | grep eth0 | cut -f 3 -d' ')", + // Remove the rules, ignore if they are not present or iptables wasn't + // installed yet (i.e. no-one called PartitionFromCluster yet). + "iptables -D INPUT -i eth0 ! -s \"$GW\" -j DROP | true", + "iptables -D OUTPUT -o eth0 ! -d \"$GW\" -j DROP | true", + }, "; "), + }) + if err != nil { + return err + } + + n.Logger.Trace(string(stdout)) + n.Logger.Trace(string(stderr)) + if exitCode != 0 { + return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode) + } + return nil +} + type LogConsumerWriter struct { consumer func(string) } @@ -844,6 +921,7 @@ type DockerClusterOptions struct { VaultBinary string Args []string StartProbe func(*api.Client) error + Storage testcluster.ClusterStorage } func ensureLeaderMatches(ctx context.Context, client *api.Client, ready func(response *api.LeaderResponse) error) error { @@ -904,6 +982,12 @@ func (dc *DockerCluster) setupDockerCluster(ctx context.Context, opts *DockerClu dc.RootCAs = x509.NewCertPool() dc.RootCAs.AddCert(dc.CA.CACert) + if dc.storage != nil { + if err := dc.storage.Start(ctx, &opts.ClusterOptions); err != nil { + return err + } + } + for i := 0; i < numCores; i++ { if err := dc.addNode(ctx, opts); err != nil { return err @@ -964,6 +1048,11 @@ func (dc *DockerCluster) addNode(ctx context.Context, opts *DockerClusterOptions } func (dc *DockerCluster) joinNode(ctx context.Context, nodeIdx int, leaderIdx int) error { + if dc.storage != nil && dc.storage.Type() != "raft" { + // Storage is not raft so nothing to do but unseal. + return testcluster.UnsealNode(ctx, dc, nodeIdx) + } + leader := dc.ClusterNodes[leaderIdx] if nodeIdx >= len(dc.ClusterNodes) { diff --git a/sdk/helper/testcluster/types.go b/sdk/helper/testcluster/types.go index 16725157c..989908fb1 100644 --- a/sdk/helper/testcluster/types.go +++ b/sdk/helper/testcluster/types.go @@ -4,6 +4,7 @@ package testcluster import ( + "context" "crypto/ecdsa" "crypto/tls" "crypto/x509" @@ -110,3 +111,10 @@ type CA struct { CAKey *ecdsa.PrivateKey CAKeyPEM []byte } + +type ClusterStorage interface { + Start(context.Context, *ClusterOptions) error + Cleanup() error + Opts() map[string]interface{} + Type() string +} diff --git a/sdk/helper/testcluster/util.go b/sdk/helper/testcluster/util.go index 4ecf5f533..883cd6992 100644 --- a/sdk/helper/testcluster/util.go +++ b/sdk/helper/testcluster/util.go @@ -158,6 +158,20 @@ func NodeHealthy(ctx context.Context, cluster VaultCluster, nodeIdx int) error { } func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) { + // Be robust to multiple nodes thinking they are active. This is possible in + // certain network partition situations where the old leader has not + // discovered it's lost leadership yet. In tests this is only likely to come + // up when we are specifically provoking it, but it's possible it could happen + // at any point if leadership flaps of connectivity suffers transient errors + // etc. so be robust against it. The best solution would be to have some sort + // of epoch like the raft term that is guaranteed to be monotonically + // increasing through elections, however we don't have that abstraction for + // all HABackends in general. The best we have is the ActiveTime. In a + // distributed systems text book this would be bad to rely on due to clock + // sync issues etc. but for our tests it's likely fine because even if we are + // running separate Vault containers, they are all using the same hardware + // clock in the system. + leaderActiveTimes := make(map[int]time.Time) for i, node := range cluster.Nodes() { client := node.APIClient() ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) @@ -166,9 +180,23 @@ func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) { if err != nil || resp == nil || !resp.IsSelf { continue } - return i, nil + leaderActiveTimes[i] = resp.ActiveTime } - return -1, fmt.Errorf("no leader found") + if len(leaderActiveTimes) == 0 { + return -1, fmt.Errorf("no leader found") + } + // At least one node thinks it is active. If multiple, pick the one with the + // most recent ActiveTime. Note if there is only one then this just returns + // it. + var newestLeaderIdx int + var newestActiveTime time.Time + for i, at := range leaderActiveTimes { + if at.After(newestActiveTime) { + newestActiveTime = at + newestLeaderIdx = i + } + } + return newestLeaderIdx, nil } func WaitForActiveNode(ctx context.Context, cluster VaultCluster) (int, error) { @@ -189,7 +217,8 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster) // A sleep before calling WaitForActiveNodeAndPerfStandbys seems to sort // things out, but so apparently does this. We should be able to eliminate // this call to WaitForActiveNode by reworking the logic in this method. - if _, err := WaitForActiveNode(ctx, cluster); err != nil { + leaderIdx, err := WaitForActiveNode(ctx, cluster) + if err != nil { return err } @@ -203,7 +232,7 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster) if err != nil { return err } - leaderClient := cluster.Nodes()[0].APIClient() + leaderClient := cluster.Nodes()[leaderIdx].APIClient() for ctx.Err() == nil { err = leaderClient.Sys().MountWithContext(ctx, mountPoint, &api.MountInput{ @@ -244,6 +273,7 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster) var leader *api.LeaderResponse leader, err = client.Sys().LeaderWithContext(ctx) if err != nil { + logger.Trace("waiting for core", "core", coreNo, "err", err) continue } switch { @@ -261,6 +291,12 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster) atomic.AddInt64(&standbys, 1) return } + default: + logger.Trace("waiting for core", "core", coreNo, + "ha_enabled", leader.HAEnabled, + "is_self", leader.IsSelf, + "perf_standby", leader.PerfStandby, + "perf_standby_remote_wal", leader.PerfStandbyLastRemoteWAL) } } }(i) diff --git a/sdk/physical/physical.go b/sdk/physical/physical.go index c0e7d2ef8..8a6e4883d 100644 --- a/sdk/physical/physical.go +++ b/sdk/physical/physical.go @@ -60,6 +60,69 @@ type HABackend interface { HAEnabled() bool } +// FencingHABackend is an HABackend which provides the additional guarantee that +// each Lock it returns from LockWith is also a FencingLock. A FencingLock +// provides a mechanism to retrieve a fencing token that can be included by +// future writes by the backend to ensure that it is still the current lock +// holder at the time the write commits. Without this timing might allow a lock +// holder not to notice it's no longer the active node for long enough for it to +// write data to storage even while a new active node is writing causing +// corruption. For Consul backend the fencing token is the session id which is +// submitted with `check-session` operation on each write to ensure the write +// only completes if the session is still holding the lock. For raft backend +// this isn't needed because our in-process raft library is unable to write if +// it's not the leader anyway. +// +// If you implement this, Vault will call RegisterActiveNodeLock with the Lock +// instance returned by LockWith after it successfully locks it. This keeps the +// backend oblivious to the specific key we use for active node locks and allows +// potential future usage of locks for other purposes in the future. +// +// Note that all implementations must support writing to storage before +// RegisterActiveNodeLock is called to support initialization of a new cluster. +// They must also skip fencing writes if the write's Context contains a special +// value. This is necessary to allow Vault to clear and re-initialise secondary +// clusters even though there is already an active node with a specific lock +// session since we clear the cluster while Vault is sealed and clearing the +// data might remove the lock in some storages (e.g. Consul). As noted above +// it's not generally safe to allow unfenced writes after a lock so instead we +// special case just a few types of writes that only happen rarely while the +// cluster is sealed. See the IsUnfencedWrite helper function. +type FencingHABackend interface { + HABackend + + RegisterActiveNodeLock(l Lock) error +} + +// unfencedWriteContextKeyType is a special type to identify context values to +// disable fencing. It's a separate type per the best-practice in Context.Value +// docs to avoid collisions even if the key might match. +type unfencedWriteContextKeyType string + +const ( + // unfencedWriteContextKey is the context key we pass the option to bypass + // fencing through to a FencingHABackend. Note that this is not an ideal use + // of context values and violates the "do not use it for optional arguments" + // guidance but has been agreed as a pragmatic option for this case rather + // than needing to specialize every physical.Backend to understand this + // option. + unfencedWriteContextKey unfencedWriteContextKeyType = "vault-disable-fencing" +) + +// UnfencedWriteCtx adds metadata to a ctx such that any writes performed +// directly on a FencingHABackend using that context will _not_ add a fencing +// token. +func UnfencedWriteCtx(ctx context.Context) context.Context { + return context.WithValue(ctx, unfencedWriteContextKey, true) +} + +// IsUnfencedWrite returns whether or not the context passed has the unfenced +// flag value set. +func IsUnfencedWrite(ctx context.Context) bool { + isUnfenced, ok := ctx.Value(unfencedWriteContextKey).(bool) + return ok && isUnfenced +} + // ToggleablePurgemonster is an interface for backends that can toggle on or // off special functionality and/or support purging. This is only used for the // cache, don't use it for other things. @@ -86,7 +149,7 @@ type Lock interface { // Unlock is used to release the lock Unlock() error - // Returns the value of the lock and if it is held + // Returns the value of the lock and if it is held by _any_ node Value() (bool, string, error) } diff --git a/sdk/physical/testing.go b/sdk/physical/testing.go index 0c6a021d3..b80f3697e 100644 --- a/sdk/physical/testing.go +++ b/sdk/physical/testing.go @@ -9,6 +9,8 @@ import ( "sort" "testing" "time" + + "github.com/stretchr/testify/require" ) func ExerciseBackend(t testing.TB, b Backend) { @@ -330,12 +332,25 @@ func ExerciseHABackend(t testing.TB, b HABackend, b2 HABackend) { t.Errorf("expected value bar: %v", err) } + // Check if it's fencing that we can register the lock + if fba, ok := b.(FencingHABackend); ok { + require.NoError(t, fba.RegisterActiveNodeLock(lock)) + } + // Second acquisition should fail lock2, err := b2.LockWith("foo", "baz") if err != nil { t.Fatalf("lock 2: %v", err) } + // Checking the lock from b2 should discover that the lock is held since held + // implies only that there is _some_ leader not that b2 is leader (this was + // not clear before so we make it explicit with this assertion). + held2, val2, err := lock2.Value() + require.NoError(t, err) + require.Equal(t, "bar", val2) + require.True(t, held2) + // Cancel attempt in 50 msec stopCh := make(chan struct{}) time.AfterFunc(50*time.Millisecond, func() { @@ -363,6 +378,11 @@ func ExerciseHABackend(t testing.TB, b HABackend, b2 HABackend) { t.Errorf("should get leaderCh") } + // Check if it's fencing that we can register the lock + if fba2, ok := b2.(FencingHABackend); ok { + require.NoError(t, fba2.RegisterActiveNodeLock(lock)) + } + // Check the value held, val, err = lock2.Value() if err != nil { diff --git a/vault/external_tests/consul_fencing_binary/consul_fencing_test.go b/vault/external_tests/consul_fencing_binary/consul_fencing_test.go new file mode 100644 index 000000000..bf69a8714 --- /dev/null +++ b/vault/external_tests/consul_fencing_binary/consul_fencing_test.go @@ -0,0 +1,295 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul_fencing + +import ( + "context" + "fmt" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/api" + "github.com/hashicorp/vault/helper/testhelpers/consul" + "github.com/hashicorp/vault/sdk/helper/testcluster" + "github.com/hashicorp/vault/sdk/helper/testcluster/docker" + "github.com/stretchr/testify/require" +) + +// TestConsulFencing_PartitionedLeaderCantWrite attempts to create an active +// node split-brain when using Consul storage to ensure the old leader doesn't +// continue to write data potentially corrupting storage. It is naturally +// non-deterministic since it relies heavily on timing of the different +// container processes, however it pretty reliably failed before the fencing fix +// (and Consul lock improvements) and should _never_ fail now we correctly fence +// writes. +func TestConsulFencing_PartitionedLeaderCantWrite(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + consulStorage := consul.NewClusterStorage() + + // Create cluster logger that will dump cluster logs to stdout for debugging. + logger := hclog.NewInterceptLogger(hclog.DefaultOptions) + logger.SetLevel(hclog.Trace) + + clusterOpts := docker.DefaultOptions(t) + clusterOpts.ImageRepo = "hashicorp/vault-enterprise" + clusterOpts.ClusterOptions.Logger = logger + + clusterOpts.Storage = consulStorage + + logger.Info("==> starting cluster") + c, err := docker.NewDockerCluster(ctx, clusterOpts) + require.NoError(t, err) + logger.Info(" ✅ done.", "root_token", c.GetRootToken(), + "consul_token", consulStorage.Config().Token) + + logger.Info("==> waiting for leader") + leaderIdx, err := testcluster.WaitForActiveNode(ctx, c) + require.NoError(t, err) + + leader := c.Nodes()[leaderIdx] + leaderClient := leader.APIClient() + + notLeader := c.Nodes()[1] // Assumes it's usually zero and correct below + if leaderIdx == 1 { + notLeader = c.Nodes()[0] + } + + // Mount a KV v2 backend + logger.Info("==> mounting KV") + err = leaderClient.Sys().Mount("/test", &api.MountInput{ + Type: "kv-v2", + }) + require.NoError(t, err) + + // Start two background workers that will cause writes to Consul in the + // background. KV v2 relies on a single active node for correctness. + // Specifically its patch operation does a read-modify-write under a + // key-specific lock which is correct for concurrent writes to one process, + // but which by nature of our storage API is not going to be atomic if another + // active node is also writing the same KV. It's made worse because the cache + // backend means the active node will not actually read from Consul after the + // initial read and will be modifying its own in-memory version and writing + // that back. So we should be able to detect multiple active nodes writing + // reliably with the following setup: + // 1. Two separate "client" goroutines each connected to different Vault + // servers. + // 2. Both write to the same kv-v2 key, each one modifies only its own set + // of subkeys c1-X or c2-X. + // 3. Each request adds the next sequential X value for that client. We use a + // Patch operation so we don't need to read the version or use CAS. On an + // error each client will retry the same key until it gets a success. + // 4. In a correct system with a single active node despite a partition, we + // expect a complete set of consecutive X values for both clients (i.e. + // no writes lost). If an old leader is still allowed to write to Consul + // then it will continue to patch against its own last-known version from + // cache and so will overwrite any concurrent updates from the other + // client and we should see that as lost updates at the end. + var wg sync.WaitGroup + errCh := make(chan error, 10) + var writeCount uint64 + + // Initialise the key once + kv := leaderClient.KVv2("/test") + _, err = kv.Put(ctx, "data", map[string]interface{}{ + "c0-00000000": 1, // value don't matter here only keys in this set. + "c1-00000000": 1, + }) + require.NoError(t, err) + + const interval = 500 * time.Millisecond + + runWriter := func(i int, targetServer testcluster.VaultClusterNode, ctr *uint64) { + wg.Add(1) + defer wg.Done() + kv := targetServer.APIClient().KVv2("/test") + + for { + key := fmt.Sprintf("c%d-%08d", i, atomic.LoadUint64(ctr)) + + // Use a short timeout. If we don't then the one goroutine writing to the + // partitioned active node can get stuck here until the 60 second request + // timeout kicks in without issuing another request. + reqCtx, cancel := context.WithTimeout(ctx, interval) + logger.Debug("sending patch", "client", i, "key", key) + _, err = kv.Patch(reqCtx, "data", map[string]interface{}{ + key: 1, + }) + cancel() + // Deliver errors to test, don't block if we get too many before context + // is cancelled otherwise client 0 can end up blocked as it has so many + // errors during the partition it doesn't actually start writing again + // ever and so the test never sees split-brain writes. + if err != nil { + select { + case <-ctx.Done(): + return + case errCh <- fmt.Errorf("client %d error: %w", i, err): + default: + // errCh is blocked, carry on anyway + } + } else { + // Only increment our set counter here now we've had an ack that the + // update was successful. + atomic.AddUint64(ctr, 1) + atomic.AddUint64(&writeCount, 1) + } + select { + case <-ctx.Done(): + return + case <-time.After(interval): + } + } + } + + logger.Info("==> starting writers") + client0Ctr, client1Ctr := uint64(1), uint64(1) + go runWriter(0, leader, &client0Ctr) + go runWriter(1, notLeader, &client1Ctr) + + // Wait for some writes to have started + var writesBeforePartition uint64 + logger.Info("==> waiting for writes") + for { + time.Sleep(1 * time.Millisecond) + writesBeforePartition = atomic.LoadUint64(&writeCount) + if writesBeforePartition >= 5 { + break + } + // Also check for any write errors + select { + case err := <-errCh: + require.NoError(t, err) + default: + } + require.NoError(t, ctx.Err()) + } + + val, err := kv.Get(ctx, "data") + require.NoError(t, err) + + logger.Info("==> partitioning leader") + // Now partition the leader from everything else (including Consul) + err = leader.(*docker.DockerClusterNode).PartitionFromCluster(ctx) + require.NoError(t, err) + + // Reload this incase more writes occurred before the partition completed. + writesBeforePartition = atomic.LoadUint64(&writeCount) + + // Wait for some more writes to have happened (the client writing to leader + // will probably have sent one and hung waiting for a response but the other + // one should eventually start committing again when new active node is + // elected). + + logger.Info("==> waiting for writes to new leader") + for { + time.Sleep(1 * time.Millisecond) + writesAfterPartition := atomic.LoadUint64(&writeCount) + if (writesAfterPartition - writesBeforePartition) >= 20 { + break + } + // Also check for any write errors or timeouts + select { + case err := <-errCh: + // Don't fail here because we expect writes to the old leader to fail + // eventually or if they need a new connection etc. + logger.Info("failed write", "write_count", writesAfterPartition, "err", err) + default: + } + require.NoError(t, ctx.Err()) + } + + // Heal partition + logger.Info("==> healing partition") + err = leader.(*docker.DockerClusterNode).UnpartitionFromCluster(ctx) + require.NoError(t, err) + + // Wait for old leader to rejoin as a standby and get healthy. + logger.Info("==> wait for old leader to rejoin") + + require.NoError(t, waitUntilNotLeader(ctx, leaderClient, logger)) + + // Stop the writers and wait for them to shut down nicely + logger.Info("==> stopping writers") + cancel() + wg.Wait() + + // Now verify that all Consul data is consistent with only one leader writing. + // Use a new context since we just cancelled the general one + reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + val, err = kv.Get(reqCtx, "data") + require.NoError(t, err) + + // Ensure we have every consecutive key for both client + sets := [][]int{make([]int, 0, 128), make([]int, 0, 128)} + for k := range val.Data { + var cNum, x int + _, err := fmt.Sscanf(k, "c%d-%08d", &cNum, &x) + require.NoError(t, err) + sets[cNum] = append(sets[cNum], x) + } + + // Sort both sets + sort.Ints(sets[0]) + sort.Ints(sets[1]) + + // Ensure they are both complete by creating an expected set and comparing to + // get nice output to debug. Note that make set is an exclusive bound since we + // don't know it the current counter value write completed or not yet so we'll + // only create sets up to one less than that value that we know for sure + // should be present. + c0Writes := int(atomic.LoadUint64(&client0Ctr)) + c1Writes := int(atomic.LoadUint64(&client1Ctr)) + expect0 := makeSet(c0Writes) + expect1 := makeSet(c1Writes) + + // Trim the sets to only the writes we know completed since that's all the + // expected arrays contain. But only if they are longer so we don't change the + // slice length if they are shorter than the expected number. + if len(sets[0]) > c0Writes { + sets[0] = sets[0][0:c0Writes] + } + if len(sets[1]) > c1Writes { + sets[1] = sets[1][0:c1Writes] + } + require.Equal(t, expect0, sets[0], "Client 0 writes lost") + require.Equal(t, expect1, sets[1], "Client 1 writes lost") +} + +func makeSet(n int) []int { + a := make([]int, n) + for i := 0; i < n; i++ { + a[i] = i + } + return a +} + +func waitUntilNotLeader(ctx context.Context, oldLeaderClient *api.Client, logger hclog.Logger) error { + for { + // Wait for the original leader to acknowledge it's not active any more. + resp, err := oldLeaderClient.Sys().LeaderWithContext(ctx) + // Tolerate errors as the old leader is in a difficult place right now. + if err == nil { + if !resp.IsSelf { + // We are not leader! + return nil + } + logger.Info("old leader not ready yet", "IsSelf", resp.IsSelf) + } else { + logger.Info("failed to read old leader status", "err", err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + // Loop again + } + } +} diff --git a/vault/ha.go b/vault/ha.go index 342849fcd..b18d33df0 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -520,6 +520,20 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop continue } + // If the backend is a FencingHABackend, register the lock with it so it can + // correctly fence all writes from now on (i.e. assert that we still hold + // the lock atomically with each write). + if fba, ok := c.ha.(physical.FencingHABackend); ok { + err := fba.RegisterActiveNodeLock(lock) + if err != nil { + // Can't register lock, bail out + c.heldHALock = nil + lock.Unlock() + c.logger.Error("failed registering lock with fencing backend, giving up active state") + continue + } + } + c.logger.Info("acquired lock, enabling active operation") // This is used later to log a metrics event; this can be helpful to @@ -825,7 +839,18 @@ func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct go func() { // Bind locally, as the race detector is tripping here lopCount := opCount - isLeader, _, newClusterAddr, _ := c.Leader() + isLeader, _, newClusterAddr, err := c.Leader() + if err != nil { + // This is debug level because it's not really something the user + // needs to see typically. This will only really fail if we are sealed + // or the HALock fails (e.g. can't connect to Consul or elect raft + // leader) and other things in logs should make those kinds of + // conditions obvious. However when debugging, it is useful to know + // for sure why a standby is not seeing the leadership update which + // could be due to errors being returned or could be due to some other + // bug. + c.logger.Debug("periodicLeaderRefresh fail to fetch leader info", "err", err) + } // If we are the leader reset the clusterAddr since the next // failover might go to the node that was previously active.