keyring: safely handle missing keys and restore GC (#15092)
When replication of a single key fails, the replication loop breaks early and therefore keys that fall later in the sorting order will never get replicated. This is particularly a problem for clusters impacted by the bug that caused #14981 and that were later upgraded; the keys that were never replicated can now never be replicated, and so we need to handle them safely. Included in the replication fix: * Refactor the replication loop so that each key replicated in a function call that returns an error, to make the workflow more clear and reduce nesting. Log the error and continue. * Improve stability of keyring replication tests. We no longer block leadership on initializing the keyring, so there's a race condition in the keyring tests where we can test for the existence of the root key before the keyring has been initialize. Change this to an "eventually" test. But these fixes aren't enough to fix #14981 because they'll end up seeing an error once a second complaining about the missing key, so we also need to fix keyring GC so the keys can be removed from the state store. Now we'll store the key ID used to sign a workload identity in the Allocation, and we'll index the Allocation table on that so we can track whether any live Allocation was signed with a particular key ID.
This commit is contained in:
parent
acc94d523f
commit
903b5baaa4
|
@ -0,0 +1,7 @@
|
|||
```release-note:bug
|
||||
keyring: Fixed a bug where a missing key would prevent any further replication.
|
||||
```
|
||||
|
||||
```release-note:bug
|
||||
keyring: Re-enabled keyring garbage collection after fixing a bug where keys would be garbage collected even if they were used to sign a live allocation's workload identity.
|
||||
```
|
|
@ -60,7 +60,7 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
|
|||
case structs.CoreJobGlobalTokenExpiredGC:
|
||||
return c.expiredACLTokenGC(eval, true)
|
||||
case structs.CoreJobRootKeyRotateOrGC:
|
||||
return c.rootKeyRotate(eval)
|
||||
return c.rootKeyRotateOrGC(eval)
|
||||
case structs.CoreJobVariablesRekey:
|
||||
return c.variablesRekey(eval)
|
||||
case structs.CoreJobForceGC:
|
||||
|
@ -96,7 +96,9 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
|
|||
if err := c.expiredACLTokenGC(eval, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.rootKeyGC(eval); err != nil {
|
||||
return err
|
||||
}
|
||||
// Node GC must occur after the others to ensure the allocations are
|
||||
// cleared.
|
||||
return c.nodeGC(eval)
|
||||
|
@ -893,9 +895,74 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool)
|
|||
return c.srv.RPC(structs.ACLDeleteTokensRPCMethod, req, &structs.GenericResponse{})
|
||||
}
|
||||
|
||||
// rootKeyRotateOrGC is used to rotate or garbage collect root keys
|
||||
func (c *CoreScheduler) rootKeyRotateOrGC(eval *structs.Evaluation) error {
|
||||
|
||||
// a rotation will be sent to the leader so our view of state
|
||||
// is no longer valid. we ack this core job and will pick up
|
||||
// the GC work on the next interval
|
||||
wasRotated, err := c.rootKeyRotate(eval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if wasRotated {
|
||||
return nil
|
||||
}
|
||||
return c.rootKeyGC(eval)
|
||||
}
|
||||
|
||||
func (c *CoreScheduler) rootKeyGC(eval *structs.Evaluation) error {
|
||||
|
||||
oldThreshold := c.getThreshold(eval, "root key",
|
||||
"root_key_gc_threshold", c.srv.config.RootKeyGCThreshold)
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
iter, err := c.snap.RootKeyMetas(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
keyMeta := raw.(*structs.RootKeyMeta)
|
||||
if keyMeta.Active() || keyMeta.Rekeying() {
|
||||
continue // never GC the active key or one we're rekeying
|
||||
}
|
||||
if keyMeta.CreateIndex > oldThreshold {
|
||||
continue // don't GC recent keys
|
||||
}
|
||||
|
||||
inUse, err := c.snap.IsRootKeyMetaInUse(keyMeta.KeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if inUse {
|
||||
continue
|
||||
}
|
||||
|
||||
req := &structs.KeyringDeleteRootKeyRequest{
|
||||
KeyID: keyMeta.KeyID,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
AuthToken: eval.LeaderACL,
|
||||
},
|
||||
}
|
||||
if err := c.srv.RPC("Keyring.Delete",
|
||||
req, &structs.KeyringDeleteRootKeyResponse{}); err != nil {
|
||||
c.logger.Error("root key delete failed", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rootKeyRotate checks if the active key is old enough that we need
|
||||
// to kick off a rotation.
|
||||
func (c *CoreScheduler) rootKeyRotate(eval *structs.Evaluation) error {
|
||||
func (c *CoreScheduler) rootKeyRotate(eval *structs.Evaluation) (bool, error) {
|
||||
|
||||
rotationThreshold := c.getThreshold(eval, "root key",
|
||||
"root_key_rotation_threshold", c.srv.config.RootKeyRotationThreshold)
|
||||
|
@ -903,13 +970,13 @@ func (c *CoreScheduler) rootKeyRotate(eval *structs.Evaluation) error {
|
|||
ws := memdb.NewWatchSet()
|
||||
activeKey, err := c.snap.GetActiveRootKeyMeta(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
if activeKey == nil {
|
||||
return nil // no active key
|
||||
return false, nil // no active key
|
||||
}
|
||||
if activeKey.CreateIndex >= rotationThreshold {
|
||||
return nil // key is too new
|
||||
return false, nil // key is too new
|
||||
}
|
||||
|
||||
req := &structs.KeyringRotateRootKeyRequest{
|
||||
|
@ -921,10 +988,10 @@ func (c *CoreScheduler) rootKeyRotate(eval *structs.Evaluation) error {
|
|||
if err := c.srv.RPC("Keyring.Rotate",
|
||||
req, &structs.KeyringRotateRootKeyResponse{}); err != nil {
|
||||
c.logger.Error("root key rotation failed", "error", err)
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// variablesReKey is optionally run after rotating the active
|
||||
|
@ -958,29 +1025,6 @@ func (c *CoreScheduler) variablesRekey(eval *structs.Evaluation) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// we've now rotated all this key's variables, so set its state
|
||||
keyMeta = keyMeta.Copy()
|
||||
keyMeta.SetDeprecated()
|
||||
|
||||
key, err := c.srv.encrypter.GetKey(keyMeta.KeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req := &structs.KeyringUpdateRootKeyRequest{
|
||||
RootKey: &structs.RootKey{
|
||||
Meta: keyMeta,
|
||||
Key: key,
|
||||
},
|
||||
Rekey: false,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
AuthToken: eval.LeaderACL},
|
||||
}
|
||||
if err := c.srv.RPC("Keyring.Update",
|
||||
req, &structs.KeyringUpdateRootKeyResponse{}); err != nil {
|
||||
c.logger.Error("root key update failed", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -1101,24 +1145,3 @@ func (c *CoreScheduler) getThreshold(eval *structs.Evaluation, objectName, confi
|
|||
}
|
||||
return oldThreshold
|
||||
}
|
||||
|
||||
// getOldestAllocationIndex returns the CreateIndex of the oldest
|
||||
// non-terminal allocation in the state store
|
||||
func (c *CoreScheduler) getOldestAllocationIndex() (uint64, error) {
|
||||
ws := memdb.NewWatchSet()
|
||||
allocs, err := c.snap.Allocs(ws, state.SortDefault)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
for {
|
||||
raw := allocs.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
alloc := raw.(*structs.Allocation)
|
||||
if !alloc.TerminalStatus() {
|
||||
return alloc.CreateIndex, nil
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
|
|
@ -2466,38 +2466,98 @@ func TestCoreScheduler_RootKeyGC(t *testing.T) {
|
|||
// reset the time table
|
||||
srv.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
||||
|
||||
// active key, will never be GC'd
|
||||
store := srv.fsm.State()
|
||||
key0, err := store.GetActiveRootKeyMeta(nil)
|
||||
require.NotNil(t, key0, "expected keyring to be bootstapped")
|
||||
require.NoError(t, err)
|
||||
|
||||
// insert an "old" inactive key
|
||||
key1 := structs.NewRootKeyMeta()
|
||||
key1.SetInactive()
|
||||
require.NoError(t, store.UpsertRootKeyMeta(600, key1, false))
|
||||
|
||||
// insert an "old" and inactive key with a variable that's using it
|
||||
key2 := structs.NewRootKeyMeta()
|
||||
key2.SetInactive()
|
||||
require.NoError(t, store.UpsertRootKeyMeta(700, key2, false))
|
||||
|
||||
variable := mock.VariableEncrypted()
|
||||
variable.KeyID = key2.KeyID
|
||||
|
||||
setResp := store.VarSet(601, &structs.VarApplyStateRequest{
|
||||
Op: structs.VarOpSet,
|
||||
Var: variable,
|
||||
})
|
||||
require.NoError(t, setResp.Error)
|
||||
|
||||
// insert an "old" key that's inactive but being used by an alloc
|
||||
key3 := structs.NewRootKeyMeta()
|
||||
key3.SetInactive()
|
||||
require.NoError(t, store.UpsertRootKeyMeta(800, key3, false))
|
||||
|
||||
// insert the allocation using key3
|
||||
alloc := mock.Alloc()
|
||||
alloc.ClientStatus = structs.AllocClientStatusRunning
|
||||
alloc.SigningKeyID = key3.KeyID
|
||||
require.NoError(t, store.UpsertAllocs(
|
||||
structs.MsgTypeTestSetup, 850, []*structs.Allocation{alloc}))
|
||||
|
||||
// insert an "old" key that's inactive but being used by an alloc
|
||||
key4 := structs.NewRootKeyMeta()
|
||||
key4.SetInactive()
|
||||
require.NoError(t, store.UpsertRootKeyMeta(900, key4, false))
|
||||
|
||||
// insert the dead allocation using key4
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.ClientStatus = structs.AllocClientStatusFailed
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
alloc2.SigningKeyID = key4.KeyID
|
||||
require.NoError(t, store.UpsertAllocs(
|
||||
structs.MsgTypeTestSetup, 950, []*structs.Allocation{alloc2}))
|
||||
|
||||
// insert a time table index before the last key
|
||||
tt := srv.fsm.TimeTable()
|
||||
tt.Witness(1000, time.Now().UTC().Add(-1*srv.config.RootKeyGCThreshold))
|
||||
|
||||
// insert a "new" but inactive key
|
||||
key5 := structs.NewRootKeyMeta()
|
||||
key5.SetInactive()
|
||||
require.NoError(t, store.UpsertRootKeyMeta(1500, key5, false))
|
||||
|
||||
// run the core job
|
||||
snap, err := store.Snapshot()
|
||||
require.NoError(t, err)
|
||||
core := NewCoreScheduler(srv, snap)
|
||||
eval := srv.coreJobEval(structs.CoreJobRootKeyRotateOrGC, 2000)
|
||||
c := core.(*CoreScheduler)
|
||||
require.NoError(t, c.rootKeyRotate(eval))
|
||||
require.NoError(t, c.rootKeyRotateOrGC(eval))
|
||||
|
||||
got, err := store.GetActiveRootKeyMeta(nil)
|
||||
require.NotNil(t, got, "expected keyring to have an active key")
|
||||
require.Equal(t, got.KeyID, key0.KeyID)
|
||||
|
||||
// insert a time table index after the key
|
||||
tt := srv.fsm.TimeTable()
|
||||
tt.Witness(3000, time.Now().UTC().Add(-1*srv.config.RootKeyRotationThreshold))
|
||||
|
||||
// re-run the core job
|
||||
snap, err = store.Snapshot()
|
||||
ws := memdb.NewWatchSet()
|
||||
key, err := store.RootKeyMetaByID(ws, key0.KeyID)
|
||||
require.NoError(t, err)
|
||||
core = NewCoreScheduler(srv, snap)
|
||||
eval = srv.coreJobEval(structs.CoreJobRootKeyRotateOrGC, 4000)
|
||||
c = core.(*CoreScheduler)
|
||||
require.NoError(t, c.rootKeyRotate(eval))
|
||||
require.NotNil(t, key, "active key should not have been GCd")
|
||||
|
||||
key, err = store.RootKeyMetaByID(ws, key1.KeyID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, key, "old and unused inactive key should have been GCd")
|
||||
|
||||
key, err = store.RootKeyMetaByID(ws, key2.KeyID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, key, "old key should not have been GCd if still in use")
|
||||
|
||||
key, err = store.RootKeyMetaByID(ws, key3.KeyID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, key, "old key used to sign a live alloc should not have been GCd")
|
||||
|
||||
key, err = store.RootKeyMetaByID(ws, key4.KeyID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, key, "old key used to sign a terminal alloc should have been GCd")
|
||||
|
||||
key, err = store.RootKeyMetaByID(ws, key5.KeyID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, key, "new key should not have been GCd")
|
||||
|
||||
got, err = store.GetActiveRootKeyMeta(nil)
|
||||
require.NotNil(t, got, "expected keyring to have an active key")
|
||||
require.NotEqual(t, got.KeyID, key0.KeyID)
|
||||
}
|
||||
|
||||
// TestCoreScheduler_VariablesRekey exercises variables rekeying
|
||||
|
@ -2563,18 +2623,6 @@ func TestCoreScheduler_VariablesRekey(t *testing.T) {
|
|||
}, time.Second*5, 100*time.Millisecond,
|
||||
"variable rekey should be complete")
|
||||
|
||||
iter, err := store.RootKeyMetas(memdb.NewWatchSet())
|
||||
require.NoError(t, err)
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
keyMeta := raw.(*structs.RootKeyMeta)
|
||||
if keyMeta.KeyID != newKeyID {
|
||||
require.True(t, keyMeta.Deprecated())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoreScheduler_FailLoop(t *testing.T) {
|
||||
|
|
|
@ -150,9 +150,10 @@ func (e *Encrypter) Decrypt(ciphertext []byte, keyID string) ([]byte, error) {
|
|||
// header name.
|
||||
const keyIDHeader = "kid"
|
||||
|
||||
// SignClaims signs the identity claim for the task and returns an
|
||||
// encoded JWT with both the claim and its signature
|
||||
func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) {
|
||||
// SignClaims signs the identity claim for the task and returns an encoded JWT
|
||||
// (including both the claim and its signature), the key ID of the key used to
|
||||
// sign it, and any error.
|
||||
func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, string, error) {
|
||||
|
||||
// If a key is rotated immediately following a leader election, plans that
|
||||
// are in-flight may get signed before the new leader has the key. Allow for
|
||||
|
@ -164,7 +165,7 @@ func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) {
|
|||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", err
|
||||
return "", "", err
|
||||
default:
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
keyset, err = e.activeKeySet()
|
||||
|
@ -180,10 +181,10 @@ func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) {
|
|||
|
||||
tokenString, err := token.SignedString(keyset.privateKey)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
return tokenString, nil
|
||||
return tokenString, keyset.rootKey.Meta.KeyID, nil
|
||||
}
|
||||
|
||||
// VerifyClaim accepts a previously-signed encoded claim and validates
|
||||
|
@ -430,8 +431,10 @@ func (krr *KeyringReplicator) stop() {
|
|||
krr.stopFn()
|
||||
}
|
||||
|
||||
const keyringReplicationRate = 10
|
||||
|
||||
func (krr *KeyringReplicator) run(ctx context.Context) {
|
||||
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
|
||||
limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate)
|
||||
krr.logger.Debug("starting encryption key replication")
|
||||
defer krr.logger.Debug("exiting key replication")
|
||||
|
||||
|
@ -460,59 +463,27 @@ START:
|
|||
krr.logger.Error("failed to fetch keyring", "error", err)
|
||||
goto ERR_WAIT
|
||||
}
|
||||
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
keyMeta := raw.(*structs.RootKeyMeta)
|
||||
keyID := keyMeta.KeyID
|
||||
if _, err := krr.encrypter.GetKey(keyID); err == nil {
|
||||
if key, err := krr.encrypter.GetKey(keyMeta.KeyID); err == nil && len(key) > 0 {
|
||||
// the key material is immutable so if we've already got it
|
||||
// we can safely return early
|
||||
// we can move on to the next key
|
||||
continue
|
||||
}
|
||||
|
||||
krr.logger.Trace("replicating new key", "id", keyID)
|
||||
|
||||
getReq := &structs.KeyringGetRootKeyRequest{
|
||||
KeyID: keyID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: krr.srv.config.Region,
|
||||
MinQueryIndex: keyMeta.ModifyIndex - 1,
|
||||
},
|
||||
}
|
||||
getResp := &structs.KeyringGetRootKeyResponse{}
|
||||
err := krr.srv.RPC("Keyring.Get", getReq, getResp)
|
||||
|
||||
if err != nil || getResp.Key == nil {
|
||||
// Key replication needs to tolerate leadership
|
||||
// flapping. If a key is rotated during a
|
||||
// leadership transition, it's possible that the
|
||||
// new leader has not yet replicated the key from
|
||||
// the old leader before the transition. Ask all
|
||||
// the other servers if they have it.
|
||||
krr.logger.Warn("failed to fetch key from current leader, trying peers",
|
||||
"key", keyID, "error", err)
|
||||
getReq.AllowStale = true
|
||||
for _, peer := range krr.getAllPeers() {
|
||||
err = krr.srv.forwardServer(peer, "Keyring.Get", getReq, getResp)
|
||||
if err == nil && getResp.Key != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if getResp.Key == nil {
|
||||
krr.logger.Error("failed to fetch key from any peer",
|
||||
"key", keyID, "error", err)
|
||||
goto ERR_WAIT
|
||||
}
|
||||
}
|
||||
err = krr.encrypter.AddKey(getResp.Key)
|
||||
err := krr.replicateKey(ctx, keyMeta)
|
||||
if err != nil {
|
||||
krr.logger.Error("failed to add key", "key", keyID, "error", err)
|
||||
goto ERR_WAIT
|
||||
// don't break the loop on an error, as we want to make sure
|
||||
// we've replicated any keys we can. the rate limiter will
|
||||
// prevent this case from sending excessive RPCs
|
||||
krr.logger.Error(err.Error(), "key", keyMeta.KeyID)
|
||||
}
|
||||
krr.logger.Info("added key", "key", keyID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -529,6 +500,53 @@ ERR_WAIT:
|
|||
|
||||
}
|
||||
|
||||
// replicateKey replicates a single key from peer servers that was present in
|
||||
// the state store but missing from the keyring. Returns an error only if no
|
||||
// peers have this key.
|
||||
func (krr *KeyringReplicator) replicateKey(ctx context.Context, keyMeta *structs.RootKeyMeta) error {
|
||||
keyID := keyMeta.KeyID
|
||||
krr.logger.Debug("replicating new key", "id", keyID)
|
||||
|
||||
getReq := &structs.KeyringGetRootKeyRequest{
|
||||
KeyID: keyID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: krr.srv.config.Region,
|
||||
MinQueryIndex: keyMeta.ModifyIndex - 1,
|
||||
},
|
||||
}
|
||||
getResp := &structs.KeyringGetRootKeyResponse{}
|
||||
err := krr.srv.RPC("Keyring.Get", getReq, getResp)
|
||||
|
||||
if err != nil || getResp.Key == nil {
|
||||
// Key replication needs to tolerate leadership flapping. If a key is
|
||||
// rotated during a leadership transition, it's possible that the new
|
||||
// leader has not yet replicated the key from the old leader before the
|
||||
// transition. Ask all the other servers if they have it.
|
||||
krr.logger.Warn("failed to fetch key from current leader, trying peers",
|
||||
"key", keyID, "error", err)
|
||||
getReq.AllowStale = true
|
||||
for _, peer := range krr.getAllPeers() {
|
||||
err = krr.srv.forwardServer(peer, "Keyring.Get", getReq, getResp)
|
||||
if err == nil && getResp.Key != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if getResp.Key == nil {
|
||||
krr.logger.Error("failed to fetch key from any peer",
|
||||
"key", keyID, "error", err)
|
||||
return fmt.Errorf("failed to fetch key from any peer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = krr.encrypter.AddKey(getResp.Key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add key to keyring: %v", err)
|
||||
}
|
||||
|
||||
krr.logger.Debug("added key", "key", keyID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: move this method into Server?
|
||||
func (krr *KeyringReplicator) getAllPeers() []*serverParts {
|
||||
krr.srv.peerLock.RLock()
|
||||
|
|
|
@ -70,8 +70,11 @@ func TestEncrypter_Restore(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var listResp structs.KeyringListRootKeyMetaResponse
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
msgpackrpc.CallWithCodec(codec, "Keyring.List", listReq, &listResp)
|
||||
require.Len(t, listResp.Keys, 1)
|
||||
return len(listResp.Keys) == 1
|
||||
}, time.Second*5, time.Second, "expected keyring to be initialized")
|
||||
|
||||
// Send a few key rotations to add keys
|
||||
|
||||
|
@ -102,9 +105,10 @@ func TestEncrypter_Restore(t *testing.T) {
|
|||
|
||||
// Verify we've restored all the keys from the old keystore
|
||||
|
||||
err := msgpackrpc.CallWithCodec(codec, "Keyring.List", listReq, &listResp)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, listResp.Keys, 5) // 4 new + the bootstrap key
|
||||
require.Eventually(t, func() bool {
|
||||
msgpackrpc.CallWithCodec(codec, "Keyring.List", listReq, &listResp)
|
||||
return len(listResp.Keys) == 5 // 4 new + the bootstrap key
|
||||
}, time.Second*5, time.Second, "expected keyring to be restored")
|
||||
|
||||
for _, keyMeta := range listResp.Keys {
|
||||
|
||||
|
@ -115,7 +119,7 @@ func TestEncrypter_Restore(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var getResp structs.KeyringGetRootKeyResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Keyring.Get", getReq, &getResp)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Keyring.Get", getReq, &getResp)
|
||||
require.NoError(t, err)
|
||||
|
||||
gotKey := getResp.Key
|
||||
|
@ -174,8 +178,12 @@ func TestEncrypter_KeyringReplication(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var listResp structs.KeyringListRootKeyMetaResponse
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
msgpackrpc.CallWithCodec(codec, "Keyring.List", listReq, &listResp)
|
||||
require.Len(t, listResp.Keys, 1)
|
||||
return len(listResp.Keys) == 1
|
||||
}, time.Second*5, time.Second, "expected keyring to be initialized")
|
||||
|
||||
keyID1 := listResp.Keys[0].KeyID
|
||||
|
||||
keyPath := filepath.Join(leader.GetConfig().DataDir, "keystore",
|
||||
|
@ -326,7 +334,7 @@ func TestEncrypter_SignVerify(t *testing.T) {
|
|||
claim := alloc.ToTaskIdentityClaims(nil, "web")
|
||||
e := srv.encrypter
|
||||
|
||||
out, err := e.SignClaims(claim)
|
||||
out, _, err := e.SignClaims(claim)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := e.VerifyClaim(out)
|
||||
|
|
|
@ -415,11 +415,12 @@ func (p *planner) signAllocIdentities(job *structs.Job, allocations []*structs.A
|
|||
tg := job.LookupTaskGroup(alloc.TaskGroup)
|
||||
for _, task := range tg.Tasks {
|
||||
claims := alloc.ToTaskIdentityClaims(job, task.Name)
|
||||
token, err := encrypter.SignClaims(claims)
|
||||
token, keyID, err := encrypter.SignClaims(claims)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
alloc.SignedIdentities[task.Name] = token
|
||||
alloc.SigningKeyID = keyID
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -18,6 +18,7 @@ const (
|
|||
TableVariablesQuotas = "variables_quota"
|
||||
TableRootKeyMeta = "root_key_meta"
|
||||
TableACLRoles = "acl_roles"
|
||||
TableAllocs = "allocs"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -31,6 +32,7 @@ const (
|
|||
indexKeyID = "key_id"
|
||||
indexPath = "path"
|
||||
indexName = "name"
|
||||
indexSigningKey = "signing_key"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -686,6 +688,35 @@ func allocTableSchema() *memdb.TableSchema {
|
|||
Field: "DeploymentID",
|
||||
},
|
||||
},
|
||||
|
||||
// signing_key index is used to lookup live allocations by signing
|
||||
// key ID
|
||||
indexSigningKey: {
|
||||
Name: indexSigningKey,
|
||||
AllowMissing: true, // terminal allocations won't be indexed
|
||||
Unique: false,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "SigningKeyID",
|
||||
},
|
||||
&memdb.ConditionalIndex{
|
||||
Conditional: func(obj interface{}) (bool, error) {
|
||||
alloc, ok := obj.(*structs.Allocation)
|
||||
if !ok {
|
||||
return false, fmt.Errorf(
|
||||
"wrong type, got %t should be Allocation", obj)
|
||||
}
|
||||
// note: this isn't alloc.TerminalStatus(),
|
||||
// because we only want to consider the key
|
||||
// unused if the allocation is terminal on both
|
||||
// server and client
|
||||
return !(alloc.ClientTerminalStatus() && alloc.ServerTerminalStatus()), nil
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6959,3 +6959,29 @@ func (s *StateStore) GetActiveRootKeyMeta(ws memdb.WatchSet) (*structs.RootKeyMe
|
|||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// IsRootKeyMetaInUse determines whether a key has been used to sign a workload
|
||||
// identity for a live allocation or encrypt any variables
|
||||
func (s *StateStore) IsRootKeyMetaInUse(keyID string) (bool, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
iter, err := txn.Get(TableAllocs, indexSigningKey, keyID, true)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
alloc := iter.Next()
|
||||
if alloc != nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
iter, err = txn.Get(TableVariables, indexKeyID, keyID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
variable := iter.Next()
|
||||
if variable != nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -54,6 +54,10 @@ const (
|
|||
RootKeyStateInactive RootKeyState = "inactive"
|
||||
RootKeyStateActive = "active"
|
||||
RootKeyStateRekeying = "rekeying"
|
||||
|
||||
// RootKeyStateDeprecated is, itself, deprecated and is no longer in
|
||||
// use. For backwards compatibility, any existing keys with this state will
|
||||
// be treated as RootKeyStateInactive
|
||||
RootKeyStateDeprecated = "deprecated"
|
||||
)
|
||||
|
||||
|
@ -103,14 +107,10 @@ func (rkm *RootKeyMeta) SetInactive() {
|
|||
rkm.State = RootKeyStateInactive
|
||||
}
|
||||
|
||||
// Deprecated indicates that variables encrypted with this key
|
||||
// have been rekeyed
|
||||
func (rkm *RootKeyMeta) Deprecated() bool {
|
||||
return rkm.State == RootKeyStateDeprecated
|
||||
}
|
||||
|
||||
func (rkm *RootKeyMeta) SetDeprecated() {
|
||||
rkm.State = RootKeyStateDeprecated
|
||||
// Inactive indicates that this key is no longer being used to encrypt new
|
||||
// variables or workload identities.
|
||||
func (rkm *RootKeyMeta) Inactive() bool {
|
||||
return rkm.State == RootKeyStateInactive || rkm.State == RootKeyStateDeprecated
|
||||
}
|
||||
|
||||
func (rkm *RootKeyMeta) Stub() *RootKeyMetaStub {
|
||||
|
|
|
@ -9717,11 +9717,14 @@ type Allocation struct {
|
|||
// to stop running because it got preempted
|
||||
PreemptedByAllocation string
|
||||
|
||||
// SignedIdentities is a map of task names to signed
|
||||
// identity/capability claim tokens for those tasks. If needed, it
|
||||
// is populated in the plan applier
|
||||
// SignedIdentities is a map of task names to signed identity/capability
|
||||
// claim tokens for those tasks. If needed, it is populated in the plan
|
||||
// applier.
|
||||
SignedIdentities map[string]string `json:"-"`
|
||||
|
||||
// SigningKeyID is the key used to sign the SignedIdentities field.
|
||||
SigningKeyID string
|
||||
|
||||
// Raft Indexes
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
|
|
|
@ -61,15 +61,15 @@ func TestVariablesEndpoint_auth(t *testing.T) {
|
|||
structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc1, alloc2, alloc3, alloc4}))
|
||||
|
||||
claims1 := alloc1.ToTaskIdentityClaims(nil, "web")
|
||||
idToken, err := srv.encrypter.SignClaims(claims1)
|
||||
idToken, _, err := srv.encrypter.SignClaims(claims1)
|
||||
must.NoError(t, err)
|
||||
|
||||
claims2 := alloc2.ToTaskIdentityClaims(nil, "web")
|
||||
noPermissionsToken, err := srv.encrypter.SignClaims(claims2)
|
||||
noPermissionsToken, _, err := srv.encrypter.SignClaims(claims2)
|
||||
must.NoError(t, err)
|
||||
|
||||
claims3 := alloc3.ToTaskIdentityClaims(alloc3.Job, "web")
|
||||
idDispatchToken, err := srv.encrypter.SignClaims(claims3)
|
||||
idDispatchToken, _, err := srv.encrypter.SignClaims(claims3)
|
||||
must.NoError(t, err)
|
||||
|
||||
// corrupt the signature of the token
|
||||
|
@ -83,7 +83,7 @@ func TestVariablesEndpoint_auth(t *testing.T) {
|
|||
invalidIDToken := strings.Join(idTokenParts, ".")
|
||||
|
||||
claims4 := alloc4.ToTaskIdentityClaims(alloc4.Job, "web")
|
||||
wiOnlyToken, err := srv.encrypter.SignClaims(claims4)
|
||||
wiOnlyToken, _, err := srv.encrypter.SignClaims(claims4)
|
||||
must.NoError(t, err)
|
||||
|
||||
policy := mock.ACLPolicy()
|
||||
|
@ -570,7 +570,7 @@ func TestVariablesEndpoint_ListFiltering(t *testing.T) {
|
|||
structs.MsgTypeTestSetup, idx, []*structs.Allocation{alloc}))
|
||||
|
||||
claims := alloc.ToTaskIdentityClaims(alloc.Job, "web")
|
||||
token, err := srv.encrypter.SignClaims(claims)
|
||||
token, _, err := srv.encrypter.SignClaims(claims)
|
||||
must.NoError(t, err)
|
||||
|
||||
writeVar := func(ns, path string) {
|
||||
|
|
|
@ -200,11 +200,15 @@ server {
|
|||
rejoin the cluster.
|
||||
|
||||
- `root_key_gc_interval` `(string: "10m")` - Specifies the interval between
|
||||
checks to rotate the root [encryption key][].
|
||||
[encryption key][] metadata garbage collections.
|
||||
|
||||
- `root_key_gc_threshold` `(string: "1h")` - Specifies the minimum time that an
|
||||
[encryption key][] must exist before it can be eligible for garbage
|
||||
collection.
|
||||
|
||||
- `root_key_rotation_threshold` `(string: "720h")` - Specifies the minimum time
|
||||
that an [encryption key][] must exist before it is automatically rotated on
|
||||
the next `root_key_gc_interval`.
|
||||
the next garbage collection interval.
|
||||
|
||||
- `server_join` <code>([server_join][server-join]: nil)</code> - Specifies
|
||||
how the Nomad server will connect to other Nomad servers. The `retry_join`
|
||||
|
|
Loading…
Reference in New Issue