node pools: replicate from authoritative region (#17456)

Upserts and deletes of node pools are forwarded to the authoritative region,
just like we do for namespaces, quotas, ACL policies, etc. Replicate node pools
from the authoritative region.
This commit is contained in:
Tim Gross 2023-06-12 13:24:24 -04:00 committed by GitHub
parent d45bb4bab9
commit e8a361310f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 279 additions and 1 deletions

View File

@ -73,6 +73,7 @@ func TestHTTP_NodePool_Info(t *testing.T) {
// Verify expected pool is returned.
must.Eq(t, pool, obj.(*structs.NodePool), must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))
@ -143,6 +144,7 @@ func TestHTTP_NodePool_Create(t *testing.T) {
must.NoError(t, err)
must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))
@ -193,6 +195,7 @@ func TestHTTP_NodePool_Update(t *testing.T) {
must.NoError(t, err)
must.Eq(t, updated, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))
@ -239,6 +242,7 @@ func TestHTTP_NodePool_Update(t *testing.T) {
must.NoError(t, err)
must.Eq(t, updated, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))
@ -278,6 +282,7 @@ func TestHTTP_NodePool_Update(t *testing.T) {
must.NoError(t, err)
must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))

View File

@ -429,6 +429,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
go s.replicateACLAuthMethods(stopCh)
go s.replicateACLBindingRules(stopCh)
go s.replicateNamespaces(stopCh)
go s.replicateNodePools(stopCh)
}
}
@ -600,6 +601,146 @@ func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*stru
return
}
// replicateNodePools is used to replicate node pools from the authoritative
// region to this region.
func (s *Server) replicateNodePools(stopCh chan struct{}) {
req := structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Region: s.config.AuthoritativeRegion,
AllowStale: true,
},
}
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
s.logger.Debug("starting node pool replication from authoritative region", "region", req.Region)
for {
select {
case <-stopCh:
return
default:
}
// Rate limit how often we attempt replication
limiter.Wait(context.Background())
if !ServersMeetMinimumVersion(
s.serf.Members(), s.Region(), minNodePoolsVersion, true) {
s.logger.Trace(
"all servers must be upgraded to 1.6.0 before Node Pools can be replicated")
if s.replicationBackoffContinue(stopCh) {
continue
} else {
return
}
}
var resp structs.NodePoolListResponse
req.AuthToken = s.ReplicationToken()
err := s.forwardRegion(s.config.AuthoritativeRegion, "NodePool.List", &req, &resp)
if err != nil {
s.logger.Error("failed to fetch node pools from authoritative region", "error", err)
if s.replicationBackoffContinue(stopCh) {
continue
} else {
return
}
}
// Perform a two-way diff
delete, update := diffNodePools(s.State(), req.MinQueryIndex, resp.NodePools)
// A significant amount of time could pass between the last check
// on whether we should stop the replication process. Therefore, do
// a check here, before calling Raft.
select {
case <-stopCh:
return
default:
}
// Delete node pools that should not exist
if len(delete) > 0 {
args := &structs.NodePoolDeleteRequest{
Names: delete,
}
_, _, err := s.raftApply(structs.NodePoolDeleteRequestType, args)
if err != nil {
s.logger.Error("failed to delete node pools", "error", err)
if s.replicationBackoffContinue(stopCh) {
continue
} else {
return
}
}
}
// Update local node pools
if len(update) > 0 {
args := &structs.NodePoolUpsertRequest{
NodePools: update,
}
_, _, err := s.raftApply(structs.NodePoolUpsertRequestType, args)
if err != nil {
s.logger.Error("failed to update node pools", "error", err)
if s.replicationBackoffContinue(stopCh) {
continue
} else {
return
}
}
}
// Update the minimum query index, blocks until there is a change.
req.MinQueryIndex = resp.Index
}
}
// diffNodePools is used to perform a two-way diff between the local node pools
// and the remote node pools to determine which node pools need to be deleted or
// updated.
func diffNodePools(store *state.StateStore, minIndex uint64, remoteList []*structs.NodePool) (delete []string, update []*structs.NodePool) {
// Construct a set of the local and remote node pools
local := make(map[string][]byte)
remote := make(map[string]struct{})
// Add all the local node pools
iter, err := store.NodePools(nil, state.SortDefault)
if err != nil {
panic("failed to iterate local node pools")
}
for {
raw := iter.Next()
if raw == nil {
break
}
pool := raw.(*structs.NodePool)
local[pool.Name] = pool.Hash
}
for _, rnp := range remoteList {
remote[rnp.Name] = struct{}{}
if localHash, ok := local[rnp.Name]; !ok {
// Node pools that are missing locally should be added
update = append(update, rnp)
} else if rnp.ModifyIndex > minIndex && !bytes.Equal(localHash, rnp.Hash) {
// Node pools that have been added/updated more recently than the
// last index we saw, and have a hash mismatch with what we have
// locally, should be updated.
update = append(update, rnp)
}
}
// Node pools that don't exist on the remote should be deleted
for lnp := range local {
if _, ok := remote[lnp]; !ok {
delete = append(delete, lnp)
}
}
return
}
// restoreEvals is used to restore pending evaluations into the eval broker and
// blocked evaluations into the blocked eval tracker. The broker and blocked
// eval tracker is maintained only by the leader, so it must be restored anytime

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
@ -1806,6 +1807,87 @@ func TestLeader_DiffNamespaces(t *testing.T) {
assert.Equal(t, []string{ns3.Name, ns4.Name}, update)
}
func TestLeader_ReplicateNodePools(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.Region = "region1"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
})
defer cleanupS1()
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
c.Region = "region2"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
c.ReplicationBackoff = 20 * time.Millisecond
c.ReplicationToken = root.SecretID
})
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// Write a node pool to the authoritative region
np1 := mock.NodePool()
must.NoError(t, s1.State().UpsertNodePools(
structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1}))
// Wait for the node pool to replicate
testutil.WaitForResult(func() (bool, error) {
store := s2.State()
out, err := store.NodePoolByName(nil, np1.Name)
return out != nil, err
}, func(err error) {
t.Fatalf("should replicate node pool")
})
// Delete the node pool at the authoritative region
must.NoError(t, s1.State().DeleteNodePools(structs.MsgTypeTestSetup, 200, []string{np1.Name}))
// Wait for the namespace deletion to replicate
testutil.WaitForResult(func() (bool, error) {
store := s2.State()
out, err := store.NodePoolByName(nil, np1.Name)
return out == nil, err
}, func(err error) {
t.Fatalf("should replicate node pool deletion")
})
}
func TestLeader_DiffNodePools(t *testing.T) {
ci.Parallel(t)
state := state.TestStateStore(t)
// Populate the local state
np1, np2, np3 := mock.NodePool(), mock.NodePool(), mock.NodePool()
must.NoError(t, state.UpsertNodePools(
structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1, np2, np3}))
// Simulate a remote list
rnp2 := np2.Copy()
rnp2.ModifyIndex = 50 // Ignored, same index
rnp3 := np3.Copy()
rnp3.ModifyIndex = 100 // Updated, higher index
rnp3.Description = "force a hash update"
rnp3.SetHash()
rnp4 := mock.NodePool()
remoteList := []*structs.NodePool{
rnp2,
rnp3,
rnp4,
}
delete, update := diffNodePools(state, 50, remoteList)
sort.Strings(delete)
// np1 does not exist on the remote side, should delete
test.Eq(t, []string{structs.NodePoolAll, structs.NodePoolDefault, np1.Name}, delete)
// np2 is un-modified - ignore. np3 modified, np4 new.
test.Eq(t, []*structs.NodePool{rnp3, rnp4}, update)
}
// waitForStableLeadership waits until a leader is elected and all servers
// get promoted as voting members, returns the leader
func waitForStableLeadership(t *testing.T, servers []*Server) *Server {

View File

@ -252,7 +252,7 @@ func Namespace() *structs.Namespace {
}
func NodePool() *structs.NodePool {
return &structs.NodePool{
pool := &structs.NodePool{
Name: fmt.Sprintf("pool-%s", uuid.Short()),
Description: "test node pool",
Meta: map[string]string{"team": "test"},
@ -260,6 +260,8 @@ func NodePool() *structs.NodePool {
SchedulerAlgorithm: structs.SchedulerAlgorithmSpread,
},
}
pool.SetHash()
return pool
}
// ServiceRegistrations generates an array containing two unique service

View File

@ -203,6 +203,8 @@ func (n *NodePool) UpsertNodePools(args *structs.NodePoolUpsertRequest, reply *s
if pool.IsBuiltIn() {
return structs.NewErrRPCCodedf(http.StatusBadRequest, "modifying node pool %q is not allowed", pool.Name)
}
pool.SetHash()
}
// Update via Raft.

View File

@ -730,6 +730,7 @@ func TestNodePoolEndpoint_UpsertNodePools(t *testing.T) {
must.NoError(t, err)
must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))
@ -865,6 +866,7 @@ func TestNodePoolEndpoint_UpsertNodePool_ACL(t *testing.T) {
must.NoError(t, err)
must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))

