Parallel retry join (#13606)
This commit is contained in:
parent
e2b17ca96b
commit
400996ef0d
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
storage/raft: When using retry_join stanzas, join against all of them in parallel.
|
||||
```
|
|
@ -655,7 +655,7 @@ func VerifyRaftPeers(t testing.T, client *api.Client, expected map[string]bool)
|
|||
// If the collection is non-empty, it means that the peer was not found in
|
||||
// the response.
|
||||
if len(expected) != 0 {
|
||||
t.Fatalf("failed to read configuration successfully, expected peers no found in configuration list: %v", expected)
|
||||
t.Fatalf("failed to read configuration successfully, expected peers not found in configuration list: %v", expected)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ func TestRaft_HA_ExistingCluster(t *testing.T) {
|
|||
haStorage, haCleanup := teststorage.MakeReusableRaftHAStorage(t, logger, opts.NumCores, physBundle)
|
||||
defer haCleanup()
|
||||
|
||||
updateCLuster := func(t *testing.T) {
|
||||
updateCluster := func(t *testing.T) {
|
||||
t.Log("simulating cluster update with raft as HABackend")
|
||||
|
||||
opts.SkipInit = true
|
||||
|
@ -240,5 +240,5 @@ func TestRaft_HA_ExistingCluster(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
updateCLuster(t)
|
||||
updateCluster(t)
|
||||
}
|
||||
|
|
370
vault/raft.go
370
vault/raft.go
|
@ -710,6 +710,104 @@ func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// getRaftChallenge is a helper function used by the raft join process for adding a
|
||||
// node to a cluster: it contacts the given node and initiates the bootstrap
|
||||
// challenge, returning the result or an error.
|
||||
func (c *Core) getRaftChallenge(leaderInfo *raft.LeaderJoinInfo) (*raftInformation, error) {
|
||||
if leaderInfo == nil {
|
||||
return nil, errors.New("raft leader information is nil")
|
||||
}
|
||||
if len(leaderInfo.LeaderAPIAddr) == 0 {
|
||||
return nil, errors.New("raft leader address not provided")
|
||||
}
|
||||
|
||||
c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr)
|
||||
|
||||
// Create an API client to interact with the leader node
|
||||
transport := cleanhttp.DefaultPooledTransport()
|
||||
|
||||
var err error
|
||||
if leaderInfo.TLSConfig == nil && (len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0) {
|
||||
leaderInfo.TLSConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create TLS config: %w", err)
|
||||
}
|
||||
leaderInfo.TLSConfig.ServerName = leaderInfo.LeaderTLSServerName
|
||||
}
|
||||
if leaderInfo.TLSConfig == nil && leaderInfo.LeaderTLSServerName != "" {
|
||||
leaderInfo.TLSConfig, err = tlsutil.SetupTLSConfig(map[string]string{"address": leaderInfo.LeaderTLSServerName}, "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create TLS config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if leaderInfo.TLSConfig != nil {
|
||||
transport.TLSClientConfig = leaderInfo.TLSConfig.Clone()
|
||||
if err := http2.ConfigureTransport(transport); err != nil {
|
||||
return nil, fmt.Errorf("failed to configure TLS: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
config := api.DefaultConfig()
|
||||
if config.Error != nil {
|
||||
return nil, fmt.Errorf("failed to create api client: %w", config.Error)
|
||||
}
|
||||
|
||||
config.Address = leaderInfo.LeaderAPIAddr
|
||||
config.HttpClient = client
|
||||
config.MaxRetries = 0
|
||||
|
||||
apiClient, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create api client: %w", err)
|
||||
}
|
||||
|
||||
// Attempt to join the leader by requesting for the bootstrap challenge
|
||||
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
||||
"server_id": c.getRaftBackend().NodeID(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error during raft bootstrap init call: %w", err)
|
||||
}
|
||||
if secret == nil {
|
||||
return nil, errors.New("could not retrieve raft bootstrap package")
|
||||
}
|
||||
|
||||
var sealConfig SealConfig
|
||||
err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if sealConfig.Type != c.seal.BarrierType() {
|
||||
return nil, fmt.Errorf("mismatching seal types between raft leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType())
|
||||
}
|
||||
|
||||
challengeB64, ok := secret.Data["challenge"]
|
||||
if !ok {
|
||||
return nil, errors.New("error during raft bootstrap call, no challenge given")
|
||||
}
|
||||
challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error decoding raft bootstrap challenge: %w", err)
|
||||
}
|
||||
|
||||
eBlob := &wrapping.EncryptedBlobInfo{}
|
||||
if err := proto.Unmarshal(challengeRaw, eBlob); err != nil {
|
||||
return nil, fmt.Errorf("error decoding raft bootstrap challenge: %w", err)
|
||||
}
|
||||
|
||||
return &raftInformation{
|
||||
challenge: eBlob,
|
||||
leaderClient: apiClient,
|
||||
leaderBarrierConfig: &sealConfig,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) {
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
|
@ -790,121 +888,22 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
return false, fmt.Errorf("failed to create auto-join discovery: %w", err)
|
||||
}
|
||||
|
||||
join := func(retry bool) error {
|
||||
joinLeader := func(leaderInfo *raft.LeaderJoinInfo, leaderAddr string) error {
|
||||
if leaderInfo == nil {
|
||||
return errors.New("raft leader information is nil")
|
||||
}
|
||||
if len(leaderAddr) == 0 {
|
||||
return errors.New("raft leader address not provided")
|
||||
}
|
||||
|
||||
init, err := c.InitializedLocally(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if core is initialized: %w", err)
|
||||
}
|
||||
|
||||
if init && !isRaftHAOnly {
|
||||
c.logger.Info("returning from raft join as the node is initialized")
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderAddr)
|
||||
|
||||
// Create an API client to interact with the leader node
|
||||
transport := cleanhttp.DefaultPooledTransport()
|
||||
|
||||
if leaderInfo.TLSConfig == nil && (len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0) {
|
||||
leaderInfo.TLSConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create TLS config: %w", err)
|
||||
}
|
||||
leaderInfo.TLSConfig.ServerName = leaderInfo.LeaderTLSServerName
|
||||
}
|
||||
if leaderInfo.TLSConfig == nil && leaderInfo.LeaderTLSServerName != "" {
|
||||
leaderInfo.TLSConfig, err = tlsutil.SetupTLSConfig(map[string]string{"address": leaderInfo.LeaderTLSServerName}, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create TLS config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if leaderInfo.TLSConfig != nil {
|
||||
transport.TLSClientConfig = leaderInfo.TLSConfig.Clone()
|
||||
if err := http2.ConfigureTransport(transport); err != nil {
|
||||
return fmt.Errorf("failed to configure TLS: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
config := api.DefaultConfig()
|
||||
if config.Error != nil {
|
||||
return fmt.Errorf("failed to create api client: %w", config.Error)
|
||||
}
|
||||
|
||||
config.Address = leaderAddr
|
||||
config.HttpClient = client
|
||||
config.MaxRetries = 0
|
||||
|
||||
apiClient, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create api client: %w", err)
|
||||
}
|
||||
|
||||
// Attempt to join the leader by requesting for the bootstrap challenge
|
||||
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
||||
"server_id": raftBackend.NodeID(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error during raft bootstrap init call: %w", err)
|
||||
}
|
||||
if secret == nil {
|
||||
return errors.New("could not retrieve raft bootstrap package")
|
||||
}
|
||||
|
||||
var sealConfig SealConfig
|
||||
err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sealConfig.Type != c.seal.BarrierType() {
|
||||
return fmt.Errorf("mismatching seal types between raft leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType())
|
||||
}
|
||||
|
||||
challengeB64, ok := secret.Data["challenge"]
|
||||
if !ok {
|
||||
return errors.New("error during raft bootstrap call, no challenge given")
|
||||
}
|
||||
challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error decoding raft bootstrap challenge: %w", err)
|
||||
}
|
||||
|
||||
eBlob := &wrapping.EncryptedBlobInfo{}
|
||||
if err := proto.Unmarshal(challengeRaw, eBlob); err != nil {
|
||||
return fmt.Errorf("error decoding raft bootstrap challenge: %w", err)
|
||||
}
|
||||
|
||||
raftInfo := &raftInformation{
|
||||
challenge: eBlob,
|
||||
leaderClient: apiClient,
|
||||
leaderBarrierConfig: &sealConfig,
|
||||
nonVoter: nonVoter,
|
||||
}
|
||||
|
||||
retryFailures := leaderInfos[0].Retry
|
||||
// answerChallenge performs the second part of a raft join: after we've issued
|
||||
// the sys/storage/raft/bootstrap/challenge call to initiate the join, this
|
||||
// func uses the seal to compute an answer to the challenge and sends it
|
||||
// back to the server that provided the challenge.
|
||||
answerChallenge := func(ctx context.Context, raftInfo *raftInformation) error {
|
||||
// If we're using Shamir and using raft for both physical and HA, we
|
||||
// need to block until the node is unsealed, unless retry is set to
|
||||
// false.
|
||||
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
||||
if c.seal.BarrierType() == wrapping.Shamir && !c.isRaftHAOnly() {
|
||||
c.raftInfo = raftInfo
|
||||
if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil {
|
||||
if err := c.seal.SetBarrierConfig(ctx, raftInfo.leaderBarrierConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !retry {
|
||||
if !retryFailures {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -915,6 +914,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
}
|
||||
}
|
||||
|
||||
raftInfo.nonVoter = nonVoter
|
||||
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil {
|
||||
return fmt.Errorf("failed to send answer to raft leader node: %w", err)
|
||||
}
|
||||
|
@ -927,24 +927,109 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
close(c.raftJoinDoneCh)
|
||||
}
|
||||
|
||||
c.logger.Info("successfully joined the raft cluster", "leader_addr", leaderInfo.LeaderAPIAddr)
|
||||
c.logger.Info("successfully joined the raft cluster", "leader_addr", raftInfo.leaderClient.Address())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Each join try goes through all the possible leader nodes and attempts to join
|
||||
// them, until one of the attempt succeeds.
|
||||
// join attempts to join to any of the leaders defined in leaderInfos,
|
||||
// using the first one that returns a challenge to our request. If shamir
|
||||
// seal is in use, we must wait to get enough unseal keys to solve the
|
||||
// challenge. If we're unable to get a challenge from any leader, or if
|
||||
// we fail to answer the challenge successfully, or if ctx times out,
|
||||
// an error is returned.
|
||||
join := func() error {
|
||||
init, err := c.InitializedLocally(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if core is initialized: %w", err)
|
||||
}
|
||||
if init && !isRaftHAOnly {
|
||||
c.logger.Info("returning from raft join as the node is initialized")
|
||||
return nil
|
||||
}
|
||||
challengeCh := make(chan *raftInformation)
|
||||
var expandedJoinInfos []*raft.LeaderJoinInfo
|
||||
for _, leaderInfo := range leaderInfos {
|
||||
joinInfos, err := c.raftLeaderInfo(leaderInfo, disco)
|
||||
if err != nil {
|
||||
c.logger.Error("error in retry_join stanza, will not use it for raft join", "error", err,
|
||||
"leader_api_addr", leaderInfo.LeaderAPIAddr, "auto_join", leaderInfo.AutoJoin != "")
|
||||
continue
|
||||
}
|
||||
expandedJoinInfos = append(expandedJoinInfos, joinInfos...)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for i := range leaderInfos {
|
||||
wg.Add(1)
|
||||
go func(joinInfo *raft.LeaderJoinInfo) {
|
||||
defer wg.Done()
|
||||
raftInfo, err := c.getRaftChallenge(joinInfo)
|
||||
if err != nil {
|
||||
c.Logger().Trace("failed to get raft challenge", "leader_addr", joinInfo.LeaderAPIAddr, "error", err)
|
||||
return
|
||||
}
|
||||
challengeCh <- raftInfo
|
||||
}(leaderInfos[i])
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(challengeCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case raftInfo := <-challengeCh:
|
||||
if raftInfo != nil {
|
||||
err = answerChallenge(ctx, raftInfo)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timed out on raft join: %w", ctx.Err())
|
||||
}
|
||||
|
||||
switch retryFailures {
|
||||
case true:
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
err := join()
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
c.logger.Error("failed to retry join raft cluster", "retry", "2s")
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
// Backgrounded so return false
|
||||
return false, nil
|
||||
default:
|
||||
if err := join(); err != nil {
|
||||
c.logger.Error("failed to join raft cluster", "error", err)
|
||||
return false, fmt.Errorf("failed to join raft cluster: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// raftLeaderInfo uses go-discover to expand leaderInfo to include any auto-join results
|
||||
func (c *Core) raftLeaderInfo(leaderInfo *raft.LeaderJoinInfo, disco *discover.Discover) ([]*raft.LeaderJoinInfo, error) {
|
||||
var ret []*raft.LeaderJoinInfo
|
||||
switch {
|
||||
case leaderInfo.LeaderAPIAddr != "" && leaderInfo.AutoJoin != "":
|
||||
c.logger.Error("join attempt failed", "error", errors.New("cannot provide both leader address and auto-join metadata"))
|
||||
return nil, errors.New("cannot provide both leader address and auto-join metadata")
|
||||
|
||||
case leaderInfo.LeaderAPIAddr != "":
|
||||
if err := joinLeader(leaderInfo, leaderInfo.LeaderAPIAddr); err != nil {
|
||||
c.logger.Warn("join attempt failed", "error", err)
|
||||
} else {
|
||||
// successfully joined leader
|
||||
return nil
|
||||
}
|
||||
ret = append(ret, leaderInfo)
|
||||
|
||||
case leaderInfo.AutoJoin != "":
|
||||
scheme := leaderInfo.AutoJoinScheme
|
||||
|
@ -957,10 +1042,10 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
// default to 8200 when no port is provided
|
||||
port = 8200
|
||||
}
|
||||
// Addrs returns either IPv4 or IPv6 address sans scheme or port
|
||||
// Addrs returns either IPv4 or IPv6 address, without scheme or port
|
||||
clusterIPs, err := disco.Addrs(leaderInfo.AutoJoin, c.logger.StandardLogger(nil))
|
||||
if err != nil {
|
||||
c.logger.Error("failed to parse addresses from auto-join metadata", "error", err)
|
||||
return nil, fmt.Errorf("failed to parse addresses from auto-join metadata: %w", err)
|
||||
}
|
||||
for _, ip := range clusterIPs {
|
||||
if strings.Count(ip, ":") >= 2 && !strings.HasPrefix(ip, "[") {
|
||||
|
@ -968,50 +1053,15 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
ip = fmt.Sprintf("[%s]", ip)
|
||||
}
|
||||
u := fmt.Sprintf("%s://%s:%d", scheme, ip, port)
|
||||
if err := joinLeader(leaderInfo, u); err != nil {
|
||||
c.logger.Warn("join attempt failed", "error", err)
|
||||
} else {
|
||||
// successfully joined leader
|
||||
return nil
|
||||
}
|
||||
info := *leaderInfo
|
||||
info.LeaderAPIAddr = u
|
||||
ret = append(ret, &info)
|
||||
}
|
||||
|
||||
default:
|
||||
c.logger.Error("join attempt failed", "error", errors.New("must provide leader address or auto-join metadata"))
|
||||
return nil, errors.New("must provide leader address or auto-join metadata")
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("failed to join any raft leader node")
|
||||
}
|
||||
|
||||
switch leaderInfos[0].Retry {
|
||||
case true:
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
err := join(true)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
c.logger.Error("failed to retry join raft cluster", "retry", "2s")
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
// Backgrounded so return false
|
||||
return false, nil
|
||||
default:
|
||||
if err := join(false); err != nil {
|
||||
c.logger.Error("failed to join raft cluster", "error", err)
|
||||
return false, fmt.Errorf("failed to join raft cluster: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// getRaftBackend returns the RaftBackend from the HA or physical backend,
|
||||
|
|
Loading…
Reference in New Issue