OSS portions of raft non-voters (#7634)

* OSS portions of raft non-voters

* add file

* Update vault/raft.go

Co-Authored-By: Vishal Nayak <vishalnayak@users.noreply.github.com>
This commit is contained in:
Brian Kassouf 2019-10-11 11:56:59 -07:00 committed by GitHub
parent e6e20e9eb3
commit 024c29c36a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 80 additions and 35 deletions

View File

@ -20,6 +20,7 @@ type RaftJoinRequest struct {
LeaderClientCert string `json:"leader_client_cert"`
LeaderClientKey string `json:"leader_client_key"`
Retry bool `json:"retry"`
NonVoter bool `json:"non_voter"`
}
// RaftJoin adds the node from which this call is invoked from to the raft

View File

@ -13,10 +13,11 @@ var _ cli.Command = (*OperatorRaftJoinCommand)(nil)
var _ cli.CommandAutocomplete = (*OperatorRaftJoinCommand)(nil)
type OperatorRaftJoinCommand struct {
flagRaftRetry bool
flagRetry bool
flagLeaderCACert string
flagLeaderClientCert string
flagLeaderClientKey string
flagNonVoter bool
*BaseCommand
}
@ -66,11 +67,18 @@ func (c *OperatorRaftJoinCommand) Flags() *FlagSets {
f.BoolVar(&BoolVar{
Name: "retry",
Target: &c.flagRaftRetry,
Target: &c.flagRetry,
Default: false,
Usage: "Continuously retry joining the raft cluster upon failures.",
})
f.BoolVar(&BoolVar{
Name: "non-voter",
Target: &c.flagNonVoter,
Default: false,
Usage: "(Enterprise-only) This flag is used to make the server not participate in the Raft quorum, and have it only receive the data replication stream. This can be used to add read scalability to a cluster in cases where a high volume of reads to servers are needed.",
})
return set
}
@ -117,7 +125,8 @@ func (c *OperatorRaftJoinCommand) Run(args []string) int {
LeaderCACert: c.flagLeaderCACert,
LeaderClientCert: c.flagLeaderClientCert,
LeaderClientKey: c.flagLeaderClientKey,
Retry: c.flagRaftRetry,
Retry: c.flagRetry,
NonVoter: c.flagNonVoter,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error joining the node to the raft cluster: %s", err))

View File

@ -328,7 +328,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
{
core := cluster.Cores[1]
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false)
if err != nil {
t.Fatal(err)
}
@ -340,7 +340,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
{
core := cluster.Cores[2]
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false)
if err != nil {
t.Fatal(err)
}

View File

@ -3,6 +3,7 @@ package http
import (
"context"
"crypto/tls"
"errors"
"io"
"net/http"
@ -29,6 +30,10 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ
return
}
if req.NonVoter && !nonVotersAllowed {
respondError(w, http.StatusBadRequest, errors.New("non-voting nodes not allowed"))
}
var tlsConfig *tls.Config
var err error
if len(req.LeaderCACert) != 0 || len(req.LeaderClientCert) != 0 || len(req.LeaderClientKey) != 0 {
@ -39,7 +44,7 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ
}
}
joined, err := core.JoinRaftCluster(context.Background(), req.LeaderAPIAddr, tlsConfig, req.Retry)
joined, err := core.JoinRaftCluster(context.Background(), req.LeaderAPIAddr, tlsConfig, req.Retry, req.NonVoter)
if err != nil {
respondError(w, http.StatusInternalServerError, err)
return
@ -61,4 +66,5 @@ type JoinRequest struct {
LeaderClientCert string `json:"leader_client_cert"`
LeaderClientKey string `json:"leader_client_key"`
Retry bool `json:"retry"`
NonVoter bool `json:"non_voter"`
}

View File

@ -19,4 +19,6 @@ var (
}
additionalRoutes = func(mux *http.ServeMux, core *vault.Core) {}
nonVotersAllowed = false
)

View File

@ -575,7 +575,6 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e
b.logger.Debug("adding raft peer", "node_id", peerID, "cluster_addr", clusterAddr)
future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0)
return future.Error()
}

View File

@ -0,0 +1,13 @@
// +build !enterprise
package raft
import (
"context"
"errors"
)
// AddPeer adds a new server to the raft cluster
func (b *RaftBackend) AddNonVotingPeer(ctx context.Context, peerID, clusterAddr string) error {
return errors.New("not implemented")
}

View File

