keyring: update handle to state inside replication loop (#15227)
* keyring: update handle to state inside replication loop When keyring replication starts, we take a handle to the state store. But whenever a snapshot is restored, this handle is invalidated and no longer points to a state store that is receiving new keys. This leaks a bunch of memory too! In addition to operator-initiated restores, when fresh servers are added to existing clusters with large-enough state, the keyring replication can get started quickly enough that it's running before the snapshot from the existing clusters have been restored. Fix this by updating the handle to the state store on each pass.
This commit is contained in:
parent
c94c231c08
commit
dd3a07302e
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
keyring: Fixed a bug where replication would stop after snapshot restores
|
||||||
|
```
|
|
@ -431,18 +431,13 @@ func (krr *KeyringReplicator) stop() {
|
||||||
krr.stopFn()
|
krr.stopFn()
|
||||||
}
|
}
|
||||||
|
|
||||||
const keyringReplicationRate = 10
|
const keyringReplicationRate = 5
|
||||||
|
|
||||||
func (krr *KeyringReplicator) run(ctx context.Context) {
|
func (krr *KeyringReplicator) run(ctx context.Context) {
|
||||||
limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate)
|
|
||||||
krr.logger.Debug("starting encryption key replication")
|
krr.logger.Debug("starting encryption key replication")
|
||||||
defer krr.logger.Debug("exiting key replication")
|
defer krr.logger.Debug("exiting key replication")
|
||||||
|
|
||||||
retryErrTimer, stop := helper.NewSafeTimer(time.Second * 1)
|
limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate)
|
||||||
defer stop()
|
|
||||||
|
|
||||||
START:
|
|
||||||
store := krr.srv.fsm.State()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -451,19 +446,17 @@ START:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
// Rate limit how often we attempt replication
|
|
||||||
err := limiter.Wait(ctx)
|
err := limiter.Wait(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
goto ERR_WAIT // rate limit exceeded
|
continue // rate limit exceeded
|
||||||
}
|
}
|
||||||
|
|
||||||
ws := store.NewWatchSet()
|
store := krr.srv.fsm.State()
|
||||||
iter, err := store.RootKeyMetas(ws)
|
iter, err := store.RootKeyMetas(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
krr.logger.Error("failed to fetch keyring", "error", err)
|
krr.logger.Error("failed to fetch keyring", "error", err)
|
||||||
goto ERR_WAIT
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
raw := iter.Next()
|
raw := iter.Next()
|
||||||
if raw == nil {
|
if raw == nil {
|
||||||
|
@ -488,16 +481,6 @@ START:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ERR_WAIT:
|
|
||||||
retryErrTimer.Reset(1 * time.Second)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-retryErrTimer.C:
|
|
||||||
goto START
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// replicateKey replicates a single key from peer servers that was present in
|
// replicateKey replicates a single key from peer servers that was present in
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package nomad
|
package nomad
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -8,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
|
"github.com/shoenig/test/must"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/nomad/ci"
|
"github.com/hashicorp/nomad/ci"
|
||||||
|
@ -300,6 +302,32 @@ func TestEncrypter_KeyringReplication(t *testing.T) {
|
||||||
time.Second*5, time.Second,
|
time.Second*5, time.Second,
|
||||||
"expected new servers to get replicated keys")
|
"expected new servers to get replicated keys")
|
||||||
|
|
||||||
|
// Scenario: reload a snapshot
|
||||||
|
|
||||||
|
t.Logf("taking snapshot of node5")
|
||||||
|
|
||||||
|
snapshot, err := srv5.fsm.Snapshot()
|
||||||
|
must.NoError(t, err)
|
||||||
|
|
||||||
|
defer snapshot.Release()
|
||||||
|
|
||||||
|
// Persist so we can read it back
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
sink := &MockSink{buf, false}
|
||||||
|
must.NoError(t, snapshot.Persist(sink))
|
||||||
|
|
||||||
|
must.NoError(t, srv5.fsm.Restore(sink))
|
||||||
|
|
||||||
|
// rotate the key
|
||||||
|
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Keyring.Rotate", rotateReq, &rotateResp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
keyID4 := rotateResp.Key.KeyID
|
||||||
|
|
||||||
|
require.Eventually(t, checkReplicationFn(keyID4),
|
||||||
|
time.Second*5, time.Second,
|
||||||
|
"expected new servers to get replicated keys after snapshot restore")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEncrypter_EncryptDecrypt(t *testing.T) {
|
func TestEncrypter_EncryptDecrypt(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue