Raft retry join (#7856)
* Raft retry join * update * Make retry join work with shamir seal * Return upon context completion * Update vault/raft.go Co-Authored-By: Brian Kassouf <briankassouf@users.noreply.github.com> * Address some review comments * send leader information slice as a parameter * Make retry join work properly with Shamir case. This commit has a blocking issue * Fix join goroutine exiting before the job is done * Polishing changes * Don't return after a successful join during unseal * Added config parsing test * Add test and fix bugs * minor changes * Address review comments * Fix build error Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com>
This commit is contained in:
parent
02c9a45c40
commit
8891f2ba88
|
@ -1505,6 +1505,15 @@ CLUSTER_SYNTHESIS_COMPLETE:
|
|||
}()
|
||||
}
|
||||
|
||||
// When the underlying storage is raft, kick off retry join if it was specified
|
||||
// in the configuration
|
||||
if config.Storage.Type == "raft" {
|
||||
if err := core.InitiateRetryJoin(context.Background()); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed to initiate raft retry join, %q", err.Error()))
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
// Perform service discovery registrations and initialization of
|
||||
// HTTP server after the verifyOnly check.
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package server
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -730,11 +731,25 @@ func ParseStorage(result *Config, list *ast.ObjectList, name string) error {
|
|||
key = item.Keys[0].Token.Value().(string)
|
||||
}
|
||||
|
||||
var m map[string]string
|
||||
if err := hcl.DecodeObject(&m, item.Val); err != nil {
|
||||
var config map[string]interface{}
|
||||
if err := hcl.DecodeObject(&config, item.Val); err != nil {
|
||||
return multierror.Prefix(err, fmt.Sprintf("%s.%s:", name, key))
|
||||
}
|
||||
|
||||
m := make(map[string]string)
|
||||
for key, val := range config {
|
||||
valStr, ok := val.(string)
|
||||
if ok {
|
||||
m[key] = valStr
|
||||
continue
|
||||
}
|
||||
valBytes, err := jsonutil.EncodeJSON(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m[key] = string(valBytes)
|
||||
}
|
||||
|
||||
// Pull out the redirect address since it's common to all backends
|
||||
var redirectAddr string
|
||||
if v, ok := m["redirect_addr"]; ok {
|
||||
|
|
|
@ -37,3 +37,7 @@ func TestParseListeners(t *testing.T) {
|
|||
func TestParseEntropy(t *testing.T) {
|
||||
testParseEntropy(t, true)
|
||||
}
|
||||
|
||||
func TestConfigRaftRetryJoin(t *testing.T) {
|
||||
testConfigRaftRetryJoin(t)
|
||||
}
|
||||
|
|
|
@ -12,6 +12,38 @@ import (
|
|||
"github.com/hashicorp/hcl/hcl/ast"
|
||||
)
|
||||
|
||||
func testConfigRaftRetryJoin(t *testing.T) {
|
||||
config, err := LoadConfigFile("./test-fixtures/raft_retry_join.hcl")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
retryJoinConfig := `[{"leader_api_addr":"http://127.0.0.1:8200"},{"leader_api_addr":"http://127.0.0.2:8200"},{"leader_api_addr":"http://127.0.0.3:8200"}]` + "\n"
|
||||
expected := &Config{
|
||||
Listeners: []*Listener{
|
||||
{
|
||||
Type: "tcp",
|
||||
Config: map[string]interface{}{
|
||||
"address": "127.0.0.1:8200",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
Storage: &Storage{
|
||||
Type: "raft",
|
||||
Config: map[string]string{
|
||||
"path": "/storage/path/raft",
|
||||
"node_id": "raft1",
|
||||
"retry_join": retryJoinConfig,
|
||||
},
|
||||
},
|
||||
DisableMlock: true,
|
||||
DisableMlockRaw: true,
|
||||
}
|
||||
if !reflect.DeepEqual(config, expected) {
|
||||
t.Fatalf("\nexpected: %#v\n actual:%#v\n", config, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadConfigFile_topLevel(t *testing.T, entropy *Entropy) {
|
||||
config, err := LoadConfigFile("./test-fixtures/config2.hcl")
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
storage "raft" {
|
||||
path = "/storage/path/raft"
|
||||
node_id = "raft1"
|
||||
retry_join = [
|
||||
{
|
||||
"leader_api_addr" = "http://127.0.0.1:8200"
|
||||
},
|
||||
{
|
||||
"leader_api_addr" = "http://127.0.0.2:8200"
|
||||
},
|
||||
{
|
||||
"leader_api_addr" = "http://127.0.0.3:8200"
|
||||
}
|
||||
]
|
||||
}
|
||||
listener "tcp" {
|
||||
address = "127.0.0.1:8200"
|
||||
}
|
||||
disable_mlock = true
|
3
go.mod
3
go.mod
|
@ -28,6 +28,7 @@ require (
|
|||
github.com/cockroachdb/apd v1.1.0 // indirect
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c
|
||||
github.com/coreos/go-semver v0.2.0
|
||||
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d // indirect
|
||||
github.com/denisenkom/go-mssqldb v0.0.0-20190412130859-3b1d194e553a
|
||||
github.com/dnaeon/go-vcr v1.0.1 // indirect
|
||||
github.com/dsnet/compress v0.0.1 // indirect
|
||||
|
@ -47,6 +48,7 @@ require (
|
|||
github.com/golang/protobuf v1.3.2
|
||||
github.com/google/go-github v17.0.0+incompatible
|
||||
github.com/google/go-metrics-stackdriver v0.0.0-20190816035513-b52628e82e2a
|
||||
github.com/google/go-querystring v1.0.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.8.5 // indirect
|
||||
github.com/hashicorp/consul-template v0.22.0
|
||||
github.com/hashicorp/consul/api v1.1.0
|
||||
|
@ -95,6 +97,7 @@ require (
|
|||
github.com/joyent/triton-go v0.0.0-20190112182421-51ffac552869
|
||||
github.com/keybase/go-crypto v0.0.0-20190403132359-d65b6b94177f
|
||||
github.com/kr/pretty v0.1.0
|
||||
github.com/kr/pty v1.1.3 // indirect
|
||||
github.com/kr/text v0.1.0
|
||||
github.com/lib/pq v1.2.0
|
||||
github.com/mattn/go-colorable v0.1.4
|
||||
|
|
|
@ -378,11 +378,19 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
|||
vault.TestWaitActive(t, leaderCore.Core)
|
||||
}
|
||||
|
||||
leaderInfo := &raft.LeaderJoinInfo{
|
||||
LeaderAPIAddr: leaderAPI,
|
||||
TLSConfig: leaderCore.TLSConfig,
|
||||
}
|
||||
|
||||
// Join core1
|
||||
{
|
||||
core := cluster.Cores[1]
|
||||
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false)
|
||||
leaderInfos := []*raft.LeaderJoinInfo{
|
||||
leaderInfo,
|
||||
}
|
||||
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderInfos, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -394,7 +402,10 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
|||
{
|
||||
core := cluster.Cores[2]
|
||||
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false)
|
||||
leaderInfos := []*raft.LeaderJoinInfo{
|
||||
leaderInfo,
|
||||
}
|
||||
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderInfos, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
|
@ -44,7 +45,14 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ
|
|||
}
|
||||
}
|
||||
|
||||
joined, err := core.JoinRaftCluster(context.Background(), req.LeaderAPIAddr, tlsConfig, req.Retry, req.NonVoter)
|
||||
leaderInfos := []*raft.LeaderJoinInfo{
|
||||
{
|
||||
LeaderAPIAddr: req.LeaderAPIAddr,
|
||||
TLSConfig: tlsConfig,
|
||||
Retry: req.Retry,
|
||||
},
|
||||
}
|
||||
joined, err := core.JoinRaftCluster(context.Background(), leaderInfos, req.NonVoter)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
|
|
|
@ -2,8 +2,11 @@ package raft
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
"github.com/hashicorp/vault/sdk/helper/tlsutil"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -107,6 +110,64 @@ type RaftBackend struct {
|
|||
permitPool *physical.PermitPool
|
||||
}
|
||||
|
||||
// LeaderJoinInfo contains information required by a node to join itself as a
|
||||
// follower to an existing raft cluster
|
||||
type LeaderJoinInfo struct {
|
||||
// LeaderAPIAddr is the address of the leader node to connect to
|
||||
LeaderAPIAddr string `json:"leader_api_addr"`
|
||||
|
||||
// LeaderCACert is the CA cert of the leader node
|
||||
LeaderCACert string `json:"leader_ca_cert"`
|
||||
|
||||
// LeaderClientCert is the client certificate for the follower node to establish
|
||||
// client authentication during TLS
|
||||
LeaderClientCert string `json:"leader_client_cert"`
|
||||
|
||||
// LeaderClientKey is the client key for the follower node to establish client
|
||||
// authentication during TLS
|
||||
LeaderClientKey string `json:"leader_client_key"`
|
||||
|
||||
// Retry indicates if the join process should automatically be retried
|
||||
Retry bool `json:"-"`
|
||||
|
||||
// TLSConfig for the API client to use when communicating with the leader node
|
||||
TLSConfig *tls.Config `json:"-"`
|
||||
}
|
||||
|
||||
// JoinConfig returns a list of information about possible leader nodes that
|
||||
// this node can join as a follower
|
||||
func (b *RaftBackend) JoinConfig() ([]*LeaderJoinInfo, error) {
|
||||
config := b.conf["retry_join"]
|
||||
if config == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var leaderInfos []*LeaderJoinInfo
|
||||
err := jsonutil.DecodeJSON([]byte(config), &leaderInfos)
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf("failed to decode retry_join config: {{err}}", err)
|
||||
}
|
||||
|
||||
if len(leaderInfos) == 0 {
|
||||
return nil, errors.New("invalid retry_join config")
|
||||
}
|
||||
|
||||
for _, info := range leaderInfos {
|
||||
info.Retry = true
|
||||
var tlsConfig *tls.Config
|
||||
var err error
|
||||
if len(info.LeaderCACert) != 0 || len(info.LeaderClientCert) != 0 || len(info.LeaderClientKey) != 0 {
|
||||
tlsConfig, err = tlsutil.ClientTLSConfig([]byte(info.LeaderCACert), []byte(info.LeaderClientCert), []byte(info.LeaderClientKey))
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf(fmt.Sprintf("failed to create tls config to communicate with leader node %q: {{err}}", info.LeaderAPIAddr), err)
|
||||
}
|
||||
}
|
||||
info.TLSConfig = tlsConfig
|
||||
}
|
||||
|
||||
return leaderInfos, nil
|
||||
}
|
||||
|
||||
// EnsurePath is used to make sure a path exists
|
||||
func EnsurePath(path string, dir bool) error {
|
||||
if !dir {
|
||||
|
|
|
@ -18,7 +18,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/errwrap"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
wrapping "github.com/hashicorp/go-kms-wrapping"
|
||||
|
@ -162,6 +162,7 @@ type raftInformation struct {
|
|||
leaderClient *api.Client
|
||||
leaderBarrierConfig *SealConfig
|
||||
nonVoter bool
|
||||
joinInProgress bool
|
||||
}
|
||||
|
||||
// Core is used as the central manager of Vault activity. It is the primary point of
|
||||
|
@ -204,6 +205,15 @@ type Core struct {
|
|||
// seal is our seal, for seal configuration information
|
||||
seal Seal
|
||||
|
||||
// raftJoinDoneCh is used by the raft retry join routine to inform unseal process
|
||||
// that the join is complete
|
||||
raftJoinDoneCh chan struct{}
|
||||
|
||||
// postUnsealStarted informs the raft retry join routine that unseal key
|
||||
// validation is completed and post unseal has started so that it can complete
|
||||
// the join process when Shamir seal is in use
|
||||
postUnsealStarted *uint32
|
||||
|
||||
// raftInfo will contain information required for this node to join as a
|
||||
// peer to an existing raft cluster
|
||||
raftInfo *raftInformation
|
||||
|
@ -721,7 +731,9 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
requests: new(uint64),
|
||||
syncInterval: syncInterval,
|
||||
},
|
||||
recoveryMode: conf.RecoveryMode,
|
||||
recoveryMode: conf.RecoveryMode,
|
||||
postUnsealStarted: new(uint32),
|
||||
raftJoinDoneCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
atomic.StoreUint32(c.sealed, 1)
|
||||
|
@ -1032,13 +1044,26 @@ func (c *Core) unseal(key []byte, useRecoveryKeys bool) (bool, error) {
|
|||
return c.unsealInternal(ctx, masterKey)
|
||||
}
|
||||
|
||||
// If we are in the middle of a raft join send the answer and wait for
|
||||
// data to start streaming in.
|
||||
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), c.raftInfo); err != nil {
|
||||
return false, err
|
||||
switch c.raftInfo.joinInProgress {
|
||||
case true:
|
||||
// JoinRaftCluster is already trying to perform a join based on retry_join configuration.
|
||||
// Inform that routine that unseal key validation is complete so that it can continue to
|
||||
// try and join possible leader nodes, and wait for it to complete.
|
||||
|
||||
atomic.StoreUint32(c.postUnsealStarted, 1)
|
||||
|
||||
c.logger.Info("waiting for raft retry join process to complete")
|
||||
<-c.raftJoinDoneCh
|
||||
|
||||
default:
|
||||
// This is the case for manual raft join. Send the answer to the leader node and
|
||||
// wait for data to start streaming in.
|
||||
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), c.raftInfo); err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Reset the state
|
||||
c.raftInfo = nil
|
||||
}
|
||||
// Reset the state
|
||||
c.raftInfo = nil
|
||||
|
||||
go func() {
|
||||
keyringFound := false
|
||||
|
|
|
@ -2,8 +2,10 @@ package rafttests
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
@ -32,6 +34,85 @@ func raftCluster(t testing.TB) *vault.TestCluster {
|
|||
return cluster
|
||||
}
|
||||
|
||||
func TestRaft_Retry_Join(t *testing.T) {
|
||||
var conf vault.CoreConfig
|
||||
var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler}
|
||||
teststorage.RaftBackendSetup(&conf, &opts)
|
||||
opts.SetupFunc = nil
|
||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||
cluster.Start()
|
||||
defer cluster.Cleanup()
|
||||
|
||||
addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster}
|
||||
|
||||
leaderCore := cluster.Cores[0]
|
||||
leaderAPI := leaderCore.Client.Address()
|
||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
||||
|
||||
{
|
||||
testhelpers.EnsureCoreSealed(t, leaderCore)
|
||||
leaderCore.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
cluster.UnsealCore(t, leaderCore)
|
||||
vault.TestWaitActive(t, leaderCore.Core)
|
||||
}
|
||||
|
||||
leaderInfos := []*raft.LeaderJoinInfo{
|
||||
&raft.LeaderJoinInfo{
|
||||
LeaderAPIAddr: leaderAPI,
|
||||
TLSConfig: leaderCore.TLSConfig,
|
||||
Retry: true,
|
||||
},
|
||||
}
|
||||
|
||||
{
|
||||
core := cluster.Cores[1]
|
||||
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderInfos, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
cluster.UnsealCore(t, core)
|
||||
}
|
||||
|
||||
{
|
||||
core := cluster.Cores[2]
|
||||
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderInfos, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
cluster.UnsealCore(t, core)
|
||||
}
|
||||
|
||||
checkConfigFunc := func(expected map[string]bool) {
|
||||
secret, err := cluster.Cores[0].Client.Logical().Read("sys/storage/raft/configuration")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
servers := secret.Data["config"].(map[string]interface{})["servers"].([]interface{})
|
||||
|
||||
for _, s := range servers {
|
||||
server := s.(map[string]interface{})
|
||||
delete(expected, server["node_id"].(string))
|
||||
}
|
||||
if len(expected) != 0 {
|
||||
t.Fatalf("failed to read configuration successfully")
|
||||
}
|
||||
}
|
||||
|
||||
checkConfigFunc(map[string]bool{
|
||||
"core-0": true,
|
||||
"core-1": true,
|
||||
"core-2": true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestRaft_Join(t *testing.T) {
|
||||
var conf vault.CoreConfig
|
||||
var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler}
|
||||
|
|
|
@ -176,23 +176,24 @@ func (b *SystemBackend) handleRaftRemovePeerUpdate() framework.OperationFunc {
|
|||
|
||||
func (b *SystemBackend) handleRaftBootstrapChallengeWrite() framework.OperationFunc {
|
||||
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
||||
_, ok := b.Core.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
||||
}
|
||||
|
||||
serverID := d.Get("server_id").(string)
|
||||
if len(serverID) == 0 {
|
||||
return logical.ErrorResponse("no server id provided"), logical.ErrInvalidRequest
|
||||
}
|
||||
|
||||
uuid, err := uuid.GenerateRandomBytes(16)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
answer, ok := b.Core.pendingRaftPeers[serverID]
|
||||
if !ok {
|
||||
var err error
|
||||
answer, err = uuid.GenerateRandomBytes(16)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.Core.pendingRaftPeers[serverID] = answer
|
||||
}
|
||||
|
||||
sealAccess := b.Core.seal.GetAccess()
|
||||
eBlob, err := sealAccess.Encrypt(ctx, uuid, nil)
|
||||
|
||||
eBlob, err := sealAccess.Encrypt(ctx, answer, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -201,7 +202,6 @@ func (b *SystemBackend) handleRaftBootstrapChallengeWrite() framework.OperationF
|
|||
return nil, err
|
||||
}
|
||||
|
||||
b.Core.pendingRaftPeers[serverID] = uuid
|
||||
sealConfig, err := b.Core.seal.BarrierConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -285,6 +285,8 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
|
|||
return nil, err
|
||||
}
|
||||
|
||||
b.logger.Info("follower node answered the raft bootstrap challenge", "follower_server_id", serverID)
|
||||
|
||||
return &logical.Response{
|
||||
Data: map[string]interface{}{
|
||||
"peers": peers,
|
||||
|
|
251
vault/raft.go
251
vault/raft.go
|
@ -2,10 +2,10 @@ package vault
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/hashicorp/vault/sdk/helper/tlsutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -14,8 +14,9 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/errwrap"
|
||||
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
wrapping "github.com/hashicorp/go-kms-wrapping"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
|
@ -525,18 +526,43 @@ func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(co
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig *tls.Config, retry, nonVoter bool) (bool, error) {
|
||||
if len(leaderAddr) == 0 {
|
||||
return false, errors.New("No leader address provided")
|
||||
func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return errors.New("raft storage not configured")
|
||||
}
|
||||
|
||||
if raftStorage.Initialized() {
|
||||
return nil
|
||||
}
|
||||
|
||||
leaderInfos, err := raftStorage.JoinConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Nothing to do if config wasn't supplied
|
||||
if len(leaderInfos) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.Info("raft retry join initiated")
|
||||
|
||||
if _, err = c.JoinRaftCluster(ctx, leaderInfos, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) {
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return false, errors.New("raft storage not configured")
|
||||
}
|
||||
|
||||
if raftStorage.Initialized() {
|
||||
return false, errors.New("raft is already initialized")
|
||||
return false, errors.New("raft storage is already initialized")
|
||||
}
|
||||
|
||||
init, err := c.Initialized(ctx)
|
||||
|
@ -544,103 +570,174 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig
|
|||
return false, errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
||||
}
|
||||
if init {
|
||||
return false, errwrap.Wrapf("join can't be invoked on an initialized cluster: {{err}}", ErrAlreadyInit)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
transport := cleanhttp.DefaultPooledTransport()
|
||||
if tlsConfig != nil {
|
||||
transport.TLSClientConfig = tlsConfig.Clone()
|
||||
if err := http2.ConfigureTransport(transport); err != nil {
|
||||
return false, errwrap.Wrapf("failed to configure TLS: {{err}}", err)
|
||||
}
|
||||
}
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
config := api.DefaultConfig()
|
||||
if config.Error != nil {
|
||||
return false, errwrap.Wrapf("failed to create api client: {{err}}", config.Error)
|
||||
}
|
||||
config.Address = leaderAddr
|
||||
config.HttpClient = client
|
||||
config.MaxRetries = 0
|
||||
apiClient, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
return false, errwrap.Wrapf("failed to create api client: {{err}}", err)
|
||||
}
|
||||
join := func(retry bool) error {
|
||||
joinLeader := func(leaderInfo *raft.LeaderJoinInfo) error {
|
||||
if leaderInfo == nil {
|
||||
return errors.New("raft leader information is nil")
|
||||
}
|
||||
if len(leaderInfo.LeaderAPIAddr) == 0 {
|
||||
return errors.New("raft leader address not provided")
|
||||
}
|
||||
|
||||
join := func() error {
|
||||
// Unwrap the token
|
||||
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
||||
"server_id": raftStorage.NodeID(),
|
||||
})
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("error during bootstrap init call: {{err}}", err)
|
||||
}
|
||||
if secret == nil {
|
||||
return errors.New("could not retrieve bootstrap package")
|
||||
}
|
||||
init, err := c.Initialized(ctx)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
||||
}
|
||||
if init {
|
||||
c.logger.Info("returning from raft join as the node is initialized")
|
||||
return nil
|
||||
}
|
||||
if !c.Sealed() {
|
||||
c.logger.Info("returning from raft join as the node is unsealed")
|
||||
return nil
|
||||
}
|
||||
|
||||
var sealConfig SealConfig
|
||||
err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr)
|
||||
|
||||
if sealConfig.Type != c.seal.BarrierType() {
|
||||
return fmt.Errorf("mismatching seal types between leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType())
|
||||
}
|
||||
// Create an API client to interact with the leader node
|
||||
transport := cleanhttp.DefaultPooledTransport()
|
||||
|
||||
challengeB64, ok := secret.Data["challenge"]
|
||||
if !ok {
|
||||
return errors.New("error during raft bootstrap call, no challenge given")
|
||||
}
|
||||
challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string))
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("error decoding challenge: {{err}}", err)
|
||||
}
|
||||
if leaderInfo.TLSConfig == nil && (len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0) {
|
||||
leaderInfo.TLSConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey))
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to create TLS config: {{err}}", err)
|
||||
}
|
||||
}
|
||||
|
||||
eBlob := &wrapping.EncryptedBlobInfo{}
|
||||
if err := proto.Unmarshal(challengeRaw, eBlob); err != nil {
|
||||
return errwrap.Wrapf("error decoding challenge: {{err}}", err)
|
||||
}
|
||||
raftInfo := &raftInformation{
|
||||
challenge: eBlob,
|
||||
leaderClient: apiClient,
|
||||
leaderBarrierConfig: &sealConfig,
|
||||
nonVoter: nonVoter,
|
||||
}
|
||||
if c.seal.BarrierType() == wrapping.Shamir {
|
||||
c.raftInfo = raftInfo
|
||||
c.seal.SetBarrierConfig(ctx, &sealConfig)
|
||||
if leaderInfo.TLSConfig != nil {
|
||||
transport.TLSClientConfig = leaderInfo.TLSConfig.Clone()
|
||||
if err := http2.ConfigureTransport(transport); err != nil {
|
||||
return errwrap.Wrapf("failed to configure TLS: {{err}}", err)
|
||||
}
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
config := api.DefaultConfig()
|
||||
if config.Error != nil {
|
||||
return errwrap.Wrapf("failed to create api client: {{err}}", config.Error)
|
||||
}
|
||||
config.Address = leaderInfo.LeaderAPIAddr
|
||||
config.HttpClient = client
|
||||
config.MaxRetries = 0
|
||||
apiClient, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to create api client: {{err}}", err)
|
||||
}
|
||||
|
||||
// Attempt to join the leader by requesting for the bootstrap challenge
|
||||
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
||||
"server_id": raftStorage.NodeID(),
|
||||
})
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("error during raft bootstrap init call: {{err}}", err)
|
||||
}
|
||||
if secret == nil {
|
||||
return errors.New("could not retrieve raft bootstrap package")
|
||||
}
|
||||
|
||||
var sealConfig SealConfig
|
||||
err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sealConfig.Type != c.seal.BarrierType() {
|
||||
return fmt.Errorf("mismatching seal types between raft leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType())
|
||||
}
|
||||
|
||||
challengeB64, ok := secret.Data["challenge"]
|
||||
if !ok {
|
||||
return errors.New("error during raft bootstrap call, no challenge given")
|
||||
}
|
||||
challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string))
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("error decoding raft bootstrap challenge: {{err}}", err)
|
||||
}
|
||||
|
||||
eBlob := &wrapping.EncryptedBlobInfo{}
|
||||
if err := proto.Unmarshal(challengeRaw, eBlob); err != nil {
|
||||
return errwrap.Wrapf("error decoding raft bootstrap challenge: {{err}}", err)
|
||||
}
|
||||
raftInfo := &raftInformation{
|
||||
challenge: eBlob,
|
||||
leaderClient: apiClient,
|
||||
leaderBarrierConfig: &sealConfig,
|
||||
nonVoter: nonVoter,
|
||||
}
|
||||
|
||||
if c.seal.BarrierType() == wrapping.Shamir {
|
||||
c.raftInfo = raftInfo
|
||||
if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !retry {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait until unseal keys are supplied
|
||||
c.raftInfo.joinInProgress = true
|
||||
if atomic.LoadUint32(c.postUnsealStarted) != 1 {
|
||||
return errors.New("waiting for unseal keys to be supplied")
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil {
|
||||
return errwrap.Wrapf("failed to send answer to raft leader node: {{err}}", err)
|
||||
}
|
||||
|
||||
if c.seal.BarrierType() == wrapping.Shamir {
|
||||
// Reset the state
|
||||
c.raftInfo = nil
|
||||
|
||||
// In case of Shamir unsealing, inform the unseal process that raft join is completed
|
||||
close(c.raftJoinDoneCh)
|
||||
}
|
||||
|
||||
c.logger.Info("successfully joined the raft cluster", "leader_addr", leaderInfo.LeaderAPIAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil {
|
||||
return errwrap.Wrapf("failed to send answer to leader node: {{err}}", err)
|
||||
// Each join try goes through all the possible leader nodes and attempts to join
|
||||
// them, until one of the attempt succeeds.
|
||||
for _, leaderInfo := range leaderInfos {
|
||||
err = joinLeader(leaderInfo)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
c.logger.Info("join attempt failed", "error", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return errors.New("failed to join any raft leader node")
|
||||
}
|
||||
|
||||
switch retry {
|
||||
switch leaderInfos[0].Retry {
|
||||
case true:
|
||||
go func() {
|
||||
for {
|
||||
// TODO add a way to shut this down
|
||||
err := join()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
err := join(true)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
c.logger.Error("failed to join raft cluster", "error", err)
|
||||
time.Sleep(time.Second * 2)
|
||||
c.logger.Error("failed to retry join raft cluster", "retry", "2s")
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
// Backgrounded so return false
|
||||
return false, nil
|
||||
default:
|
||||
if err := join(); err != nil {
|
||||
if err := join(false); err != nil {
|
||||
c.logger.Error("failed to join raft cluster", "error", err)
|
||||
return false, errwrap.Wrapf("failed to join raft cluster: {{err}}", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue