open-nomad/nomad/fsm.go

2586 lines
76 KiB
Go
Raw Normal View History

2015-06-01 15:49:10 +00:00
package nomad
import (
"fmt"
"io"
2017-01-11 21:18:36 +00:00
"reflect"
"sync"
2015-06-01 15:49:10 +00:00
"time"
2022-04-02 00:24:02 +00:00
"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
2015-06-01 15:49:10 +00:00
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
2015-06-01 15:49:10 +00:00
"github.com/hashicorp/raft"
)
2015-08-16 00:38:13 +00:00
const (
// timeTableGranularity is the granularity of index to time tracking
timeTableGranularity = 5 * time.Minute
// timeTableLimit is the maximum limit of our tracking
2015-09-07 18:01:29 +00:00
timeTableLimit = 72 * time.Hour
2015-08-16 00:38:13 +00:00
)
// SnapshotType is prefixed to a record in the FSM snapshot
// so that we can determine the type for restore
type SnapshotType byte
const (
2020-10-21 04:16:25 +00:00
NodeSnapshot SnapshotType = 0
JobSnapshot SnapshotType = 1
IndexSnapshot SnapshotType = 2
EvalSnapshot SnapshotType = 3
AllocSnapshot SnapshotType = 4
TimeTableSnapshot SnapshotType = 5
PeriodicLaunchSnapshot SnapshotType = 6
JobSummarySnapshot SnapshotType = 7
VaultAccessorSnapshot SnapshotType = 8
JobVersionSnapshot SnapshotType = 9
DeploymentSnapshot SnapshotType = 10
ACLPolicySnapshot SnapshotType = 11
ACLTokenSnapshot SnapshotType = 12
SchedulerConfigSnapshot SnapshotType = 13
ClusterMetadataSnapshot SnapshotType = 14
ServiceIdentityTokenAccessorSnapshot SnapshotType = 15
ScalingPolicySnapshot SnapshotType = 16
CSIPluginSnapshot SnapshotType = 17
CSIVolumeSnapshot SnapshotType = 18
ScalingEventsSnapshot SnapshotType = 19
EventSinkSnapshot SnapshotType = 20
ServiceRegistrationSnapshot SnapshotType = 21
2020-10-21 04:16:25 +00:00
// Namespace appliers were moved from enterprise and therefore start at 64
NamespaceSnapshot SnapshotType = 64
)
2017-09-07 23:56:15 +00:00
// LogApplier is the definition of a function that can apply a Raft log
type LogApplier func(buf []byte, index uint64) interface{}
// LogAppliers is a mapping of the Raft MessageType to the appropriate log
// applier
type LogAppliers map[structs.MessageType]LogApplier
// SnapshotRestorer is the definition of a function that can apply a Raft log
type SnapshotRestorer func(restore *state.StateRestore, dec *codec.Decoder) error
// SnapshotRestorers is a mapping of the SnapshotType to the appropriate
// snapshot restorer.
type SnapshotRestorers map[SnapshotType]SnapshotRestorer
2015-06-01 15:49:10 +00:00
// nomadFSM implements a finite state machine that is used
// along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package.
type nomadFSM struct {
evalBroker *EvalBroker
2016-01-29 23:31:32 +00:00
blockedEvals *BlockedEvals
periodicDispatcher *PeriodicDispatch
2022-04-02 00:24:02 +00:00
logger hclog.Logger
state *state.StateStore
timetable *TimeTable
2017-10-13 21:36:02 +00:00
// config is the FSM config
config *FSMConfig
2017-09-07 23:56:15 +00:00
// enterpriseAppliers holds the set of enterprise only LogAppliers
enterpriseAppliers LogAppliers
// enterpriseRestorers holds the set of enterprise only snapshot restorers
enterpriseRestorers SnapshotRestorers
// stateLock is only used to protect outside callers to State() from
// racing with Restore(), which is called by Raft (it puts in a totally
// new state store). Everything internal here is synchronized by the
// Raft side, so doesn't need to lock this.
stateLock sync.RWMutex
2015-06-01 15:49:10 +00:00
}
// nomadSnapshot is used to provide a snapshot of the current
// state in a way that can be accessed concurrently with operations
// that may modify the live state.
type nomadSnapshot struct {
snap *state.StateSnapshot
timetable *TimeTable
}
// snapshotHeader is the first entry in our snapshot
type snapshotHeader struct {
2015-06-01 15:49:10 +00:00
}
2017-10-13 21:36:02 +00:00
// FSMConfig is used to configure the FSM
type FSMConfig struct {
// EvalBroker is the evaluation broker evaluations should be added to
EvalBroker *EvalBroker
// Periodic is the periodic job dispatcher that periodic jobs should be
// added/removed from
Periodic *PeriodicDispatch
2018-03-11 18:01:35 +00:00
// BlockedEvals is the blocked eval tracker that blocked evaluations should
2017-10-13 21:36:02 +00:00
// be added to.
Blocked *BlockedEvals
2018-09-15 23:23:13 +00:00
// Logger is the logger used by the FSM
2022-04-02 00:24:02 +00:00
Logger hclog.Logger
2017-10-13 21:36:02 +00:00
// Region is the region of the server embedding the FSM
Region string
// EnableEventBroker specifies if the FSMs state store should enable
// it's event publisher.
EnableEventBroker bool
2020-10-05 23:40:06 +00:00
// EventBufferSize is the amount of messages to hold in memory
EventBufferSize int64
2017-10-13 21:36:02 +00:00
}
// NewFSM is used to construct a new FSM with a blank state.
2017-10-13 21:36:02 +00:00
func NewFSM(config *FSMConfig) (*nomadFSM, error) {
2015-06-01 15:49:10 +00:00
// Create a state store
2017-10-13 21:36:02 +00:00
sconfig := &state.StateStoreConfig{
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventBroker,
EventBufferSize: config.EventBufferSize,
2017-10-13 21:36:02 +00:00
}
state, err := state.NewStateStore(sconfig)
2015-06-01 15:49:10 +00:00
if err != nil {
return nil, err
}
fsm := &nomadFSM{
2017-10-13 21:36:02 +00:00
evalBroker: config.EvalBroker,
periodicDispatcher: config.Periodic,
blockedEvals: config.Blocked,
2018-09-15 23:23:13 +00:00
logger: config.Logger.Named("fsm"),
2017-10-13 21:36:02 +00:00
config: config,
2017-09-07 23:56:15 +00:00
state: state,
timetable: NewTimeTable(timeTableGranularity, timeTableLimit),
enterpriseAppliers: make(map[structs.MessageType]LogApplier, 8),
enterpriseRestorers: make(map[SnapshotType]SnapshotRestorer, 8),
2015-06-01 15:49:10 +00:00
}
2017-09-07 23:56:15 +00:00
// Register all the log applier functions
fsm.registerLogAppliers()
// Register all the snapshot restorer functions
fsm.registerSnapshotRestorers()
2015-06-01 15:49:10 +00:00
return fsm, nil
}
// Close is used to cleanup resources associated with the FSM
func (n *nomadFSM) Close() error {
n.state.StopEventBroker()
return nil
2015-06-01 15:49:10 +00:00
}
// State is used to return a handle to the current state
func (n *nomadFSM) State() *state.StateStore {
n.stateLock.RLock()
defer n.stateLock.RUnlock()
2015-06-01 15:49:10 +00:00
return n.state
}
2015-08-16 00:38:13 +00:00
// TimeTable returns the time table of transactions
func (n *nomadFSM) TimeTable() *TimeTable {
return n.timetable
}
2015-06-01 15:49:10 +00:00
func (n *nomadFSM) Apply(log *raft.Log) interface{} {
buf := log.Data
msgType := structs.MessageType(buf[0])
2015-08-16 00:38:13 +00:00
// Witness this write
n.timetable.Witness(log.Index, time.Now().UTC())
2015-06-01 15:49:10 +00:00
// Check if this message type should be ignored when unknown. This is
// used so that new commands can be added with developer control if older
// versions can safely ignore the command, or if they should crash.
ignoreUnknown := false
if msgType&structs.IgnoreUnknownTypeFlag == structs.IgnoreUnknownTypeFlag {
msgType &= ^structs.IgnoreUnknownTypeFlag
ignoreUnknown = true
}
switch msgType {
case structs.NodeRegisterRequestType:
return n.applyUpsertNode(msgType, buf[1:], log.Index)
case structs.NodeDeregisterRequestType:
return n.applyDeregisterNode(msgType, buf[1:], log.Index)
2015-07-04 01:41:36 +00:00
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(msgType, buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
return n.applyDrainUpdate(msgType, buf[1:], log.Index)
case structs.JobRegisterRequestType:
return n.applyUpsertJob(msgType, buf[1:], log.Index)
case structs.JobDeregisterRequestType:
return n.applyDeregisterJob(msgType, buf[1:], log.Index)
case structs.EvalUpdateRequestType:
return n.applyUpdateEval(msgType, buf[1:], log.Index)
case structs.EvalDeleteRequestType:
return n.applyDeleteEval(buf[1:], log.Index)
2015-08-04 21:04:26 +00:00
case structs.AllocUpdateRequestType:
return n.applyAllocUpdate(msgType, buf[1:], log.Index)
case structs.AllocClientUpdateRequestType:
return n.applyAllocClientUpdate(msgType, buf[1:], log.Index)
case structs.ReconcileJobSummariesRequestType:
return n.applyReconcileSummaries(buf[1:], log.Index)
case structs.VaultAccessorRegisterRequestType:
return n.applyUpsertVaultAccessor(buf[1:], log.Index)
2018-03-11 17:53:22 +00:00
case structs.VaultAccessorDeregisterRequestType:
return n.applyDeregisterVaultAccessor(buf[1:], log.Index)
case structs.ApplyPlanResultsRequestType:
return n.applyPlanResults(msgType, buf[1:], log.Index)
2017-06-26 21:23:52 +00:00
case structs.DeploymentStatusUpdateRequestType:
return n.applyDeploymentStatusUpdate(msgType, buf[1:], log.Index)
2017-06-26 21:23:52 +00:00
case structs.DeploymentPromoteRequestType:
return n.applyDeploymentPromotion(msgType, buf[1:], log.Index)
2017-06-26 21:23:52 +00:00
case structs.DeploymentAllocHealthRequestType:
return n.applyDeploymentAllocHealth(msgType, buf[1:], log.Index)
case structs.DeploymentDeleteRequestType:
return n.applyDeploymentDelete(buf[1:], log.Index)
2017-07-06 19:49:13 +00:00
case structs.JobStabilityRequestType:
return n.applyJobStability(buf[1:], log.Index)
2017-08-08 04:01:14 +00:00
case structs.ACLPolicyUpsertRequestType:
return n.applyACLPolicyUpsert(msgType, buf[1:], log.Index)
case structs.ACLPolicyDeleteRequestType:
return n.applyACLPolicyDelete(msgType, buf[1:], log.Index)
2017-08-12 22:44:05 +00:00
case structs.ACLTokenUpsertRequestType:
return n.applyACLTokenUpsert(msgType, buf[1:], log.Index)
2017-08-12 22:44:05 +00:00
case structs.ACLTokenDeleteRequestType:
return n.applyACLTokenDelete(msgType, buf[1:], log.Index)
2017-08-21 01:19:26 +00:00
case structs.ACLTokenBootstrapRequestType:
return n.applyACLTokenBootstrap(msgType, buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
2018-03-14 00:52:12 +00:00
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index)
2018-03-14 22:32:18 +00:00
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(msgType, buf[1:], log.Index)
case structs.AllocUpdateDesiredTransitionRequestType:
return n.applyAllocUpdateDesiredTransition(msgType, buf[1:], log.Index)
2018-02-27 00:34:42 +00:00
case structs.NodeUpdateEligibilityRequestType:
return n.applyNodeEligibilityUpdate(msgType, buf[1:], log.Index)
2018-03-09 22:15:21 +00:00
case structs.BatchNodeUpdateDrainRequestType:
return n.applyBatchDrainUpdate(msgType, buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
return n.applyDeregisterNodeBatch(msgType, buf[1:], log.Index)
case structs.ClusterMetadataRequestType:
return n.applyClusterMetadata(buf[1:], log.Index)
case structs.ServiceIdentityAccessorRegisterRequestType:
return n.applyUpsertSIAccessor(buf[1:], log.Index)
case structs.ServiceIdentityAccessorDeregisterRequestType:
return n.applyDeregisterSIAccessor(buf[1:], log.Index)
case structs.CSIVolumeRegisterRequestType:
return n.applyCSIVolumeRegister(buf[1:], log.Index)
case structs.CSIVolumeDeregisterRequestType:
return n.applyCSIVolumeDeregister(buf[1:], log.Index)
case structs.CSIVolumeClaimRequestType:
return n.applyCSIVolumeClaim(buf[1:], log.Index)
case structs.ScalingEventRegisterRequestType:
return n.applyUpsertScalingEvent(buf[1:], log.Index)
case structs.CSIVolumeClaimBatchRequestType:
return n.applyCSIVolumeBatchClaim(buf[1:], log.Index)
case structs.CSIPluginDeleteRequestType:
return n.applyCSIPluginDelete(buf[1:], log.Index)
2020-10-21 04:16:25 +00:00
case structs.NamespaceUpsertRequestType:
return n.applyNamespaceUpsert(buf[1:], log.Index)
case structs.NamespaceDeleteRequestType:
return n.applyNamespaceDelete(buf[1:], log.Index)
// COMPAT(1.0): These messages were added and removed during the 1.0-beta
// series and should not be immediately reused for other purposes
case structs.EventSinkUpsertRequestType,
structs.EventSinkDeleteRequestType,
structs.BatchEventSinkUpdateProgressType:
return nil
case structs.OneTimeTokenUpsertRequestType:
return n.applyOneTimeTokenUpsert(msgType, buf[1:], log.Index)
case structs.OneTimeTokenDeleteRequestType:
return n.applyOneTimeTokenDelete(msgType, buf[1:], log.Index)
case structs.OneTimeTokenExpireRequestType:
return n.applyOneTimeTokenExpire(msgType, buf[1:], log.Index)
case structs.ServiceRegistrationUpsertRequestType:
return n.applyUpsertServiceRegistrations(msgType, buf[1:], log.Index)
case structs.ServiceRegistrationDeleteByIDRequestType:
return n.applyDeleteServiceRegistrationByID(msgType, buf[1:], log.Index)
case structs.ServiceRegistrationDeleteByNodeIDRequestType:
return n.applyDeleteServiceRegistrationByNodeID(msgType, buf[1:], log.Index)
2015-06-01 15:49:10 +00:00
}
2017-09-07 23:56:15 +00:00
// Check enterprise only message types.
if applier, ok := n.enterpriseAppliers[msgType]; ok {
return applier(buf[1:], log.Index)
}
// We didn't match anything, either panic or ignore
if ignoreUnknown {
2018-09-15 23:23:13 +00:00
n.logger.Warn("ignoring unknown message type, upgrade to newer version", "msg_type", msgType)
2017-09-07 23:56:15 +00:00
return nil
}
panic(fmt.Errorf("failed to apply request: %#v", buf))
2015-06-01 15:49:10 +00:00
}
2015-07-04 01:41:36 +00:00
func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "cluster_meta"}, time.Now())
var req structs.ClusterMetadata
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.ClusterSetMetadata(index, &req); err != nil {
n.logger.Error("ClusterSetMetadata failed", "error", err)
return err
}
n.logger.Trace("ClusterSetMetadata", "cluster_id", req.ClusterID, "create_time", req.CreateTime)
return nil
}
func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
2015-07-04 01:41:36 +00:00
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2015-07-04 01:41:36 +00:00
}
// Handle upgrade paths
req.Node.Canonicalize()
if err := n.state.UpsertNode(reqType, index, req.Node); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertNode failed", "error", err)
2015-07-04 01:41:36 +00:00
return err
}
// Unblock evals for the nodes computed node class if it is in a ready
// state.
if req.Node.Status == structs.NodeStatusReady {
n.blockedEvals.Unblock(req.Node.ComputedClass, index)
}
2015-07-04 01:41:36 +00:00
return nil
}
func (n *nomadFSM) applyDeregisterNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now())
var req structs.NodeDeregisterRequest
2015-07-04 01:41:36 +00:00
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2015-07-04 01:41:36 +00:00
}
if err := n.state.DeleteNode(reqType, index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeregisterNodeBatch(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_node"}, time.Now())
var req structs.NodeBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteNode(reqType, index, req.NodeIDs); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("DeleteNode failed", "error", err)
2015-07-04 01:41:36 +00:00
return err
}
2015-07-04 01:41:36 +00:00
return nil
}
func (n *nomadFSM) applyStatusUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2015-07-04 01:41:36 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_status_update"}, time.Now())
var req structs.NodeUpdateStatusRequest
2015-07-04 01:41:36 +00:00
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2015-07-04 01:41:36 +00:00
}
if err := n.state.UpdateNodeStatus(msgType, index, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpdateNodeStatus failed", "error", err)
2015-07-04 01:41:36 +00:00
return err
}
2016-01-29 23:31:32 +00:00
// Unblock evals for the nodes computed node class if it is in a ready
// state.
if req.Status == structs.NodeStatusReady {
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
node, err := n.state.NodeByID(ws, req.NodeID)
2016-01-29 23:31:32 +00:00
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("looking up node failed", "node_id", req.NodeID, "error", err)
2016-01-29 23:31:32 +00:00
return err
}
n.blockedEvals.Unblock(node.ComputedClass, index)
2019-06-21 20:44:34 +00:00
n.blockedEvals.UnblockNode(req.NodeID, index)
2016-01-29 23:31:32 +00:00
}
2015-07-04 01:41:36 +00:00
return nil
}
func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
var req structs.NodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
2021-05-07 17:58:40 +00:00
accessorId := ""
if req.AuthToken != "" {
token, err := n.state.ACLTokenBySecretID(nil, req.AuthToken)
if err != nil {
n.logger.Error("error looking up ACL token from drain update", "error", err)
return fmt.Errorf("error looking up ACL token: %v", err)
}
if token == nil {
n.logger.Error("token did not exist during node drain update")
return fmt.Errorf("token did not exist during node drain update")
}
accessorId = token.AccessorID
}
if err := n.state.UpdateNodeDrain(reqType, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt,
req.NodeEvent, req.Meta, accessorId); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyBatchDrainUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2018-03-09 22:15:21 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_node_drain_update"}, time.Now())
var req structs.BatchNodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2018-03-09 22:15:21 +00:00
}
if err := n.state.BatchUpdateNodeDrain(msgType, index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("BatchUpdateNodeDrain failed", "error", err)
2018-03-09 22:15:21 +00:00
return err
}
return nil
}
func (n *nomadFSM) applyNodeEligibilityUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2018-02-27 00:34:42 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_eligibility_update"}, time.Now())
var req structs.NodeUpdateEligibilityRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2018-02-27 00:34:42 +00:00
}
2018-02-27 22:08:29 +00:00
// Lookup the existing node
node, err := n.state.NodeByID(nil, req.NodeID)
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpdateNodeEligibility failed to lookup node", "node_id", req.NodeID, "error", err)
2018-02-27 22:08:29 +00:00
return err
}
if err := n.state.UpdateNodeEligibility(msgType, index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpdateNodeEligibility failed", "error", err)
2018-02-27 00:34:42 +00:00
return err
}
2018-02-27 22:08:29 +00:00
// Unblock evals for the nodes computed node class if it is in a ready
// state.
if node != nil && node.SchedulingEligibility == structs.NodeSchedulingIneligible &&
req.Eligibility == structs.NodeSchedulingEligible {
n.blockedEvals.Unblock(node.ComputedClass, index)
2019-06-21 20:44:34 +00:00
n.blockedEvals.UnblockNode(req.NodeID, index)
2018-02-27 22:08:29 +00:00
}
2018-02-27 00:34:42 +00:00
return nil
}
func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now())
var req structs.JobRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
/* Handle upgrade paths:
* - Empty maps and slices should be treated as nil to avoid
* un-intended destructive updates in scheduler since we use
2018-03-11 18:52:59 +00:00
* reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanitizes
* the incoming job.
* - Migrate from old style upgrade stanza that used only a stagger.
*/
2016-07-20 23:07:15 +00:00
req.Job.Canonicalize()
2016-07-18 23:17:38 +00:00
if err := n.state.UpsertJob(msgType, index, req.Job); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertJob failed", "error", err)
return err
}
2015-12-01 22:54:57 +00:00
2015-12-21 21:25:50 +00:00
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
if err := n.periodicDispatcher.Add(req.Job); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("periodicDispatcher.Add failed", "error", err)
2018-09-15 23:42:38 +00:00
return fmt.Errorf("failed adding job to periodic dispatcher: %v", err)
2015-12-21 21:25:50 +00:00
}
2015-12-19 01:51:30 +00:00
2017-02-08 04:31:23 +00:00
// Create a watch set
ws := memdb.NewWatchSet()
// If it is an active periodic job, record the time it was inserted. This is
// necessary for recovering during leader election. It is possible that from
// the time it is added to when it was suppose to launch, leader election
// occurs and the job was not launched. In this case, we use the insertion
// time to determine if a launch was missed.
if req.Job.IsPeriodicActive() {
2017-09-07 23:56:15 +00:00
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("PeriodicLaunchByID failed", "error", err)
return err
}
2015-12-21 21:25:50 +00:00
// Record the insertion time as a launch. We overload the launch table
// such that the first entry is the insertion time.
if prevLaunch == nil {
2017-09-07 23:56:15 +00:00
launch := &structs.PeriodicLaunch{
ID: req.Job.ID,
Namespace: req.Namespace,
Launch: time.Now(),
}
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertPeriodicLaunch failed", "error", err)
return err
}
2015-12-19 01:51:30 +00:00
}
}
// Check if the parent job is periodic and mark the launch time.
parentID := req.Job.ParentID
if parentID != "" {
2017-09-07 23:56:15 +00:00
parent, err := n.state.JobByID(ws, req.Namespace, parentID)
2015-12-19 01:51:30 +00:00
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("JobByID lookup for parent failed", "parent_id", parentID, "namespace", req.Namespace, "error", err)
2015-12-19 01:51:30 +00:00
return err
} else if parent == nil {
// The parent has been deregistered.
return nil
}
if parent.IsPeriodic() && !parent.IsParameterized() {
t, err := n.periodicDispatcher.LaunchTime(req.Job.ID)
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("LaunchTime failed", "job", req.Job.NamespacedID(), "error", err)
return err
}
2017-09-07 23:56:15 +00:00
launch := &structs.PeriodicLaunch{
ID: parentID,
Namespace: req.Namespace,
Launch: t,
}
2015-12-19 01:51:30 +00:00
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertPeriodicLaunch failed", "error", err)
2015-12-19 01:51:30 +00:00
return err
}
}
2015-12-01 22:54:57 +00:00
}
2020-07-15 15:23:49 +00:00
// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil {
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
return err
}
}
return nil
}
func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_job"}, time.Now())
var req structs.JobDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx)
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
if err != nil {
n.logger.Error("deregistering job failed",
"error", err, "job", req.JobID, "namespace", req.Namespace)
return err
}
2018-03-14 22:32:18 +00:00
return nil
})
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
2020-07-15 15:23:49 +00:00
// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
// always attempt upsert eval even if job deregister fail
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil {
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
return err
}
}
if err != nil {
return err
}
return nil
2018-03-14 22:32:18 +00:00
}
func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2018-03-14 22:32:18 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_job"}, time.Now())
var req structs.JobBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2018-03-14 22:32:18 +00:00
}
2018-11-14 13:36:14 +00:00
// Perform all store updates atomically to ensure a consistent view for store readers.
// A partial update may increment the snapshot index, allowing eval brokers to process
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err)
return err
}
}
if err := n.state.UpsertEvalsTxn(index, req.Evals, tx); err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
2018-03-14 22:32:18 +00:00
return err
}
return nil
})
if err != nil {
return err
2018-03-14 22:32:18 +00:00
}
// perform the side effects outside the transactions
n.handleUpsertedEvals(req.Evals)
return nil
2018-03-14 22:32:18 +00:00
}
// handleJobDeregister is used to deregister a job. Leaves error logging up to
// caller.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error {
2017-04-15 03:54:30 +00:00
// If it is periodic remove it from the dispatcher
2018-03-14 22:32:18 +00:00
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
return fmt.Errorf("periodicDispatcher.Remove failed: %w", err)
2015-12-01 22:54:57 +00:00
}
if noShutdownDelay {
ws := memdb.NewWatchSet()
allocs, err := n.state.AllocsByJob(ws, namespace, jobID, false)
if err != nil {
return err
}
transition := &structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)}
for _, alloc := range allocs {
err := n.state.UpdateAllocDesiredTransitionTxn(tx, index, alloc.ID, transition)
if err != nil {
return err
}
err = tx.Insert("index", &state.IndexEntry{Key: "allocs", Value: index})
if err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
}
2018-03-14 22:32:18 +00:00
if purge {
if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil {
return fmt.Errorf("DeleteJob failed: %w", err)
2017-04-15 03:54:30 +00:00
}
// We always delete from the periodic launch table because it is possible that
2018-03-11 18:37:05 +00:00
// the job was updated to be non-periodic, thus checking if it is periodic
2017-04-15 03:54:30 +00:00
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx)
2017-04-15 03:54:30 +00:00
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx)
2017-04-15 03:54:30 +00:00
if err != nil {
return fmt.Errorf("JobByID lookup failed: %w", err)
2017-04-15 03:54:30 +00:00
}
if current == nil {
2018-03-14 22:32:18 +00:00
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace)
2017-04-15 03:54:30 +00:00
}
stopped := current.Copy()
stopped.Stop = true
if err := n.state.UpsertJobTxn(index, stopped, tx); err != nil {
return fmt.Errorf("UpsertJob failed: %w", err)
2017-04-15 03:54:30 +00:00
}
}
2015-12-19 01:51:30 +00:00
return nil
}
func (n *nomadFSM) applyUpdateEval(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now())
var req structs.EvalUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
return n.upsertEvals(msgType, index, req.Evals)
}
func (n *nomadFSM) upsertEvals(msgType structs.MessageType, index uint64, evals []*structs.Evaluation) error {
if err := n.state.UpsertEvals(msgType, index, evals); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertEvals failed", "error", err)
return err
}
2015-08-06 18:32:42 +00:00
2018-04-10 22:29:54 +00:00
n.handleUpsertedEvals(evals)
return nil
}
// handleUpsertingEval is a helper for taking action after upserting
// evaluations.
func (n *nomadFSM) handleUpsertedEvals(evals []*structs.Evaluation) {
for _, eval := range evals {
2018-04-10 22:29:54 +00:00
n.handleUpsertedEval(eval)
}
}
// handleUpsertingEval is a helper for taking action after upserting an eval.
func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
if eval == nil {
return
}
if eval.ShouldEnqueue() {
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
} else if eval.Status == structs.EvalStatusComplete &&
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
2018-11-07 18:22:08 +00:00
n.blockedEvals.Untrack(eval.JobID, eval.Namespace)
2015-08-06 18:32:42 +00:00
}
}
func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "delete_eval"}, time.Now())
var req structs.EvalDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteEval(index, req.Evals, req.Allocs); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("DeleteEval failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyAllocUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2015-08-04 21:04:26 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update"}, time.Now())
var req structs.AllocUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2015-08-04 21:04:26 +00:00
}
2016-03-01 22:09:25 +00:00
// Attach the job to all the allocations. It is pulled out in the
2016-02-21 19:42:54 +00:00
// payload to avoid the redundancy of encoding, but should be denormalized
// prior to being inserted into MemDB.
structs.DenormalizeAllocationJobs(req.Job, req.Alloc)
2016-02-21 19:42:54 +00:00
2016-03-01 22:09:25 +00:00
for _, alloc := range req.Alloc {
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
if alloc.Resources == nil {
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
2016-03-01 22:09:25 +00:00
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
2016-03-01 22:09:25 +00:00
}
// Handle upgrade path
alloc.Canonicalize()
2016-03-01 22:09:25 +00:00
}
if err := n.state.UpsertAllocs(msgType, index, req.Alloc); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertAllocs failed", "error", err)
2015-08-04 21:04:26 +00:00
return err
}
return nil
}
func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_client_update"}, time.Now())
var req structs.AllocUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if len(req.Alloc) == 0 {
return nil
}
2017-02-08 04:31:23 +00:00
// Create a watch set
ws := memdb.NewWatchSet()
2016-07-21 21:43:21 +00:00
// Updating the allocs with the job id and task group name
for _, alloc := range req.Alloc {
2017-02-08 04:31:23 +00:00
if existing, _ := n.state.AllocByID(ws, alloc.ID); existing != nil {
alloc.JobID = existing.JobID
alloc.TaskGroup = existing.TaskGroup
}
}
// Update all the client allocations
if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpdateAllocFromClient failed", "error", err)
return err
}
2016-01-29 23:31:32 +00:00
// Update any evals
if len(req.Evals) > 0 {
if err := n.upsertEvals(msgType, index, req.Evals); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err)
return err
}
}
2016-01-29 23:31:32 +00:00
// Unblock evals for the nodes computed node class if the client has
// finished running an allocation.
for _, alloc := range req.Alloc {
if alloc.ClientStatus == structs.AllocClientStatusComplete ||
alloc.ClientStatus == structs.AllocClientStatusFailed {
nodeID := alloc.NodeID
2017-02-08 04:31:23 +00:00
node, err := n.state.NodeByID(ws, nodeID)
if err != nil || node == nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("looking up node failed", "node_id", nodeID, "error", err)
return err
2016-01-29 23:31:32 +00:00
}
2017-10-13 21:36:02 +00:00
// Unblock any associated quota
quota, err := n.allocQuota(alloc.ID)
if err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("looking up quota associated with alloc failed", "alloc_id", alloc.ID, "error", err)
2017-10-13 21:36:02 +00:00
return err
}
n.blockedEvals.UnblockClassAndQuota(node.ComputedClass, quota, index)
2019-06-21 20:44:34 +00:00
n.blockedEvals.UnblockNode(node.ID, index)
2016-01-29 23:31:32 +00:00
}
}
return nil
}
// applyAllocUpdateDesiredTransition is used to update the desired transitions
// of a set of allocations.
func (n *nomadFSM) applyAllocUpdateDesiredTransition(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transition"}, time.Now())
var req structs.AllocUpdateDesiredTransitionRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateAllocsDesiredTransitions(msgType, index, req.Allocs, req.Evals); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpdateAllocsDesiredTransitions failed", "error", err)
return err
}
2018-04-10 22:29:54 +00:00
n.handleUpsertedEvals(req.Evals)
return nil
}
2016-08-04 01:08:37 +00:00
// applyReconcileSummaries reconciles summaries for all the jobs
func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} {
if err := n.state.ReconcileJobSummaries(index); err != nil {
return err
}
return n.reconcileQueuedAllocations(index)
}
2018-03-14 01:04:55 +00:00
// applyUpsertNodeEvent tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEvent(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2018-03-14 00:52:12 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
2018-03-12 01:00:13 +00:00
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err))
}
if err := n.state.UpsertNodeEvents(msgType, index, req.NodeEvents); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("failed to add node events", "error", err)
2018-03-12 01:00:13 +00:00
return err
}
2018-03-09 00:30:49 +00:00
return nil
}
2016-08-22 18:41:47 +00:00
// applyUpsertVaultAccessor stores the Vault accessors for a given allocation
// and task
func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_vault_accessor"}, time.Now())
var req structs.VaultAccessorsRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertVaultAccessor(index, req.Accessors); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertVaultAccessor failed", "error", err)
return err
}
return nil
}
2016-08-31 21:10:33 +00:00
// applyDeregisterVaultAccessor deregisters a set of Vault accessors
func (n *nomadFSM) applyDeregisterVaultAccessor(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_vault_accessor"}, time.Now())
var req structs.VaultAccessorsRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteVaultAccessors(index, req.Accessors); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("DeregisterVaultAccessor failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyUpsertSIAccessor(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_si_accessor"}, time.Now())
var request structs.SITokenAccessorsRequest
if err := structs.Decode(buf, &request); err != nil {
2022-04-02 00:24:02 +00:00
panic(fmt.Errorf("failed to decode request: %w", err))
}
if err := n.state.UpsertSITokenAccessors(index, request.Accessors); err != nil {
n.logger.Error("UpsertSITokenAccessors failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeregisterSIAccessor(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_si_accessor"}, time.Now())
var request structs.SITokenAccessorsRequest
if err := structs.Decode(buf, &request); err != nil {
2022-04-02 00:24:02 +00:00
panic(fmt.Errorf("failed to decode request: %w", err))
}
if err := n.state.DeleteSITokenAccessors(index, request.Accessors); err != nil {
n.logger.Error("DeregisterSITokenAccessor failed", "error", err)
return err
}
return nil
}
// applyPlanApply applies the results of a plan application
func (n *nomadFSM) applyPlanResults(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_plan_results"}, time.Now())
var req structs.ApplyPlanResultsRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertPlanResults(msgType, index, &req); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("ApplyPlan failed", "error", err)
return err
}
// Add evals for jobs that were preempted
n.handleUpsertedEvals(req.PreemptionEvals)
return nil
}
2017-06-26 21:23:52 +00:00
// applyDeploymentStatusUpdate is used to update the status of an existing
// deployment
func (n *nomadFSM) applyDeploymentStatusUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2017-06-26 21:23:52 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_status_update"}, time.Now())
var req structs.DeploymentStatusUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-06-26 21:23:52 +00:00
}
if err := n.state.UpdateDeploymentStatus(msgType, index, &req); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertDeploymentStatusUpdate failed", "error", err)
2017-06-26 21:23:52 +00:00
return err
}
2018-04-10 22:29:54 +00:00
n.handleUpsertedEval(req.Eval)
2017-06-26 21:23:52 +00:00
return nil
}
// applyDeploymentPromotion is used to promote canaries in a deployment
func (n *nomadFSM) applyDeploymentPromotion(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2017-06-26 21:23:52 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_promotion"}, time.Now())
var req structs.ApplyDeploymentPromoteRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-06-26 21:23:52 +00:00
}
if err := n.state.UpdateDeploymentPromotion(msgType, index, &req); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertDeploymentPromotion failed", "error", err)
2017-06-26 21:23:52 +00:00
return err
}
2018-04-10 22:29:54 +00:00
n.handleUpsertedEval(req.Eval)
2017-06-26 21:23:52 +00:00
return nil
}
// applyDeploymentAllocHealth is used to set the health of allocations as part
// of a deployment
func (n *nomadFSM) applyDeploymentAllocHealth(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2017-06-26 21:23:52 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_alloc_health"}, time.Now())
var req structs.ApplyDeploymentAllocHealthRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-06-26 21:23:52 +00:00
}
if err := n.state.UpdateDeploymentAllocHealth(msgType, index, &req); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertDeploymentAllocHealth failed", "error", err)
2017-06-26 21:23:52 +00:00
return err
}
2018-04-10 22:29:54 +00:00
n.handleUpsertedEval(req.Eval)
2017-06-26 21:23:52 +00:00
return nil
}
// applyDeploymentDelete is used to delete a set of deployments
func (n *nomadFSM) applyDeploymentDelete(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_delete"}, time.Now())
var req structs.DeploymentDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteDeployment(index, req.Deployments); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("DeleteDeployment failed", "error", err)
return err
}
return nil
}
2017-07-06 19:49:13 +00:00
// applyJobStability is used to set the stability of a job
func (n *nomadFSM) applyJobStability(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_job_stability"}, time.Now())
var req structs.JobStabilityRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-07-06 19:49:13 +00:00
}
2017-09-07 23:56:15 +00:00
if err := n.state.UpdateJobStability(index, req.Namespace, req.JobID, req.JobVersion, req.Stable); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpdateJobStability failed", "error", err)
2017-07-06 19:49:13 +00:00
return err
}
return nil
}
2017-08-08 04:01:14 +00:00
// applyACLPolicyUpsert is used to upsert a set of policies
func (n *nomadFSM) applyACLPolicyUpsert(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2017-08-08 04:01:14 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_upsert"}, time.Now())
var req structs.ACLPolicyUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-08-08 04:01:14 +00:00
}
if err := n.state.UpsertACLPolicies(msgType, index, req.Policies); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertACLPolicies failed", "error", err)
2017-08-08 04:01:14 +00:00
return err
}
return nil
}
// applyACLPolicyDelete is used to delete a set of policies
func (n *nomadFSM) applyACLPolicyDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_delete"}, time.Now())
var req structs.ACLPolicyDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteACLPolicies(msgType, index, req.Names); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("DeleteACLPolicies failed", "error", err)
return err
}
return nil
}
2017-08-12 22:44:05 +00:00
// applyACLTokenUpsert is used to upsert a set of policies
func (n *nomadFSM) applyACLTokenUpsert(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2017-08-12 22:44:05 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_upsert"}, time.Now())
var req structs.ACLTokenUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-08-12 22:44:05 +00:00
}
if err := n.state.UpsertACLTokens(msgType, index, req.Tokens); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("UpsertACLTokens failed", "error", err)
2017-08-12 22:44:05 +00:00
return err
}
return nil
}
// applyACLTokenDelete is used to delete a set of policies
func (n *nomadFSM) applyACLTokenDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2017-08-12 22:44:05 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_delete"}, time.Now())
var req structs.ACLTokenDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-08-12 22:44:05 +00:00
}
if err := n.state.DeleteACLTokens(msgType, index, req.AccessorIDs); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("DeleteACLTokens failed", "error", err)
2017-08-12 22:44:05 +00:00
return err
}
return nil
}
2017-08-21 01:19:26 +00:00
// applyACLTokenBootstrap is used to bootstrap an ACL token
func (n *nomadFSM) applyACLTokenBootstrap(msgType structs.MessageType, buf []byte, index uint64) interface{} {
2017-08-21 01:19:26 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_bootstrap"}, time.Now())
var req structs.ACLTokenBootstrapRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
2017-08-21 01:19:26 +00:00
}
if err := n.state.BootstrapACLTokens(msgType, index, req.ResetIndex, req.Token); err != nil {
2018-09-15 23:23:13 +00:00
n.logger.Error("BootstrapACLToken failed", "error", err)
2017-08-21 01:19:26 +00:00
return err
}
return nil
}
// applyOneTimeTokenUpsert is used to upsert a one-time token
func (n *nomadFSM) applyOneTimeTokenUpsert(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_one_time_token_upsert"}, time.Now())
var req structs.OneTimeToken
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertOneTimeToken(msgType, index, &req); err != nil {
n.logger.Error("UpsertOneTimeToken failed", "error", err)
return err
}
return nil
}
// applyOneTimeTokenDelete is used to delete a set of one-time tokens
func (n *nomadFSM) applyOneTimeTokenDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_one_time_token_delete"}, time.Now())
var req structs.OneTimeTokenDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteOneTimeTokens(msgType, index, req.AccessorIDs); err != nil {
n.logger.Error("DeleteOneTimeTokens failed", "error", err)
return err
}
return nil
}
// applyOneTimeTokenExpire is used to delete a set of one-time tokens
func (n *nomadFSM) applyOneTimeTokenExpire(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_one_time_token_expire"}, time.Now())
var req structs.OneTimeTokenExpireRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.ExpireOneTimeTokens(msgType, index); err != nil {
n.logger.Error("ExpireOneTimeTokens failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
var req structs.AutopilotSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
2018-09-15 23:42:38 +00:00
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "autopilot"}, time.Now())
if req.CAS {
act, err := n.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
}
return act
}
return n.state.AutopilotSetConfig(index, &req.Config)
}
func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} {
var req structs.SchedulerSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
2018-10-18 03:29:29 +00:00
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_scheduler_config"}, time.Now())
req.Config.Canonicalize()
if req.CAS {
2018-10-18 03:29:29 +00:00
applied, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
}
2018-10-18 03:29:29 +00:00
return applied
}
return n.state.SchedulerSetConfig(index, &req.Config)
}
func (n *nomadFSM) applyCSIVolumeRegister(buf []byte, index uint64) interface{} {
var req structs.CSIVolumeRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_register"}, time.Now())
if err := n.state.UpsertCSIVolume(index, req.Volumes); err != nil {
n.logger.Error("CSIVolumeRegister failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyCSIVolumeDeregister(buf []byte, index uint64) interface{} {
var req structs.CSIVolumeDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_deregister"}, time.Now())
if err := n.state.CSIVolumeDeregister(index, req.RequestNamespace(), req.VolumeIDs, req.Force); err != nil {
n.logger.Error("CSIVolumeDeregister failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyCSIVolumeBatchClaim(buf []byte, index uint64) interface{} {
var batch *structs.CSIVolumeClaimBatchRequest
if err := structs.Decode(buf, &batch); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_batch_claim"}, time.Now())
for _, req := range batch.Claims {
err := n.state.CSIVolumeClaim(index, req.RequestNamespace(),
req.VolumeID, req.ToClaim())
if err != nil {
n.logger.Error("CSIVolumeClaim for batch failed", "error", err)
return err // note: fails the remaining batch
}
}
return nil
}
func (n *nomadFSM) applyCSIVolumeClaim(buf []byte, index uint64) interface{} {
var req structs.CSIVolumeClaimRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_claim"}, time.Now())
if err := n.state.CSIVolumeClaim(index, req.RequestNamespace(), req.VolumeID, req.ToClaim()); err != nil {
n.logger.Error("CSIVolumeClaim failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyCSIPluginDelete(buf []byte, index uint64) interface{} {
var req structs.CSIPluginDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_plugin_delete"}, time.Now())
if err := n.state.DeleteCSIPlugin(index, req.ID); err != nil {
// "plugin in use" is an error for the state store but not for typical
// callers, so reduce log noise by not logging that case here
if err.Error() != "plugin in use" {
n.logger.Error("DeleteCSIPlugin failed", "error", err)
}
return err
}
return nil
}
2020-10-21 04:16:25 +00:00
// applyNamespaceUpsert is used to upsert a set of namespaces
func (n *nomadFSM) applyNamespaceUpsert(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_upsert"}, time.Now())
var req structs.NamespaceUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
var trigger []string
for _, ns := range req.Namespaces {
old, err := n.state.NamespaceByName(nil, ns.Name)
if err != nil {
n.logger.Error("namespace lookup failed", "error", err)
return err
}
// If we are changing the quota on a namespace trigger evals for the
// older quota.
if old != nil && old.Quota != "" && old.Quota != ns.Quota {
trigger = append(trigger, old.Quota)
}
}
if err := n.state.UpsertNamespaces(index, req.Namespaces); err != nil {
n.logger.Error("UpsertNamespaces failed", "error", err)
return err
}
// Send the unblocks
for _, quota := range trigger {
n.blockedEvals.UnblockQuota(quota, index)
}
return nil
}
// applyNamespaceDelete is used to delete a set of namespaces
func (n *nomadFSM) applyNamespaceDelete(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_delete"}, time.Now())
var req structs.NamespaceDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteNamespaces(index, req.Namespaces); err != nil {
n.logger.Error("DeleteNamespaces failed", "error", err)
}
return nil
}
2015-06-01 15:49:10 +00:00
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()
if err != nil {
return nil, err
}
2015-08-16 00:38:13 +00:00
ns := &nomadSnapshot{
snap: snap,
timetable: n.timetable,
2015-08-16 00:38:13 +00:00
}
return ns, nil
2015-06-01 15:49:10 +00:00
}
func (n *nomadFSM) Restore(old io.ReadCloser) error {
defer old.Close()
// Create a new state store
2017-10-13 21:36:02 +00:00
config := &state.StateStoreConfig{
Logger: n.config.Logger,
Region: n.config.Region,
EnablePublisher: n.config.EnableEventBroker,
EventBufferSize: n.config.EventBufferSize,
2017-10-13 21:36:02 +00:00
}
newState, err := state.NewStateStore(config)
if err != nil {
return err
}
// Start the state restore
restore, err := newState.Restore()
if err != nil {
return err
}
defer restore.Abort()
// Create a decoder
dec := codec.NewDecoder(old, structs.MsgpackHandle)
// Read in the header
var header snapshotHeader
if err := dec.Decode(&header); err != nil {
return err
}
// Populate the new state
msgType := make([]byte, 1)
for {
// Read the message type
_, err := old.Read(msgType)
if err == io.EOF {
break
} else if err != nil {
return err
}
// Decode
2017-09-07 23:56:15 +00:00
snapType := SnapshotType(msgType[0])
switch snapType {
2015-08-16 00:38:13 +00:00
case TimeTableSnapshot:
if err := n.timetable.Deserialize(dec); err != nil {
2018-09-15 23:42:38 +00:00
return fmt.Errorf("time table deserialize failed: %v", err)
2015-08-16 00:38:13 +00:00
}
case NodeSnapshot:
node := new(structs.Node)
if err := dec.Decode(node); err != nil {
return err
}
// Handle upgrade paths
node.Canonicalize()
if err := restore.NodeRestore(node); err != nil {
return err
}
2015-07-07 16:55:47 +00:00
case JobSnapshot:
job := new(structs.Job)
if err := dec.Decode(job); err != nil {
return err
}
2016-07-18 23:17:38 +00:00
/* Handle upgrade paths:
* - Empty maps and slices should be treated as nil to avoid
* un-intended destructive updates in scheduler since we use
2018-03-11 18:52:59 +00:00
* reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanitizes
* the incoming job.
* - Migrate from old style upgrade stanza that used only a stagger.
*/
2016-07-20 23:07:15 +00:00
job.Canonicalize()
2016-07-18 23:17:38 +00:00
2015-07-07 16:55:47 +00:00
if err := restore.JobRestore(job); err != nil {
return err
}
case EvalSnapshot:
eval := new(structs.Evaluation)
if err := dec.Decode(eval); err != nil {
return err
}
2017-09-07 23:56:15 +00:00
if err := restore.EvalRestore(eval); err != nil {
return err
}
2015-08-04 21:04:26 +00:00
case AllocSnapshot:
alloc := new(structs.Allocation)
if err := dec.Decode(alloc); err != nil {
return err
}
2017-09-07 23:56:15 +00:00
// Handle upgrade path
alloc.Canonicalize()
2015-08-04 21:04:26 +00:00
if err := restore.AllocRestore(alloc); err != nil {
return err
}
case IndexSnapshot:
idx := new(state.IndexEntry)
if err := dec.Decode(idx); err != nil {
return err
}
if err := restore.IndexRestore(idx); err != nil {
return err
}
2015-12-07 23:58:17 +00:00
case PeriodicLaunchSnapshot:
launch := new(structs.PeriodicLaunch)
if err := dec.Decode(launch); err != nil {
return err
}
2017-09-07 23:56:15 +00:00
2015-12-07 23:58:17 +00:00
if err := restore.PeriodicLaunchRestore(launch); err != nil {
return err
}
case JobSummarySnapshot:
summary := new(structs.JobSummary)
if err := dec.Decode(summary); err != nil {
return err
}
2017-09-07 23:56:15 +00:00
if err := restore.JobSummaryRestore(summary); err != nil {
return err
}
case VaultAccessorSnapshot:
accessor := new(structs.VaultAccessor)
if err := dec.Decode(accessor); err != nil {
return err
}
if err := restore.VaultAccessorRestore(accessor); err != nil {
return err
}
case ServiceIdentityTokenAccessorSnapshot:
accessor := new(structs.SITokenAccessor)
if err := dec.Decode(accessor); err != nil {
return err
}
if err := restore.SITokenAccessorRestore(accessor); err != nil {
return err
}
case JobVersionSnapshot:
version := new(structs.Job)
if err := dec.Decode(version); err != nil {
return err
}
2017-09-07 23:56:15 +00:00
if err := restore.JobVersionRestore(version); err != nil {
return err
}
case DeploymentSnapshot:
deployment := new(structs.Deployment)
if err := dec.Decode(deployment); err != nil {
return err
}
2017-09-07 23:56:15 +00:00
if err := restore.DeploymentRestore(deployment); err != nil {
return err
}
case ACLPolicySnapshot:
policy := new(structs.ACLPolicy)
if err := dec.Decode(policy); err != nil {
return err
}
if err := restore.ACLPolicyRestore(policy); err != nil {
return err
}
case ACLTokenSnapshot:
token := new(structs.ACLToken)
if err := dec.Decode(token); err != nil {
return err
}
if err := restore.ACLTokenRestore(token); err != nil {
return err
}
case SchedulerConfigSnapshot:
schedConfig := new(structs.SchedulerConfiguration)
if err := dec.Decode(schedConfig); err != nil {
return err
}
schedConfig.Canonicalize()
if err := restore.SchedulerConfigRestore(schedConfig); err != nil {
return err
}
case ClusterMetadataSnapshot:
meta := new(structs.ClusterMetadata)
if err := dec.Decode(meta); err != nil {
return err
}
if err := restore.ClusterMetadataRestore(meta); err != nil {
return err
}
case ScalingEventsSnapshot:
jobScalingEvents := new(structs.JobScalingEvents)
if err := dec.Decode(jobScalingEvents); err != nil {
return err
}
if err := restore.ScalingEventsRestore(jobScalingEvents); err != nil {
return err
}
case ScalingPolicySnapshot:
scalingPolicy := new(structs.ScalingPolicy)
if err := dec.Decode(scalingPolicy); err != nil {
return err
}
2020-09-29 21:57:46 +00:00
// Handle upgrade path:
// - Set policy type if empty
scalingPolicy.Canonicalize()
if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil {
return err
}
case CSIPluginSnapshot:
plugin := new(structs.CSIPlugin)
if err := dec.Decode(plugin); err != nil {
return err
}
if err := restore.CSIPluginRestore(plugin); err != nil {
return err
}
case CSIVolumeSnapshot:
plugin := new(structs.CSIVolume)
if err := dec.Decode(plugin); err != nil {
return err
}
if err := restore.CSIVolumeRestore(plugin); err != nil {
return err
}
2020-10-21 04:16:25 +00:00
case NamespaceSnapshot:
namespace := new(structs.Namespace)
if err := dec.Decode(namespace); err != nil {
return err
}
if err := restore.NamespaceRestore(namespace); err != nil {
return err
}
// COMPAT(1.0): Allow 1.0-beta clusterers to gracefully handle
case EventSinkSnapshot:
return nil
case ServiceRegistrationSnapshot:
// Create a new ServiceRegistration object, so we can decode the
// message into it.
serviceRegistration := new(structs.ServiceRegistration)
if err := dec.Decode(serviceRegistration); err != nil {
return err
}
// Perform the restoration.
if err := restore.ServiceRegistrationRestore(serviceRegistration); err != nil {
return err
}
default:
2017-09-07 23:56:15 +00:00
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
if !ok {
return fmt.Errorf("Unrecognized snapshot type: %v", msgType)
}
// Restore the enterprise only object
if err := restorer(restore, dec); err != nil {
return err
}
}
}
if err := restore.Commit(); err != nil {
return err
}
// COMPAT Remove in 0.10
// Clean up active deployments that do not have a job
if err := n.failLeakedDeployments(newState); err != nil {
return err
}
// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
n.stateLock.Lock()
stateOld := n.state
n.state = newState
n.stateLock.Unlock()
// Signal that the old state store has been abandoned. This is required
// because we don't operate on it any more, we just throw it away, so
// blocking queries won't see any changes and need to be woken up.
stateOld.Abandon()
return nil
}
// failLeakedDeployments is used to fail deployments that do not have a job.
// This state is a broken invariant that should not occur since 0.8.X.
func (n *nomadFSM) failLeakedDeployments(store *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := store.Deployments(nil, state.SortDefault)
if err != nil {
2018-09-15 23:42:38 +00:00
return fmt.Errorf("failed to query deployments: %v", err)
}
dindex, err := store.Index("deployment")
if err != nil {
2018-09-15 23:42:38 +00:00
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}
for {
raw := iter.Next()
if raw == nil {
break
}
d := raw.(*structs.Deployment)
// We are only looking for active deployments where the job no longer
// exists
if !d.Active() {
continue
}
// Find the job
job, err := store.JobByID(nil, d.Namespace, d.JobID)
if err != nil {
return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err)
}
// Job exists.
if job != nil {
continue
}
// Update the deployment to be terminal
failed := d.Copy()
failed.Status = structs.DeploymentStatusCancelled
failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob
if err := store.UpsertDeployment(dindex, failed); err != nil {
return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err)
}
}
return nil
}
// reconcileQueuedAllocations re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Get all the jobs
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
iter, err := n.state.Jobs(ws)
if err != nil {
return err
}
snap, err := n.state.Snapshot()
if err != nil {
2018-09-15 23:42:38 +00:00
return fmt.Errorf("unable to create snapshot: %v", err)
}
2016-08-04 01:08:37 +00:00
// Invoking the scheduler for every job so that we can populate the number
// of queued allocations for every job
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
// Nothing to do for queued allocations if the job is a parent periodic/parameterized job
2019-01-17 18:15:42 +00:00
if job.IsParameterized() || job.IsPeriodic() {
continue
}
planner := &scheduler.Harness{
State: &snap.StateStore,
}
// Create an eval and mark it as requiring annotations and insert that as well
eval := &structs.Evaluation{
ID: uuid.Generate(),
2017-09-07 23:56:15 +00:00
Namespace: job.Namespace,
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
JobModifyIndex: job.JobModifyIndex + 1,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}
// Ignore eval event creation during snapshot restore
snap.UpsertEvals(structs.IgnoreUnknownTypeFlag, 100, []*structs.Evaluation{eval})
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, nil, snap, planner)
if err != nil {
return err
}
if err := sched.Process(eval); err != nil {
return err
}
// Get the job summary from the fsm state store
2017-09-07 23:56:15 +00:00
originalSummary, err := n.state.JobSummaryByID(ws, job.Namespace, job.ID)
if err != nil {
return err
}
2017-01-11 21:18:36 +00:00
summary := originalSummary.Copy()
// Add the allocations scheduler has made to queued since these
// allocations are never getting placed until the scheduler is invoked
// with a real planner
if l := len(planner.Plans); l != 1 {
return fmt.Errorf("unexpected number of plans during restore %d. Please file an issue including the logs", l)
}
for _, allocations := range planner.Plans[0].NodeAllocation {
for _, allocation := range allocations {
tgSummary, ok := summary.Summary[allocation.TaskGroup]
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", allocation.TaskGroup)
}
tgSummary.Queued += 1
summary.Summary[allocation.TaskGroup] = tgSummary
}
}
// Add the queued allocations attached to the evaluation to the queued
// counter of the job summary
if l := len(planner.Evals); l != 1 {
return fmt.Errorf("unexpected number of evals during restore %d. Please file an issue including the logs", l)
}
for tg, queued := range planner.Evals[0].QueuedAllocations {
tgSummary, ok := summary.Summary[tg]
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", tg)
}
// We add instead of setting here because we want to take into
// consideration what the scheduler with a mock planner thinks it
// placed. Those should be counted as queued as well
tgSummary.Queued += queued
summary.Summary[tg] = tgSummary
}
2017-01-11 21:18:36 +00:00
if !reflect.DeepEqual(summary, originalSummary) {
summary.ModifyIndex = index
if err := n.state.UpsertJobSummary(index, summary); err != nil {
return err
}
}
}
2015-06-01 15:49:10 +00:00
return nil
}
func (n *nomadFSM) applyUpsertScalingEvent(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_scaling_event"}, time.Now())
var req structs.ScalingEventRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertScalingEvent(index, &req); err != nil {
n.logger.Error("UpsertScalingEvent failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyUpsertServiceRegistrations(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_upsert"}, time.Now())
var req structs.ServiceRegistrationUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertServiceRegistrations(msgType, index, req.Services); err != nil {
n.logger.Error("UpsertServiceRegistrations failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeleteServiceRegistrationByID(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_id"}, time.Now())
var req structs.ServiceRegistrationDeleteByIDRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteServiceRegistrationByID(msgType, index, req.RequestNamespace(), req.ID); err != nil {
n.logger.Error("DeleteServiceRegistrationByID failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeleteServiceRegistrationByNodeID(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_node_id"}, time.Now())
var req structs.ServiceRegistrationDeleteByNodeIDRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteServiceRegistrationByNodeID(msgType, index, req.NodeID); err != nil {
n.logger.Error("DeleteServiceRegistrationByNodeID failed", "error", err)
return err
}
return nil
}
2015-06-01 15:49:10 +00:00
func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
// Write the header
header := snapshotHeader{}
if err := encoder.Encode(&header); err != nil {
sink.Cancel()
return err
}
2015-08-16 00:38:13 +00:00
// Write the time table
sink.Write([]byte{byte(TimeTableSnapshot)})
if err := s.timetable.Serialize(encoder); err != nil {
sink.Cancel()
return err
}
// Write all the data out
if err := s.persistIndexes(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistNodes(sink, encoder); err != nil {
sink.Cancel()
return err
}
2015-07-07 16:55:47 +00:00
if err := s.persistJobs(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistEvals(sink, encoder); err != nil {
sink.Cancel()
return err
}
2015-08-04 21:04:26 +00:00
if err := s.persistAllocs(sink, encoder); err != nil {
sink.Cancel()
return err
}
2015-12-07 23:58:17 +00:00
if err := s.persistPeriodicLaunches(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistJobSummaries(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistVaultAccessors(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistSITokenAccessors(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistJobVersions(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistDeployments(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistScalingPolicies(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistScalingEvents(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistCSIPlugins(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistCSIVolumes(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistACLPolicies(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistACLTokens(sink, encoder); err != nil {
sink.Cancel()
return err
}
2020-10-21 04:16:25 +00:00
if err := s.persistNamespaces(sink, encoder); err != nil {
sink.Cancel()
2020-10-21 04:16:25 +00:00
return err
}
2017-09-07 23:56:15 +00:00
if err := s.persistEnterpriseTables(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistSchedulerConfig(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistClusterMetadata(sink, encoder); err != nil {
sink.Cancel()
return err
}
2015-06-01 15:49:10 +00:00
return nil
}
func (s *nomadSnapshot) persistIndexes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the indexes
iter, err := s.snap.Indexes()
if err != nil {
return err
}
for {
// Get the next item
raw := iter.Next()
if raw == nil {
break
}
// Prepare the request struct
idx := raw.(*state.IndexEntry)
// Write out a node registration
sink.Write([]byte{byte(IndexSnapshot)})
if err := encoder.Encode(idx); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the nodes
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
nodes, err := s.snap.Nodes(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := nodes.Next()
if raw == nil {
break
}
// Prepare the request struct
node := raw.(*structs.Node)
// Write out a node registration
sink.Write([]byte{byte(NodeSnapshot)})
if err := encoder.Encode(node); err != nil {
return err
}
}
return nil
2015-06-01 15:49:10 +00:00
}
2015-07-07 16:55:47 +00:00
func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
jobs, err := s.snap.Jobs(ws)
2015-07-07 16:55:47 +00:00
if err != nil {
return err
}
for {
// Get the next item
raw := jobs.Next()
if raw == nil {
break
}
// Prepare the request struct
job := raw.(*structs.Job)
// Write out a job registration
sink.Write([]byte{byte(JobSnapshot)})
if err := encoder.Encode(job); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the evaluations
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
evals, err := s.snap.Evals(ws, false)
if err != nil {
return err
}
for {
// Get the next item
raw := evals.Next()
if raw == nil {
break
}
// Prepare the request struct
eval := raw.(*structs.Evaluation)
// Write out the evaluation
sink.Write([]byte{byte(EvalSnapshot)})
if err := encoder.Encode(eval); err != nil {
return err
}
}
return nil
}
2015-08-04 21:04:26 +00:00
func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the allocations
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
allocs, err := s.snap.Allocs(ws, state.SortDefault)
2015-08-04 21:04:26 +00:00
if err != nil {
return err
}
for {
// Get the next item
raw := allocs.Next()
if raw == nil {
break
}
// Prepare the request struct
alloc := raw.(*structs.Allocation)
// Write out the evaluation
sink.Write([]byte{byte(AllocSnapshot)})
if err := encoder.Encode(alloc); err != nil {
return err
}
}
return nil
}
2015-12-07 23:58:17 +00:00
func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
launches, err := s.snap.PeriodicLaunches(ws)
2015-12-07 23:58:17 +00:00
if err != nil {
return err
}
for {
// Get the next item
raw := launches.Next()
if raw == nil {
break
}
// Prepare the request struct
launch := raw.(*structs.PeriodicLaunch)
// Write out a job registration
sink.Write([]byte{byte(PeriodicLaunchSnapshot)})
if err := encoder.Encode(launch); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
summaries, err := s.snap.JobSummaries(ws)
if err != nil {
return err
}
for {
raw := summaries.Next()
if raw == nil {
break
}
jobSummary := raw.(*structs.JobSummary)
sink.Write([]byte{byte(JobSummarySnapshot)})
if err := encoder.Encode(jobSummary); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistVaultAccessors(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
accessors, err := s.snap.VaultAccessors(ws)
if err != nil {
return err
}
for {
raw := accessors.Next()
if raw == nil {
break
}
accessor := raw.(*structs.VaultAccessor)
sink.Write([]byte{byte(VaultAccessorSnapshot)})
if err := encoder.Encode(accessor); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistSITokenAccessors(sink raft.SnapshotSink, encoder *codec.Encoder) error {
ws := memdb.NewWatchSet()
accessors, err := s.snap.SITokenAccessors(ws)
if err != nil {
return err
}
for raw := accessors.Next(); raw != nil; raw = accessors.Next() {
accessor := raw.(*structs.SITokenAccessor)
sink.Write([]byte{byte(ServiceIdentityTokenAccessorSnapshot)})
if err := encoder.Encode(accessor); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistJobVersions(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
versions, err := s.snap.JobVersions(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := versions.Next()
if raw == nil {
break
}
// Prepare the request struct
job := raw.(*structs.Job)
// Write out a job registration
sink.Write([]byte{byte(JobVersionSnapshot)})
if err := encoder.Encode(job); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws, state.SortDefault)
if err != nil {
return err
}
for {
// Get the next item
raw := deployments.Next()
if raw == nil {
break
}
// Prepare the request struct
deployment := raw.(*structs.Deployment)
// Write out a job registration
sink.Write([]byte{byte(DeploymentSnapshot)})
if err := encoder.Encode(deployment); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistACLPolicies(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
2017-08-08 04:09:13 +00:00
// Get all the policies
ws := memdb.NewWatchSet()
policies, err := s.snap.ACLPolicies(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := policies.Next()
if raw == nil {
break
}
// Prepare the request struct
policy := raw.(*structs.ACLPolicy)
2017-08-08 04:09:13 +00:00
// Write out a policy registration
sink.Write([]byte{byte(ACLPolicySnapshot)})
if err := encoder.Encode(policy); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the policies
ws := memdb.NewWatchSet()
tokens, err := s.snap.ACLTokens(ws, state.SortDefault)
if err != nil {
return err
}
for {
// Get the next item
raw := tokens.Next()
if raw == nil {
break
}
// Prepare the request struct
token := raw.(*structs.ACLToken)
// Write out a token registration
sink.Write([]byte{byte(ACLTokenSnapshot)})
if err := encoder.Encode(token); err != nil {
return err
}
}
return nil
}
2020-10-21 04:16:25 +00:00
// persistNamespaces persists all the namespaces.
func (s *nomadSnapshot) persistNamespaces(sink raft.SnapshotSink, encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
namespaces, err := s.snap.Namespaces(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := namespaces.Next()
if raw == nil {
break
}
// Prepare the request struct
namespace := raw.(*structs.Namespace)
// Write out a namespace registration
sink.Write([]byte{byte(NamespaceSnapshot)})
if err := encoder.Encode(namespace); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get scheduler config
_, schedConfig, err := s.snap.SchedulerConfig()
if err != nil {
return err
}
if schedConfig == nil {
return nil
}
// Write out scheduler config
sink.Write([]byte{byte(SchedulerConfigSnapshot)})
if err := encoder.Encode(schedConfig); err != nil {
return err
}
return nil
}
func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get the cluster metadata
ws := memdb.NewWatchSet()
clusterMetadata, err := s.snap.ClusterMetadata(ws)
if err != nil {
return err
}
if clusterMetadata == nil {
return nil
}
// Write out the cluster metadata
sink.Write([]byte{byte(ClusterMetadataSnapshot)})
if err := encoder.Encode(clusterMetadata); err != nil {
return err
}
return nil
}
func (s *nomadSnapshot) persistScalingPolicies(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the scaling policies
ws := memdb.NewWatchSet()
scalingPolicies, err := s.snap.ScalingPolicies(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := scalingPolicies.Next()
if raw == nil {
break
}
// Prepare the request struct
scalingPolicy := raw.(*structs.ScalingPolicy)
// Write out a scaling policy snapshot
sink.Write([]byte{byte(ScalingPolicySnapshot)})
if err := encoder.Encode(scalingPolicy); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistScalingEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error {
// Get all the scaling events
ws := memdb.NewWatchSet()
iter, err := s.snap.ScalingEvents(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := iter.Next()
if raw == nil {
break
}
// Prepare the request struct
events := raw.(*structs.JobScalingEvents)
// Write out a scaling events snapshot
sink.Write([]byte{byte(ScalingEventsSnapshot)})
if err := encoder.Encode(events); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistCSIPlugins(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the CSI plugins
ws := memdb.NewWatchSet()
plugins, err := s.snap.CSIPlugins(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := plugins.Next()
if raw == nil {
break
}
// Prepare the request struct
plugin := raw.(*structs.CSIPlugin)
// Write out a plugin snapshot
sink.Write([]byte{byte(CSIPluginSnapshot)})
if err := encoder.Encode(plugin); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the CSI volumes
ws := memdb.NewWatchSet()
volumes, err := s.snap.CSIVolumes(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := volumes.Next()
if raw == nil {
break
}
// Prepare the request struct
volume := raw.(*structs.CSIVolume)
// Write out a volume snapshot
sink.Write([]byte{byte(CSIVolumeSnapshot)})
if err := encoder.Encode(volume); err != nil {
return err
}
}
return nil
}
// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
func (s *nomadSnapshot) Release() {}