View File

@ -6,8 +6,10 @@ package structs
import (
"fmt"
"regexp"
"sort"
"github.com/hashicorp/go-multierror"
"golang.org/x/crypto/blake2b"
"golang.org/x/exp/maps"
)
@ -55,6 +57,10 @@ type NodePool struct {
// node pool.
SchedulerConfiguration *NodePoolSchedulerConfiguration
// Hash is the hash of the node pool which is used to efficiently diff when
// we replicate pools across regions.
Hash []byte
// Raft indexes.
CreateIndex uint64
ModifyIndex uint64
@ -91,6 +97,9 @@ func (n *NodePool) Copy() *NodePool {
nc.Meta = maps.Clone(nc.Meta)
nc.SchedulerConfiguration = nc.SchedulerConfiguration.Copy()
nc.Hash = make([]byte, len(n.Hash))
copy(nc.Hash, n.Hash)
return nc
}
@ -107,6 +116,41 @@ func (n *NodePool) IsBuiltIn() bool {
}
}
// SetHash is used to compute and set the hash of node pool
func (n *NodePool) SetHash() []byte {
// Initialize a 256bit Blake2 hash (32 bytes)
hash, err := blake2b.New256(nil)
if err != nil {
panic(err)
}
// Write all the user set fields
_, _ = hash.Write([]byte(n.Name))
_, _ = hash.Write([]byte(n.Description))
if n.SchedulerConfiguration != nil {
_, _ = hash.Write([]byte(n.SchedulerConfiguration.SchedulerAlgorithm))
}
// sort keys to ensure hash stability when meta is stored later
var keys []string
for k := range n.Meta {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
_, _ = hash.Write([]byte(k))
_, _ = hash.Write([]byte(n.Meta[k]))
}
// Finalize the hash
hashVal := hash.Sum(nil)
// Set and return the hash
n.Hash = hashVal
return hashVal
}
// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a
// node pool.
type NodePoolSchedulerConfiguration struct {