@ -152,6 +152,13 @@ type unlockInformation struct {
Nonce string
}
type raftInformation struct {
challenge *physical.EncryptedBlobInfo
leaderClient *api.Client
leaderBarrierConfig *SealConfig
nonVoter bool
}
// Core is used as the central manager of Vault activity. It is the primary point of
// interface for API handlers and is responsible for managing the logical and physical
// backends, router, security barrier, and audit trails.
@ -189,13 +196,9 @@ type Core struct {
// seal is our seal, for seal configuration information
seal Seal
raftUnseal bool
raftChallenge *physical.EncryptedBlobInfo
raftLeaderClient *api.Client
raftLeaderBarrierConfig *SealConfig
// raftInfo will contain information required for this node to join as a
// peer to an existing raft cluster
raftInfo *raftInformation
// migrationSeal is the seal to use during a migration operation. It is the
// seal we're migrating *from*.
@ -923,14 +926,11 @@ func (c *Core) unseal(key []byte, useRecoveryKeys bool) (bool, error) {
// If we are in the middle of a raft join send the answer and wait for
// data to start streaming in.
if err := c.joinRaftSendAnswer(ctx, c.raftLeaderClient, c.raftChallenge, c.seal.GetAccess()); err != nil {
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), c.raftInfo); err != nil {
return false, err
}
// Reset the state
c.raftUnseal = false
c.raftChallenge = nil
c.raftLeaderBarrierConfig = nil
c.raftLeaderClient = nil
c.raftInfo = nil
go func() {
keyringFound := false
@ -1002,7 +1002,7 @@ func (c *Core) unsealPart(ctx context.Context, seal Seal, key []byte, useRecover
case c.isRaftUnseal():
// Ignore follower's seal config and refer to leader's barrier
// configuration.
config = c.raftLeaderBarrierConfig
config = c.raftInfo.leaderBarrierConfig
default:
config, err = seal.BarrierConfig(ctx)
}

View File

@ -1,4 +1,4 @@
package vault
package rafttests
import (
"bytes"

View File

@ -33,6 +33,9 @@ func (b *SystemBackend) raftStoragePaths() []*framework.Path {
"cluster_addr": {
Type: framework.TypeString,
},
"non_voter": {
Type: framework.TypeBool,
},
},
Operations: map[logical.Operation]framework.OperationHandler{
@ -233,6 +236,8 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
return logical.ErrorResponse("no cluster_addr provided"), logical.ErrInvalidRequest
}
nonVoter := d.Get("non_voter").(bool)
answer, err := base64.StdEncoding.DecodeString(answerRaw)
if err != nil {
return logical.ErrorResponse("could not base64 decode answer"), logical.ErrInvalidRequest
@ -261,9 +266,16 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
return nil, errors.New("could not decode raft TLS configuration")
}
if err := raftStorage.AddPeer(ctx, serverID, clusterAddr); err != nil {
switch nonVoter {
case true:
err = raftStorage.AddNonVotingPeer(ctx, serverID, clusterAddr)
default:
err = raftStorage.AddPeer(ctx, serverID, clusterAddr)
}
if err != nil {
return nil, err
}
if b.Core.raftFollowerStates != nil {
b.Core.raftFollowerStates.update(serverID, 0)
}

View File

@ -524,7 +524,7 @@ func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(co
}
}
func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig *tls.Config, retry bool) (bool, error) {
func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig *tls.Config, retry, nonVoter bool) (bool, error) {
if len(leaderAddr) == 0 {
return false, errors.New("No leader address provided")
}
@ -603,17 +603,19 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig
if err := proto.Unmarshal(challengeRaw, eBlob); err != nil {
return errwrap.Wrapf("error decoding challenge: {{err}}", err)
}
raftInfo := &raftInformation{
challenge: eBlob,
leaderClient: apiClient,
leaderBarrierConfig: &sealConfig,
nonVoter: nonVoter,
}
if c.seal.BarrierType() == seal.Shamir {
c.raftUnseal = true
c.raftChallenge = eBlob
c.raftLeaderClient = apiClient
c.raftLeaderBarrierConfig = &sealConfig
c.raftInfo = raftInfo
c.seal.SetBarrierConfig(ctx, &sealConfig)
return nil
}
if err := c.joinRaftSendAnswer(ctx, apiClient, eBlob, c.seal.GetAccess()); err != nil {
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil {
return errwrap.Wrapf("failed to send answer to leader node: {{err}}", err)
}
@ -649,8 +651,8 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig
// This is used in tests to override the cluster address
var UpdateClusterAddrForTests uint32
func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client, challenge *physical.EncryptedBlobInfo, sealAccess seal.Access) error {
if challenge == nil {
func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess seal.Access, raftInfo *raftInformation) error {
if raftInfo.challenge == nil {
return errors.New("raft challenge is nil")
}
@ -663,7 +665,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client,
return errors.New("raft is already initialized")
}
plaintext, err := sealAccess.Decrypt(ctx, challenge)
plaintext, err := sealAccess.Decrypt(ctx, raftInfo.challenge)
if err != nil {
return errwrap.Wrapf("error decrypting challenge: {{err}}", err)
}
@ -683,16 +685,17 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client,
}
}
answerReq := leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer")
answerReq := raftInfo.leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer")
if err := answerReq.SetJSONBody(map[string]interface{}{
"answer": base64.StdEncoding.EncodeToString(plaintext),
"cluster_addr": clusterAddr,
"server_id": raftStorage.NodeID(),
"non_voter": raftInfo.nonVoter,
}); err != nil {
return err
}
answerRespJson, err := leaderClient.RawRequestWithContext(ctx, answerReq)
answerRespJson, err := raftInfo.leaderClient.RawRequestWithContext(ctx, answerReq)
if answerRespJson != nil {
defer answerRespJson.Body.Close()
}
@ -725,7 +728,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client,
}
func (c *Core) isRaftUnseal() bool {
return c.raftUnseal
return c.raftInfo != nil
}
type answerRespData struct {