65c7e149d3
The eval broker's `Cancelable` method used by the cancelable eval reaper mutates the slice of cancelable evals by removing a batch at a time from the slice. But this method unsafely uses a read lock despite this mutation. Under normal workloads this is likely to be safe but when the eval broker is under the heavy load this feature is intended to fix, we're likely to have a race condition. Switch this to a write lock, like the other locks that mutate the eval broker state. This changeset also adjusts the timeout to allow poorly-sized Actions runners more time to schedule the appropriate goroutines. The test has also been updated to use `shoenig/test/wait` so we can have sensible reporting of the results rather than just a timeout error when things go wrong.
2589 lines
81 KiB
Go
2589 lines
81 KiB
Go
package nomad
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-version"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/raft"
|
|
"github.com/hashicorp/serf/serf"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
const (
|
|
// failedEvalUnblockInterval is the interval at which failed evaluations are
|
|
// unblocked to re-enter the scheduler. A failed evaluation occurs under
|
|
// high contention when the schedulers plan does not make progress.
|
|
failedEvalUnblockInterval = 1 * time.Minute
|
|
|
|
// replicationRateLimit is used to rate limit how often data is replicated
|
|
// between the authoritative region and the local region
|
|
replicationRateLimit rate.Limit = 10.0
|
|
|
|
// barrierWriteTimeout is used to give Raft a chance to process a
|
|
// possible loss of leadership event if we are unable to get a barrier
|
|
// while leader.
|
|
barrierWriteTimeout = 2 * time.Minute
|
|
)
|
|
|
|
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
|
|
|
|
var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0"))
|
|
|
|
var minClusterIDVersion = version.Must(version.NewVersion("0.10.4"))
|
|
|
|
var minJobRegisterAtomicEvalVersion = version.Must(version.NewVersion("0.12.1"))
|
|
|
|
var minOneTimeAuthenticationTokenVersion = version.Must(version.NewVersion("1.1.0"))
|
|
|
|
// minACLRoleVersion is the Nomad version at which the ACL role table was
|
|
// introduced. It forms the minimum version all federated servers must meet
|
|
// before the feature can be used.
|
|
var minACLRoleVersion = version.Must(version.NewVersion("1.4.0"))
|
|
|
|
// minACLAuthMethodVersion is the Nomad version at which the ACL auth methods
|
|
// table was introduced. It forms the minimum version all federated servers must
|
|
// meet before the feature can be used.
|
|
var minACLAuthMethodVersion = version.Must(version.NewVersion("1.5.0-beta.1"))
|
|
|
|
// minACLBindingRuleVersion is the Nomad version at which the ACL binding rules
|
|
// table was introduced. It forms the minimum version all federated servers
|
|
// must meet before the feature can be used.
|
|
var minACLBindingRuleVersion = version.Must(version.NewVersion("1.5.0-beta.1"))
|
|
|
|
// minNomadServiceRegistrationVersion is the Nomad version at which the service
|
|
// registrations table was introduced. It forms the minimum version all local
|
|
// servers must meet before the feature can be used.
|
|
var minNomadServiceRegistrationVersion = version.Must(version.NewVersion("1.3.0"))
|
|
|
|
// monitorLeadership is used to monitor if we acquire or lose our role
|
|
// as the leader in the Raft cluster. There is some work the leader is
|
|
// expected to do, so we must react to changes
|
|
func (s *Server) monitorLeadership() {
|
|
var weAreLeaderCh chan struct{}
|
|
var leaderLoop sync.WaitGroup
|
|
|
|
leaderCh := s.raft.LeaderCh()
|
|
|
|
leaderStep := func(isLeader bool) {
|
|
if isLeader {
|
|
if weAreLeaderCh != nil {
|
|
s.logger.Error("attempted to start the leader loop while running")
|
|
return
|
|
}
|
|
|
|
weAreLeaderCh = make(chan struct{})
|
|
leaderLoop.Add(1)
|
|
go func(ch chan struct{}) {
|
|
defer leaderLoop.Done()
|
|
s.leaderLoop(ch)
|
|
}(weAreLeaderCh)
|
|
s.logger.Info("cluster leadership acquired")
|
|
return
|
|
}
|
|
|
|
if weAreLeaderCh == nil {
|
|
s.logger.Error("attempted to stop the leader loop while not running")
|
|
return
|
|
}
|
|
|
|
s.logger.Debug("shutting down leader loop")
|
|
close(weAreLeaderCh)
|
|
leaderLoop.Wait()
|
|
weAreLeaderCh = nil
|
|
s.logger.Info("cluster leadership lost")
|
|
}
|
|
|
|
wasLeader := false
|
|
for {
|
|
select {
|
|
case isLeader := <-leaderCh:
|
|
if wasLeader != isLeader {
|
|
wasLeader = isLeader
|
|
// normal case where we went through a transition
|
|
leaderStep(isLeader)
|
|
} else if wasLeader && isLeader {
|
|
// Server lost but then gained leadership immediately.
|
|
// During this time, this server may have received
|
|
// Raft transitions that haven't been applied to the FSM
|
|
// yet.
|
|
// Ensure that that FSM caught up and eval queues are refreshed
|
|
s.logger.Warn("cluster leadership lost and gained leadership immediately. Could indicate network issues, memory paging, or high CPU load.")
|
|
|
|
leaderStep(false)
|
|
leaderStep(true)
|
|
} else {
|
|
// Server gained but lost leadership immediately
|
|
// before it reacted; nothing to do, move on
|
|
s.logger.Warn("cluster leadership gained and lost leadership immediately. Could indicate network issues, memory paging, or high CPU load.")
|
|
}
|
|
case <-s.shutdownCh:
|
|
if weAreLeaderCh != nil {
|
|
leaderStep(false)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) leadershipTransfer() error {
|
|
retryCount := 3
|
|
for i := 0; i < retryCount; i++ {
|
|
err := s.raft.LeadershipTransfer().Error()
|
|
if err == nil {
|
|
s.logger.Info("successfully transferred leadership")
|
|
return nil
|
|
}
|
|
|
|
// Don't retry if the Raft version doesn't support leadership transfer
|
|
// since this will never succeed.
|
|
if err == raft.ErrUnsupportedProtocol {
|
|
return fmt.Errorf("leadership transfer not supported with Raft version lower than 3")
|
|
}
|
|
|
|
s.logger.Error("failed to transfer leadership attempt, will retry",
|
|
"attempt", i,
|
|
"retry_limit", retryCount,
|
|
"error", err,
|
|
)
|
|
}
|
|
return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount)
|
|
}
|
|
|
|
// leaderLoop runs as long as we are the leader to run various
|
|
// maintenance activities
|
|
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
|
var reconcileCh chan serf.Member
|
|
establishedLeader := false
|
|
|
|
RECONCILE:
|
|
// Setup a reconciliation timer
|
|
reconcileCh = nil
|
|
interval := time.After(s.config.ReconcileInterval)
|
|
|
|
// Apply a raft barrier to ensure our FSM is caught up
|
|
start := time.Now()
|
|
barrier := s.raft.Barrier(barrierWriteTimeout)
|
|
if err := barrier.Error(); err != nil {
|
|
s.logger.Error("failed to wait for barrier", "error", err)
|
|
goto WAIT
|
|
}
|
|
metrics.MeasureSince([]string{"nomad", "leader", "barrier"}, start)
|
|
|
|
// Check if we need to handle initial leadership actions
|
|
if !establishedLeader {
|
|
if err := s.establishLeadership(stopCh); err != nil {
|
|
s.logger.Error("failed to establish leadership", "error", err)
|
|
|
|
// Immediately revoke leadership since we didn't successfully
|
|
// establish leadership.
|
|
if err := s.revokeLeadership(); err != nil {
|
|
s.logger.Error("failed to revoke leadership", "error", err)
|
|
}
|
|
|
|
// Attempt to transfer leadership. If successful, leave the
|
|
// leaderLoop since this node is no longer the leader. Otherwise
|
|
// try to establish leadership again after 5 seconds.
|
|
if err := s.leadershipTransfer(); err != nil {
|
|
s.logger.Error("failed to transfer leadership", "error", err)
|
|
interval = time.After(5 * time.Second)
|
|
goto WAIT
|
|
}
|
|
return
|
|
}
|
|
|
|
establishedLeader = true
|
|
defer func() {
|
|
if err := s.revokeLeadership(); err != nil {
|
|
s.logger.Error("failed to revoke leadership", "error", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Reconcile any missing data
|
|
if err := s.reconcile(); err != nil {
|
|
s.logger.Error("failed to reconcile", "error", err)
|
|
goto WAIT
|
|
}
|
|
|
|
// Initial reconcile worked, now we can process the channel
|
|
// updates
|
|
reconcileCh = s.reconcileCh
|
|
|
|
// Poll the stop channel to give it priority so we don't waste time
|
|
// trying to perform the other operations if we have been asked to shut
|
|
// down.
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
WAIT:
|
|
// Wait until leadership is lost or periodically reconcile as long as we
|
|
// are the leader, or when Serf events arrive.
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
// Lost leadership.
|
|
return
|
|
case <-s.shutdownCh:
|
|
return
|
|
case <-interval:
|
|
goto RECONCILE
|
|
case member := <-reconcileCh:
|
|
s.reconcileMember(member)
|
|
case errCh := <-s.reassertLeaderCh:
|
|
// Recompute leader state, by asserting leadership and
|
|
// repopulating leader states.
|
|
|
|
// Check first if we are indeed the leaders first. We
|
|
// can get into this state when the initial
|
|
// establishLeadership has failed.
|
|
// Afterwards we will be waiting for the interval to
|
|
// trigger a reconciliation and can potentially end up
|
|
// here. There is no point to reassert because this
|
|
// agent was never leader in the first place.
|
|
if !establishedLeader {
|
|
errCh <- fmt.Errorf("leadership has not been established")
|
|
continue
|
|
}
|
|
|
|
// refresh leadership state
|
|
s.revokeLeadership()
|
|
err := s.establishLeadership(stopCh)
|
|
errCh <- err
|
|
|
|
// In case establishLeadership fails, try to transfer leadership.
|
|
// At this point Raft thinks we are the leader, but Nomad did not
|
|
// complete the required steps to act as the leader.
|
|
if err != nil {
|
|
if err := s.leadershipTransfer(); err != nil {
|
|
// establishedLeader was true before, but it no longer is
|
|
// since we revoked leadership and leadershipTransfer also
|
|
// failed.
|
|
// Stay in the leaderLoop with establishedLeader set to
|
|
// false so we try to establish leadership again in the
|
|
// next loop.
|
|
establishedLeader = false
|
|
interval = time.After(5 * time.Second)
|
|
goto WAIT
|
|
}
|
|
|
|
// leadershipTransfer was successful and it is
|
|
// time to leave the leaderLoop.
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// establishLeadership is invoked once we become leader and are able
|
|
// to invoke an initial barrier. The barrier is used to ensure any
|
|
// previously inflight transactions have been committed and that our
|
|
// state is up-to-date.
|
|
func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
|
defer metrics.MeasureSince([]string{"nomad", "leader", "establish_leadership"}, time.Now())
|
|
|
|
// Generate a leader ACL token. This will allow the leader to issue work
|
|
// that requires a valid ACL token.
|
|
s.setLeaderAcl(uuid.Generate())
|
|
|
|
// Disable workers to free half the cores for use in the plan queue and
|
|
// evaluation broker
|
|
s.handlePausableWorkers(true)
|
|
|
|
// Initialize and start the autopilot routine
|
|
s.getOrCreateAutopilotConfig()
|
|
s.autopilot.Start(s.shutdownCtx)
|
|
|
|
// Initialize scheduler configuration.
|
|
schedulerConfig := s.getOrCreateSchedulerConfig()
|
|
|
|
// Initialize the ClusterID
|
|
_, _ = s.ClusterID()
|
|
// todo: use cluster ID for stuff, later!
|
|
|
|
// Enable the plan queue, since we are now the leader
|
|
s.planQueue.SetEnabled(true)
|
|
|
|
// Start the plan evaluator
|
|
go s.planApply()
|
|
|
|
// Start the eval broker and blocked eval broker if these are not paused by
|
|
// the operator.
|
|
restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig)
|
|
|
|
// Enable the deployment watcher, since we are now the leader
|
|
s.deploymentWatcher.SetEnabled(true, s.State())
|
|
|
|
// Enable the NodeDrainer
|
|
s.nodeDrainer.SetEnabled(true, s.State())
|
|
|
|
// Enable the volume watcher, since we are now the leader
|
|
s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl())
|
|
|
|
// Restore the eval broker state and blocked eval state. If these are
|
|
// currently paused, we do not need to do this.
|
|
if restoreEvals {
|
|
if err := s.restoreEvals(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Activate the vault client
|
|
s.vault.SetActive(true)
|
|
|
|
// Enable the periodic dispatcher, since we are now the leader.
|
|
s.periodicDispatcher.SetEnabled(true)
|
|
|
|
// Activate RPC now that local FSM caught up with Raft (as evident by Barrier call success)
|
|
// and all leader related components (e.g. broker queue) are enabled.
|
|
// Auxiliary processes (e.g. background, bookkeeping, and cleanup tasks can start after)
|
|
s.setConsistentReadReady()
|
|
|
|
// Further clean ups and follow up that don't block RPC consistency
|
|
|
|
// Create the first root key if it doesn't already exist
|
|
go s.initializeKeyring(stopCh)
|
|
|
|
// Restore the periodic dispatcher state
|
|
if err := s.restorePeriodicDispatcher(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Schedule periodic jobs which include expired local ACL token garbage
|
|
// collection.
|
|
go s.schedulePeriodic(stopCh)
|
|
|
|
// Reap any failed evaluations
|
|
go s.reapFailedEvaluations(stopCh)
|
|
|
|
// Reap any duplicate blocked evaluations
|
|
go s.reapDupBlockedEvaluations(stopCh)
|
|
|
|
// Reap any cancelable evaluations
|
|
s.reapCancelableEvalsCh = s.reapCancelableEvaluations(stopCh)
|
|
|
|
// Periodically unblock failed allocations
|
|
go s.periodicUnblockFailedEvals(stopCh)
|
|
|
|
// Periodically publish job summary metrics
|
|
go s.publishJobSummaryMetrics(stopCh)
|
|
|
|
// Periodically publish job status metrics
|
|
go s.publishJobStatusMetrics(stopCh)
|
|
|
|
// Setup the heartbeat timers. This is done both when starting up or when
|
|
// a leader fail over happens. Since the timers are maintained by the leader
|
|
// node, effectively this means all the timers are renewed at the time of failover.
|
|
// The TTL contract is that the session will not be expired before the TTL,
|
|
// so expiring it later is allowable.
|
|
//
|
|
// This MUST be done after the initial barrier to ensure the latest Nodes
|
|
// are available to be initialized. Otherwise initialization may use stale
|
|
// data.
|
|
if err := s.initializeHeartbeatTimers(); err != nil {
|
|
s.logger.Error("heartbeat timer setup failed", "error", err)
|
|
return err
|
|
}
|
|
|
|
// If ACLs are enabled, the leader needs to start a number of long-lived
|
|
// routines. Exactly which routines, depends on whether this leader is
|
|
// running within the authoritative region or not.
|
|
if s.config.ACLEnabled {
|
|
|
|
// The authoritative region is responsible for garbage collecting
|
|
// expired global tokens. Otherwise, non-authoritative regions need to
|
|
// replicate policies, tokens, and namespaces.
|
|
switch s.config.AuthoritativeRegion {
|
|
case s.config.Region:
|
|
go s.schedulePeriodicAuthoritative(stopCh)
|
|
default:
|
|
go s.replicateACLPolicies(stopCh)
|
|
go s.replicateACLTokens(stopCh)
|
|
go s.replicateACLRoles(stopCh)
|
|
go s.replicateACLAuthMethods(stopCh)
|
|
go s.replicateACLBindingRules(stopCh)
|
|
go s.replicateNamespaces(stopCh)
|
|
}
|
|
}
|
|
|
|
// Setup any enterprise systems required.
|
|
if err := s.establishEnterpriseLeadership(stopCh); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Cleanup orphaned Vault token accessors
|
|
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Cleanup orphaned Service Identity token accessors
|
|
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// replicateNamespaces is used to replicate namespaces from the authoritative
|
|
// region to this region.
|
|
func (s *Server) replicateNamespaces(stopCh chan struct{}) {
|
|
req := structs.NamespaceListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AllowStale: true,
|
|
},
|
|
}
|
|
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
|
|
s.logger.Debug("starting namespace replication from authoritative region", "region", req.Region)
|
|
|
|
START:
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Rate limit how often we attempt replication
|
|
limiter.Wait(context.Background())
|
|
|
|
// Fetch the list of namespaces
|
|
var resp structs.NamespaceListResponse
|
|
req.AuthToken = s.ReplicationToken()
|
|
err := s.forwardRegion(s.config.AuthoritativeRegion, "Namespace.ListNamespaces", &req, &resp)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch namespaces from authoritative region", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
|
|
// Perform a two-way diff
|
|
delete, update := diffNamespaces(s.State(), req.MinQueryIndex, resp.Namespaces)
|
|
|
|
// Delete namespaces that should not exist
|
|
if len(delete) > 0 {
|
|
args := &structs.NamespaceDeleteRequest{
|
|
Namespaces: delete,
|
|
}
|
|
_, _, err := s.raftApply(structs.NamespaceDeleteRequestType, args)
|
|
if err != nil {
|
|
s.logger.Error("failed to delete namespaces", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
}
|
|
|
|
// Fetch any outdated namespaces
|
|
var fetched []*structs.Namespace
|
|
if len(update) > 0 {
|
|
req := structs.NamespaceSetRequest{
|
|
Namespaces: update,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AuthToken: s.ReplicationToken(),
|
|
AllowStale: true,
|
|
MinQueryIndex: resp.Index - 1,
|
|
},
|
|
}
|
|
var reply structs.NamespaceSetResponse
|
|
if err := s.forwardRegion(s.config.AuthoritativeRegion, "Namespace.GetNamespaces", &req, &reply); err != nil {
|
|
s.logger.Error("failed to fetch namespaces from authoritative region", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
for _, namespace := range reply.Namespaces {
|
|
fetched = append(fetched, namespace)
|
|
}
|
|
}
|
|
|
|
// Update local namespaces
|
|
if len(fetched) > 0 {
|
|
args := &structs.NamespaceUpsertRequest{
|
|
Namespaces: fetched,
|
|
}
|
|
_, _, err := s.raftApply(structs.NamespaceUpsertRequestType, args)
|
|
if err != nil {
|
|
s.logger.Error("failed to update namespaces", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
}
|
|
|
|
// Update the minimum query index, blocks until there is a change.
|
|
req.MinQueryIndex = resp.Index
|
|
}
|
|
|
|
ERR_WAIT:
|
|
select {
|
|
case <-time.After(s.config.ReplicationBackoff):
|
|
goto START
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
func (s *Server) handlePausableWorkers(isLeader bool) {
|
|
for _, w := range s.pausableWorkers() {
|
|
if isLeader {
|
|
w.Pause()
|
|
} else {
|
|
w.Resume()
|
|
}
|
|
}
|
|
}
|
|
|
|
// diffNamespaces is used to perform a two-way diff between the local namespaces
|
|
// and the remote namespaces to determine which namespaces need to be deleted or
|
|
// updated.
|
|
func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*structs.Namespace) (delete []string, update []string) {
|
|
// Construct a set of the local and remote namespaces
|
|
local := make(map[string][]byte)
|
|
remote := make(map[string]struct{})
|
|
|
|
// Add all the local namespaces
|
|
iter, err := state.Namespaces(nil)
|
|
if err != nil {
|
|
panic("failed to iterate local namespaces")
|
|
}
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
namespace := raw.(*structs.Namespace)
|
|
local[namespace.Name] = namespace.Hash
|
|
}
|
|
|
|
// Iterate over the remote namespaces
|
|
for _, rns := range remoteList {
|
|
remote[rns.Name] = struct{}{}
|
|
|
|
// Check if the namespace is missing locally
|
|
if localHash, ok := local[rns.Name]; !ok {
|
|
update = append(update, rns.Name)
|
|
|
|
// Check if the namespace is newer remotely and there is a hash
|
|
// mis-match.
|
|
} else if rns.ModifyIndex > minIndex && !bytes.Equal(localHash, rns.Hash) {
|
|
update = append(update, rns.Name)
|
|
}
|
|
}
|
|
|
|
// Check if namespaces should be deleted
|
|
for lns := range local {
|
|
if _, ok := remote[lns]; !ok {
|
|
delete = append(delete, lns)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// restoreEvals is used to restore pending evaluations into the eval broker and
|
|
// blocked evaluations into the blocked eval tracker. The broker and blocked
|
|
// eval tracker is maintained only by the leader, so it must be restored anytime
|
|
// a leadership transition takes place.
|
|
func (s *Server) restoreEvals() error {
|
|
// Get an iterator over every evaluation
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := s.fsm.State().Evals(ws, false)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get evaluations: %v", err)
|
|
}
|
|
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
eval := raw.(*structs.Evaluation)
|
|
|
|
if eval.ShouldEnqueue() {
|
|
s.evalBroker.Enqueue(eval)
|
|
} else if eval.ShouldBlock() {
|
|
s.blockedEvals.Block(eval)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// revokeVaultAccessorsOnRestore is used to restore Vault accessors that should be
|
|
// revoked.
|
|
func (s *Server) revokeVaultAccessorsOnRestore() error {
|
|
// An accessor should be revoked if its allocation or node is terminal
|
|
ws := memdb.NewWatchSet()
|
|
state := s.fsm.State()
|
|
iter, err := state.VaultAccessors(ws)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get vault accessors: %v", err)
|
|
}
|
|
|
|
var revoke []*structs.VaultAccessor
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
|
|
va := raw.(*structs.VaultAccessor)
|
|
|
|
// Check the allocation
|
|
alloc, err := state.AllocByID(ws, va.AllocID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup allocation %q: %v", va.AllocID, err)
|
|
}
|
|
if alloc == nil || alloc.Terminated() {
|
|
// No longer running and should be revoked
|
|
revoke = append(revoke, va)
|
|
continue
|
|
}
|
|
|
|
// Check the node
|
|
node, err := state.NodeByID(ws, va.NodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup node %q: %v", va.NodeID, err)
|
|
}
|
|
if node == nil || node.TerminalStatus() {
|
|
// Node is terminal so any accessor from it should be revoked
|
|
revoke = append(revoke, va)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if len(revoke) != 0 {
|
|
s.logger.Info("revoking vault accessors after becoming leader", "accessors", len(revoke))
|
|
|
|
if err := s.vault.MarkForRevocation(revoke); err != nil {
|
|
return fmt.Errorf("failed to revoke tokens: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// revokeSITokenAccessorsOnRestore is used to revoke Service Identity token
|
|
// accessors on behalf of allocs that are now gone / terminal.
|
|
func (s *Server) revokeSITokenAccessorsOnRestore() error {
|
|
ws := memdb.NewWatchSet()
|
|
fsmState := s.fsm.State()
|
|
iter, err := fsmState.SITokenAccessors(ws)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get SI token accessors: %w", err)
|
|
}
|
|
|
|
var toRevoke []*structs.SITokenAccessor
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
accessor := raw.(*structs.SITokenAccessor)
|
|
|
|
// Check the allocation
|
|
alloc, err := fsmState.AllocByID(ws, accessor.AllocID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup alloc %q: %w", accessor.AllocID, err)
|
|
}
|
|
if alloc == nil || alloc.Terminated() {
|
|
// no longer running and associated accessors should be revoked
|
|
toRevoke = append(toRevoke, accessor)
|
|
continue
|
|
}
|
|
|
|
// Check the node
|
|
node, err := fsmState.NodeByID(ws, accessor.NodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup node %q: %w", accessor.NodeID, err)
|
|
}
|
|
if node == nil || node.TerminalStatus() {
|
|
// node is terminal and associated accessors should be revoked
|
|
toRevoke = append(toRevoke, accessor)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if len(toRevoke) > 0 {
|
|
s.logger.Info("revoking consul accessors after becoming leader", "accessors", len(toRevoke))
|
|
s.consulACLs.MarkForRevocation(toRevoke)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// restorePeriodicDispatcher is used to restore all periodic jobs into the
|
|
// periodic dispatcher. It also determines if a periodic job should have been
|
|
// created during the leadership transition and force runs them. The periodic
|
|
// dispatcher is maintained only by the leader, so it must be restored anytime a
|
|
// leadership transition takes place.
|
|
func (s *Server) restorePeriodicDispatcher() error {
|
|
logger := s.logger.Named("periodic")
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := s.fsm.State().JobsByPeriodic(ws, true)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get periodic jobs: %v", err)
|
|
}
|
|
|
|
now := time.Now()
|
|
for i := iter.Next(); i != nil; i = iter.Next() {
|
|
job := i.(*structs.Job)
|
|
|
|
// We skip adding parameterized jobs because they themselves aren't
|
|
// tracked, only the dispatched children are.
|
|
if job.IsParameterized() {
|
|
continue
|
|
}
|
|
|
|
if err := s.periodicDispatcher.Add(job); err != nil {
|
|
logger.Error("failed to add job to periodic dispatcher", "error", err)
|
|
continue
|
|
}
|
|
|
|
// We do not need to force run the job since it isn't active.
|
|
if !job.IsPeriodicActive() {
|
|
continue
|
|
}
|
|
|
|
// If the periodic job has never been launched before, launch will hold
|
|
// the time the periodic job was added. Otherwise it has the last launch
|
|
// time of the periodic job.
|
|
launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get periodic launch time: %v", err)
|
|
}
|
|
if launch == nil {
|
|
return fmt.Errorf("no recorded periodic launch time for job %q in namespace %q",
|
|
job.ID, job.Namespace)
|
|
}
|
|
|
|
// nextLaunch is the next launch that should occur.
|
|
nextLaunch, err := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
|
|
if err != nil {
|
|
logger.Error("failed to determine next periodic launch for job", "job", job.NamespacedID(), "error", err)
|
|
continue
|
|
}
|
|
|
|
// We skip force launching the job if there should be no next launch
|
|
// (the zero case) or if the next launch time is in the future. If it is
|
|
// in the future, it will be handled by the periodic dispatcher.
|
|
if nextLaunch.IsZero() || !nextLaunch.Before(now) {
|
|
continue
|
|
}
|
|
|
|
if _, err := s.periodicDispatcher.ForceRun(job.Namespace, job.ID); err != nil {
|
|
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
|
|
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
|
|
}
|
|
logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// schedulePeriodic is used to do periodic job dispatch while we are leader
|
|
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
|
evalGC := time.NewTicker(s.config.EvalGCInterval)
|
|
defer evalGC.Stop()
|
|
nodeGC := time.NewTicker(s.config.NodeGCInterval)
|
|
defer nodeGC.Stop()
|
|
jobGC := time.NewTicker(s.config.JobGCInterval)
|
|
defer jobGC.Stop()
|
|
deploymentGC := time.NewTicker(s.config.DeploymentGCInterval)
|
|
defer deploymentGC.Stop()
|
|
csiPluginGC := time.NewTicker(s.config.CSIPluginGCInterval)
|
|
defer csiPluginGC.Stop()
|
|
csiVolumeClaimGC := time.NewTicker(s.config.CSIVolumeClaimGCInterval)
|
|
defer csiVolumeClaimGC.Stop()
|
|
oneTimeTokenGC := time.NewTicker(s.config.OneTimeTokenGCInterval)
|
|
defer oneTimeTokenGC.Stop()
|
|
rootKeyGC := time.NewTicker(s.config.RootKeyGCInterval)
|
|
defer rootKeyGC.Stop()
|
|
variablesRekey := time.NewTicker(s.config.VariablesRekeyInterval)
|
|
defer variablesRekey.Stop()
|
|
|
|
// Set up the expired ACL local token garbage collection timer.
|
|
localTokenExpiredGC, localTokenExpiredGCStop := helper.NewSafeTimer(s.config.ACLTokenExpirationGCInterval)
|
|
defer localTokenExpiredGCStop()
|
|
|
|
for {
|
|
|
|
select {
|
|
case <-evalGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index))
|
|
}
|
|
case <-nodeGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index))
|
|
}
|
|
case <-jobGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index))
|
|
}
|
|
case <-deploymentGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobDeploymentGC, index))
|
|
}
|
|
case <-csiPluginGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIPluginGC, index))
|
|
}
|
|
case <-csiVolumeClaimGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumeClaimGC, index))
|
|
}
|
|
case <-oneTimeTokenGC.C:
|
|
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minOneTimeAuthenticationTokenVersion, false) {
|
|
continue
|
|
}
|
|
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobOneTimeTokenGC, index))
|
|
}
|
|
case <-localTokenExpiredGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobLocalTokenExpiredGC, index))
|
|
}
|
|
localTokenExpiredGC.Reset(s.config.ACLTokenExpirationGCInterval)
|
|
case <-rootKeyGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobRootKeyRotateOrGC, index))
|
|
}
|
|
case <-variablesRekey.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobVariablesRekey, index))
|
|
}
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// schedulePeriodicAuthoritative is a long-lived routine intended for use on
|
|
// the leader within the authoritative region only. It periodically queues work
|
|
// onto the _core scheduler for ACL based activities such as removing expired
|
|
// global ACL tokens.
|
|
func (s *Server) schedulePeriodicAuthoritative(stopCh chan struct{}) {
|
|
|
|
// Set up the expired ACL global token garbage collection timer.
|
|
globalTokenExpiredGC, globalTokenExpiredGCStop := helper.NewSafeTimer(s.config.ACLTokenExpirationGCInterval)
|
|
defer globalTokenExpiredGCStop()
|
|
|
|
for {
|
|
select {
|
|
case <-globalTokenExpiredGC.C:
|
|
if index, ok := s.getLatestIndex(); ok {
|
|
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobGlobalTokenExpiredGC, index))
|
|
}
|
|
globalTokenExpiredGC.Reset(s.config.ACLTokenExpirationGCInterval)
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// getLatestIndex is a helper function which returns the latest index from the
|
|
// state store. The boolean return indicates whether the call has been
|
|
// successful or not.
|
|
func (s *Server) getLatestIndex() (uint64, bool) {
|
|
snapshotIndex, err := s.fsm.State().LatestIndex()
|
|
if err != nil {
|
|
s.logger.Error("failed to determine state store's index", "error", err)
|
|
return 0, false
|
|
}
|
|
return snapshotIndex, true
|
|
}
|
|
|
|
// coreJobEval returns an evaluation for a core job
|
|
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
|
|
return &structs.Evaluation{
|
|
ID: uuid.Generate(),
|
|
Namespace: "-",
|
|
Priority: structs.CoreJobPriority,
|
|
Type: structs.JobTypeCore,
|
|
TriggeredBy: structs.EvalTriggerScheduled,
|
|
JobID: job,
|
|
LeaderACL: s.getLeaderAcl(),
|
|
Status: structs.EvalStatusPending,
|
|
ModifyIndex: modifyIndex,
|
|
}
|
|
}
|
|
|
|
// reapFailedEvaluations is used to reap evaluations that
|
|
// have reached their delivery limit and should be failed
|
|
func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
// Scan for a failed evaluation
|
|
eval, token, err := s.evalBroker.Dequeue([]string{failedQueue}, time.Second)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if eval == nil {
|
|
continue
|
|
}
|
|
|
|
// Update the status to failed
|
|
updateEval := eval.Copy()
|
|
updateEval.Status = structs.EvalStatusFailed
|
|
updateEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit)
|
|
s.logger.Warn("eval reached delivery limit, marking as failed",
|
|
"eval", hclog.Fmt("%#v", updateEval))
|
|
|
|
// Core job evals that fail or span leader elections will never
|
|
// succeed because the follow-up doesn't have the leader ACL. We
|
|
// rely on the leader to schedule new core jobs periodically
|
|
// instead.
|
|
if eval.Type != structs.JobTypeCore {
|
|
|
|
// Create a follow-up evaluation that will be used to retry the
|
|
// scheduling for the job after the cluster is hopefully more stable
|
|
// due to the fairly large backoff.
|
|
followupEvalWait := s.config.EvalFailedFollowupBaselineDelay +
|
|
time.Duration(rand.Int63n(int64(s.config.EvalFailedFollowupDelayRange)))
|
|
|
|
followupEval := eval.CreateFailedFollowUpEval(followupEvalWait)
|
|
updateEval.NextEval = followupEval.ID
|
|
updateEval.UpdateModifyTime()
|
|
|
|
// Update via Raft
|
|
req := structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{updateEval, followupEval},
|
|
}
|
|
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
|
|
s.logger.Error("failed to update failed eval and create a follow-up",
|
|
"eval", hclog.Fmt("%#v", updateEval), "error", err)
|
|
continue
|
|
}
|
|
}
|
|
// Ack completion
|
|
s.evalBroker.Ack(eval.ID, token)
|
|
}
|
|
}
|
|
}
|
|
|
|
// reapDupBlockedEvaluations is used to reap duplicate blocked evaluations and
|
|
// should be cancelled.
|
|
func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) {
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
// Scan for duplicate blocked evals.
|
|
dups := s.blockedEvals.GetDuplicates(time.Second)
|
|
if dups == nil {
|
|
continue
|
|
}
|
|
|
|
cancel := make([]*structs.Evaluation, len(dups))
|
|
for i, dup := range dups {
|
|
// Update the status to cancelled
|
|
newEval := dup.Copy()
|
|
newEval.Status = structs.EvalStatusCancelled
|
|
newEval.StatusDescription = fmt.Sprintf("existing blocked evaluation exists for job %q", newEval.JobID)
|
|
newEval.UpdateModifyTime()
|
|
cancel[i] = newEval
|
|
}
|
|
|
|
// Update via Raft
|
|
req := structs.EvalUpdateRequest{
|
|
Evals: cancel,
|
|
}
|
|
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
|
|
s.logger.Error("failed to update duplicate evals", "evals", hclog.Fmt("%#v", cancel), "error", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// reapCancelableEvaluations is used to reap evaluations that were marked
|
|
// cancelable by the eval broker and should be canceled. These get swept up
|
|
// whenever an eval Acks, but this ensures that we don't have a straggling batch
|
|
// when the cluster doesn't have any more work to do. Returns a wake-up channel
|
|
// that can be used to trigger a new reap without waiting for the timer
|
|
func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} {
|
|
|
|
wakeCh := make(chan struct{}, 1)
|
|
go func() {
|
|
|
|
timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval)
|
|
defer cancel()
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-wakeCh:
|
|
cancelCancelableEvals(s)
|
|
case <-timer.C:
|
|
cancelCancelableEvals(s)
|
|
timer.Reset(s.config.EvalReapCancelableInterval)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return wakeCh
|
|
}
|
|
|
|
const cancelableEvalsBatchSize = 728 // structs.MaxUUIDsPerWriteRequest / 10
|
|
|
|
// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval
|
|
// broker and updates their status to canceled.
|
|
func cancelCancelableEvals(srv *Server) error {
|
|
|
|
const cancelDesc = "canceled after more recent eval was processed"
|
|
|
|
// We *can* send larger raft logs but rough benchmarks show that a smaller
|
|
// page size strikes a balance between throughput and time we block the FSM
|
|
// apply for other operations
|
|
cancelable := srv.evalBroker.Cancelable(cancelableEvalsBatchSize)
|
|
if len(cancelable) > 0 {
|
|
for i, eval := range cancelable {
|
|
eval = eval.Copy()
|
|
eval.Status = structs.EvalStatusCancelled
|
|
eval.StatusDescription = cancelDesc
|
|
eval.UpdateModifyTime()
|
|
cancelable[i] = eval
|
|
}
|
|
|
|
update := &structs.EvalUpdateRequest{
|
|
Evals: cancelable,
|
|
WriteRequest: structs.WriteRequest{Region: srv.Region()},
|
|
}
|
|
_, _, err := srv.raftApply(structs.EvalUpdateRequestType, update)
|
|
if err != nil {
|
|
srv.logger.Warn("eval cancel failed", "error", err, "method", "ack")
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations.
|
|
func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) {
|
|
ticker := time.NewTicker(failedEvalUnblockInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
// Unblock the failed allocations
|
|
s.blockedEvals.UnblockFailed()
|
|
}
|
|
}
|
|
}
|
|
|
|
// publishJobSummaryMetrics publishes the job summaries as metrics
|
|
func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
|
|
timer := time.NewTimer(0)
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-timer.C:
|
|
timer.Reset(s.config.StatsCollectionInterval)
|
|
state, err := s.State().Snapshot()
|
|
if err != nil {
|
|
s.logger.Error("failed to get state", "error", err)
|
|
continue
|
|
}
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := state.JobSummaries(ws)
|
|
if err != nil {
|
|
s.logger.Error("failed to get job summaries", "error", err)
|
|
continue
|
|
}
|
|
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
summary := raw.(*structs.JobSummary)
|
|
if s.config.DisableDispatchedJobSummaryMetrics {
|
|
job, err := state.JobByID(ws, summary.Namespace, summary.JobID)
|
|
if err != nil {
|
|
s.logger.Error("error getting job for summary", "error", err)
|
|
continue
|
|
}
|
|
if job.Dispatched {
|
|
continue
|
|
}
|
|
}
|
|
s.iterateJobSummaryMetrics(summary)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) {
|
|
for name, tgSummary := range summary.Summary {
|
|
labels := []metrics.Label{
|
|
{
|
|
Name: "job",
|
|
Value: summary.JobID,
|
|
},
|
|
{
|
|
Name: "task_group",
|
|
Value: name,
|
|
},
|
|
{
|
|
Name: "namespace",
|
|
Value: summary.Namespace,
|
|
},
|
|
}
|
|
|
|
if strings.Contains(summary.JobID, "/dispatch-") {
|
|
jobInfo := strings.Split(summary.JobID, "/dispatch-")
|
|
labels = append(labels, metrics.Label{
|
|
Name: "parent_id",
|
|
Value: jobInfo[0],
|
|
}, metrics.Label{
|
|
Name: "dispatch_id",
|
|
Value: jobInfo[1],
|
|
})
|
|
}
|
|
|
|
if strings.Contains(summary.JobID, "/periodic-") {
|
|
jobInfo := strings.Split(summary.JobID, "/periodic-")
|
|
labels = append(labels, metrics.Label{
|
|
Name: "parent_id",
|
|
Value: jobInfo[0],
|
|
}, metrics.Label{
|
|
Name: "periodic_id",
|
|
Value: jobInfo[1],
|
|
})
|
|
}
|
|
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
|
|
float32(tgSummary.Queued), labels)
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
|
|
float32(tgSummary.Complete), labels)
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
|
|
float32(tgSummary.Failed), labels)
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
|
|
float32(tgSummary.Running), labels)
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
|
|
float32(tgSummary.Starting), labels)
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
|
|
float32(tgSummary.Lost), labels)
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "unknown"},
|
|
float32(tgSummary.Unknown), labels)
|
|
}
|
|
}
|
|
|
|
// publishJobStatusMetrics publishes the job statuses as metrics
|
|
func (s *Server) publishJobStatusMetrics(stopCh chan struct{}) {
|
|
timer := time.NewTimer(0)
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-timer.C:
|
|
timer.Reset(s.config.StatsCollectionInterval)
|
|
state, err := s.State().Snapshot()
|
|
if err != nil {
|
|
s.logger.Error("failed to get state", "error", err)
|
|
continue
|
|
}
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := state.Jobs(ws)
|
|
if err != nil {
|
|
s.logger.Error("failed to get job statuses", "error", err)
|
|
continue
|
|
}
|
|
|
|
s.iterateJobStatusMetrics(&iter)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) iterateJobStatusMetrics(jobs *memdb.ResultIterator) {
|
|
var pending int64 // Sum of all jobs in 'pending' state
|
|
var running int64 // Sum of all jobs in 'running' state
|
|
var dead int64 // Sum of all jobs in 'dead' state
|
|
|
|
for {
|
|
raw := (*jobs).Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
|
|
job := raw.(*structs.Job)
|
|
|
|
switch job.Status {
|
|
case structs.JobStatusPending:
|
|
pending++
|
|
case structs.JobStatusRunning:
|
|
running++
|
|
case structs.JobStatusDead:
|
|
dead++
|
|
}
|
|
}
|
|
|
|
metrics.SetGauge([]string{"nomad", "job_status", "pending"}, float32(pending))
|
|
metrics.SetGauge([]string{"nomad", "job_status", "running"}, float32(running))
|
|
metrics.SetGauge([]string{"nomad", "job_status", "dead"}, float32(dead))
|
|
}
|
|
|
|
// revokeLeadership is invoked once we step down as leader.
|
|
// This is used to cleanup any state that may be specific to a leader.
|
|
func (s *Server) revokeLeadership() error {
|
|
defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now())
|
|
|
|
s.resetConsistentReadReady()
|
|
|
|
// Clear the leader token since we are no longer the leader.
|
|
s.setLeaderAcl("")
|
|
|
|
// Disable autopilot
|
|
s.autopilot.Stop()
|
|
|
|
// Disable the plan queue, since we are no longer leader
|
|
s.planQueue.SetEnabled(false)
|
|
|
|
// Disable the eval broker and blocked evals. We do not need to check the
|
|
// scheduler configuration paused eval broker value, as the brokers should
|
|
// always be paused on the non-leader.
|
|
s.brokerLock.Lock()
|
|
s.evalBroker.SetEnabled(false)
|
|
s.blockedEvals.SetEnabled(false)
|
|
s.brokerLock.Unlock()
|
|
|
|
// Disable the periodic dispatcher, since it is only useful as a leader
|
|
s.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Disable the Vault client as it is only useful as a leader.
|
|
s.vault.SetActive(false)
|
|
|
|
// Disable the deployment watcher as it is only useful as a leader.
|
|
s.deploymentWatcher.SetEnabled(false, nil)
|
|
|
|
// Disable the node drainer
|
|
s.nodeDrainer.SetEnabled(false, nil)
|
|
|
|
// Disable the volume watcher
|
|
s.volumeWatcher.SetEnabled(false, nil, "")
|
|
|
|
// Disable any enterprise systems required.
|
|
if err := s.revokeEnterpriseLeadership(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Clear the heartbeat timers on either shutdown or step down,
|
|
// since we are no longer responsible for TTL expirations.
|
|
if err := s.clearAllHeartbeatTimers(); err != nil {
|
|
s.logger.Error("clearing heartbeat timers failed", "error", err)
|
|
return err
|
|
}
|
|
|
|
// Unpause our worker if we paused previously
|
|
s.handlePausableWorkers(false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// pausableWorkers returns a slice of the workers
|
|
// to pause on leader transitions.
|
|
//
|
|
// Upon leadership establishment, pause workers to free half
|
|
// the cores for use in the plan queue and evaluation broker
|
|
func (s *Server) pausableWorkers() []*Worker {
|
|
n := len(s.workers)
|
|
if n <= 1 {
|
|
return []*Worker{}
|
|
}
|
|
|
|
// Disabling 3/4 of the workers frees CPU for raft and the
|
|
// plan applier which uses 1/2 the cores.
|
|
return s.workers[:3*n/4]
|
|
}
|
|
|
|
// reconcile is used to reconcile the differences between Serf
|
|
// membership and what is reflected in our strongly consistent store.
|
|
func (s *Server) reconcile() error {
|
|
defer metrics.MeasureSince([]string{"nomad", "leader", "reconcile"}, time.Now())
|
|
members := s.serf.Members()
|
|
for _, member := range members {
|
|
if err := s.reconcileMember(member); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// reconcileMember is used to do an async reconcile of a single serf member
|
|
func (s *Server) reconcileMember(member serf.Member) error {
|
|
// Check if this is a member we should handle
|
|
valid, parts := isNomadServer(member)
|
|
if !valid || parts.Region != s.config.Region {
|
|
return nil
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "leader", "reconcileMember"}, time.Now())
|
|
|
|
var err error
|
|
switch member.Status {
|
|
case serf.StatusAlive:
|
|
err = s.addRaftPeer(member, parts)
|
|
case serf.StatusLeft, StatusReap:
|
|
err = s.removeRaftPeer(member, parts)
|
|
}
|
|
if err != nil {
|
|
s.logger.Error("failed to reconcile member", "member", member, "error", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// addRaftPeer is used to add a new Raft peer when a Nomad server joins
|
|
func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
|
|
// Check for possibility of multiple bootstrap nodes
|
|
members := s.serf.Members()
|
|
if parts.Bootstrap {
|
|
for _, member := range members {
|
|
valid, p := isNomadServer(member)
|
|
if valid && member.Name != m.Name && p.Bootstrap {
|
|
s.logger.Error("skipping adding Raft peer because an existing peer is in bootstrap mode and only one server should be in bootstrap mode",
|
|
"existing_peer", member.Name, "joining_peer", m.Name)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Processing ourselves could result in trying to remove ourselves to
|
|
// fix up our address, which would make us step down. This is only
|
|
// safe to attempt if there are multiple servers available.
|
|
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
|
|
configFuture := s.raft.GetConfiguration()
|
|
if err := configFuture.Error(); err != nil {
|
|
s.logger.Error("failed to get raft configuration", "error", err)
|
|
return err
|
|
}
|
|
|
|
if m.Name == s.config.NodeName {
|
|
if l := len(configFuture.Configuration().Servers); l < 3 {
|
|
s.logger.Debug("skipping self join check for peer since the cluster is too small", "peer", m.Name)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// See if it's already in the configuration. It's harmless to re-add it
|
|
// but we want to avoid doing that if possible to prevent useless Raft
|
|
// log entries. If the address is the same but the ID changed, remove the
|
|
// old server before adding the new one.
|
|
minRaftProtocol, err := s.MinRaftProtocol()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, server := range configFuture.Configuration().Servers {
|
|
// No-op if the raft version is too low
|
|
if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) {
|
|
return nil
|
|
}
|
|
|
|
// If the address or ID matches an existing server, see if we need to remove the old one first
|
|
if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) {
|
|
// Exit with no-op if this is being called on an existing server and both the ID and address match
|
|
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
|
|
return nil
|
|
}
|
|
future := s.raft.RemoveServer(server.ID, 0, 0)
|
|
if server.Address == raft.ServerAddress(addr) {
|
|
if err := future.Error(); err != nil {
|
|
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
|
|
}
|
|
s.logger.Info("removed server with duplicate address", "address", server.Address)
|
|
} else {
|
|
if err := future.Error(); err != nil {
|
|
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
|
|
}
|
|
s.logger.Info("removed server with duplicate ID", "id", server.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Attempt to add as a peer
|
|
switch {
|
|
case minRaftProtocol >= 3:
|
|
addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
|
|
if err := addFuture.Error(); err != nil {
|
|
s.logger.Error("failed to add raft peer", "error", err)
|
|
return err
|
|
}
|
|
case minRaftProtocol == 2 && parts.RaftVersion >= 3:
|
|
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
|
|
if err := addFuture.Error(); err != nil {
|
|
s.logger.Error("failed to add raft peer", "error", err)
|
|
return err
|
|
}
|
|
default:
|
|
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
|
|
if err := addFuture.Error(); err != nil {
|
|
s.logger.Error("failed to add raft peer", "error", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeRaftPeer is used to remove a Raft peer when a Nomad server leaves
|
|
// or is reaped
|
|
func (s *Server) removeRaftPeer(m serf.Member, parts *serverParts) error {
|
|
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
|
|
|
|
// See if it's already in the configuration. It's harmless to re-remove it
|
|
// but we want to avoid doing that if possible to prevent useless Raft
|
|
// log entries.
|
|
configFuture := s.raft.GetConfiguration()
|
|
if err := configFuture.Error(); err != nil {
|
|
s.logger.Error("failed to get raft configuration", "error", err)
|
|
return err
|
|
}
|
|
|
|
minRaftProtocol, err := s.MinRaftProtocol()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Pick which remove API to use based on how the server was added.
|
|
for _, server := range configFuture.Configuration().Servers {
|
|
// Check if this is the server to remove based on how it was registered.
|
|
// Raft v2 servers are registered by address.
|
|
// Raft v3 servers are registered by ID.
|
|
if server.ID == raft.ServerID(parts.ID) || server.Address == raft.ServerAddress(addr) {
|
|
// Use the new add/remove APIs if we understand them.
|
|
if minRaftProtocol >= 2 {
|
|
s.logger.Info("removing server by ID", "id", server.ID)
|
|
future := s.raft.RemoveServer(server.ID, 0, 0)
|
|
if err := future.Error(); err != nil {
|
|
s.logger.Error("failed to remove raft peer", "id", server.ID, "error", err)
|
|
return err
|
|
}
|
|
} else {
|
|
// If not, use the old remove API
|
|
s.logger.Info("removing server by address", "address", server.Address)
|
|
future := s.raft.RemovePeer(raft.ServerAddress(addr))
|
|
if err := future.Error(); err != nil {
|
|
s.logger.Error("failed to remove raft peer", "address", addr, "error", err)
|
|
return err
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// replicateACLPolicies is used to replicate ACL policies from
|
|
// the authoritative region to this region.
|
|
func (s *Server) replicateACLPolicies(stopCh chan struct{}) {
|
|
req := structs.ACLPolicyListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AllowStale: true,
|
|
},
|
|
}
|
|
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
|
|
s.logger.Debug("starting ACL policy replication from authoritative region", "authoritative_region", req.Region)
|
|
|
|
START:
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
// Rate limit how often we attempt replication
|
|
limiter.Wait(context.Background())
|
|
|
|
// Fetch the list of policies
|
|
var resp structs.ACLPolicyListResponse
|
|
req.AuthToken = s.ReplicationToken()
|
|
err := s.forwardRegion(s.config.AuthoritativeRegion,
|
|
"ACL.ListPolicies", &req, &resp)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch policies from authoritative region", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
|
|
// Perform a two-way diff
|
|
delete, update := diffACLPolicies(s.State(), req.MinQueryIndex, resp.Policies)
|
|
|
|
// Delete policies that should not exist
|
|
if len(delete) > 0 {
|
|
args := &structs.ACLPolicyDeleteRequest{
|
|
Names: delete,
|
|
}
|
|
_, _, err := s.raftApply(structs.ACLPolicyDeleteRequestType, args)
|
|
if err != nil {
|
|
s.logger.Error("failed to delete policies", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
}
|
|
|
|
// Fetch any outdated policies
|
|
var fetched []*structs.ACLPolicy
|
|
if len(update) > 0 {
|
|
req := structs.ACLPolicySetRequest{
|
|
Names: update,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AuthToken: s.ReplicationToken(),
|
|
AllowStale: true,
|
|
MinQueryIndex: resp.Index - 1,
|
|
},
|
|
}
|
|
var reply structs.ACLPolicySetResponse
|
|
if err := s.forwardRegion(s.config.AuthoritativeRegion,
|
|
"ACL.GetPolicies", &req, &reply); err != nil {
|
|
s.logger.Error("failed to fetch policies from authoritative region", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
for _, policy := range reply.Policies {
|
|
fetched = append(fetched, policy)
|
|
}
|
|
}
|
|
|
|
// Update local policies
|
|
if len(fetched) > 0 {
|
|
args := &structs.ACLPolicyUpsertRequest{
|
|
Policies: fetched,
|
|
}
|
|
_, _, err := s.raftApply(structs.ACLPolicyUpsertRequestType, args)
|
|
if err != nil {
|
|
s.logger.Error("failed to update policies", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
}
|
|
|
|
// Update the minimum query index, blocks until there
|
|
// is a change.
|
|
req.MinQueryIndex = resp.Index
|
|
}
|
|
}
|
|
|
|
ERR_WAIT:
|
|
select {
|
|
case <-time.After(s.config.ReplicationBackoff):
|
|
goto START
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
// diffACLPolicies is used to perform a two-way diff between the local
|
|
// policies and the remote policies to determine which policies need to
|
|
// be deleted or updated.
|
|
func diffACLPolicies(state *state.StateStore, minIndex uint64, remoteList []*structs.ACLPolicyListStub) (delete []string, update []string) {
|
|
// Construct a set of the local and remote policies
|
|
local := make(map[string][]byte)
|
|
remote := make(map[string]struct{})
|
|
|
|
// Add all the local policies
|
|
iter, err := state.ACLPolicies(nil)
|
|
if err != nil {
|
|
panic("failed to iterate local policies")
|
|
}
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
policy := raw.(*structs.ACLPolicy)
|
|
local[policy.Name] = policy.Hash
|
|
}
|
|
|
|
// Iterate over the remote policies
|
|
for _, rp := range remoteList {
|
|
remote[rp.Name] = struct{}{}
|
|
|
|
// Check if the policy is missing locally
|
|
if localHash, ok := local[rp.Name]; !ok {
|
|
update = append(update, rp.Name)
|
|
|
|
// Check if policy is newer remotely and there is a hash mis-match.
|
|
} else if rp.ModifyIndex > minIndex && !bytes.Equal(localHash, rp.Hash) {
|
|
update = append(update, rp.Name)
|
|
}
|
|
}
|
|
|
|
// Check if policy should be deleted
|
|
for lp := range local {
|
|
if _, ok := remote[lp]; !ok {
|
|
delete = append(delete, lp)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// replicateACLTokens is used to replicate global ACL tokens from
|
|
// the authoritative region to this region.
|
|
func (s *Server) replicateACLTokens(stopCh chan struct{}) {
|
|
req := structs.ACLTokenListRequest{
|
|
GlobalOnly: true,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AllowStale: true,
|
|
},
|
|
}
|
|
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
|
|
s.logger.Debug("starting ACL token replication from authoritative region", "authoritative_region", req.Region)
|
|
|
|
START:
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
// Rate limit how often we attempt replication
|
|
limiter.Wait(context.Background())
|
|
|
|
// Fetch the list of tokens
|
|
var resp structs.ACLTokenListResponse
|
|
req.AuthToken = s.ReplicationToken()
|
|
err := s.forwardRegion(s.config.AuthoritativeRegion,
|
|
"ACL.ListTokens", &req, &resp)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch tokens from authoritative region", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
|
|
// Perform a two-way diff
|
|
delete, update := diffACLTokens(s.State(), req.MinQueryIndex, resp.Tokens)
|
|
|
|
// Delete tokens that should not exist
|
|
if len(delete) > 0 {
|
|
args := &structs.ACLTokenDeleteRequest{
|
|
AccessorIDs: delete,
|
|
}
|
|
_, _, err := s.raftApply(structs.ACLTokenDeleteRequestType, args)
|
|
if err != nil {
|
|
s.logger.Error("failed to delete tokens", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
}
|
|
|
|
// Fetch any outdated policies.
|
|
var fetched []*structs.ACLToken
|
|
if len(update) > 0 {
|
|
req := structs.ACLTokenSetRequest{
|
|
AccessorIDS: update,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AuthToken: s.ReplicationToken(),
|
|
AllowStale: true,
|
|
MinQueryIndex: resp.Index - 1,
|
|
},
|
|
}
|
|
var reply structs.ACLTokenSetResponse
|
|
if err := s.forwardRegion(s.config.AuthoritativeRegion,
|
|
"ACL.GetTokens", &req, &reply); err != nil {
|
|
s.logger.Error("failed to fetch tokens from authoritative region", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
for _, token := range reply.Tokens {
|
|
fetched = append(fetched, token)
|
|
}
|
|
}
|
|
|
|
// Update local tokens
|
|
if len(fetched) > 0 {
|
|
args := &structs.ACLTokenUpsertRequest{
|
|
Tokens: fetched,
|
|
}
|
|
_, _, err := s.raftApply(structs.ACLTokenUpsertRequestType, args)
|
|
if err != nil {
|
|
s.logger.Error("failed to update tokens", "error", err)
|
|
goto ERR_WAIT
|
|
}
|
|
}
|
|
|
|
// Update the minimum query index, blocks until there
|
|
// is a change.
|
|
req.MinQueryIndex = resp.Index
|
|
}
|
|
}
|
|
|
|
ERR_WAIT:
|
|
select {
|
|
case <-time.After(s.config.ReplicationBackoff):
|
|
goto START
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
// diffACLTokens is used to perform a two-way diff between the local
|
|
// tokens and the remote tokens to determine which tokens need to
|
|
// be deleted or updated.
|
|
func diffACLTokens(store *state.StateStore, minIndex uint64, remoteList []*structs.ACLTokenListStub) (delete []string, update []string) {
|
|
// Construct a set of the local and remote policies
|
|
local := make(map[string][]byte)
|
|
remote := make(map[string]struct{})
|
|
|
|
// Add all the local global tokens
|
|
iter, err := store.ACLTokensByGlobal(nil, true, state.SortDefault)
|
|
if err != nil {
|
|
panic("failed to iterate local tokens")
|
|
}
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
token := raw.(*structs.ACLToken)
|
|
local[token.AccessorID] = token.Hash
|
|
}
|
|
|
|
// Iterate over the remote tokens
|
|
for _, rp := range remoteList {
|
|
remote[rp.AccessorID] = struct{}{}
|
|
|
|
// Check if the token is missing locally
|
|
if localHash, ok := local[rp.AccessorID]; !ok {
|
|
update = append(update, rp.AccessorID)
|
|
|
|
// Check if policy is newer remotely and there is a hash mis-match.
|
|
} else if rp.ModifyIndex > minIndex && !bytes.Equal(localHash, rp.Hash) {
|
|
update = append(update, rp.AccessorID)
|
|
}
|
|
}
|
|
|
|
// Check if local token should be deleted
|
|
for lp := range local {
|
|
if _, ok := remote[lp]; !ok {
|
|
delete = append(delete, lp)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// replicateACLRoles is used to replicate ACL Roles from the authoritative
|
|
// region to this region. The loop should only be run on the leader within the
|
|
// federated region.
|
|
func (s *Server) replicateACLRoles(stopCh chan struct{}) {
|
|
|
|
// Generate our request object. We only need to do this once and reuse it
|
|
// for every RPC request. The MinQueryIndex is updated after every
|
|
// successful replication loop, so the next query acts as a blocking query
|
|
// and only returns upon a change in the authoritative region.
|
|
req := structs.ACLRolesListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
Region: s.config.AuthoritativeRegion,
|
|
},
|
|
}
|
|
|
|
// Create our replication rate limiter for ACL roles and log a lovely
|
|
// message to indicate the process is starting.
|
|
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
|
|
s.logger.Debug("starting ACL Role replication from authoritative region",
|
|
"authoritative_region", req.Region)
|
|
|
|
// Enter the main ACL Role replication loop that will only exit when the
|
|
// stopCh is closed.
|
|
//
|
|
// Any error encountered will use the replicationBackoffContinue function
|
|
// which handles replication backoff and shutdown coordination in the event
|
|
// of an error inside the loop.
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
|
|
// Rate limit how often we attempt replication. It is OK to ignore
|
|
// the error as the context will never be cancelled and the limit
|
|
// parameters are controlled internally.
|
|
_ = limiter.Wait(context.Background())
|
|
|
|
// Set the replication token on each replication iteration so that
|
|
// it is always current and can handle agent SIGHUP reloads.
|
|
req.AuthToken = s.ReplicationToken()
|
|
|
|
var resp structs.ACLRolesListResponse
|
|
|
|
// Make the list RPC request to the authoritative region, so we
|
|
// capture the latest ACL role listing.
|
|
err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLListRolesRPCMethod, &req, &resp)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch ACL Roles from authoritative region", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
|
|
// Perform a two-way diff on the ACL roles.
|
|
toDelete, toUpdate := diffACLRoles(s.State(), req.MinQueryIndex, resp.ACLRoles)
|
|
|
|
// A significant amount of time could pass between the last check
|
|
// on whether we should stop the replication process. Therefore, do
|
|
// a check here, before calling Raft.
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// If we have ACL roles to delete, make this call directly to Raft.
|
|
if len(toDelete) > 0 {
|
|
args := structs.ACLRolesDeleteByIDRequest{ACLRoleIDs: toDelete}
|
|
_, _, err := s.raftApply(structs.ACLRolesDeleteByIDRequestType, &args)
|
|
|
|
// If the error was because we lost leadership while calling
|
|
// Raft, avoid logging as this can be confusing to operators.
|
|
if err != nil {
|
|
if err != raft.ErrLeadershipLost {
|
|
s.logger.Error("failed to delete ACL roles", "error", err)
|
|
}
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fetch any outdated policies.
|
|
var fetched []*structs.ACLRole
|
|
if len(toUpdate) > 0 {
|
|
req := structs.ACLRolesByIDRequest{
|
|
ACLRoleIDs: toUpdate,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AuthToken: s.ReplicationToken(),
|
|
AllowStale: true,
|
|
MinQueryIndex: resp.Index - 1,
|
|
},
|
|
}
|
|
var reply structs.ACLRolesByIDResponse
|
|
if err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLGetRolesByIDRPCMethod, &req, &reply); err != nil {
|
|
s.logger.Error("failed to fetch ACL Roles from authoritative region", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
for _, aclRole := range reply.ACLRoles {
|
|
fetched = append(fetched, aclRole)
|
|
}
|
|
}
|
|
|
|
// Update local tokens
|
|
if len(fetched) > 0 {
|
|
|
|
// The replication of ACL roles and policies are independent,
|
|
// therefore we cannot ensure the policies linked within the
|
|
// role are present. We must set allow missing to true.
|
|
args := structs.ACLRolesUpsertRequest{
|
|
ACLRoles: fetched,
|
|
AllowMissingPolicies: true,
|
|
}
|
|
|
|
// Perform the upsert directly via Raft.
|
|
_, _, err := s.raftApply(structs.ACLRolesUpsertRequestType, &args)
|
|
if err != nil {
|
|
s.logger.Error("failed to update ACL roles", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update the minimum query index, blocks until there is a change.
|
|
req.MinQueryIndex = resp.Index
|
|
}
|
|
}
|
|
}
|
|
|
|
// diffACLRoles is used to perform a two-way diff between the local ACL Roles
|
|
// and the remote Roles to determine which tokens need to be deleted or
|
|
// updated. The returned array's contain ACL Role IDs.
|
|
func diffACLRoles(
|
|
store *state.StateStore, minIndex uint64, remoteList []*structs.ACLRoleListStub) (
|
|
delete []string, update []string) {
|
|
|
|
// The local ACL role tracking is keyed by the role ID and the value is the
|
|
// hash of the role.
|
|
local := make(map[string][]byte)
|
|
|
|
// The remote ACL role tracking is keyed by the role ID; the value is an
|
|
// empty struct as we already have the full object.
|
|
remote := make(map[string]struct{})
|
|
|
|
// Read all the ACL role currently held within our local state. This panic
|
|
// will only happen as a developer making a mistake with naming the index
|
|
// to use.
|
|
iter, err := store.GetACLRoles(nil)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("failed to iterate local ACL roles: %v", err))
|
|
}
|
|
|
|
// Iterate the local ACL roles and add them to our tracking of local roles.
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
aclRole := raw.(*structs.ACLRole)
|
|
local[aclRole.ID] = aclRole.Hash
|
|
}
|
|
|
|
// Iterate over the remote ACL roles.
|
|
for _, remoteACLRole := range remoteList {
|
|
remote[remoteACLRole.ID] = struct{}{}
|
|
|
|
// Identify whether the ACL role is within the local state. If it is
|
|
// not, add this to our update list.
|
|
if localHash, ok := local[remoteACLRole.ID]; !ok {
|
|
update = append(update, remoteACLRole.ID)
|
|
|
|
// Check if ACL role is newer remotely and there is a hash
|
|
// mismatch.
|
|
} else if remoteACLRole.ModifyIndex > minIndex && !bytes.Equal(localHash, remoteACLRole.Hash) {
|
|
update = append(update, remoteACLRole.ID)
|
|
}
|
|
}
|
|
|
|
// If we have ACL roles within state which are no longer present in the
|
|
// authoritative region we should delete them.
|
|
for localACLRole := range local {
|
|
if _, ok := remote[localACLRole]; !ok {
|
|
delete = append(delete, localACLRole)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// replicateACLAuthMethods is used to replicate ACL Authentication Methods from
|
|
// the authoritative region to this region. The loop should only be run on the
|
|
// leader within the federated region.
|
|
func (s *Server) replicateACLAuthMethods(stopCh chan struct{}) {
|
|
|
|
// Generate our request object. We only need to do this once and reuse it
|
|
// for every RPC request. The MinQueryIndex is updated after every
|
|
// successful replication loop, so the next query acts as a blocking query
|
|
// and only returns upon a change in the authoritative region.
|
|
req := structs.ACLAuthMethodListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
Region: s.config.AuthoritativeRegion,
|
|
},
|
|
}
|
|
|
|
// Create our replication rate limiter for ACL auth-methods and log a
|
|
// lovely message to indicate the process is starting.
|
|
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
|
|
s.logger.Debug("starting ACL Auth-Methods replication from authoritative region",
|
|
"authoritative_region", req.Region)
|
|
|
|
// Enter the main ACL auth-methods replication loop that will only exit
|
|
// when the stopCh is closed.
|
|
//
|
|
// Any error encountered will use the replicationBackoffContinue function
|
|
// which handles replication backoff and shutdown coordination in the event
|
|
// of an error inside the loop.
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
|
|
// Rate limit how often we attempt replication. It is OK to ignore
|
|
// the error as the context will never be cancelled and the limit
|
|
// parameters are controlled internally.
|
|
_ = limiter.Wait(context.Background())
|
|
|
|
// Set the replication token on each replication iteration so that
|
|
// it is always current and can handle agent SIGHUP reloads.
|
|
req.AuthToken = s.ReplicationToken()
|
|
|
|
var resp structs.ACLAuthMethodListResponse
|
|
|
|
// Make the list RPC request to the authoritative region, so we
|
|
// capture the latest ACL auth-method listing.
|
|
err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLListAuthMethodsRPCMethod, &req, &resp)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch ACL auth-methods from authoritative region", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
|
|
// Perform a two-way diff on the ACL auth-methods.
|
|
toDelete, toUpdate := diffACLAuthMethods(s.State(), req.MinQueryIndex, resp.AuthMethods)
|
|
|
|
// A significant amount of time could pass between the last check
|
|
// on whether we should stop the replication process. Therefore, do
|
|
// a check here, before calling Raft.
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// If we have ACL auth-methods to delete, make this call directly
|
|
// to Raft.
|
|
if len(toDelete) > 0 {
|
|
args := structs.ACLAuthMethodDeleteRequest{Names: toDelete}
|
|
_, _, err := s.raftApply(structs.ACLAuthMethodsDeleteRequestType, &args)
|
|
|
|
// If the error was because we lost leadership while calling
|
|
// Raft, avoid logging as this can be confusing to operators.
|
|
if err != nil {
|
|
if err != raft.ErrLeadershipLost {
|
|
s.logger.Error("failed to delete ACL auth-methods", "error", err)
|
|
}
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fetch any outdated auth-methods.
|
|
var fetched []*structs.ACLAuthMethod
|
|
if len(toUpdate) > 0 {
|
|
req := structs.ACLAuthMethodsGetRequest{
|
|
Names: toUpdate,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AuthToken: s.ReplicationToken(),
|
|
AllowStale: true,
|
|
MinQueryIndex: resp.Index - 1,
|
|
},
|
|
}
|
|
var reply structs.ACLAuthMethodsGetResponse
|
|
if err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLGetAuthMethodsRPCMethod, &req, &reply); err != nil {
|
|
s.logger.Error("failed to fetch ACL auth-methods from authoritative region", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
for _, aclAuthMethod := range reply.AuthMethods {
|
|
fetched = append(fetched, aclAuthMethod)
|
|
}
|
|
}
|
|
|
|
// Update local auth-methods.
|
|
if len(fetched) > 0 {
|
|
args := structs.ACLAuthMethodUpsertRequest{
|
|
AuthMethods: fetched,
|
|
}
|
|
|
|
// Perform the upsert directly via Raft.
|
|
_, _, err := s.raftApply(structs.ACLAuthMethodsUpsertRequestType, &args)
|
|
if err != nil {
|
|
s.logger.Error("failed to update ACL auth-methods", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update the minimum query index, blocks until there is a change.
|
|
req.MinQueryIndex = resp.Index
|
|
}
|
|
}
|
|
}
|
|
|
|
// diffACLAuthMethods is used to perform a two-way diff between the local ACL
|
|
// auth-methods and the remote auth-methods to determine which ones need to be
|
|
// deleted or updated. The returned array's contain ACL auth-method names.
|
|
func diffACLAuthMethods(
|
|
store *state.StateStore, minIndex uint64, remoteList []*structs.ACLAuthMethodStub) (
|
|
delete []string, update []string) {
|
|
|
|
// The local ACL auth-method tracking is keyed by the name and the value is
|
|
// the hash of the auth-method.
|
|
local := make(map[string][]byte)
|
|
|
|
// The remote ACL auth-method tracking is keyed by the name; the value is
|
|
// an empty struct as we already have the full object.
|
|
remote := make(map[string]struct{})
|
|
|
|
// Read all the ACL auth-methods currently held within our local state.
|
|
// This panic will only happen as a developer making a mistake with naming
|
|
// the index to use.
|
|
iter, err := store.GetACLAuthMethods(nil)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("failed to iterate local ACL roles: %v", err))
|
|
}
|
|
|
|
// Iterate the local ACL auth-methods and add them to our tracking of
|
|
// local auth-methods
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
aclAuthMethod := raw.(*structs.ACLAuthMethod)
|
|
local[aclAuthMethod.Name] = aclAuthMethod.Hash
|
|
}
|
|
|
|
// Iterate over the remote ACL auth-methods.
|
|
for _, remoteACLAuthMethod := range remoteList {
|
|
remote[remoteACLAuthMethod.Name] = struct{}{}
|
|
|
|
// Identify whether the ACL auth-method is within the local state. If
|
|
// it is not, add this to our update list.
|
|
if localHash, ok := local[remoteACLAuthMethod.Name]; !ok {
|
|
update = append(update, remoteACLAuthMethod.Name)
|
|
|
|
// Check if ACL auth-method is newer remotely and there is a hash
|
|
// mismatch.
|
|
} else if remoteACLAuthMethod.ModifyIndex > minIndex && !bytes.Equal(localHash, remoteACLAuthMethod.Hash) {
|
|
update = append(update, remoteACLAuthMethod.Name)
|
|
}
|
|
}
|
|
|
|
// If we have ACL auth-methods within state which are no longer present in
|
|
// the authoritative region we should delete them.
|
|
for localACLAuthMethod := range local {
|
|
if _, ok := remote[localACLAuthMethod]; !ok {
|
|
delete = append(delete, localACLAuthMethod)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// replicateACLBindingRules is used to replicate ACL binding rules from the
|
|
// authoritative region to this region. The loop should only be run on the
|
|
// leader within the federated region.
|
|
func (s *Server) replicateACLBindingRules(stopCh chan struct{}) {
|
|
|
|
// Generate our request object. We only need to do this once and reuse it
|
|
// for every RPC request. The MinQueryIndex is updated after every
|
|
// successful replication loop, so the next query acts as a blocking query
|
|
// and only returns upon a change in the authoritative region.
|
|
req := structs.ACLBindingRulesListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
Region: s.config.AuthoritativeRegion,
|
|
},
|
|
}
|
|
|
|
// Create our replication rate limiter for ACL binding rules and log a
|
|
// lovely message to indicate the process is starting.
|
|
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
|
|
s.logger.Debug("starting ACL Binding Rules replication from authoritative region",
|
|
"authoritative_region", req.Region)
|
|
|
|
// Enter the main ACL binding rules replication loop that will only exit
|
|
// when the stopCh is closed.
|
|
//
|
|
// Any error encountered will use the replicationBackoffContinue function
|
|
// which handles replication backoff and shutdown coordination in the event
|
|
// of an error inside the loop.
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
|
|
// Rate limit how often we attempt replication. It is OK to ignore
|
|
// the error as the context will never be cancelled and the limit
|
|
// parameters are controlled internally.
|
|
_ = limiter.Wait(context.Background())
|
|
|
|
// Set the replication token on each replication iteration so that
|
|
// it is always current and can handle agent SIGHUP reloads.
|
|
req.AuthToken = s.ReplicationToken()
|
|
|
|
var resp structs.ACLBindingRulesListResponse
|
|
|
|
// Make the list RPC request to the authoritative region, so we
|
|
// capture the latest ACL binding rules listing.
|
|
err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLListBindingRulesRPCMethod, &req, &resp)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch ACL binding rules from authoritative region", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
|
|
// Perform a two-way diff on the ACL binding rules.
|
|
toDelete, toUpdate := diffACLBindingRules(s.State(), req.MinQueryIndex, resp.ACLBindingRules)
|
|
|
|
// A significant amount of time could pass between the last check
|
|
// on whether we should stop the replication process. Therefore, do
|
|
// a check here, before calling Raft.
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// If we have ACL binding rules to delete, make this call directly
|
|
// to Raft.
|
|
if len(toDelete) > 0 {
|
|
args := structs.ACLBindingRulesDeleteRequest{ACLBindingRuleIDs: toDelete}
|
|
_, _, err := s.raftApply(structs.ACLBindingRulesDeleteRequestType, &args)
|
|
|
|
// If the error was because we lost leadership while calling
|
|
// Raft, avoid logging as this can be confusing to operators.
|
|
if err != nil {
|
|
if err != raft.ErrLeadershipLost {
|
|
s.logger.Error("failed to delete ACL binding rules", "error", err)
|
|
}
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fetch any outdated binding rules.
|
|
var fetched []*structs.ACLBindingRule
|
|
if len(toUpdate) > 0 {
|
|
req := structs.ACLBindingRulesRequest{
|
|
ACLBindingRuleIDs: toUpdate,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: s.config.AuthoritativeRegion,
|
|
AuthToken: s.ReplicationToken(),
|
|
AllowStale: true,
|
|
MinQueryIndex: resp.Index - 1,
|
|
},
|
|
}
|
|
var reply structs.ACLBindingRulesResponse
|
|
if err := s.forwardRegion(s.config.AuthoritativeRegion, structs.ACLGetBindingRulesRPCMethod, &req, &reply); err != nil {
|
|
s.logger.Error("failed to fetch ACL binding rules from authoritative region", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
for _, aclBindingRule := range reply.ACLBindingRules {
|
|
fetched = append(fetched, aclBindingRule)
|
|
}
|
|
}
|
|
|
|
// Update local binding rules.
|
|
if len(fetched) > 0 {
|
|
args := structs.ACLBindingRulesUpsertRequest{
|
|
ACLBindingRules: fetched,
|
|
AllowMissingAuthMethods: true,
|
|
}
|
|
|
|
// Perform the upsert directly via Raft.
|
|
_, _, err := s.raftApply(structs.ACLBindingRulesUpsertRequestType, &args)
|
|
if err != nil {
|
|
s.logger.Error("failed to update ACL binding rules", "error", err)
|
|
if s.replicationBackoffContinue(stopCh) {
|
|
continue
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update the minimum query index, blocks until there is a change.
|
|
req.MinQueryIndex = resp.Index
|
|
}
|
|
}
|
|
}
|
|
|
|
// diffACLBindingRules is used to perform a two-way diff between the local ACL
|
|
// binding rules and the remote binding rules to determine which ones need to be
|
|
// deleted or updated. The returned array's contain ACL binding rule names.
|
|
func diffACLBindingRules(
|
|
store *state.StateStore, minIndex uint64, remoteList []*structs.ACLBindingRuleListStub) (
|
|
delete []string, update []string) {
|
|
|
|
// The local ACL binding rules tracking is keyed by the name and the value
|
|
// is the hash of the auth-method.
|
|
local := make(map[string][]byte)
|
|
|
|
// The remote ACL binding rules tracking is keyed by the name; the value is
|
|
// an empty struct as we already have the full object.
|
|
remote := make(map[string]struct{})
|
|
|
|
// Read all the ACL binding rules currently held within our local state.
|
|
// This panic will only happen as a developer making a mistake with naming
|
|
// the index to use.
|
|
iter, err := store.GetACLBindingRules(nil)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("failed to iterate local ACL binding rules: %v", err))
|
|
}
|
|
|
|
// Iterate the local ACL binding rules and add them to our tracking of
|
|
// local binding rules.
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
aclBindingRule := raw.(*structs.ACLBindingRule)
|
|
local[aclBindingRule.ID] = aclBindingRule.Hash
|
|
}
|
|
|
|
// Iterate over the remote ACL binding rules.
|
|
for _, remoteACLBindingRule := range remoteList {
|
|
remote[remoteACLBindingRule.ID] = struct{}{}
|
|
|
|
// Identify whether the ACL auth-method is within the local state. If
|
|
// it is not, add this to our update list.
|
|
if localHash, ok := local[remoteACLBindingRule.ID]; !ok {
|
|
update = append(update, remoteACLBindingRule.ID)
|
|
|
|
// Check if the ACL binding rule is newer remotely and there is a
|
|
// hash mismatch.
|
|
} else if remoteACLBindingRule.ModifyIndex > minIndex && !bytes.Equal(localHash, remoteACLBindingRule.Hash) {
|
|
update = append(update, remoteACLBindingRule.ID)
|
|
}
|
|
}
|
|
|
|
// If we have ACL binding rules within state which are no longer present in
|
|
// the authoritative region we should delete them.
|
|
for localACLBindingRules := range local {
|
|
if _, ok := remote[localACLBindingRules]; !ok {
|
|
delete = append(delete, localACLBindingRules)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// replicationBackoffContinue should be used when a replication loop encounters
|
|
// an error and wants to wait until either the backoff time has been met, or
|
|
// the stopCh has been closed. The boolean indicates whether the replication
|
|
// process should continue.
|
|
//
|
|
// Typical use:
|
|
//
|
|
// if s.replicationBackoffContinue(stopCh) {
|
|
// continue
|
|
// } else {
|
|
// return
|
|
// }
|
|
func (s *Server) replicationBackoffContinue(stopCh chan struct{}) bool {
|
|
|
|
timer, timerStopFn := helper.NewSafeTimer(s.config.ReplicationBackoff)
|
|
defer timerStopFn()
|
|
|
|
select {
|
|
case <-timer.C:
|
|
return true
|
|
case <-stopCh:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
|
func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
|
|
state := s.fsm.State()
|
|
_, config, err := state.AutopilotConfig()
|
|
if err != nil {
|
|
s.logger.Named("autopilot").Error("failed to get autopilot config", "error", err)
|
|
return nil
|
|
}
|
|
if config != nil {
|
|
return config
|
|
}
|
|
|
|
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minAutopilotVersion, false) {
|
|
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
|
|
return nil
|
|
}
|
|
|
|
config = s.config.AutopilotConfig
|
|
req := structs.AutopilotSetConfigRequest{Config: *config}
|
|
if _, _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
|
|
s.logger.Named("autopilot").Error("failed to initialize config", "error", err)
|
|
return nil
|
|
}
|
|
|
|
return config
|
|
}
|
|
|
|
// getOrCreateSchedulerConfig is used to get the scheduler config. We create a default
|
|
// config if it doesn't already exist for bootstrapping an empty cluster
|
|
func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
|
|
state := s.fsm.State()
|
|
_, config, err := state.SchedulerConfig()
|
|
if err != nil {
|
|
s.logger.Named("core").Error("failed to get scheduler config", "error", err)
|
|
return nil
|
|
}
|
|
if config != nil {
|
|
return config
|
|
}
|
|
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minSchedulerConfigVersion, false) {
|
|
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
|
|
return nil
|
|
}
|
|
|
|
req := structs.SchedulerSetConfigRequest{Config: s.config.DefaultSchedulerConfig}
|
|
if _, _, err = s.raftApply(structs.SchedulerConfigRequestType, req); err != nil {
|
|
s.logger.Named("core").Error("failed to initialize config", "error", err)
|
|
return nil
|
|
}
|
|
|
|
return config
|
|
}
|
|
|
|
var minVersionKeyring = version.Must(version.NewVersion("1.4.0"))
|
|
|
|
// initializeKeyring creates the first root key if the leader doesn't
|
|
// already have one. The metadata will be replicated via raft and then
|
|
// the followers will get the key material from their own key
|
|
// replication.
|
|
func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
|
|
|
|
logger := s.logger.Named("keyring")
|
|
|
|
store := s.fsm.State()
|
|
keyMeta, err := store.GetActiveRootKeyMeta(nil)
|
|
if err != nil {
|
|
logger.Error("failed to get active key: %v", err)
|
|
return
|
|
}
|
|
if keyMeta != nil {
|
|
return
|
|
}
|
|
|
|
logger.Trace("verifying cluster is ready to initialize keyring")
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
if ServersMeetMinimumVersion(s.serf.Members(), s.Region(), minVersionKeyring, true) {
|
|
break
|
|
}
|
|
}
|
|
// we might have lost leadership during the version check
|
|
if !s.IsLeader() {
|
|
return
|
|
}
|
|
|
|
logger.Trace("initializing keyring")
|
|
|
|
rootKey, err := structs.NewRootKey(structs.EncryptionAlgorithmAES256GCM)
|
|
rootKey.Meta.SetActive()
|
|
if err != nil {
|
|
logger.Error("could not initialize keyring: %v", err)
|
|
return
|
|
}
|
|
|
|
err = s.encrypter.AddKey(rootKey)
|
|
if err != nil {
|
|
logger.Error("could not add initial key to keyring: %v", err)
|
|
return
|
|
}
|
|
|
|
if _, _, err = s.raftApply(structs.RootKeyMetaUpsertRequestType,
|
|
structs.KeyringUpdateRootKeyMetaRequest{
|
|
RootKeyMeta: rootKey.Meta,
|
|
}); err != nil {
|
|
logger.Error("could not initialize keyring: %v", err)
|
|
return
|
|
}
|
|
|
|
logger.Info("initialized keyring", "id", rootKey.Meta.KeyID)
|
|
}
|
|
|
|
func (s *Server) generateClusterID() (string, error) {
|
|
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minClusterIDVersion, false) {
|
|
s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion)
|
|
return "", fmt.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
|
|
}
|
|
|
|
newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate(), CreateTime: time.Now().UnixNano()}
|
|
if _, _, err := s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil {
|
|
s.logger.Named("core").Error("failed to create cluster ID", "error", err)
|
|
return "", fmt.Errorf("failed to create cluster ID: %w", err)
|
|
}
|
|
|
|
s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime)
|
|
return newMeta.ClusterID, nil
|
|
}
|
|
|
|
// handleEvalBrokerStateChange handles changing the evalBroker and blockedEvals
|
|
// enabled status based on the passed scheduler configuration. The boolean
|
|
// response indicates whether the caller needs to call restoreEvals() due to
|
|
// the brokers being enabled. It is for use when the change must take the
|
|
// scheduler configuration into account. This is not needed when calling
|
|
// revokeLeadership, as the configuration doesn't matter, and we need to ensure
|
|
// the brokers are stopped.
|
|
//
|
|
// The function checks the server is the leader and uses a mutex to avoid any
|
|
// potential timings problems. Consider the following timings:
|
|
// - operator updates the configuration via the API
|
|
// - the RPC handler applies the change via Raft
|
|
// - leadership transitions with write barrier
|
|
// - the RPC handler call this function to enact the change
|
|
//
|
|
// The mutex also protects against a situation where leadership is revoked
|
|
// while this function is being called. Ensuring the correct series of actions
|
|
// occurs so that state stays consistent.
|
|
func (s *Server) handleEvalBrokerStateChange(schedConfig *structs.SchedulerConfiguration) bool {
|
|
|
|
// Grab the lock first. Once we have this we can be sure to run everything
|
|
// needed before any leader transition can attempt to modify the state.
|
|
s.brokerLock.Lock()
|
|
defer s.brokerLock.Unlock()
|
|
|
|
// If we are no longer the leader, exit early.
|
|
if !s.IsLeader() {
|
|
return false
|
|
}
|
|
|
|
// enableEvalBroker tracks whether the evalBroker and blockedEvals
|
|
// processes should be enabled or not. It allows us to answer this question
|
|
// whether using a persisted Raft configuration, or the default bootstrap
|
|
// config.
|
|
var enableBrokers, restoreEvals bool
|
|
|
|
// The scheduler config can only be persisted to Raft once quorum has been
|
|
// established. If this is a fresh cluster, we need to use the default
|
|
// scheduler config, otherwise we can use the persisted object.
|
|
switch schedConfig {
|
|
case nil:
|
|
enableBrokers = !s.config.DefaultSchedulerConfig.PauseEvalBroker
|
|
default:
|
|
enableBrokers = !schedConfig.PauseEvalBroker
|
|
}
|
|
|
|
// If the evalBroker status is changing, set the new state.
|
|
if enableBrokers != s.evalBroker.Enabled() {
|
|
s.logger.Info("eval broker status modified", "paused", !enableBrokers)
|
|
s.evalBroker.SetEnabled(enableBrokers)
|
|
restoreEvals = enableBrokers
|
|
}
|
|
|
|
// If the blockedEvals status is changing, set the new state.
|
|
if enableBrokers != s.blockedEvals.Enabled() {
|
|
s.logger.Info("blocked evals status modified", "paused", !enableBrokers)
|
|
s.blockedEvals.SetEnabled(enableBrokers)
|
|
restoreEvals = enableBrokers
|
|
|
|
if enableBrokers {
|
|
s.blockedEvals.SetTimetable(s.fsm.TimeTable())
|
|
}
|
|
}
|
|
|
|
return restoreEvals
|
|
}
|