open-vault/vault/request_forwarding_rpc.go
Vishal Nayak 3e55e79a3f
Autopilot: Server Stabilization, State and Dead Server Cleanup (#10856)
* k8s doc: update for 0.9.1 and 0.8.0 releases (#10825)

* k8s doc: update for 0.9.1 and 0.8.0 releases

* Update website/content/docs/platform/k8s/helm/configuration.mdx

Co-authored-by: Theron Voran <tvoran@users.noreply.github.com>

Co-authored-by: Theron Voran <tvoran@users.noreply.github.com>

* Autopilot initial commit

* Move autopilot related backend implementations to its own file

* Abstract promoter creation

* Add nil check for health

* Add server state oss no-ops

* Config ext stub for oss

* Make way for non-voters

* s/health/state

* s/ReadReplica/NonVoter

* Add synopsis and description

* Remove struct tags from AutopilotConfig

* Use var for config storage path

* Handle nin-config when reading

* Enable testing autopilot by using inmem cluster

* First passing test

* Only report the server as known if it is present in raft config

* Autopilot defaults to on for all existing and new clusters

* Add locking to some functions

* Persist initial config

* Clarify the command usage doc

* Add health metric for each node

* Fix audit logging issue

* Don't set DisablePerformanceStandby to true in test

* Use node id label for health metric

* Log updates to autopilot config

* Less aggressively consume config loading failures

* Return a mutable config

* Return early from known servers if raft config is unable to be pulled

* Update metrics name

* Reduce log level for potentially noisy log

* Add knob to disable autopilot

* Don't persist if default config is in use

* Autopilot: Dead server cleanup (#10857)

* Dead server cleanup

* Initialize channel in any case

* Fix a bunch of tests

* Fix panic

* Add follower locking in heartbeat tracker

* Add LastContactFailureThreshold to config

* Add log when marking node as dead

* Update follower state locking in heartbeat tracker

* Avoid follower states being nil

* Pull test to its own file

* Add execution status to state response

* Optionally enable autopilot in some tests

* Updates

* Added API function to fetch autopilot configuration

* Add test for default autopilot configuration

* Configuration tests

* Add State API test

* Update test

* Added TestClusterOptions.PhysicalFactoryConfig

* Update locking

* Adjust locking in heartbeat tracker

* s/last_contact_failure_threshold/left_server_last_contact_threshold

* Add disabling autopilot as a core config option

* Disable autopilot in some tests

* s/left_server_last_contact_threshold/dead_server_last_contact_threshold

* Set the lastheartbeat of followers to now when setting up active node

* Don't use config defaults from CLI command

* Remove config file support

* Remove HCL test as well

* Persist only supplied config; merge supplied config with default to operate

* Use pointer to structs for storing follower information

* Test update

* Retrieve non voter status from configbucket and set it up when a node comes up

* Manage desired suffrage

* Consider bucket being created already

* Move desired suffrage to its own entry

* s/DesiredSuffrageKey/LocalNodeConfigKey

* s/witnessSuffrage/recordSuffrage

* Fix test compilation

* Handle local node config post a snapshot install

* Commit to storage first; then record suffrage in fsm

* No need of local node config being nili case, post snapshot restore

* Reconcile autopilot config when a new leader takes over duty

* Grab fsm lock when recording suffrage

* s/Suffrage/DesiredSuffrage in FollowerState

* Instantiate autopilot only in leader

* Default to old ways in more scenarios

* Make API gracefully handle 404

* Address some feedback

* Make IsDead an atomic.Value

* Simplify follower hearbeat tracking

* Use uber.atomic

* Don't have multiple causes for having autopilot disabled

* Don't remove node from follower states if we fail to remove the dead server

* Autopilot server removals map (#11019)

* Don't remove node from follower states if we fail to remove the dead server

* Use map to track dead server removals

* Use lock and map

* Use delegate lock

* Adjust when to remove entry from map

* Only hold the lock while accessing map

* Fix race

* Don't set default min_quorum

* Fix test

* Ensure follower states is not nil before starting autopilot

* Fix race

Co-authored-by: Jason O'Donnell <2160810+jasonodonnell@users.noreply.github.com>
Co-authored-by: Theron Voran <tvoran@users.noreply.github.com>
2021-03-03 13:59:50 -05:00

160 lines
4.3 KiB
Go

package vault
import (
"context"
"net/http"
"runtime"
"sync/atomic"
"time"
"github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/vault/replication"
)
type forwardedRequestRPCServer struct {
core *Core
handler http.Handler
perfStandbySlots chan struct{}
perfStandbyRepCluster *replication.Cluster
raftFollowerStates *raft.FollowerStates
}
func (s *forwardedRequestRPCServer) ForwardRequest(ctx context.Context, freq *forwarding.Request) (*forwarding.Response, error) {
// Parse an http.Request out of it
req, err := forwarding.ParseForwardedRequest(freq)
if err != nil {
return nil, err
}
// A very dummy response writer that doesn't follow normal semantics, just
// lets you write a status code (last written wins) and a body. But it
// meets the interface requirements.
w := forwarding.NewRPCResponseWriter()
resp := &forwarding.Response{}
runRequest := func() {
defer func() {
// Logic here comes mostly from the Go source code
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
s.core.logger.Error("panic serving forwarded request", "path", req.URL.Path, "error", err, "stacktrace", string(buf))
}
}()
s.handler.ServeHTTP(w, req)
}
runRequest()
resp.StatusCode = uint32(w.StatusCode())
resp.Body = w.Body().Bytes()
header := w.Header()
if header != nil {
resp.HeaderEntries = make(map[string]*forwarding.HeaderEntry, len(header))
for k, v := range header {
resp.HeaderEntries[k] = &forwarding.HeaderEntry{
Values: v,
}
}
}
// Performance standby nodes will use this value to do wait for WALs to ship
// in order to do a best-effort read after write guarantee
resp.LastRemoteWal = LastWAL(s.core)
return resp, nil
}
func (s *forwardedRequestRPCServer) Echo(ctx context.Context, in *EchoRequest) (*EchoReply, error) {
if in.ClusterAddr != "" {
s.core.clusterPeerClusterAddrsCache.Set(in.ClusterAddr, nil, 0)
}
if in.RaftAppliedIndex > 0 && len(in.RaftNodeID) > 0 && s.raftFollowerStates != nil {
s.raftFollowerStates.Update(in.RaftNodeID, in.RaftAppliedIndex, in.RaftTerm, in.RaftDesiredSuffrage)
}
reply := &EchoReply{
Message: "pong",
ReplicationState: uint32(s.core.ReplicationState()),
}
if raftBackend := s.core.getRaftBackend(); raftBackend != nil {
if !s.core.isRaftHAOnly() {
reply.RaftAppliedIndex = raftBackend.AppliedIndex()
reply.RaftNodeID = raftBackend.NodeID()
}
}
return reply, nil
}
type forwardingClient struct {
RequestForwardingClient
core *Core
echoTicker *time.Ticker
echoContext context.Context
}
// NOTE: we also take advantage of gRPC's keepalive bits, but as we send data
// with these requests it's useful to keep this as well
func (c *forwardingClient) startHeartbeat() {
go func() {
tick := func() {
clusterAddr := c.core.ClusterAddr()
req := &EchoRequest{
Message: "ping",
ClusterAddr: clusterAddr,
}
if raftBackend := c.core.getRaftBackend(); raftBackend != nil {
if !c.core.isRaftHAOnly() {
req.RaftAppliedIndex = raftBackend.AppliedIndex()
req.RaftNodeID = raftBackend.NodeID()
req.RaftTerm = raftBackend.Term()
req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage()
}
}
ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
resp, err := c.RequestForwardingClient.Echo(ctx, req)
cancel()
if err != nil {
c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err)
return
}
if resp == nil {
c.core.logger.Debug("forwarding: empty echo response from active node")
return
}
if resp.Message != "pong" {
c.core.logger.Debug("forwarding: unexpected echo response from active node", "message", resp.Message)
return
}
// Store the active node's replication state to display in
// sys/health calls
atomic.StoreUint32(c.core.activeNodeReplicationState, resp.ReplicationState)
}
tick()
for {
select {
case <-c.echoContext.Done():
c.echoTicker.Stop()
c.core.logger.Debug("forwarding: stopping heartbeating")
atomic.StoreUint32(c.core.activeNodeReplicationState, uint32(consts.ReplicationUnknown))
return
case <-c.echoTicker.C:
tick()
}
}
}()
}