backport of commit 0fa36a36ae1b4842d96623eef0d20af5dea557c0 (#23443)

Co-authored-by: Paul Banks <pbanks@hashicorp.com>
This commit is contained in:
hc-github-team-secure-vault-core 2023-10-02 12:49:05 -04:00 committed by GitHub
parent 15e85d26df
commit 28f2585da3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1140 additions and 96 deletions

7
changelog/23013.txt Normal file
View File

@ -0,0 +1,7 @@
```release-note:bug
storage/consul: fix a bug where an active node in a specific sort of network
partition could continue to write data to Consul after a new leader is elected
potentially causing data loss or corruption for keys with many concurrent
writers. For Enterprise clusters this could cause corruption of the merkle trees
leading to failure to complete merkle sync without a full re-index.
```

View File

@ -0,0 +1,70 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul
import (
"context"
"fmt"
"github.com/hashicorp/vault/sdk/helper/testcluster"
)
type ClusterStorage struct {
// Set these after calling `NewConsulClusterStorage` but before `Start` (or
// passing in to NewDockerCluster) to control Consul version specifically in
// your test. Leave empty for latest OSS (defined in consulhelper.go).
ConsulVersion string
ConsulEnterprise bool
cleanup func()
config *Config
}
var _ testcluster.ClusterStorage = &ClusterStorage{}
func NewClusterStorage() *ClusterStorage {
return &ClusterStorage{}
}
func (s *ClusterStorage) Start(ctx context.Context, opts *testcluster.ClusterOptions) error {
prefix := ""
if opts != nil && opts.ClusterName != "" {
prefix = fmt.Sprintf("%s-", opts.ClusterName)
}
cleanup, config, err := RunContainer(ctx, prefix, s.ConsulVersion, s.ConsulEnterprise, true)
if err != nil {
return err
}
s.cleanup = cleanup
s.config = config
return nil
}
func (s *ClusterStorage) Cleanup() error {
if s.cleanup != nil {
s.cleanup()
s.cleanup = nil
}
return nil
}
func (s *ClusterStorage) Opts() map[string]interface{} {
if s.config == nil {
return nil
}
return map[string]interface{}{
"address": s.config.ContainerHTTPAddr,
"token": s.config.Token,
"max_parallel": "32",
}
}
func (s *ClusterStorage) Type() string {
return "consul"
}
func (s *ClusterStorage) Config() *Config {
return s.config
}

View File

@ -5,6 +5,7 @@ package consul
import (
"context"
"fmt"
"os"
"strings"
"testing"
@ -14,9 +15,16 @@ import (
"github.com/hashicorp/vault/sdk/helper/docker"
)
// LatestConsulVersion is the most recent version of Consul which is used unless
// another version is specified in the test config or environment. This will
// probably go stale as we don't always update it on every release but we rarely
// rely on specific new Consul functionality so that's probably not a problem.
const LatestConsulVersion = "1.15.3"
type Config struct {
docker.ServiceHostPort
Token string
ContainerHTTPAddr string
}
func (c *Config) APIConfig() *consulapi.Config {
@ -26,19 +34,39 @@ func (c *Config) APIConfig() *consulapi.Config {
return apiConfig
}
// PrepareTestContainer creates a Consul docker container. If version is empty,
// the Consul version used will be given by the environment variable
// CONSUL_DOCKER_VERSION, or if that's empty, whatever we've hardcoded as the
// the latest Consul version.
// PrepareTestContainer is a test helper that creates a Consul docker container
// or fails the test if unsuccessful. See RunContainer for more details on the
// configuration.
func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config) {
t.Helper()
cleanup, config, err := RunContainer(context.Background(), "", version, isEnterprise, doBootstrapSetup)
if err != nil {
t.Fatalf("failed starting consul: %s", err)
}
return cleanup, config
}
// RunContainer runs Consul in a Docker container unless CONSUL_HTTP_ADDR is
// already found in the environment. Consul version is determined by the version
// argument. If version is empty string, the CONSUL_DOCKER_VERSION environment
// variable is used and if that is empty too, LatestConsulVersion is used
// (defined above). If namePrefix is provided we assume you have chosen a unique
// enough prefix to avoid collision with other tests that may be running in
// parallel and so _do not_ add an additional unique ID suffix. We will also
// ensure previous instances are deleted and leave the container running for
// debugging. This is useful for using Consul as part of at testcluster (i.e.
// when Vault is in Docker too). If namePrefix is empty then a unique suffix is
// added since many older tests rely on a uniq instance of the container. This
// is used by `PrepareTestContainer` which is used typically in tests that rely
// on Consul but run tested code within the test process.
func RunContainer(ctx context.Context, namePrefix, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config, error) {
if retAddress := os.Getenv("CONSUL_HTTP_ADDR"); retAddress != "" {
shp, err := docker.NewServiceHostPortParse(retAddress)
if err != nil {
t.Fatal(err)
return nil, nil, err
}
return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")}
return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")}, nil
}
config := `acl { enabled = true default_policy = "deny" }`
@ -47,7 +75,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
if consulVersion != "" {
version = consulVersion
} else {
version = "1.11.3" // Latest Consul version, update as new releases come out
version = LatestConsulVersion
}
}
if strings.HasPrefix(version, "1.3") {
@ -66,15 +94,18 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
envVars = append(envVars, "CONSUL_LICENSE="+license)
if !hasLicense {
t.Fatalf("Failed to find enterprise license")
return nil, nil, fmt.Errorf("Failed to find enterprise license")
}
}
if namePrefix != "" {
name = namePrefix + name
}
if dockerRepo, hasEnvRepo := os.LookupEnv("CONSUL_DOCKER_REPO"); hasEnvRepo {
repo = dockerRepo
}
runner, err := docker.NewServiceRunner(docker.RunOptions{
dockerOpts := docker.RunOptions{
ContainerName: name,
ImageRepo: repo,
ImageTag: version,
@ -83,12 +114,25 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
Ports: []string{"8500/tcp"},
AuthUsername: os.Getenv("CONSUL_DOCKER_USERNAME"),
AuthPassword: os.Getenv("CONSUL_DOCKER_PASSWORD"),
})
if err != nil {
t.Fatalf("Could not start docker Consul: %s", err)
}
svc, err := runner.StartService(context.Background(), func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) {
// Add a unique suffix if there is no per-test prefix provided
addSuffix := true
if namePrefix != "" {
// Don't add a suffix if the caller already provided a prefix
addSuffix = false
// Also enable predelete and non-removal to make debugging easier for test
// cases with named containers).
dockerOpts.PreDelete = true
dockerOpts.DoNotAutoRemove = true
}
runner, err := docker.NewServiceRunner(dockerOpts)
if err != nil {
return nil, nil, fmt.Errorf("Could not start docker Consul: %s", err)
}
svc, _, err := runner.StartNewService(ctx, addSuffix, false, func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) {
shp := docker.NewServiceHostPort(host, port)
apiConfig := consulapi.DefaultNonPooledConfig()
apiConfig.Address = shp.Address()
@ -165,7 +209,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
}
}
// Configure a namespace and parition if testing enterprise Consul
// Configure a namespace and partition if testing enterprise Consul
if isEnterprise {
// Namespaces require Consul 1.7 or newer
namespaceVersion, _ := goversion.NewVersion("1.7")
@ -229,8 +273,20 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
}, nil
})
if err != nil {
t.Fatalf("Could not start docker Consul: %s", err)
return nil, nil, err
}
return svc.Cleanup, svc.Config.(*Config)
// Find the container network info.
if len(svc.Container.NetworkSettings.Networks) < 1 {
svc.Cleanup()
return nil, nil, fmt.Errorf("failed to find any network settings for container")
}
cfg := svc.Config.(*Config)
for _, eps := range svc.Container.NetworkSettings.Networks {
// Just pick the first network, we assume only one for now.
// Pull out the real container IP and set that up
cfg.ContainerHTTPAddr = fmt.Sprintf("http://%s:8500", eps.IPAddress)
break
}
return svc.Cleanup, cfg, nil
}

