3e55e79a3f
* k8s doc: update for 0.9.1 and 0.8.0 releases (#10825) * k8s doc: update for 0.9.1 and 0.8.0 releases * Update website/content/docs/platform/k8s/helm/configuration.mdx Co-authored-by: Theron Voran <tvoran@users.noreply.github.com> Co-authored-by: Theron Voran <tvoran@users.noreply.github.com> * Autopilot initial commit * Move autopilot related backend implementations to its own file * Abstract promoter creation * Add nil check for health * Add server state oss no-ops * Config ext stub for oss * Make way for non-voters * s/health/state * s/ReadReplica/NonVoter * Add synopsis and description * Remove struct tags from AutopilotConfig * Use var for config storage path * Handle nin-config when reading * Enable testing autopilot by using inmem cluster * First passing test * Only report the server as known if it is present in raft config * Autopilot defaults to on for all existing and new clusters * Add locking to some functions * Persist initial config * Clarify the command usage doc * Add health metric for each node * Fix audit logging issue * Don't set DisablePerformanceStandby to true in test * Use node id label for health metric * Log updates to autopilot config * Less aggressively consume config loading failures * Return a mutable config * Return early from known servers if raft config is unable to be pulled * Update metrics name * Reduce log level for potentially noisy log * Add knob to disable autopilot * Don't persist if default config is in use * Autopilot: Dead server cleanup (#10857) * Dead server cleanup * Initialize channel in any case * Fix a bunch of tests * Fix panic * Add follower locking in heartbeat tracker * Add LastContactFailureThreshold to config * Add log when marking node as dead * Update follower state locking in heartbeat tracker * Avoid follower states being nil * Pull test to its own file * Add execution status to state response * Optionally enable autopilot in some tests * Updates * Added API function to fetch autopilot configuration * Add test for default autopilot configuration * Configuration tests * Add State API test * Update test * Added TestClusterOptions.PhysicalFactoryConfig * Update locking * Adjust locking in heartbeat tracker * s/last_contact_failure_threshold/left_server_last_contact_threshold * Add disabling autopilot as a core config option * Disable autopilot in some tests * s/left_server_last_contact_threshold/dead_server_last_contact_threshold * Set the lastheartbeat of followers to now when setting up active node * Don't use config defaults from CLI command * Remove config file support * Remove HCL test as well * Persist only supplied config; merge supplied config with default to operate * Use pointer to structs for storing follower information * Test update * Retrieve non voter status from configbucket and set it up when a node comes up * Manage desired suffrage * Consider bucket being created already * Move desired suffrage to its own entry * s/DesiredSuffrageKey/LocalNodeConfigKey * s/witnessSuffrage/recordSuffrage * Fix test compilation * Handle local node config post a snapshot install * Commit to storage first; then record suffrage in fsm * No need of local node config being nili case, post snapshot restore * Reconcile autopilot config when a new leader takes over duty * Grab fsm lock when recording suffrage * s/Suffrage/DesiredSuffrage in FollowerState * Instantiate autopilot only in leader * Default to old ways in more scenarios * Make API gracefully handle 404 * Address some feedback * Make IsDead an atomic.Value * Simplify follower hearbeat tracking * Use uber.atomic * Don't have multiple causes for having autopilot disabled * Don't remove node from follower states if we fail to remove the dead server * Autopilot server removals map (#11019) * Don't remove node from follower states if we fail to remove the dead server * Use map to track dead server removals * Use lock and map * Use delegate lock * Adjust when to remove entry from map * Only hold the lock while accessing map * Fix race * Don't set default min_quorum * Fix test * Ensure follower states is not nil before starting autopilot * Fix race Co-authored-by: Jason O'Donnell <2160810+jasonodonnell@users.noreply.github.com> Co-authored-by: Theron Voran <tvoran@users.noreply.github.com>
160 lines
4.3 KiB
Go
160 lines
4.3 KiB
Go
package vault
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/hashicorp/vault/helper/forwarding"
|
|
"github.com/hashicorp/vault/physical/raft"
|
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
|
"github.com/hashicorp/vault/vault/replication"
|
|
)
|
|
|
|
type forwardedRequestRPCServer struct {
|
|
core *Core
|
|
handler http.Handler
|
|
perfStandbySlots chan struct{}
|
|
perfStandbyRepCluster *replication.Cluster
|
|
raftFollowerStates *raft.FollowerStates
|
|
}
|
|
|
|
func (s *forwardedRequestRPCServer) ForwardRequest(ctx context.Context, freq *forwarding.Request) (*forwarding.Response, error) {
|
|
// Parse an http.Request out of it
|
|
req, err := forwarding.ParseForwardedRequest(freq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// A very dummy response writer that doesn't follow normal semantics, just
|
|
// lets you write a status code (last written wins) and a body. But it
|
|
// meets the interface requirements.
|
|
w := forwarding.NewRPCResponseWriter()
|
|
|
|
resp := &forwarding.Response{}
|
|
|
|
runRequest := func() {
|
|
defer func() {
|
|
// Logic here comes mostly from the Go source code
|
|
if err := recover(); err != nil {
|
|
const size = 64 << 10
|
|
buf := make([]byte, size)
|
|
buf = buf[:runtime.Stack(buf, false)]
|
|
s.core.logger.Error("panic serving forwarded request", "path", req.URL.Path, "error", err, "stacktrace", string(buf))
|
|
}
|
|
}()
|
|
s.handler.ServeHTTP(w, req)
|
|
}
|
|
runRequest()
|
|
resp.StatusCode = uint32(w.StatusCode())
|
|
resp.Body = w.Body().Bytes()
|
|
|
|
header := w.Header()
|
|
if header != nil {
|
|
resp.HeaderEntries = make(map[string]*forwarding.HeaderEntry, len(header))
|
|
for k, v := range header {
|
|
resp.HeaderEntries[k] = &forwarding.HeaderEntry{
|
|
Values: v,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Performance standby nodes will use this value to do wait for WALs to ship
|
|
// in order to do a best-effort read after write guarantee
|
|
resp.LastRemoteWal = LastWAL(s.core)
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *forwardedRequestRPCServer) Echo(ctx context.Context, in *EchoRequest) (*EchoReply, error) {
|
|
if in.ClusterAddr != "" {
|
|
s.core.clusterPeerClusterAddrsCache.Set(in.ClusterAddr, nil, 0)
|
|
}
|
|
|
|
if in.RaftAppliedIndex > 0 && len(in.RaftNodeID) > 0 && s.raftFollowerStates != nil {
|
|
s.raftFollowerStates.Update(in.RaftNodeID, in.RaftAppliedIndex, in.RaftTerm, in.RaftDesiredSuffrage)
|
|
}
|
|
|
|
reply := &EchoReply{
|
|
Message: "pong",
|
|
ReplicationState: uint32(s.core.ReplicationState()),
|
|
}
|
|
|
|
if raftBackend := s.core.getRaftBackend(); raftBackend != nil {
|
|
if !s.core.isRaftHAOnly() {
|
|
reply.RaftAppliedIndex = raftBackend.AppliedIndex()
|
|
reply.RaftNodeID = raftBackend.NodeID()
|
|
}
|
|
}
|
|
|
|
return reply, nil
|
|
}
|
|
|
|
type forwardingClient struct {
|
|
RequestForwardingClient
|
|
|
|
core *Core
|
|
|
|
echoTicker *time.Ticker
|
|
echoContext context.Context
|
|
}
|
|
|
|
// NOTE: we also take advantage of gRPC's keepalive bits, but as we send data
|
|
// with these requests it's useful to keep this as well
|
|
func (c *forwardingClient) startHeartbeat() {
|
|
go func() {
|
|
tick := func() {
|
|
clusterAddr := c.core.ClusterAddr()
|
|
|
|
req := &EchoRequest{
|
|
Message: "ping",
|
|
ClusterAddr: clusterAddr,
|
|
}
|
|
|
|
if raftBackend := c.core.getRaftBackend(); raftBackend != nil {
|
|
if !c.core.isRaftHAOnly() {
|
|
req.RaftAppliedIndex = raftBackend.AppliedIndex()
|
|
req.RaftNodeID = raftBackend.NodeID()
|
|
req.RaftTerm = raftBackend.Term()
|
|
req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage()
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
|
|
resp, err := c.RequestForwardingClient.Echo(ctx, req)
|
|
cancel()
|
|
if err != nil {
|
|
c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err)
|
|
return
|
|
}
|
|
if resp == nil {
|
|
c.core.logger.Debug("forwarding: empty echo response from active node")
|
|
return
|
|
}
|
|
if resp.Message != "pong" {
|
|
c.core.logger.Debug("forwarding: unexpected echo response from active node", "message", resp.Message)
|
|
return
|
|
}
|
|
// Store the active node's replication state to display in
|
|
// sys/health calls
|
|
atomic.StoreUint32(c.core.activeNodeReplicationState, resp.ReplicationState)
|
|
}
|
|
|
|
tick()
|
|
|
|
for {
|
|
select {
|
|
case <-c.echoContext.Done():
|
|
c.echoTicker.Stop()
|
|
c.core.logger.Debug("forwarding: stopping heartbeating")
|
|
atomic.StoreUint32(c.core.activeNodeReplicationState, uint32(consts.ReplicationUnknown))
|
|
return
|
|
case <-c.echoTicker.C:
|
|
tick()
|
|
}
|
|
}
|
|
}()
|
|
}
|