keyring: fixes for keyring replication on cluster join (#14987)
* keyring: don't unblock early if rate limit burst exceeded The rate limiter returns an error and unblocks early if its burst limit is exceeded (unless the burst limit is Inf). Ensure we're not unblocking early, otherwise we'll only slow down the cases where we're already pausing to make external RPC requests. * keyring: set MinQueryIndex on stale queries When keyring replication makes a stale query to non-leader peers to find a key the leader doesn't have, we need to make sure the peer we're querying has had a chance to catch up to the most current index for that key. Otherwise it's possible for newly-added servers to query another newly-added server and get a non-error nil response for that key ID. Ensure that we're setting the correct reply index in the blocking query. Note that the "not found" case does not return an error, just an empty key. So as a belt-and-suspenders, update the handling of empty responses so that we don't break the loop early if we hit a server that doesn't have the key. * test for adding new servers to keyring * leader: initialize keyring after we have consistent reads Wait until we're sure the FSM is current before we try to initialize the keyring. Also, if a key is rotated immediately following a leader election, plans that are in-flight may get signed before the new leader has the key. Allow for a short timeout-and-retry to avoid rejecting plans
This commit is contained in:
parent
9cac60dbed
commit
3a811ac5e7
|
@ -155,12 +155,33 @@ const keyIDHeader = "kid"
|
|||
// SignClaims signs the identity claim for the task and returns an
|
||||
// encoded JWT with both the claim and its signature
|
||||
func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) {
|
||||
e.lock.RLock()
|
||||
defer e.lock.RUnlock()
|
||||
|
||||
keyset, err := e.activeKeySetLocked()
|
||||
getActiveKeyset := func() (*keyset, error) {
|
||||
e.lock.RLock()
|
||||
defer e.lock.RUnlock()
|
||||
keyset, err := e.activeKeySetLocked()
|
||||
return keyset, err
|
||||
}
|
||||
|
||||
// If a key is rotated immediately following a leader election, plans that
|
||||
// are in-flight may get signed before the new leader has the key. Allow for
|
||||
// a short timeout-and-retry to avoid rejecting plans
|
||||
keyset, err := getActiveKeyset()
|
||||
if err != nil {
|
||||
return "", err
|
||||
ctx, cancel := context.WithTimeout(e.srv.shutdownCtx, 5*time.Second)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", err
|
||||
default:
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
keyset, err = getActiveKeyset()
|
||||
if keyset != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
token := jwt.NewWithClaims(&jwt.SigningMethodEd25519{}, claim)
|
||||
|
@ -435,7 +456,10 @@ START:
|
|||
return
|
||||
default:
|
||||
// Rate limit how often we attempt replication
|
||||
limiter.Wait(ctx)
|
||||
err := limiter.Wait(ctx)
|
||||
if err != nil {
|
||||
goto ERR_WAIT // rate limit exceeded
|
||||
}
|
||||
|
||||
ws := store.NewWatchSet()
|
||||
iter, err := store.RootKeyMetas(ws)
|
||||
|
@ -461,7 +485,8 @@ START:
|
|||
getReq := &structs.KeyringGetRootKeyRequest{
|
||||
KeyID: keyID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: krr.srv.config.Region,
|
||||
Region: krr.srv.config.Region,
|
||||
MinQueryIndex: keyMeta.ModifyIndex - 1,
|
||||
},
|
||||
}
|
||||
getResp := &structs.KeyringGetRootKeyResponse{}
|
||||
|
@ -479,7 +504,7 @@ START:
|
|||
getReq.AllowStale = true
|
||||
for _, peer := range krr.getAllPeers() {
|
||||
err = krr.srv.forwardServer(peer, "Keyring.Get", getReq, getResp)
|
||||
if err == nil {
|
||||
if err == nil && getResp.Key != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,8 +123,8 @@ func TestEncrypter_Restore(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestKeyringReplicator exercises key replication between servers
|
||||
func TestKeyringReplicator(t *testing.T) {
|
||||
// TestEncrypter_KeyringReplication exercises key replication between servers
|
||||
func TestEncrypter_KeyringReplication(t *testing.T) {
|
||||
|
||||
ci.Parallel(t)
|
||||
|
||||
|
@ -267,6 +267,31 @@ func TestKeyringReplicator(t *testing.T) {
|
|||
require.Eventually(t, checkReplicationFn(keyID3),
|
||||
time.Second*5, time.Second,
|
||||
"expected keys to be replicated to followers after election")
|
||||
|
||||
// Scenario: new members join the cluster
|
||||
|
||||
srv4, cleanupSRV4 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 0
|
||||
c.NumSchedulers = 0
|
||||
})
|
||||
defer cleanupSRV4()
|
||||
srv5, cleanupSRV5 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 0
|
||||
c.NumSchedulers = 0
|
||||
})
|
||||
defer cleanupSRV5()
|
||||
|
||||
TestJoin(t, srv4, srv5)
|
||||
TestJoin(t, srv5, srv1)
|
||||
servers = []*Server{srv1, srv2, srv3, srv4, srv5}
|
||||
|
||||
testutil.WaitForLeader(t, srv4.RPC)
|
||||
testutil.WaitForLeader(t, srv5.RPC)
|
||||
|
||||
require.Eventually(t, checkReplicationFn(keyID3),
|
||||
time.Second*5, time.Second,
|
||||
"expected new servers to get replicated keys")
|
||||
|
||||
}
|
||||
|
||||
func TestEncrypter_EncryptDecrypt(t *testing.T) {
|
||||
|
|
|
@ -264,7 +264,20 @@ func (k *Keyring) Get(args *structs.KeyringGetRootKeyRequest, reply *structs.Key
|
|||
Key: key,
|
||||
}
|
||||
reply.Key = rootKey
|
||||
reply.Index = keyMeta.ModifyIndex
|
||||
|
||||
// Use the last index that affected the policy table
|
||||
index, err := s.Index(state.TableRootKeyMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure we never set the index to zero, otherwise a blocking query
|
||||
// cannot be used. We floor the index at one, since realistically
|
||||
// the first write must have a higher index.
|
||||
if index == 0 {
|
||||
index = 1
|
||||
}
|
||||
reply.Index = index
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
|
|
@ -303,9 +303,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
|||
// Initialize scheduler configuration.
|
||||
schedulerConfig := s.getOrCreateSchedulerConfig()
|
||||
|
||||
// Create the first root key if it doesn't already exist
|
||||
go s.initializeKeyring(stopCh)
|
||||
|
||||
// Initialize the ClusterID
|
||||
_, _ = s.ClusterID()
|
||||
// todo: use cluster ID for stuff, later!
|
||||
|
@ -350,6 +347,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
|||
|
||||
// Further clean ups and follow up that don't block RPC consistency
|
||||
|
||||
// Create the first root key if it doesn't already exist
|
||||
go s.initializeKeyring(stopCh)
|
||||
|
||||
// Restore the periodic dispatcher state
|
||||
if err := s.restorePeriodicDispatcher(); err != nil {
|
||||
return err
|
||||
|
@ -2005,7 +2005,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
|
|||
break
|
||||
}
|
||||
}
|
||||
// we might have lost leadershuip during the version check
|
||||
// we might have lost leadership during the version check
|
||||
if !s.IsLeader() {
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue