296 lines
9.9 KiB
Go
296 lines
9.9 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package consul_fencing
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/vault/api"
|
|
"github.com/hashicorp/vault/helper/testhelpers/consul"
|
|
"github.com/hashicorp/vault/sdk/helper/testcluster"
|
|
"github.com/hashicorp/vault/sdk/helper/testcluster/docker"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestConsulFencing_PartitionedLeaderCantWrite attempts to create an active
|
|
// node split-brain when using Consul storage to ensure the old leader doesn't
|
|
// continue to write data potentially corrupting storage. It is naturally
|
|
// non-deterministic since it relies heavily on timing of the different
|
|
// container processes, however it pretty reliably failed before the fencing fix
|
|
// (and Consul lock improvements) and should _never_ fail now we correctly fence
|
|
// writes.
|
|
func TestConsulFencing_PartitionedLeaderCantWrite(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
defer cancel()
|
|
|
|
consulStorage := consul.NewClusterStorage()
|
|
|
|
// Create cluster logger that will dump cluster logs to stdout for debugging.
|
|
logger := hclog.NewInterceptLogger(hclog.DefaultOptions)
|
|
logger.SetLevel(hclog.Trace)
|
|
|
|
clusterOpts := docker.DefaultOptions(t)
|
|
clusterOpts.ImageRepo = "hashicorp/vault-enterprise"
|
|
clusterOpts.ClusterOptions.Logger = logger
|
|
|
|
clusterOpts.Storage = consulStorage
|
|
|
|
logger.Info("==> starting cluster")
|
|
c, err := docker.NewDockerCluster(ctx, clusterOpts)
|
|
require.NoError(t, err)
|
|
logger.Info(" ✅ done.", "root_token", c.GetRootToken(),
|
|
"consul_token", consulStorage.Config().Token)
|
|
|
|
logger.Info("==> waiting for leader")
|
|
leaderIdx, err := testcluster.WaitForActiveNode(ctx, c)
|
|
require.NoError(t, err)
|
|
|
|
leader := c.Nodes()[leaderIdx]
|
|
leaderClient := leader.APIClient()
|
|
|
|
notLeader := c.Nodes()[1] // Assumes it's usually zero and correct below
|
|
if leaderIdx == 1 {
|
|
notLeader = c.Nodes()[0]
|
|
}
|
|
|
|
// Mount a KV v2 backend
|
|
logger.Info("==> mounting KV")
|
|
err = leaderClient.Sys().Mount("/test", &api.MountInput{
|
|
Type: "kv-v2",
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Start two background workers that will cause writes to Consul in the
|
|
// background. KV v2 relies on a single active node for correctness.
|
|
// Specifically its patch operation does a read-modify-write under a
|
|
// key-specific lock which is correct for concurrent writes to one process,
|
|
// but which by nature of our storage API is not going to be atomic if another
|
|
// active node is also writing the same KV. It's made worse because the cache
|
|
// backend means the active node will not actually read from Consul after the
|
|
// initial read and will be modifying its own in-memory version and writing
|
|
// that back. So we should be able to detect multiple active nodes writing
|
|
// reliably with the following setup:
|
|
// 1. Two separate "client" goroutines each connected to different Vault
|
|
// servers.
|
|
// 2. Both write to the same kv-v2 key, each one modifies only its own set
|
|
// of subkeys c1-X or c2-X.
|
|
// 3. Each request adds the next sequential X value for that client. We use a
|
|
// Patch operation so we don't need to read the version or use CAS. On an
|
|
// error each client will retry the same key until it gets a success.
|
|
// 4. In a correct system with a single active node despite a partition, we
|
|
// expect a complete set of consecutive X values for both clients (i.e.
|
|
// no writes lost). If an old leader is still allowed to write to Consul
|
|
// then it will continue to patch against its own last-known version from
|
|
// cache and so will overwrite any concurrent updates from the other
|
|
// client and we should see that as lost updates at the end.
|
|
var wg sync.WaitGroup
|
|
errCh := make(chan error, 10)
|
|
var writeCount uint64
|
|
|
|
// Initialise the key once
|
|
kv := leaderClient.KVv2("/test")
|
|
_, err = kv.Put(ctx, "data", map[string]interface{}{
|
|
"c0-00000000": 1, // value don't matter here only keys in this set.
|
|
"c1-00000000": 1,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
const interval = 500 * time.Millisecond
|
|
|
|
runWriter := func(i int, targetServer testcluster.VaultClusterNode, ctr *uint64) {
|
|
wg.Add(1)
|
|
defer wg.Done()
|
|
kv := targetServer.APIClient().KVv2("/test")
|
|
|
|
for {
|
|
key := fmt.Sprintf("c%d-%08d", i, atomic.LoadUint64(ctr))
|
|
|
|
// Use a short timeout. If we don't then the one goroutine writing to the
|
|
// partitioned active node can get stuck here until the 60 second request
|
|
// timeout kicks in without issuing another request.
|
|
reqCtx, cancel := context.WithTimeout(ctx, interval)
|
|
logger.Debug("sending patch", "client", i, "key", key)
|
|
_, err = kv.Patch(reqCtx, "data", map[string]interface{}{
|
|
key: 1,
|
|
})
|
|
cancel()
|
|
// Deliver errors to test, don't block if we get too many before context
|
|
// is cancelled otherwise client 0 can end up blocked as it has so many
|
|
// errors during the partition it doesn't actually start writing again
|
|
// ever and so the test never sees split-brain writes.
|
|
if err != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case errCh <- fmt.Errorf("client %d error: %w", i, err):
|
|
default:
|
|
// errCh is blocked, carry on anyway
|
|
}
|
|
} else {
|
|
// Only increment our set counter here now we've had an ack that the
|
|
// update was successful.
|
|
atomic.AddUint64(ctr, 1)
|
|
atomic.AddUint64(&writeCount, 1)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(interval):
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.Info("==> starting writers")
|
|
client0Ctr, client1Ctr := uint64(1), uint64(1)
|
|
go runWriter(0, leader, &client0Ctr)
|
|
go runWriter(1, notLeader, &client1Ctr)
|
|
|
|
// Wait for some writes to have started
|
|
var writesBeforePartition uint64
|
|
logger.Info("==> waiting for writes")
|
|
for {
|
|
time.Sleep(1 * time.Millisecond)
|
|
writesBeforePartition = atomic.LoadUint64(&writeCount)
|
|
if writesBeforePartition >= 5 {
|
|
break
|
|
}
|
|
// Also check for any write errors
|
|
select {
|
|
case err := <-errCh:
|
|
require.NoError(t, err)
|
|
default:
|
|
}
|
|
require.NoError(t, ctx.Err())
|
|
}
|
|
|
|
val, err := kv.Get(ctx, "data")
|
|
require.NoError(t, err)
|
|
|
|
logger.Info("==> partitioning leader")
|
|
// Now partition the leader from everything else (including Consul)
|
|
err = leader.(*docker.DockerClusterNode).PartitionFromCluster(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Reload this incase more writes occurred before the partition completed.
|
|
writesBeforePartition = atomic.LoadUint64(&writeCount)
|
|
|
|
// Wait for some more writes to have happened (the client writing to leader
|
|
// will probably have sent one and hung waiting for a response but the other
|
|
// one should eventually start committing again when new active node is
|
|
// elected).
|
|
|
|
logger.Info("==> waiting for writes to new leader")
|
|
for {
|
|
time.Sleep(1 * time.Millisecond)
|
|
writesAfterPartition := atomic.LoadUint64(&writeCount)
|
|
if (writesAfterPartition - writesBeforePartition) >= 20 {
|
|
break
|
|
}
|
|
// Also check for any write errors or timeouts
|
|
select {
|
|
case err := <-errCh:
|
|
// Don't fail here because we expect writes to the old leader to fail
|
|
// eventually or if they need a new connection etc.
|
|
logger.Info("failed write", "write_count", writesAfterPartition, "err", err)
|
|
default:
|
|
}
|
|
require.NoError(t, ctx.Err())
|
|
}
|
|
|
|
// Heal partition
|
|
logger.Info("==> healing partition")
|
|
err = leader.(*docker.DockerClusterNode).UnpartitionFromCluster(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Wait for old leader to rejoin as a standby and get healthy.
|
|
logger.Info("==> wait for old leader to rejoin")
|
|
|
|
require.NoError(t, waitUntilNotLeader(ctx, leaderClient, logger))
|
|
|
|
// Stop the writers and wait for them to shut down nicely
|
|
logger.Info("==> stopping writers")
|
|
cancel()
|
|
wg.Wait()
|
|
|
|
// Now verify that all Consul data is consistent with only one leader writing.
|
|
// Use a new context since we just cancelled the general one
|
|
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
val, err = kv.Get(reqCtx, "data")
|
|
require.NoError(t, err)
|
|
|
|
// Ensure we have every consecutive key for both client
|
|
sets := [][]int{make([]int, 0, 128), make([]int, 0, 128)}
|
|
for k := range val.Data {
|
|
var cNum, x int
|
|
_, err := fmt.Sscanf(k, "c%d-%08d", &cNum, &x)
|
|
require.NoError(t, err)
|
|
sets[cNum] = append(sets[cNum], x)
|
|
}
|
|
|
|
// Sort both sets
|
|
sort.Ints(sets[0])
|
|
sort.Ints(sets[1])
|
|
|
|
// Ensure they are both complete by creating an expected set and comparing to
|
|
// get nice output to debug. Note that make set is an exclusive bound since we
|
|
// don't know it the current counter value write completed or not yet so we'll
|
|
// only create sets up to one less than that value that we know for sure
|
|
// should be present.
|
|
c0Writes := int(atomic.LoadUint64(&client0Ctr))
|
|
c1Writes := int(atomic.LoadUint64(&client1Ctr))
|
|
expect0 := makeSet(c0Writes)
|
|
expect1 := makeSet(c1Writes)
|
|
|
|
// Trim the sets to only the writes we know completed since that's all the
|
|
// expected arrays contain. But only if they are longer so we don't change the
|
|
// slice length if they are shorter than the expected number.
|
|
if len(sets[0]) > c0Writes {
|
|
sets[0] = sets[0][0:c0Writes]
|
|
}
|
|
if len(sets[1]) > c1Writes {
|
|
sets[1] = sets[1][0:c1Writes]
|
|
}
|
|
require.Equal(t, expect0, sets[0], "Client 0 writes lost")
|
|
require.Equal(t, expect1, sets[1], "Client 1 writes lost")
|
|
}
|
|
|
|
func makeSet(n int) []int {
|
|
a := make([]int, n)
|
|
for i := 0; i < n; i++ {
|
|
a[i] = i
|
|
}
|
|
return a
|
|
}
|
|
|
|
func waitUntilNotLeader(ctx context.Context, oldLeaderClient *api.Client, logger hclog.Logger) error {
|
|
for {
|
|
// Wait for the original leader to acknowledge it's not active any more.
|
|
resp, err := oldLeaderClient.Sys().LeaderWithContext(ctx)
|
|
// Tolerate errors as the old leader is in a difficult place right now.
|
|
if err == nil {
|
|
if !resp.IsSelf {
|
|
// We are not leader!
|
|
return nil
|
|
}
|
|
logger.Info("old leader not ready yet", "IsSelf", resp.IsSelf)
|
|
} else {
|
|
logger.Info("failed to read old leader status", "err", err)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(time.Second):
|
|
// Loop again
|
|
}
|
|
}
|
|
}
|