View File

@ -4,11 +4,11 @@
package consul
import (
"sync"
realtesting "testing"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/testhelpers/consul"
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
physConsul "github.com/hashicorp/vault/physical/consul"
"github.com/hashicorp/vault/vault"
"github.com/mitchellh/go-testing-interface"
@ -33,5 +33,93 @@ func MakeConsulBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendB
}
func ConsulBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
opts.PhysicalFactory = teststorage.SharedPhysicalFactory(MakeConsulBackend)
m := &consulContainerManager{}
opts.PhysicalFactory = m.Backend
}
// consulContainerManager exposes Backend which matches the PhysicalFactory func
// type. When called, it will ensure that a separate Consul container is started
// for each distinct vault cluster that calls it and ensures that each Vault
// core gets a separate Consul backend instance since that contains state
// related to lock sessions. The whole test framework doesn't have a concept of
// "cluster names" outside of the prefix attached to the logger and other
// backend factories, mostly via SharedPhysicalFactory currently implicitly rely
// on being called in a sequence of core 0, 1, 2,... on one cluster and then
// core 0, 1, 2... on the next and so on. Refactoring lots of things to make
// first-class cluster identifiers a thing seems like a heavy lift given that we
// already rely on sequence of calls everywhere else anyway so we do the same
// here - each time the Backend method is called with coreIdx == 0 we create a
// whole new Consul and assume subsequent non 0 index cores are in the same
// cluster.
type consulContainerManager struct {
mu sync.Mutex
current *consulContainerBackendFactory
}
func (m *consulContainerManager) Backend(t testing.T, coreIdx int,
logger hclog.Logger, conf map[string]interface{},
) *vault.PhysicalBackendBundle {
m.mu.Lock()
if coreIdx == 0 || m.current == nil {
// Create a new consul container factory
m.current = &consulContainerBackendFactory{}
}
f := m.current
m.mu.Unlock()
return f.Backend(t, coreIdx, logger, conf)
}
type consulContainerBackendFactory struct {
mu sync.Mutex
refCount int
cleanupFn func()
config map[string]string
}
func (f *consulContainerBackendFactory) Backend(t testing.T, coreIdx int,
logger hclog.Logger, conf map[string]interface{},
) *vault.PhysicalBackendBundle {
f.mu.Lock()
defer f.mu.Unlock()
if f.refCount == 0 {
f.startContainerLocked(t)
logger.Debug("started consul container", "clusterID", conf["cluster_id"],
"address", f.config["address"])
}
f.refCount++
consulBackend, err := physConsul.NewConsulBackend(f.config, logger.Named("consul"))
if err != nil {
t.Fatal(err)
}
return &vault.PhysicalBackendBundle{
Backend: consulBackend,
Cleanup: f.cleanup,
}
}
func (f *consulContainerBackendFactory) startContainerLocked(t testing.T) {
cleanup, config := consul.PrepareTestContainer(t.(*realtesting.T), "", false, true)
f.config = map[string]string{
"address": config.Address(),
"token": config.Token,
"max_parallel": "32",
}
f.cleanupFn = cleanup
}
func (f *consulContainerBackendFactory) cleanup() {
f.mu.Lock()
defer f.mu.Unlock()
if f.refCount < 1 || f.cleanupFn == nil {
return
}
f.refCount--
if f.refCount == 0 {
f.cleanupFn()
f.cleanupFn = nil
}
}

View File

