storage/raft: Make raftInfo atomic (#16565)
* storage/raft: Make raftInfo atomic This fixes some racy behavior discovered in parallel testing. Change the core struct member to an atomic and update references throughout.
This commit is contained in:
parent
61262ad98e
commit
42900b554b
|
@ -260,8 +260,9 @@ type Core struct {
|
|||
postUnsealStarted *uint32
|
||||
|
||||
// raftInfo will contain information required for this node to join as a
|
||||
// peer to an existing raft cluster
|
||||
raftInfo *raftInformation
|
||||
// peer to an existing raft cluster. This is marked atomic to prevent data
|
||||
// races and casted to raftInformation wherever it is used.
|
||||
raftInfo *atomic.Value
|
||||
|
||||
// migrationInfo is used during (and possibly after) a seal migration.
|
||||
// This contains information about the seal we are migrating *from*. Even
|
||||
|
@ -886,6 +887,7 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
|
|||
rawConfig: new(atomic.Value),
|
||||
recoveryMode: conf.RecoveryMode,
|
||||
postUnsealStarted: new(uint32),
|
||||
raftInfo: new(atomic.Value),
|
||||
raftJoinDoneCh: make(chan struct{}),
|
||||
clusterHeartbeatInterval: clusterHeartbeatInterval,
|
||||
activityLogConfig: conf.ActivityLogConfig,
|
||||
|
@ -925,6 +927,8 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
|
|||
c.activeContextCancelFunc.Store((context.CancelFunc)(nil))
|
||||
atomic.StoreInt64(c.keyRotateGracePeriod, int64(2*time.Minute))
|
||||
|
||||
c.raftInfo.Store((*raftInformation)(nil))
|
||||
|
||||
switch conf.ClusterCipherSuites {
|
||||
case "tls13", "tls12":
|
||||
// Do nothing, let Go use the default
|
||||
|
@ -1373,7 +1377,9 @@ func (c *Core) unsealWithRaft(combinedKey []byte) error {
|
|||
}
|
||||
}
|
||||
|
||||
switch c.raftInfo.joinInProgress {
|
||||
raftInfo := c.raftInfo.Load().(*raftInformation)
|
||||
|
||||
switch raftInfo.joinInProgress {
|
||||
case true:
|
||||
// JoinRaftCluster is already trying to perform a join based on retry_join configuration.
|
||||
// Inform that routine that unseal key validation is complete so that it can continue to
|
||||
|
@ -1387,11 +1393,11 @@ func (c *Core) unsealWithRaft(combinedKey []byte) error {
|
|||
default:
|
||||
// This is the case for manual raft join. Send the answer to the leader node and
|
||||
// wait for data to start streaming in.
|
||||
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), c.raftInfo); err != nil {
|
||||
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
// Reset the state
|
||||
c.raftInfo = nil
|
||||
c.raftInfo.Store((*raftInformation)(nil))
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
@ -1465,13 +1471,15 @@ func (c *Core) getUnsealKey(ctx context.Context, seal Seal) ([]byte, error) {
|
|||
var config *SealConfig
|
||||
var err error
|
||||
|
||||
raftInfo := c.raftInfo.Load().(*raftInformation)
|
||||
|
||||
switch {
|
||||
case seal.RecoveryKeySupported():
|
||||
config, err = seal.RecoveryConfig(ctx)
|
||||
case c.isRaftUnseal():
|
||||
// Ignore follower's seal config and refer to leader's barrier
|
||||
// configuration.
|
||||
config = c.raftInfo.leaderBarrierConfig
|
||||
config = raftInfo.leaderBarrierConfig
|
||||
default:
|
||||
config, err = seal.BarrierConfig(ctx)
|
||||
}
|
||||
|
|
|
@ -914,7 +914,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
// need to block until the node is unsealed, unless retry is set to
|
||||
// false.
|
||||
if c.seal.BarrierType() == wrapping.Shamir && !c.isRaftHAOnly() {
|
||||
c.raftInfo = raftInfo
|
||||
c.raftInfo.Store(raftInfo)
|
||||
if err := c.seal.SetBarrierConfig(ctx, raftInfo.leaderBarrierConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -924,7 +924,8 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
}
|
||||
|
||||
// Wait until unseal keys are supplied
|
||||
c.raftInfo.joinInProgress = true
|
||||
raftInfo.joinInProgress = true
|
||||
c.raftInfo.Store(raftInfo)
|
||||
if atomic.LoadUint32(c.postUnsealStarted) != 1 {
|
||||
return errors.New("waiting for unseal keys to be supplied")
|
||||
}
|
||||
|
@ -937,7 +938,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
|
||||
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
||||
// Reset the state
|
||||
c.raftInfo = nil
|
||||
c.raftInfo.Store((*raftInformation)(nil))
|
||||
|
||||
// In case of Shamir unsealing, inform the unseal process that raft join is completed
|
||||
close(c.raftJoinDoneCh)
|
||||
|
@ -1268,7 +1269,7 @@ func (c *Core) RaftBootstrap(ctx context.Context, onInit bool) error {
|
|||
}
|
||||
|
||||
func (c *Core) isRaftUnseal() bool {
|
||||
return c.raftInfo != nil
|
||||
return c.raftInfo.Load().(*raftInformation) != nil
|
||||
}
|
||||
|
||||
type answerRespData struct {
|
||||
|
|
Loading…
Reference in New Issue