@ -10,6 +10,7 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -41,7 +42,7 @@ const (
// Verify ConsulBackend satisfies the correct interfaces
var (
_ physical.Backend = (*ConsulBackend)(nil)
_ physical.HABackend = (*ConsulBackend)(nil)
_ physical.FencingHABackend = (*ConsulBackend)(nil)
_ physical.Lock = (*ConsulLock)(nil)
_ physical.Transactional = (*ConsulBackend)(nil)
@ -53,6 +54,7 @@ var (
// it allows Vault to run on multiple machines in a highly-available manner.
// failGetInTxn is only used in tests.
type ConsulBackend struct {
logger log.Logger
client *api.Client
path string
kv *api.KV
@ -62,6 +64,7 @@ type ConsulBackend struct {
sessionTTL string
lockWaitTime time.Duration
failGetInTxn *uint32
activeNodeLock atomic.Pointer[ConsulLock]
}
// NewConsulBackend constructs a Consul backend using the given API client
@ -152,6 +155,7 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe
// Set up the backend
c := &ConsulBackend{
logger: logger,
path: path,
client: client,
kv: client.KV(),
@ -262,12 +266,53 @@ func (c *ConsulBackend) ExpandedCapabilitiesAvailable(ctx context.Context) bool
return available
}
func (c *ConsulBackend) writeTxnOps(ctx context.Context, len int) ([]*api.TxnOp, string) {
if len < 1 {
len = 1
}
ops := make([]*api.TxnOp, 0, len+1)
// If we don't have a lock yet, return a transaction with no session check. We
// need to do this to allow writes during cluster initialization before there
// is an active node.
lock := c.activeNodeLock.Load()
if lock == nil {
return ops, ""
}
lockKey, lockSession := lock.Info()
if lockKey == "" || lockSession == "" {
return ops, ""
}
// If the context used to write has been marked as a special case write that
// happens outside of a lock then don't add the session check.
if physical.IsUnfencedWrite(ctx) {
return ops, ""
}
// Insert the session check operation at index 0. This will allow us later to
// work out easily if a write failure is because of the session check.
ops = append(ops, &api.TxnOp{
KV: &api.KVTxnOp{
Verb: api.KVCheckSession,
Key: lockKey,
Session: lockSession,
},
})
return ops, lockSession
}
// Transaction is used to run multiple entries via a transaction.
func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
return c.txnInternal(ctx, txns, "transaction")
}
func (c *ConsulBackend) txnInternal(ctx context.Context, txns []*physical.TxnEntry, apiOpName string) error {
if len(txns) == 0 {
return nil
}
defer metrics.MeasureSince([]string{"consul", "transaction"}, time.Now())
defer metrics.MeasureSince([]string{"consul", apiOpName}, time.Now())
failGetInTxn := atomic.LoadUint32(c.failGetInTxn)
for _, t := range txns {
@ -276,7 +321,7 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt
}
}
ops := make([]*api.TxnOp, 0, len(txns))
ops, sessionID := c.writeTxnOps(ctx, len(txns))
for _, t := range txns {
o, err := c.makeApiTxn(t)
if err != nil {
@ -302,14 +347,15 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt
}
return err
}
if ok && len(resp.Errors) == 0 {
// Loop over results and cache them in a map. Note that we're only caching the first time we see a key,
// which _should_ correspond to a Get operation, since we expect those come first in our txns slice.
// Loop over results and cache them in a map. Note that we're only caching
// the first time we see a key, which _should_ correspond to a Get
// operation, since we expect those come first in our txns slice (though
// after check-session).
for _, txnr := range resp.Results {
if len(txnr.KV.Value) > 0 {
// We need to trim the Consul kv path (typically "vault/") from the key otherwise it won't
// match the transaction entries we have.
// We need to trim the Consul kv path (typically "vault/") from the key
// otherwise it won't match the transaction entries we have.
key := strings.TrimPrefix(txnr.KV.Key, c.path)
if _, found := kvMap[key]; !found {
kvMap[key] = txnr.KV.Value
@ -321,6 +367,31 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt
if len(resp.Errors) > 0 {
for _, res := range resp.Errors {
retErr = multierror.Append(retErr, errors.New(res.What))
if res.OpIndex == 0 && sessionID != "" {
// We added a session check (sessionID not empty) so an error at OpIndex
// 0 means that we failed that session check. We don't attempt to string
// match because Consul can return at least three different errors here
// with no common string. In all cases though failing this check means
// we no longer hold the lock because it was released, modified or
// deleted. Rather than just continuing to try writing until the
// blocking query manages to notice we're no longer the lock holder
// (which can take 10s of seconds even in good network conditions in my
// testing) we can now Unlock directly here. Our ConsulLock now has a
// shortcut that will cause the lock to close the leaderCh immediately
// when we call without waiting for the blocking query to return (unlike
// Consul's current Lock implementation). But before we unlock, we
// should re-load the lock and ensure it's still the same instance we
// just tried to write with in case this goroutine is somehow really
// delayed and we actually acquired a whole new lock in the meantime!
lock := c.activeNodeLock.Load()
if lock != nil {
_, lockSessionID := lock.Info()
if sessionID == lockSessionID {
c.logger.Warn("session check failed on write, we lost active node lock, stepping down", "err", res.What)
lock.Unlock()
}
}
}
}
}
@ -361,27 +432,13 @@ func (c *ConsulBackend) makeApiTxn(txn *physical.TxnEntry) (*api.TxnOp, error) {
// Put is used to insert or update an entry
func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"consul", "put"}, time.Now())
c.permitPool.Acquire()
defer c.permitPool.Release()
pair := &api.KVPair{
Key: c.path + entry.Key,
Value: entry.Value,
txns := []*physical.TxnEntry{
{
Operation: physical.PutOperation,
Entry: entry,
},
}
writeOpts := &api.WriteOptions{}
writeOpts = writeOpts.WithContext(ctx)
_, err := c.kv.Put(pair, writeOpts)
if err != nil {
if strings.Contains(err.Error(), "Value exceeds") {
return fmt.Errorf("%s: %w", physical.ErrValueTooLarge, err)
}
return err
}
return nil
return c.txnInternal(ctx, txns, "put")
}
// Get is used to fetch an entry
@ -414,16 +471,15 @@ func (c *ConsulBackend) Get(ctx context.Context, key string) (*physical.Entry, e
// Delete is used to permanently delete an entry
func (c *ConsulBackend) Delete(ctx context.Context, key string) error {
defer metrics.MeasureSince([]string{"consul", "delete"}, time.Now())
c.permitPool.Acquire()
defer c.permitPool.Release()
writeOpts := &api.WriteOptions{}
writeOpts = writeOpts.WithContext(ctx)
_, err := c.kv.Delete(c.path+key, writeOpts)
return err
txns := []*physical.TxnEntry{
{
Operation: physical.DeleteOperation,
Entry: &physical.Entry{
Key: key,
},
},
}
return c.txnInternal(ctx, txns, "delete")
}
// List is used to list all the keys under a given
@ -463,24 +519,14 @@ func (c *ConsulBackend) FailGetInTxn(fail bool) {
// LockWith is used for mutual exclusion based on the given key.
func (c *ConsulBackend) LockWith(key, value string) (physical.Lock, error) {
// Create the lock
opts := &api.LockOptions{
Key: c.path + key,
Value: []byte(value),
SessionName: "Vault Lock",
MonitorRetries: 5,
SessionTTL: c.sessionTTL,
LockWaitTime: c.lockWaitTime,
}
lock, err := c.client.LockOpts(opts)
if err != nil {
return nil, fmt.Errorf("failed to create lock: %w", err)
}
cl := &ConsulLock{
logger: c.logger,
client: c.client,
key: c.path + key,
lock: lock,
value: value,
consistencyMode: c.consistencyMode,
sessionTTL: c.sessionTTL,
lockWaitTime: c.lockWaitTime,
}
return cl, nil
}
@ -505,20 +551,203 @@ func (c *ConsulBackend) DetectHostAddr() (string, error) {
return addr, nil
}
// ConsulLock is used to provide the Lock interface backed by Consul
// RegisterActiveNodeLock is called after active node lock is obtained to allow
// us to fence future writes.
func (c *ConsulBackend) RegisterActiveNodeLock(l physical.Lock) error {
cl, ok := l.(*ConsulLock)
if !ok {
return fmt.Errorf("invalid Lock type")
}
c.activeNodeLock.Store(cl)
key, sessionID := cl.Info()
c.logger.Info("registered active node lock", "key", key, "sessionID", sessionID)
return nil
}
// ConsulLock is used to provide the Lock interface backed by Consul. We work
// around some limitations of Consuls api.Lock noted in
// https://github.com/hashicorp/consul/issues/18271 by creating and managing the
// session ourselves, while using Consul's Lock to do the heavy lifting.
type ConsulLock struct {
logger log.Logger
client *api.Client
key string
lock *api.Lock
value string
consistencyMode string
sessionTTL string
lockWaitTime time.Duration
mu sync.Mutex // protects session state
session *lockSession
// sessionID is a copy of the value from session.id. We use a separate field
// because `Info` needs to keep returning the same sessionID after Unlock has
// cleaned up the session state so that we continue to fence any writes still
// in flight after the lock is Unlocked. It's easier to reason about that as a
// separate field rather than keeping an already-terminated session object
// around. Once Lock is called again this will be replaced (while mu is
// locked) with the new session ID. Must hold mu to read or write this.
sessionID string
}
type lockSession struct {
// id is immutable after the session is created so does not need mu held
id string
// mu protects the lock and unlockCh to ensure they are only cleaned up once
mu sync.Mutex
lock *api.Lock
unlockCh chan struct{}
}
func (s *lockSession) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
lockHeld := false
defer func() {
if !lockHeld {
s.cleanupLocked()
}
}()
consulLeaderCh, err := s.lock.Lock(stopCh)
if err != nil {
return nil, err
}
if consulLeaderCh == nil {
// If both leaderCh and err are nil from Consul's Lock then it means we
// waited for the lockWait without grabbing it.
return nil, nil
}
// We got the Lock, monitor it!
lockHeld = true
leaderCh := make(chan struct{})
go s.monitorLock(leaderCh, s.unlockCh, consulLeaderCh)
return leaderCh, nil
}
// monitorLock waits for either unlockCh or consulLeaderCh to close and then
// closes leaderCh. It's designed to be run in a separate goroutine. Note that
// we pass unlockCh rather than accessing it via the member variable because it
// is mutated under the lock during Unlock so reading it from c could be racy.
// We just need the chan created at the call site here so we pass it instead of
// locking and unlocking in here.
func (s *lockSession) monitorLock(leaderCh chan struct{}, unlockCh, consulLeaderCh <-chan struct{}) {
select {
case <-unlockCh:
case <-consulLeaderCh:
}
// We lost the lock. Close the leaderCh
close(leaderCh)
// Whichever chan closed, cleanup to unwind all the state. If we were
// triggered by a cleanup call this will be a no-op, but if not it ensures all
// state is cleaned up correctly.
s.cleanup()
}
func (s *lockSession) cleanup() {
s.mu.Lock()
defer s.mu.Unlock()
s.cleanupLocked()
}
func (s *lockSession) cleanupLocked() {
if s.lock != nil {
s.lock.Unlock()
s.lock = nil
}
if s.unlockCh != nil {
close(s.unlockCh)
s.unlockCh = nil
}
// Don't bother destroying sessions as they will be destroyed after TTL
// anyway.
}
func (c *ConsulLock) createSession() (*lockSession, error) {
se := &api.SessionEntry{
Name: "Vault Lock",
TTL: c.sessionTTL,
// We use Consul's default LockDelay of 15s by not specifying it
}
session, _, err := c.client.Session().Create(se, nil)
if err != nil {
return nil, err
}
opts := &api.LockOptions{
Key: c.key,
Value: []byte(c.value),
Session: session,
MonitorRetries: 5,
LockWaitTime: c.lockWaitTime,
SessionTTL: c.sessionTTL,
}
lock, err := c.client.LockOpts(opts)
if err != nil {
// Don't bother destroying sessions as they will be destroyed after TTL
// anyway.
return nil, fmt.Errorf("failed to create lock: %w", err)
}
unlockCh := make(chan struct{})
s := &lockSession{
id: session,
lock: lock,
unlockCh: unlockCh,
}
// Start renewals of the session
go func() {
// Note we capture unlockCh here rather than s.unlockCh because s.unlockCh
// is mutated on cleanup which is racy since we don't hold a lock here.
// unlockCh will never be mutated though.
err := c.client.Session().RenewPeriodic(c.sessionTTL, session, nil, unlockCh)
if err != nil {
c.logger.Error("failed to renew consul session for more than the TTL, lock lost", "err", err)
}
// release other resources for this session only i.e. don't c.Unlock as that
// might now be locked under a different session).
s.cleanup()
}()
return s, nil
}
func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
return c.lock.Lock(stopCh)
c.mu.Lock()
defer c.mu.Unlock()
if c.session != nil {
return nil, fmt.Errorf("lock instance already locked")
}
session, err := c.createSession()
if err != nil {
return nil, err
}
leaderCh, err := session.Lock(stopCh)
if leaderCh != nil && err == nil {
// We hold the lock, store the session
c.session = session
c.sessionID = session.id
}
return leaderCh, err
}
func (c *ConsulLock) Unlock() error {
return c.lock.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
if c.session != nil {
c.session.cleanup()
c.session = nil
// Don't clear c.sessionID since we rely on returning the same old ID after
// Unlock until the next Lock.
}
return nil
}
func (c *ConsulLock) Value() (bool, string, error) {
@ -538,7 +767,18 @@ func (c *ConsulLock) Value() (bool, string, error) {
if pair == nil {
return false, "", nil
}
// Note that held is expected to mean "does _any_ node hold the lock" not
// "does this current instance hold the lock" so although we know what our own
// session ID is, we don't check it matches here only that there is _some_
// session in Consul holding the lock right now.
held := pair.Session != ""
value := string(pair.Value)
return held, value, nil
}
func (c *ConsulLock) Info() (key, sessionid string) {
c.mu.Lock()
defer c.mu.Unlock()
return c.key, c.sessionID
}

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/vault/helper/testhelpers/consul"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/physical"
"github.com/stretchr/testify/require"
)
func TestConsul_newConsulBackend(t *testing.T) {
@ -442,7 +443,9 @@ func TestConsulHABackend(t *testing.T) {
t.Fatalf("err: %v", err)
}
randPath := fmt.Sprintf("vault-%d/", time.Now().Unix())
// We used to use a timestamp here but then if you run multiple instances in
// parallel with one Consul they end up conflicting.
randPath := fmt.Sprintf("vault-%d/", rand.Int())
defer func() {
client.KV().DeleteTree(randPath, nil)
}()
@ -453,6 +456,10 @@ func TestConsulHABackend(t *testing.T) {
"token": config.Token,
"path": randPath,
"max_parallel": "-1",
// We have to wait this out as part of the test so shorten it a little from
// the default 15 seconds helps with test run times, especially when running
// this in a loop to detect flakes!
"lock_wait_time": "3s",
}
b, err := NewConsulBackend(backendConfig, logger)
@ -478,4 +485,44 @@ func TestConsulHABackend(t *testing.T) {
if host == "" {
t.Fatalf("bad addr: %v", host)
}
// Calling `Info` on a Lock that has been unlocked must still return the old
// sessionID (until it is locked again) otherwise we will fail to fence writes
// that are still in flight from before (e.g. queued WAL or Merkle flushes) as
// soon as the first one unlocks the session allowing corruption again.
l, err := b.(physical.HABackend).LockWith("test-lock-session-info", "bar")
require.NoError(t, err)
expectKey := randPath + "test-lock-session-info"
cl := l.(*ConsulLock)
stopCh := make(chan struct{})
time.AfterFunc(5*time.Second, func() {
close(stopCh)
})
leaderCh, err := cl.Lock(stopCh)
require.NoError(t, err)
require.NotNil(t, leaderCh)
key, sid := cl.Info()
require.Equal(t, expectKey, key)
require.NotEmpty(t, sid)
// Now Unlock the lock, sessionID should be reset to empty string
err = cl.Unlock()
require.NoError(t, err)
key2, sid2 := cl.Info()
require.Equal(t, key, key2)
require.Equal(t, sid, sid2)
// Lock it again, this should cause a new session to be created so SID should
// change.
leaderCh, err = cl.Lock(stopCh)
require.NoError(t, err)
require.NotNil(t, leaderCh)
key3, sid3 := cl.Info()
require.Equal(t, key, key3)
require.NotEqual(t, sid, sid3)
}

View File

@ -55,7 +55,6 @@ const MaxClusterNameLength = 52
type DockerCluster struct {
ClusterName string
RaftStorage bool
ClusterNodes []*DockerClusterNode
// Certificate fields
@ -73,6 +72,8 @@ type DockerCluster struct {
ID string
Logger log.Logger
builtTags map[string]struct{}
storage testcluster.ClusterStorage
}
func (dc *DockerCluster) NamedLogger(s string) log.Logger {
@ -407,9 +408,6 @@ func NewTestDockerCluster(t *testing.T, opts *DockerClusterOptions) *DockerClust
if opts.NetworkName == "" {
opts.NetworkName = os.Getenv("TEST_DOCKER_NETWORK_NAME")
}
if opts.VaultLicense == "" {
opts.VaultLicense = os.Getenv(testcluster.EnvVaultLicenseCI)
}
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
t.Cleanup(cancel)
@ -434,14 +432,17 @@ func NewDockerCluster(ctx context.Context, opts *DockerClusterOptions) (*DockerC
if opts.Logger == nil {
opts.Logger = log.NewNullLogger()
}
if opts.VaultLicense == "" {
opts.VaultLicense = os.Getenv(testcluster.EnvVaultLicenseCI)
}
dc := &DockerCluster{
DockerAPI: api,
RaftStorage: true,
ClusterName: opts.ClusterName,
Logger: opts.Logger,
builtTags: map[string]struct{}{},
CA: opts.CA,
storage: opts.Storage,
}
if err := dc.setupDockerCluster(ctx, opts); err != nil {
@ -588,21 +589,31 @@ func (n *DockerClusterNode) Start(ctx context.Context, opts *DockerClusterOption
vaultCfg["telemetry"] = map[string]interface{}{
"disable_hostname": true,
}
raftOpts := map[string]interface{}{
// Setup storage. Default is raft.
storageType := "raft"
storageOpts := map[string]interface{}{
// TODO add options from vnc
"path": "/vault/file",
"node_id": n.NodeID,
}
vaultCfg["storage"] = map[string]interface{}{
"raft": raftOpts,
if opts.Storage != nil {
storageType = opts.Storage.Type()
storageOpts = opts.Storage.Opts()
}
if opts != nil && opts.VaultNodeConfig != nil && len(opts.VaultNodeConfig.StorageOptions) > 0 {
if opts != nil && opts.VaultNodeConfig != nil {
for k, v := range opts.VaultNodeConfig.StorageOptions {
if _, ok := raftOpts[k].(string); !ok {
raftOpts[k] = v
if _, ok := storageOpts[k].(string); !ok {
storageOpts[k] = v
}
}
}
vaultCfg["storage"] = map[string]interface{}{
storageType: storageOpts,
}
//// disable_mlock is required for working in the Docker environment with
//// custom plugins
vaultCfg["disable_mlock"] = true
@ -817,6 +828,72 @@ func (n *DockerClusterNode) AddNetworkDelay(ctx context.Context, delay time.Dura
return nil
}
// PartitionFromCluster will cause the node to be disconnected at the network
// level from the rest of the docker cluster. It does so in a way that the node
// will not see TCP RSTs and all packets it sends will be "black holed". It
// attempts to keep packets to and from the host intact which allows docker
// daemon to continue streaming logs and any test code to continue making
// requests from the host to the partitioned node.
func (n *DockerClusterNode) PartitionFromCluster(ctx context.Context) error {
stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{
"/bin/sh",
"-xec", strings.Join([]string{
fmt.Sprintf("echo partitioning container from network"),
"apk add iproute2",
// Get the gateway address for the bridge so we can allow host to
// container traffic still.
"GW=$(ip r | grep default | grep eth0 | cut -f 3 -d' ')",
// First delete the rules in case this is called twice otherwise we'll add
// multiple copies and only remove one in Unpartition (yay iptables).
// Ignore the error if it didn't exist.
"iptables -D INPUT -i eth0 ! -s \"$GW\" -j DROP | true",
"iptables -D OUTPUT -o eth0 ! -d \"$GW\" -j DROP | true",
// Add rules to drop all packets in and out of the docker network
// connection.
"iptables -I INPUT -i eth0 ! -s \"$GW\" -j DROP",
"iptables -I OUTPUT -o eth0 ! -d \"$GW\" -j DROP",
}, "; "),
})
if err != nil {
return err
}
n.Logger.Trace(string(stdout))
n.Logger.Trace(string(stderr))
if exitCode != 0 {
return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode)
}
return nil
}
// UnpartitionFromCluster reverses a previous call to PartitionFromCluster and
// restores full connectivity. Currently assumes the default "bridge" network.
func (n *DockerClusterNode) UnpartitionFromCluster(ctx context.Context) error {
stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{
"/bin/sh",
"-xec", strings.Join([]string{
fmt.Sprintf("echo un-partitioning container from network"),
// Get the gateway address for the bridge so we can allow host to
// container traffic still.
"GW=$(ip r | grep default | grep eth0 | cut -f 3 -d' ')",
// Remove the rules, ignore if they are not present or iptables wasn't
// installed yet (i.e. no-one called PartitionFromCluster yet).
"iptables -D INPUT -i eth0 ! -s \"$GW\" -j DROP | true",
"iptables -D OUTPUT -o eth0 ! -d \"$GW\" -j DROP | true",
}, "; "),
})
if err != nil {
return err
}
n.Logger.Trace(string(stdout))
n.Logger.Trace(string(stderr))
if exitCode != 0 {
return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode)
}
return nil
}
type LogConsumerWriter struct {
consumer func(string)
}
@ -844,6 +921,7 @@ type DockerClusterOptions struct {
VaultBinary string
Args []string
StartProbe func(*api.Client) error
Storage testcluster.ClusterStorage
}
func ensureLeaderMatches(ctx context.Context, client *api.Client, ready func(response *api.LeaderResponse) error) error {
@ -904,6 +982,12 @@ func (dc *DockerCluster) setupDockerCluster(ctx context.Context, opts *DockerClu
dc.RootCAs = x509.NewCertPool()
dc.RootCAs.AddCert(dc.CA.CACert)
if dc.storage != nil {
if err := dc.storage.Start(ctx, &opts.ClusterOptions); err != nil {
return err
}
}
for i := 0; i < numCores; i++ {
if err := dc.addNode(ctx, opts); err != nil {
return err
@ -964,6 +1048,11 @@ func (dc *DockerCluster) addNode(ctx context.Context, opts *DockerClusterOptions
}
func (dc *DockerCluster) joinNode(ctx context.Context, nodeIdx int, leaderIdx int) error {
if dc.storage != nil && dc.storage.Type() != "raft" {
// Storage is not raft so nothing to do but unseal.
return testcluster.UnsealNode(ctx, dc, nodeIdx)
}
leader := dc.ClusterNodes[leaderIdx]
if nodeIdx >= len(dc.ClusterNodes) {

View File

@ -4,6 +4,7 @@
package testcluster
import (
"context"
"crypto/ecdsa"
"crypto/tls"
"crypto/x509"
@ -110,3 +111,10 @@ type CA struct {
CAKey *ecdsa.PrivateKey
CAKeyPEM []byte
}
type ClusterStorage interface {
Start(context.Context, *ClusterOptions) error
Cleanup() error
Opts() map[string]interface{}
Type() string
}

View File

@ -158,6 +158,20 @@ func NodeHealthy(ctx context.Context, cluster VaultCluster, nodeIdx int) error {
}
func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) {
// Be robust to multiple nodes thinking they are active. This is possible in
// certain network partition situations where the old leader has not
// discovered it's lost leadership yet. In tests this is only likely to come
// up when we are specifically provoking it, but it's possible it could happen
// at any point if leadership flaps of connectivity suffers transient errors
// etc. so be robust against it. The best solution would be to have some sort
// of epoch like the raft term that is guaranteed to be monotonically
// increasing through elections, however we don't have that abstraction for
// all HABackends in general. The best we have is the ActiveTime. In a
// distributed systems text book this would be bad to rely on due to clock
// sync issues etc. but for our tests it's likely fine because even if we are
// running separate Vault containers, they are all using the same hardware
// clock in the system.
leaderActiveTimes := make(map[int]time.Time)
for i, node := range cluster.Nodes() {
client := node.APIClient()
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
@ -166,10 +180,24 @@ func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) {
if err != nil || resp == nil || !resp.IsSelf {
continue
}
return i, nil
leaderActiveTimes[i] = resp.ActiveTime
}
if len(leaderActiveTimes) == 0 {
return -1, fmt.Errorf("no leader found")
}
// At least one node thinks it is active. If multiple, pick the one with the
// most recent ActiveTime. Note if there is only one then this just returns
// it.
var newestLeaderIdx int
var newestActiveTime time.Time
for i, at := range leaderActiveTimes {
if at.After(newestActiveTime) {
newestActiveTime = at
newestLeaderIdx = i
}
}
return newestLeaderIdx, nil
}
func WaitForActiveNode(ctx context.Context, cluster VaultCluster) (int, error) {
for ctx.Err() == nil {
@ -189,7 +217,8 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
// A sleep before calling WaitForActiveNodeAndPerfStandbys seems to sort
// things out, but so apparently does this. We should be able to eliminate
// this call to WaitForActiveNode by reworking the logic in this method.
if _, err := WaitForActiveNode(ctx, cluster); err != nil {
leaderIdx, err := WaitForActiveNode(ctx, cluster)
if err != nil {
return err
}
@ -203,7 +232,7 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
if err != nil {
return err
}
leaderClient := cluster.Nodes()[0].APIClient()
leaderClient := cluster.Nodes()[leaderIdx].APIClient()
for ctx.Err() == nil {
err = leaderClient.Sys().MountWithContext(ctx, mountPoint, &api.MountInput{
@ -244,6 +273,7 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
var leader *api.LeaderResponse
leader, err = client.Sys().LeaderWithContext(ctx)
if err != nil {
logger.Trace("waiting for core", "core", coreNo, "err", err)
continue
}
switch {
@ -261,6 +291,12 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
atomic.AddInt64(&standbys, 1)
return
}
default:
logger.Trace("waiting for core", "core", coreNo,
"ha_enabled", leader.HAEnabled,
"is_self", leader.IsSelf,
"perf_standby", leader.PerfStandby,
"perf_standby_remote_wal", leader.PerfStandbyLastRemoteWAL)
}
}
}(i)

View File

@ -60,6 +60,69 @@ type HABackend interface {
HAEnabled() bool
}
// FencingHABackend is an HABackend which provides the additional guarantee that
// each Lock it returns from LockWith is also a FencingLock. A FencingLock
// provides a mechanism to retrieve a fencing token that can be included by
// future writes by the backend to ensure that it is still the current lock
// holder at the time the write commits. Without this timing might allow a lock
// holder not to notice it's no longer the active node for long enough for it to
// write data to storage even while a new active node is writing causing
// corruption. For Consul backend the fencing token is the session id which is
// submitted with `check-session` operation on each write to ensure the write
// only completes if the session is still holding the lock. For raft backend
// this isn't needed because our in-process raft library is unable to write if
// it's not the leader anyway.
//
// If you implement this, Vault will call RegisterActiveNodeLock with the Lock
// instance returned by LockWith after it successfully locks it. This keeps the
// backend oblivious to the specific key we use for active node locks and allows
// potential future usage of locks for other purposes in the future.
//
// Note that all implementations must support writing to storage before
// RegisterActiveNodeLock is called to support initialization of a new cluster.
// They must also skip fencing writes if the write's Context contains a special
// value. This is necessary to allow Vault to clear and re-initialise secondary
// clusters even though there is already an active node with a specific lock
// session since we clear the cluster while Vault is sealed and clearing the
// data might remove the lock in some storages (e.g. Consul). As noted above
// it's not generally safe to allow unfenced writes after a lock so instead we
// special case just a few types of writes that only happen rarely while the
// cluster is sealed. See the IsUnfencedWrite helper function.
type FencingHABackend interface {
HABackend
RegisterActiveNodeLock(l Lock) error
}
// unfencedWriteContextKeyType is a special type to identify context values to
// disable fencing. It's a separate type per the best-practice in Context.Value
// docs to avoid collisions even if the key might match.
type unfencedWriteContextKeyType string
const (
// unfencedWriteContextKey is the context key we pass the option to bypass
// fencing through to a FencingHABackend. Note that this is not an ideal use
// of context values and violates the "do not use it for optional arguments"
// guidance but has been agreed as a pragmatic option for this case rather
// than needing to specialize every physical.Backend to understand this
// option.
unfencedWriteContextKey unfencedWriteContextKeyType = "vault-disable-fencing"
)
// UnfencedWriteCtx adds metadata to a ctx such that any writes performed
// directly on a FencingHABackend using that context will _not_ add a fencing
// token.
func UnfencedWriteCtx(ctx context.Context) context.Context {
return context.WithValue(ctx, unfencedWriteContextKey, true)
}
// IsUnfencedWrite returns whether or not the context passed has the unfenced
// flag value set.
func IsUnfencedWrite(ctx context.Context) bool {
isUnfenced, ok := ctx.Value(unfencedWriteContextKey).(bool)
return ok && isUnfenced
}
// ToggleablePurgemonster is an interface for backends that can toggle on or
// off special functionality and/or support purging. This is only used for the
// cache, don't use it for other things.
@ -86,7 +149,7 @@ type Lock interface {
// Unlock is used to release the lock
Unlock() error
// Returns the value of the lock and if it is held
// Returns the value of the lock and if it is held by _any_ node
Value() (bool, string, error)
}

View File

@ -9,6 +9,8 @@ import (
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func ExerciseBackend(t testing.TB, b Backend) {
@ -330,12 +332,25 @@ func ExerciseHABackend(t testing.TB, b HABackend, b2 HABackend) {
t.Errorf("expected value bar: %v", err)
}
// Check if it's fencing that we can register the lock
if fba, ok := b.(FencingHABackend); ok {
require.NoError(t, fba.RegisterActiveNodeLock(lock))
}
// Second acquisition should fail
lock2, err := b2.LockWith("foo", "baz")
if err != nil {
t.Fatalf("lock 2: %v", err)
}
// Checking the lock from b2 should discover that the lock is held since held
// implies only that there is _some_ leader not that b2 is leader (this was
// not clear before so we make it explicit with this assertion).
held2, val2, err := lock2.Value()
require.NoError(t, err)
require.Equal(t, "bar", val2)
require.True(t, held2)
// Cancel attempt in 50 msec
stopCh := make(chan struct{})
time.AfterFunc(50*time.Millisecond, func() {
@ -363,6 +378,11 @@ func ExerciseHABackend(t testing.TB, b HABackend, b2 HABackend) {
t.Errorf("should get leaderCh")
}
// Check if it's fencing that we can register the lock
if fba2, ok := b2.(FencingHABackend); ok {
require.NoError(t, fba2.RegisterActiveNodeLock(lock))
}
// Check the value
held, val, err = lock2.Value()
if err != nil {

View File

@ -0,0 +1,295 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul_fencing
import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/helper/testhelpers/consul"
"github.com/hashicorp/vault/sdk/helper/testcluster"
"github.com/hashicorp/vault/sdk/helper/testcluster/docker"
"github.com/stretchr/testify/require"
)
// TestConsulFencing_PartitionedLeaderCantWrite attempts to create an active
// node split-brain when using Consul storage to ensure the old leader doesn't
// continue to write data potentially corrupting storage. It is naturally
// non-deterministic since it relies heavily on timing of the different
// container processes, however it pretty reliably failed before the fencing fix
// (and Consul lock improvements) and should _never_ fail now we correctly fence
// writes.
func TestConsulFencing_PartitionedLeaderCantWrite(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
consulStorage := consul.NewClusterStorage()
// Create cluster logger that will dump cluster logs to stdout for debugging.
logger := hclog.NewInterceptLogger(hclog.DefaultOptions)
logger.SetLevel(hclog.Trace)
clusterOpts := docker.DefaultOptions(t)
clusterOpts.ImageRepo = "hashicorp/vault-enterprise"
clusterOpts.ClusterOptions.Logger = logger
clusterOpts.Storage = consulStorage
logger.Info("==> starting cluster")
c, err := docker.NewDockerCluster(ctx, clusterOpts)
require.NoError(t, err)
logger.Info(" ✅ done.", "root_token", c.GetRootToken(),
"consul_token", consulStorage.Config().Token)
logger.Info("==> waiting for leader")
leaderIdx, err := testcluster.WaitForActiveNode(ctx, c)
require.NoError(t, err)
leader := c.Nodes()[leaderIdx]
leaderClient := leader.APIClient()
notLeader := c.Nodes()[1] // Assumes it's usually zero and correct below
if leaderIdx == 1 {
notLeader = c.Nodes()[0]
}
// Mount a KV v2 backend
logger.Info("==> mounting KV")
err = leaderClient.Sys().Mount("/test", &api.MountInput{
Type: "kv-v2",
})
require.NoError(t, err)
// Start two background workers that will cause writes to Consul in the
// background. KV v2 relies on a single active node for correctness.
// Specifically its patch operation does a read-modify-write under a
// key-specific lock which is correct for concurrent writes to one process,
// but which by nature of our storage API is not going to be atomic if another
// active node is also writing the same KV. It's made worse because the cache
// backend means the active node will not actually read from Consul after the
// initial read and will be modifying its own in-memory version and writing
// that back. So we should be able to detect multiple active nodes writing
// reliably with the following setup:
// 1. Two separate "client" goroutines each connected to different Vault
// servers.
// 2. Both write to the same kv-v2 key, each one modifies only its own set
// of subkeys c1-X or c2-X.
// 3. Each request adds the next sequential X value for that client. We use a
// Patch operation so we don't need to read the version or use CAS. On an
// error each client will retry the same key until it gets a success.
// 4. In a correct system with a single active node despite a partition, we
// expect a complete set of consecutive X values for both clients (i.e.
// no writes lost). If an old leader is still allowed to write to Consul
// then it will continue to patch against its own last-known version from
// cache and so will overwrite any concurrent updates from the other
// client and we should see that as lost updates at the end.
var wg sync.WaitGroup
errCh := make(chan error, 10)
var writeCount uint64
// Initialise the key once
kv := leaderClient.KVv2("/test")
_, err = kv.Put(ctx, "data", map[string]interface{}{
"c0-00000000": 1, // value don't matter here only keys in this set.
"c1-00000000": 1,
})
require.NoError(t, err)
const interval = 500 * time.Millisecond
runWriter := func(i int, targetServer testcluster.VaultClusterNode, ctr *uint64) {
wg.Add(1)
defer wg.Done()
kv := targetServer.APIClient().KVv2("/test")
for {
key := fmt.Sprintf("c%d-%08d", i, atomic.LoadUint64(ctr))
// Use a short timeout. If we don't then the one goroutine writing to the
// partitioned active node can get stuck here until the 60 second request
// timeout kicks in without issuing another request.
reqCtx, cancel := context.WithTimeout(ctx, interval)
logger.Debug("sending patch", "client", i, "key", key)
_, err = kv.Patch(reqCtx, "data", map[string]interface{}{
key: 1,
})
cancel()
// Deliver errors to test, don't block if we get too many before context
// is cancelled otherwise client 0 can end up blocked as it has so many
// errors during the partition it doesn't actually start writing again
// ever and so the test never sees split-brain writes.
if err != nil {
select {
case <-ctx.Done():
return
case errCh <- fmt.Errorf("client %d error: %w", i, err):
default:
// errCh is blocked, carry on anyway
}
} else {
// Only increment our set counter here now we've had an ack that the
// update was successful.
atomic.AddUint64(ctr, 1)
atomic.AddUint64(&writeCount, 1)
}
select {
case <-ctx.Done():
return
case <-time.After(interval):
}
}
}
logger.Info("==> starting writers")
client0Ctr, client1Ctr := uint64(1), uint64(1)
go runWriter(0, leader, &client0Ctr)
go runWriter(1, notLeader, &client1Ctr)
// Wait for some writes to have started
var writesBeforePartition uint64
logger.Info("==> waiting for writes")
for {
time.Sleep(1 * time.Millisecond)
writesBeforePartition = atomic.LoadUint64(&writeCount)
if writesBeforePartition >= 5 {
break
}
// Also check for any write errors
select {
case err := <-errCh:
require.NoError(t, err)
default:
}
require.NoError(t, ctx.Err())
}
val, err := kv.Get(ctx, "data")
require.NoError(t, err)
logger.Info("==> partitioning leader")
// Now partition the leader from everything else (including Consul)
err = leader.(*docker.DockerClusterNode).PartitionFromCluster(ctx)
require.NoError(t, err)
// Reload this incase more writes occurred before the partition completed.
writesBeforePartition = atomic.LoadUint64(&writeCount)
// Wait for some more writes to have happened (the client writing to leader
// will probably have sent one and hung waiting for a response but the other
// one should eventually start committing again when new active node is
// elected).
logger.Info("==> waiting for writes to new leader")
for {
time.Sleep(1 * time.Millisecond)
writesAfterPartition := atomic.LoadUint64(&writeCount)
if (writesAfterPartition - writesBeforePartition) >= 20 {
break
}
// Also check for any write errors or timeouts
select {
case err := <-errCh:
// Don't fail here because we expect writes to the old leader to fail
// eventually or if they need a new connection etc.
logger.Info("failed write", "write_count", writesAfterPartition, "err", err)
default:
}
require.NoError(t, ctx.Err())
}
// Heal partition
logger.Info("==> healing partition")
err = leader.(*docker.DockerClusterNode).UnpartitionFromCluster(ctx)
require.NoError(t, err)
// Wait for old leader to rejoin as a standby and get healthy.
logger.Info("==> wait for old leader to rejoin")
require.NoError(t, waitUntilNotLeader(ctx, leaderClient, logger))
// Stop the writers and wait for them to shut down nicely
logger.Info("==> stopping writers")
cancel()
wg.Wait()
// Now verify that all Consul data is consistent with only one leader writing.
// Use a new context since we just cancelled the general one
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
val, err = kv.Get(reqCtx, "data")
require.NoError(t, err)
// Ensure we have every consecutive key for both client
sets := [][]int{make([]int, 0, 128), make([]int, 0, 128)}
for k := range val.Data {
var cNum, x int
_, err := fmt.Sscanf(k, "c%d-%08d", &cNum, &x)
require.NoError(t, err)
sets[cNum] = append(sets[cNum], x)
}
// Sort both sets
sort.Ints(sets[0])
sort.Ints(sets[1])
// Ensure they are both complete by creating an expected set and comparing to
// get nice output to debug. Note that make set is an exclusive bound since we
// don't know it the current counter value write completed or not yet so we'll
// only create sets up to one less than that value that we know for sure
// should be present.
c0Writes := int(atomic.LoadUint64(&client0Ctr))
c1Writes := int(atomic.LoadUint64(&client1Ctr))
expect0 := makeSet(c0Writes)
expect1 := makeSet(c1Writes)
// Trim the sets to only the writes we know completed since that's all the
// expected arrays contain. But only if they are longer so we don't change the
// slice length if they are shorter than the expected number.
if len(sets[0]) > c0Writes {
sets[0] = sets[0][0:c0Writes]
}
if len(sets[1]) > c1Writes {
sets[1] = sets[1][0:c1Writes]
}
require.Equal(t, expect0, sets[0], "Client 0 writes lost")
require.Equal(t, expect1, sets[1], "Client 1 writes lost")
}
func makeSet(n int) []int {
a := make([]int, n)
for i := 0; i < n; i++ {
a[i] = i
}
return a
}
func waitUntilNotLeader(ctx context.Context, oldLeaderClient *api.Client, logger hclog.Logger) error {
for {
// Wait for the original leader to acknowledge it's not active any more.
resp, err := oldLeaderClient.Sys().LeaderWithContext(ctx)
// Tolerate errors as the old leader is in a difficult place right now.
if err == nil {
if !resp.IsSelf {
// We are not leader!
return nil
}
logger.Info("old leader not ready yet", "IsSelf", resp.IsSelf)
} else {
logger.Info("failed to read old leader status", "err", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
// Loop again
}
}
}

View File

@ -520,6 +520,20 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop
continue
}
// If the backend is a FencingHABackend, register the lock with it so it can
// correctly fence all writes from now on (i.e. assert that we still hold
// the lock atomically with each write).
if fba, ok := c.ha.(physical.FencingHABackend); ok {
err := fba.RegisterActiveNodeLock(lock)
if err != nil {
// Can't register lock, bail out
c.heldHALock = nil
lock.Unlock()
c.logger.Error("failed registering lock with fencing backend, giving up active state")
continue
}
}
c.logger.Info("acquired lock, enabling active operation")
// This is used later to log a metrics event; this can be helpful to
@ -825,7 +839,18 @@ func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct
go func() {
// Bind locally, as the race detector is tripping here
lopCount := opCount
isLeader, _, newClusterAddr, _ := c.Leader()
isLeader, _, newClusterAddr, err := c.Leader()
if err != nil {
// This is debug level because it's not really something the user
// needs to see typically. This will only really fail if we are sealed
// or the HALock fails (e.g. can't connect to Consul or elect raft
// leader) and other things in logs should make those kinds of
// conditions obvious. However when debugging, it is useful to know
// for sure why a standby is not seeing the leadership update which
// could be due to errors being returned or could be due to some other
// bug.
c.logger.Debug("periodicLeaderRefresh fail to fetch leader info", "err", err)
}
// If we are the leader reset the clusterAddr since the next
// failover might go to the node that was previously active.