open-nomad/nomad/structs/structs.go

1588 lines
44 KiB
Go

package structs
import (
"bytes"
"errors"
"fmt"
"regexp"
"strings"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
)
var (
ErrNoLeader = fmt.Errorf("No cluster leader")
ErrNoRegionPath = fmt.Errorf("No path to region")
defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 2,
Interval: 1 * time.Minute,
}
defaultBatchJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 15,
}
)
type MessageType uint8
const (
NodeRegisterRequestType MessageType = iota
NodeDeregisterRequestType
NodeUpdateStatusRequestType
NodeUpdateDrainRequestType
JobRegisterRequestType
JobDeregisterRequestType
EvalUpdateRequestType
EvalDeleteRequestType
AllocUpdateRequestType
AllocClientUpdateRequestType
)
const (
// IgnoreUnknownTypeFlag is set along with a MessageType
// to indicate that the message type can be safely ignored
// if it is not recognized. This is for future proofing, so
// that new commands can be added in a way that won't cause
// old servers to crash when the FSM attempts to process them.
IgnoreUnknownTypeFlag MessageType = 128
)
// RPCInfo is used to describe common information about query
type RPCInfo interface {
RequestRegion() string
IsRead() bool
AllowStaleRead() bool
}
// QueryOptions is used to specify various flags for read queries
type QueryOptions struct {
// The target region for this query
Region string
// If set, wait until query exceeds given index. Must be provided
// with MaxQueryTime.
MinQueryIndex uint64
// Provided with MinQueryIndex to wait for change.
MaxQueryTime time.Duration
// If set, any follower can service the request. Results
// may be arbitrarily stale.
AllowStale bool
}
func (q QueryOptions) RequestRegion() string {
return q.Region
}
// QueryOption only applies to reads, so always true
func (q QueryOptions) IsRead() bool {
return true
}
func (q QueryOptions) AllowStaleRead() bool {
return q.AllowStale
}
type WriteRequest struct {
// The target region for this write
Region string
}
func (w WriteRequest) RequestRegion() string {
// The target region for this request
return w.Region
}
// WriteRequest only applies to writes, always false
func (w WriteRequest) IsRead() bool {
return false
}
func (w WriteRequest) AllowStaleRead() bool {
return false
}
// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
// This is the index associated with the read
Index uint64
// If AllowStale is used, this is time elapsed since
// last contact between the follower and leader. This
// can be used to gauge staleness.
LastContact time.Duration
// Used to indicate if there is a known leader node
KnownLeader bool
}
// WriteMeta allows a write response to includ e potentially
// useful metadata about the write
type WriteMeta struct {
// This is the index associated with the write
Index uint64
}
// NodeRegisterRequest is used for Node.Register endpoint
// to register a node as being a schedulable entity.
type NodeRegisterRequest struct {
Node *Node
WriteRequest
}
// NodeDeregisterRequest is used for Node.Deregister endpoint
// to deregister a node as being a schedulable entity.
type NodeDeregisterRequest struct {
NodeID string
WriteRequest
}
// NodeUpdateStatusRequest is used for Node.UpdateStatus endpoint
// to update the status of a node.
type NodeUpdateStatusRequest struct {
NodeID string
Status string
WriteRequest
}
// NodeUpdateDrainRequest is used for updatin the drain status
type NodeUpdateDrainRequest struct {
NodeID string
Drain bool
WriteRequest
}
// NodeEvaluateRequest is used to re-evaluate the ndoe
type NodeEvaluateRequest struct {
NodeID string
WriteRequest
}
// NodeSpecificRequest is used when we just need to specify a target node
type NodeSpecificRequest struct {
NodeID string
QueryOptions
}
// JobRegisterRequest is used for Job.Register endpoint
// to register a job as being a schedulable entity.
type JobRegisterRequest struct {
Job *Job
WriteRequest
}
// JobDeregisterRequest is used for Job.Deregister endpoint
// to deregister a job as being a schedulable entity.
type JobDeregisterRequest struct {
JobID string
WriteRequest
}
// JobEvaluateRequest is used when we just need to re-evaluate a target job
type JobEvaluateRequest struct {
JobID string
WriteRequest
}
// JobSpecificRequest is used when we just need to specify a target job
type JobSpecificRequest struct {
JobID string
QueryOptions
}
// JobListRequest is used to parameterize a list request
type JobListRequest struct {
QueryOptions
}
// NodeListRequest is used to parameterize a list request
type NodeListRequest struct {
QueryOptions
}
// EvalUpdateRequest is used for upserting evaluations.
type EvalUpdateRequest struct {
Evals []*Evaluation
EvalToken string
WriteRequest
}
// EvalDeleteRequest is used for deleting an evaluation.
type EvalDeleteRequest struct {
Evals []string
Allocs []string
WriteRequest
}
// EvalSpecificRequest is used when we just need to specify a target evaluation
type EvalSpecificRequest struct {
EvalID string
QueryOptions
}
// EvalAckRequest is used to Ack/Nack a specific evaluation
type EvalAckRequest struct {
EvalID string
Token string
WriteRequest
}
// EvalDequeueRequest is used when we want to dequeue an evaluation
type EvalDequeueRequest struct {
Schedulers []string
Timeout time.Duration
WriteRequest
}
// EvalListRequest is used to list the evaluations
type EvalListRequest struct {
QueryOptions
}
// PlanRequest is used to submit an allocation plan to the leader
type PlanRequest struct {
Plan *Plan
WriteRequest
}
// AllocUpdateRequest is used to submit changes to allocations, either
// to cause evictions or to assign new allocaitons. Both can be done
// within a single transaction
type AllocUpdateRequest struct {
// Alloc is the list of new allocations to assign
Alloc []*Allocation
WriteRequest
}
// AllocListRequest is used to request a list of allocations
type AllocListRequest struct {
QueryOptions
}
// AllocSpecificRequest is used to query a specific allocation
type AllocSpecificRequest struct {
AllocID string
QueryOptions
}
// GenericRequest is used to request where no
// specific information is needed.
type GenericRequest struct {
QueryOptions
}
// GenericResponse is used to respond to a request where no
// specific response information is needed.
type GenericResponse struct {
WriteMeta
}
const (
ProtocolVersion = "protocol"
APIMajorVersion = "api.major"
APIMinorVersion = "api.minor"
)
// VersionResponse is used for the Status.Version reseponse
type VersionResponse struct {
Build string
Versions map[string]int
QueryMeta
}
// JobRegisterResponse is used to respond to a job registration
type JobRegisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}
// JobDeregisterResponse is used to respond to a job deregistration
type JobDeregisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}
// NodeUpdateResponse is used to respond to a node update
type NodeUpdateResponse struct {
HeartbeatTTL time.Duration
EvalIDs []string
EvalCreateIndex uint64
NodeModifyIndex uint64
QueryMeta
}
// NodeDrainUpdateResponse is used to respond to a node drain update
type NodeDrainUpdateResponse struct {
EvalIDs []string
EvalCreateIndex uint64
NodeModifyIndex uint64
QueryMeta
}
// NodeAllocsResponse is used to return allocs for a single node
type NodeAllocsResponse struct {
Allocs []*Allocation
QueryMeta
}
// SingleNodeResponse is used to return a single node
type SingleNodeResponse struct {
Node *Node
QueryMeta
}
// JobListResponse is used for a list request
type NodeListResponse struct {
Nodes []*NodeListStub
QueryMeta
}
// SingleJobResponse is used to return a single job
type SingleJobResponse struct {
Job *Job
QueryMeta
}
// JobListResponse is used for a list request
type JobListResponse struct {
Jobs []*JobListStub
QueryMeta
}
// SingleAllocResponse is used to return a single allocation
type SingleAllocResponse struct {
Alloc *Allocation
QueryMeta
}
// JobAllocationsResponse is used to return the allocations for a job
type JobAllocationsResponse struct {
Allocations []*AllocListStub
QueryMeta
}
// JobEvaluationsResponse is used to return the evaluations for a job
type JobEvaluationsResponse struct {
Evaluations []*Evaluation
QueryMeta
}
// SingleEvalResponse is used to return a single evaluation
type SingleEvalResponse struct {
Eval *Evaluation
QueryMeta
}
// EvalDequeueResponse is used to return from a dequeue
type EvalDequeueResponse struct {
Eval *Evaluation
Token string
QueryMeta
}
// PlanResponse is used to return from a PlanRequest
type PlanResponse struct {
Result *PlanResult
WriteMeta
}
// AllocListResponse is used for a list request
type AllocListResponse struct {
Allocations []*AllocListStub
QueryMeta
}
// EvalListResponse is used for a list request
type EvalListResponse struct {
Evaluations []*Evaluation
QueryMeta
}
// EvalAllocationsResponse is used to return the allocations for an evaluation
type EvalAllocationsResponse struct {
Allocations []*AllocListStub
QueryMeta
}
const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
NodeStatusDown = "down"
)
// ShouldDrainNode checks if a given node status should trigger an
// evaluation. Some states don't require any further action.
func ShouldDrainNode(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady:
return false
case NodeStatusDown:
return true
default:
panic(fmt.Sprintf("unhandled node status %s", status))
}
}
// ValidNodeStatus is used to check if a node status is valid
func ValidNodeStatus(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady, NodeStatusDown:
return true
default:
return false
}
}
// Node is a representation of a schedulable client node
type Node struct {
// ID is a unique identifier for the node. It can be constructed
// by doing a concatenation of the Name and Datacenter as a simple
// approach. Alternatively a UUID may be used.
ID string
// Datacenter for this node
Datacenter string
// Node name
Name string
// Attributes is an arbitrary set of key/value
// data that can be used for constraints. Examples
// include "kernel.name=linux", "arch=386", "driver.docker=1",
// "docker.runtime=1.8.3"
Attributes map[string]string
// Resources is the available resources on the client.
// For example 'cpu=2' 'memory=2048'
Resources *Resources
// Reserved is the set of resources that are reserved,
// and should be subtracted from the total resources for
// the purposes of scheduling. This may be provide certain
// high-watermark tolerances or because of external schedulers
// consuming resources.
Reserved *Resources
// Links are used to 'link' this client to external
// systems. For example 'consul=foo.dc1' 'aws=i-83212'
// 'ami=ami-123'
Links map[string]string
// Meta is used to associate arbitrary metadata with this
// client. This is opaque to Nomad.
Meta map[string]string
// NodeClass is an opaque identifier used to group nodes
// together for the purpose of determining scheduling pressure.
NodeClass string
// Drain is controlled by the servers, and not the client.
// If true, no jobs will be scheduled to this node, and existing
// allocations will be drained.
Drain bool
// Status of this node
Status string
// StatusDescription is meant to provide more human useful information
StatusDescription string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (n *Node) TerminalStatus() bool {
switch n.Status {
case NodeStatusDown:
return true
default:
return false
}
}
// Stub returns a summarized version of the node
func (n *Node) Stub() *NodeListStub {
return &NodeListStub{
ID: n.ID,
Datacenter: n.Datacenter,
Name: n.Name,
NodeClass: n.NodeClass,
Drain: n.Drain,
Status: n.Status,
StatusDescription: n.StatusDescription,
CreateIndex: n.CreateIndex,
ModifyIndex: n.ModifyIndex,
}
}
// NodeListStub is used to return a subset of job information
// for the job list
type NodeListStub struct {
ID string
Datacenter string
Name string
NodeClass string
Drain bool
Status string
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
}
// Resources is used to define the resources available
// on a client
type Resources struct {
CPU int
MemoryMB int `mapstructure:"memory"`
DiskMB int `mapstructure:"disk"`
IOPS int
Networks []*NetworkResource
}
// Copy returns a deep copy of the resources
func (r *Resources) Copy() *Resources {
newR := new(Resources)
*newR = *r
n := len(r.Networks)
newR.Networks = make([]*NetworkResource, n)
for i := 0; i < n; i++ {
newR.Networks[i] = r.Networks[i].Copy()
}
return newR
}
// NetIndex finds the matching net index using device name
func (r *Resources) NetIndex(n *NetworkResource) int {
for idx, net := range r.Networks {
if net.Device == n.Device {
return idx
}
}
return -1
}
// Superset checks if one set of resources is a superset
// of another. This ignores network resources, and the NetworkIndex
// should be used for that.
func (r *Resources) Superset(other *Resources) (bool, string) {
if r.CPU < other.CPU {
return false, "cpu exhausted"
}
if r.MemoryMB < other.MemoryMB {
return false, "memory exhausted"
}
if r.DiskMB < other.DiskMB {
return false, "disk exhausted"
}
if r.IOPS < other.IOPS {
return false, "iops exhausted"
}
return true, ""
}
// Add adds the resources of the delta to this, potentially
// returning an error if not possible.
func (r *Resources) Add(delta *Resources) error {
if delta == nil {
return nil
}
r.CPU += delta.CPU
r.MemoryMB += delta.MemoryMB
r.DiskMB += delta.DiskMB
r.IOPS += delta.IOPS
for _, n := range delta.Networks {
// Find the matching interface by IP or CIDR
idx := r.NetIndex(n)
if idx == -1 {
r.Networks = append(r.Networks, n.Copy())
} else {
r.Networks[idx].Add(n)
}
}
return nil
}
func (r *Resources) GoString() string {
return fmt.Sprintf("*%#v", *r)
}
// NetworkResource is used to represent available network
// resources
type NetworkResource struct {
Device string // Name of the device
CIDR string // CIDR block of addresses
IP string // IP address
MBits int // Throughput
ReservedPorts []int `mapstructure:"reserved_ports"` // Reserved ports
DynamicPorts []string `mapstructure:"dynamic_ports"` // Dynamically assigned ports
}
// Copy returns a deep copy of the network resource
func (n *NetworkResource) Copy() *NetworkResource {
newR := new(NetworkResource)
*newR = *n
if n.ReservedPorts != nil {
newR.ReservedPorts = make([]int, len(n.ReservedPorts))
copy(newR.ReservedPorts, n.ReservedPorts)
}
return newR
}
// Add adds the resources of the delta to this, potentially
// returning an error if not possible.
func (n *NetworkResource) Add(delta *NetworkResource) {
if len(delta.ReservedPorts) > 0 {
n.ReservedPorts = append(n.ReservedPorts, delta.ReservedPorts...)
}
n.MBits += delta.MBits
n.DynamicPorts = append(n.DynamicPorts, delta.DynamicPorts...)
}
func (n *NetworkResource) GoString() string {
return fmt.Sprintf("*%#v", *n)
}
// MapDynamicPorts returns a mapping of Label:PortNumber for dynamic ports
// allocated on this NetworkResource. The ordering of Label:Port pairs is
// random.
//
// Details:
//
// The jobspec lets us ask for two types of ports: Reserved ports and Dynamic
// ports. Reserved ports are identified by the port number, while Dynamic ports
// are identified by a Label.
//
// When we ask nomad to run a job it checks to see if the Reserved ports we
// requested are available. If they are, it then tries to provision any Dynamic
// ports that we have requested. When available ports are found to satisfy our
// dynamic port requirements, they are APPENDED to the reserved ports list. In
// effect, the reserved ports list serves double-duty. First it indicates the
// ports we *want*, and then it indicates the ports we are *using*.
//
// After the the offer process is complete and the job is scheduled we want to
// see which ports were made available to us. To see the dynamic ports that
// were allocated to us we look at the last N ports in our reservation, where N
// is how many dynamic ports we requested.
//
// MapDynamicPorts matches these port numbers with their labels and gives you
// the port mapping.
//
// Also, be aware that this is intended to be called in the context of
// task.Resources after an offer has been made. If you call it in some other
// context the behavior is unspecified, including maybe crashing. So don't do that.
func (n *NetworkResource) MapDynamicPorts() map[string]int {
ports := n.ReservedPorts[len(n.ReservedPorts)-len(n.DynamicPorts):]
mapping := make(map[string]int, len(n.DynamicPorts))
for idx, label := range n.DynamicPorts {
mapping[label] = ports[idx]
}
return mapping
}
// ListStaticPorts returns the list of Static ports allocated to this
// NetworkResource. These are presumed to have known semantics so there is no
// mapping information.
func (n *NetworkResource) ListStaticPorts() []int {
return n.ReservedPorts[:len(n.ReservedPorts)-len(n.DynamicPorts)]
}
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
)
const (
JobStatusPending = "pending" // Pending means the job is waiting on scheduling
JobStatusRunning = "running" // Running means the entire job is running
JobStatusComplete = "complete" // Complete means there was a clean termination
JobStatusDead = "dead" // Dead means there was abnormal termination
)
const (
// JobMinPriority is the minimum allowed priority
JobMinPriority = 1
// JobDefaultPriority is the default priority if not
// not specified.
JobDefaultPriority = 50
// JobMaxPriority is the maximum allowed priority
JobMaxPriority = 100
// Ensure CoreJobPriority is higher than any user
// specified job so that it gets priority. This is important
// for the system to remain healthy.
CoreJobPriority = JobMaxPriority * 2
)
// Job is the scope of a scheduling request to Nomad. It is the largest
// scoped object, and is a named collection of task groups. Each task group
// is further composed of tasks. A task group (TG) is the unit of scheduling
// however.
type Job struct {
// Region is the Nomad region that handles scheduling this job
Region string
// ID is a unique identifier for the job per region. It can be
// specified hierarchically like LineOfBiz/OrgName/Team/Project
ID string
// Name is the logical name of the job used to refer to it. This is unique
// per region, but not unique globally.
Name string
// Type is used to control various behaviors about the job. Most jobs
// are service jobs, meaning they are expected to be long lived.
// Some jobs are batch oriented meaning they run and then terminate.
// This can be extended in the future to support custom schedulers.
Type string
// Priority is used to control scheduling importance and if this job
// can preempt other jobs.
Priority int
// AllAtOnce is used to control if incremental scheduling of task groups
// is allowed or if we must do a gang scheduling of the entire job. This
// can slow down larger jobs if resources are not available.
AllAtOnce bool `mapstructure:"all_at_once"`
// Datacenters contains all the datacenters this job is allowed to span
Datacenters []string
// Constraints can be specified at a job level and apply to
// all the task groups and tasks.
Constraints []*Constraint
// TaskGroups are the collections of task groups that this job needs
// to run. Each task group is an atomic unit of scheduling and placement.
TaskGroups []*TaskGroup
// Update is used to control the update strategy
Update UpdateStrategy
// Meta is used to associate arbitrary metadata with this
// job. This is opaque to Nomad.
Meta map[string]string
// Job status
Status string
// StatusDescription is meant to provide more human useful information
StatusDescription string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// Validate is used to sanity check a job input
func (j *Job) Validate() error {
var mErr multierror.Error
if j.Region == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing job region"))
}
if j.ID == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing job ID"))
} else if strings.Contains(j.ID, " ") {
mErr.Errors = append(mErr.Errors, errors.New("Job ID contains a space"))
}
if j.Name == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing job name"))
}
if j.Type == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing job type"))
}
if j.Priority < JobMinPriority || j.Priority > JobMaxPriority {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Job priority must be between [%d, %d]", JobMinPriority, JobMaxPriority))
}
if len(j.Datacenters) == 0 {
mErr.Errors = append(mErr.Errors, errors.New("Missing job datacenters"))
}
if len(j.TaskGroups) == 0 {
mErr.Errors = append(mErr.Errors, errors.New("Missing job task groups"))
}
for idx, constr := range j.Constraints {
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
// Check for duplicate task groups
taskGroups := make(map[string]int)
for idx, tg := range j.TaskGroups {
if tg.Name == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Job task group %d missing name", idx+1))
} else if existing, ok := taskGroups[tg.Name]; ok {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Job task group %d redefines '%s' from group %d", idx+1, tg.Name, existing+1))
} else {
taskGroups[tg.Name] = idx
}
if j.Type == "system" && tg.Count != 1 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Job task group %d has count %d. Only count of 1 is supported with system scheduler",
idx+1, tg.Count))
}
}
// Validate the task group
for idx, tg := range j.TaskGroups {
if err := tg.Validate(); err != nil {
outer := fmt.Errorf("Task group %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
return mErr.ErrorOrNil()
}
// LookupTaskGroup finds a task group by name
func (j *Job) LookupTaskGroup(name string) *TaskGroup {
for _, tg := range j.TaskGroups {
if tg.Name == name {
return tg
}
}
return nil
}
// Stub is used to return a summary of the job
func (j *Job) Stub() *JobListStub {
return &JobListStub{
ID: j.ID,
Name: j.Name,
Type: j.Type,
Priority: j.Priority,
Status: j.Status,
StatusDescription: j.StatusDescription,
CreateIndex: j.CreateIndex,
ModifyIndex: j.ModifyIndex,
}
}
// JobListStub is used to return a subset of job information
// for the job list
type JobListStub struct {
ID string
Name string
Type string
Priority int
Status string
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
}
// UpdateStrategy is used to modify how updates are done
type UpdateStrategy struct {
// Stagger is the amount of time between the updates
Stagger time.Duration
// MaxParallel is how many updates can be done in parallel
MaxParallel int `mapstructure:"max_parallel"`
}
// Rolling returns if a rolling strategy should be used
func (u *UpdateStrategy) Rolling() bool {
return u.Stagger > 0 && u.MaxParallel > 0
}
// RestartPolicy influences how Nomad restarts Tasks when they
// crash or fail.
type RestartPolicy struct {
Attempts int
Interval time.Duration
Delay time.Duration
}
func (r *RestartPolicy) Validate() error {
if time.Duration(r.Attempts)*r.Delay > r.Interval {
return fmt.Errorf("Nomad can't restart the TaskGroup %v times in an interval of %v with a delay of %v", r.Attempts, r.Interval, r.Delay)
}
return nil
}
func NewRestartPolicy(jobType string) *RestartPolicy {
switch jobType {
case JobTypeService, JobTypeSystem:
rp := defaultServiceJobRestartPolicy
return &rp
case JobTypeBatch:
rp := defaultBatchJobRestartPolicy
return &rp
}
return nil
}
// TaskGroup is an atomic unit of placement. Each task group belongs to
// a job and may contain any number of tasks. A task group support running
// in many replicas using the same configuration..
type TaskGroup struct {
// Name of the task group
Name string
// Count is the number of replicas of this task group that should
// be scheduled.
Count int
// Constraints can be specified at a task group level and apply to
// all the tasks contained.
Constraints []*Constraint
//RestartPolicy of a TaskGroup
RestartPolicy *RestartPolicy
// Tasks are the collection of tasks that this task group needs to run
Tasks []*Task
// Meta is used to associate arbitrary metadata with this
// task group. This is opaque to Nomad.
Meta map[string]string
}
// Validate is used to sanity check a task group
func (tg *TaskGroup) Validate() error {
var mErr multierror.Error
if tg.Name == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task group name"))
}
if tg.Count <= 0 {
mErr.Errors = append(mErr.Errors, errors.New("Task group count must be positive"))
}
if len(tg.Tasks) == 0 {
mErr.Errors = append(mErr.Errors, errors.New("Missing tasks for task group"))
}
for idx, constr := range tg.Constraints {
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
if err := tg.RestartPolicy.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// Check for duplicate tasks
tasks := make(map[string]int)
for idx, task := range tg.Tasks {
if task.Name == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %d missing name", idx+1))
} else if existing, ok := tasks[task.Name]; ok {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %d redefines '%s' from task %d", idx+1, task.Name, existing+1))
} else {
tasks[task.Name] = idx
}
}
// Validate the tasks
for idx, task := range tg.Tasks {
if err := task.Validate(); err != nil {
outer := fmt.Errorf("Task %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
return mErr.ErrorOrNil()
}
// LookupTask finds a task by name
func (tg *TaskGroup) LookupTask(name string) *Task {
for _, t := range tg.Tasks {
if t.Name == name {
return t
}
}
return nil
}
func (tg *TaskGroup) GoString() string {
return fmt.Sprintf("*%#v", *tg)
}
// Task is a single process typically that is executed as part of a task group.
type Task struct {
// Name of the task
Name string
// Driver is used to control which driver is used
Driver string
// Config is provided to the driver to initialize
Config map[string]string
// Map of environment variables to be used by the driver
Env map[string]string
// Constraints can be specified at a task level and apply only to
// the particular task.
Constraints []*Constraint
// Resources is the resources needed by this task
Resources *Resources
// Meta is used to associate arbitrary metadata with this
// task. This is opaque to Nomad.
Meta map[string]string
}
func (t *Task) GoString() string {
return fmt.Sprintf("*%#v", *t)
}
// Validate is used to sanity check a task group
func (t *Task) Validate() error {
var mErr multierror.Error
if t.Name == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task name"))
}
if t.Driver == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task driver"))
}
if t.Resources == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing task resources"))
}
for idx, constr := range t.Constraints {
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
return mErr.ErrorOrNil()
}
const (
ConstraintDistinctHosts = "distinct_hosts"
ConstraintRegex = "regexp"
ConstraintVersion = "version"
)
// Constraints are used to restrict placement options.
type Constraint struct {
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Constraint operand (<=, <, =, !=, >, >=), contains, near
}
func (c *Constraint) String() string {
return fmt.Sprintf("%s %s %s", c.LTarget, c.Operand, c.RTarget)
}
func (c *Constraint) Validate() error {
var mErr multierror.Error
if c.Operand == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing constraint operand"))
}
// Perform additional validation based on operand
switch c.Operand {
case ConstraintRegex:
if _, err := regexp.Compile(c.RTarget); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Regular expression failed to compile: %v", err))
}
case ConstraintVersion:
if _, err := version.NewConstraint(c.RTarget); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Version constraint is invalid: %v", err))
}
}
return mErr.ErrorOrNil()
}
const (
AllocDesiredStatusRun = "run" // Allocation should run
AllocDesiredStatusStop = "stop" // Allocation should stop
AllocDesiredStatusEvict = "evict" // Allocation should stop, and was evicted
AllocDesiredStatusFailed = "failed" // Allocation failed to be done
)
const (
AllocClientStatusPending = "pending"
AllocClientStatusRunning = "running"
AllocClientStatusDead = "dead"
AllocClientStatusFailed = "failed"
)
// Allocation is used to allocate the placement of a task group to a node.
type Allocation struct {
// ID of the allocation (UUID)
ID string
// ID of the evaluation that generated this allocation
EvalID string
// Name is a logical name of the allocation.
Name string
// NodeID is the node this is being placed on
NodeID string
// Job is the parent job of the task group being allocated.
// This is copied at allocation time to avoid issues if the job
// definition is updated.
JobID string
Job *Job
// TaskGroup is the name of the task group that should be run
TaskGroup string
// Resources is the total set of resources allocated as part
// of this allocation of the task group.
Resources *Resources
// TaskResources is the set of resources allocated to each
// task. These should sum to the total Resources.
TaskResources map[string]*Resources
// Metrics associated with this allocation
Metrics *AllocMetric
// Desired Status of the allocation on the client
DesiredStatus string
// DesiredStatusDescription is meant to provide more human useful information
DesiredDescription string
// Status of the allocation on the client
ClientStatus string
// ClientStatusDescription is meant to provide more human useful information
ClientDescription string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// TerminalStatus returns if the desired status is terminal and
// will no longer transition. This is not based on the current client status.
func (a *Allocation) TerminalStatus() bool {
switch a.DesiredStatus {
case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed:
return true
default:
return false
}
}
// Stub returns a list stub for the allocation
func (a *Allocation) Stub() *AllocListStub {
return &AllocListStub{
ID: a.ID,
EvalID: a.EvalID,
Name: a.Name,
NodeID: a.NodeID,
JobID: a.JobID,
TaskGroup: a.TaskGroup,
DesiredStatus: a.DesiredStatus,
DesiredDescription: a.DesiredDescription,
ClientStatus: a.ClientStatus,
ClientDescription: a.ClientDescription,
CreateIndex: a.CreateIndex,
ModifyIndex: a.ModifyIndex,
}
}
// AllocListStub is used to return a subset of alloc information
type AllocListStub struct {
ID string
EvalID string
Name string
NodeID string
JobID string
TaskGroup string
DesiredStatus string
DesiredDescription string
ClientStatus string
ClientDescription string
CreateIndex uint64
ModifyIndex uint64
}
// AllocMetric is used to track various metrics while attempting
// to make an allocation. These are used to debug a job, or to better
// understand the pressure within the system.
type AllocMetric struct {
// NodesEvaluated is the number of nodes that were evaluated
NodesEvaluated int
// NodesFiltered is the number of nodes filtered due to a constraint
NodesFiltered int
// ClassFiltered is the number of nodes filtered by class
ClassFiltered map[string]int
// ConstraintFiltered is the number of failures caused by constraint
ConstraintFiltered map[string]int
// NodesExhausted is the number of nodes skipped due to being
// exhausted of at least one resource
NodesExhausted int
// ClassExhausted is the number of nodes exhausted by class
ClassExhausted map[string]int
// DimensionExhausted provides the count by dimension or reason
DimensionExhausted map[string]int
// Scores is the scores of the final few nodes remaining
// for placement. The top score is typically selected.
Scores map[string]float64
// AllocationTime is a measure of how long the allocation
// attempt took. This can affect performance and SLAs.
AllocationTime time.Duration
// CoalescedFailures indicates the number of other
// allocations that were coalesced into this failed allocation.
// This is to prevent creating many failed allocations for a
// single task group.
CoalescedFailures int
}
func (a *AllocMetric) EvaluateNode() {
a.NodesEvaluated += 1
}
func (a *AllocMetric) FilterNode(node *Node, constraint string) {
a.NodesFiltered += 1
if node != nil && node.NodeClass != "" {
if a.ClassFiltered == nil {
a.ClassFiltered = make(map[string]int)
}
a.ClassFiltered[node.NodeClass] += 1
}
if constraint != "" {
if a.ConstraintFiltered == nil {
a.ConstraintFiltered = make(map[string]int)
}
a.ConstraintFiltered[constraint] += 1
}
}
func (a *AllocMetric) ExhaustedNode(node *Node, dimension string) {
a.NodesExhausted += 1
if node != nil && node.NodeClass != "" {
if a.ClassExhausted == nil {
a.ClassExhausted = make(map[string]int)
}
a.ClassExhausted[node.NodeClass] += 1
}
if dimension != "" {
if a.DimensionExhausted == nil {
a.DimensionExhausted = make(map[string]int)
}
a.DimensionExhausted[dimension] += 1
}
}
func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) {
if a.Scores == nil {
a.Scores = make(map[string]float64)
}
key := fmt.Sprintf("%s.%s", node.ID, name)
a.Scores[key] = score
}
const (
EvalStatusPending = "pending"
EvalStatusComplete = "complete"
EvalStatusFailed = "failed"
)
const (
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
)
const (
// CoreJobEvalGC is used for the garbage collection of evaluations
// and allocations. We periodically scan evaluations in a terminal state,
// in which all the corresponding allocations are also terminal. We
// delete these out of the system to bound the state.
CoreJobEvalGC = "eval-gc"
// CoreJobNodeGC is used for the garbage collection of failed nodes.
// We periodically scan nodes in a terminal state, and if they have no
// corresponding allocations we delete these out of the system.
CoreJobNodeGC = "node-gc"
)
// Evaluation is used anytime we need to apply business logic as a result
// of a change to our desired state (job specification) or the emergent state
// (registered nodes). When the inputs change, we need to "evaluate" them,
// potentially taking action (allocation of work) or doing nothing if the state
// of the world does not require it.
type Evaluation struct {
// ID is a randonly generated UUID used for this evaluation. This
// is assigned upon the creation of the evaluation.
ID string
// Priority is used to control scheduling importance and if this job
// can preempt other jobs.
Priority int
// Type is used to control which schedulers are available to handle
// this evaluation.
Type string
// TriggeredBy is used to give some insight into why this Eval
// was created. (Job change, node failure, alloc failure, etc).
TriggeredBy string
// JobID is the job this evaluation is scoped to. Evaluations cannot
// be run in parallel for a given JobID, so we serialize on this.
JobID string
// JobModifyIndex is the modify index of the job at the time
// the evaluation was created
JobModifyIndex uint64
// NodeID is the node that was affected triggering the evaluation.
NodeID string
// NodeModifyIndex is the modify index of the node at the time
// the evaluation was created
NodeModifyIndex uint64
// Status of the evaluation
Status string
// StatusDescription is meant to provide more human useful information
StatusDescription string
// Wait is a minimum wait time for running the eval. This is used to
// support a rolling upgrade.
Wait time.Duration
// NextEval is the evaluation ID for the eval created to do a followup.
// This is used to support rolling upgrades, where we need a chain of evaluations.
NextEval string
// PreviousEval is the evaluation ID for the eval creating this one to do a followup.
// This is used to support rolling upgrades, where we need a chain of evaluations.
PreviousEval string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (e *Evaluation) TerminalStatus() bool {
switch e.Status {
case EvalStatusComplete, EvalStatusFailed:
return true
default:
return false
}
}
func (e *Evaluation) GoString() string {
return fmt.Sprintf("<Eval '%s' JobID: '%s'>", e.ID, e.JobID)
}
func (e *Evaluation) Copy() *Evaluation {
ne := new(Evaluation)
*ne = *e
return ne
}
// ShouldEnqueue checks if a given evaluation should be enqueued
func (e *Evaluation) ShouldEnqueue() bool {
switch e.Status {
case EvalStatusPending:
return true
case EvalStatusComplete, EvalStatusFailed:
return false
default:
panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status))
}
}
// MakePlan is used to make a plan from the given evaluation
// for a given Job
func (e *Evaluation) MakePlan(j *Job) *Plan {
p := &Plan{
EvalID: e.ID,
Priority: e.Priority,
NodeUpdate: make(map[string][]*Allocation),
NodeAllocation: make(map[string][]*Allocation),
}
if j != nil {
p.AllAtOnce = j.AllAtOnce
}
return p
}
// NextRollingEval creates an evaluation to followup this eval for rolling updates
func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation {
return &Evaluation{
ID: GenerateUUID(),
Priority: e.Priority,
Type: e.Type,
TriggeredBy: EvalTriggerRollingUpdate,
JobID: e.JobID,
JobModifyIndex: e.JobModifyIndex,
Status: EvalStatusPending,
Wait: wait,
PreviousEval: e.ID,
}
}
// Plan is used to submit a commit plan for task allocations. These
// are submitted to the leader which verifies that resources have
// not been overcommitted before admiting the plan.
type Plan struct {
// EvalID is the evaluation ID this plan is associated with
EvalID string
// EvalToken is used to prevent a split-brain processing of
// an evaluation. There should only be a single scheduler running
// an Eval at a time, but this could be violated after a leadership
// transition. This unique token is used to reject plans that are
// being submitted from a different leader.
EvalToken string
// Priority is the priority of the upstream job
Priority int
// AllAtOnce is used to control if incremental scheduling of task groups
// is allowed or if we must do a gang scheduling of the entire job.
// If this is false, a plan may be partially applied. Otherwise, the
// entire plan must be able to make progress.
AllAtOnce bool
// NodeUpdate contains all the allocations for each node. For each node,
// this is a list of the allocations to update to either stop or evict.
NodeUpdate map[string][]*Allocation
// NodeAllocation contains all the allocations for each node.
// The evicts must be considered prior to the allocations.
NodeAllocation map[string][]*Allocation
// FailedAllocs are allocations that could not be made,
// but are persisted so that the user can use the feedback
// to determine the cause.
FailedAllocs []*Allocation
}
func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
newAlloc := new(Allocation)
*newAlloc = *alloc
newAlloc.DesiredStatus = status
newAlloc.DesiredDescription = desc
node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)
}
func (p *Plan) PopUpdate(alloc *Allocation) {
existing := p.NodeUpdate[alloc.NodeID]
n := len(existing)
if n > 0 && existing[n-1].ID == alloc.ID {
existing = existing[:n-1]
if len(existing) > 0 {
p.NodeUpdate[alloc.NodeID] = existing
} else {
delete(p.NodeUpdate, alloc.NodeID)
}
}
}
func (p *Plan) AppendAlloc(alloc *Allocation) {
node := alloc.NodeID
existing := p.NodeAllocation[node]
p.NodeAllocation[node] = append(existing, alloc)
}
func (p *Plan) AppendFailed(alloc *Allocation) {
p.FailedAllocs = append(p.FailedAllocs, alloc)
}
// IsNoOp checks if this plan would do nothing
func (p *Plan) IsNoOp() bool {
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && len(p.FailedAllocs) == 0
}
// PlanResult is the result of a plan submitted to the leader.
type PlanResult struct {
// NodeUpdate contains all the updates that were committed.
NodeUpdate map[string][]*Allocation
// NodeAllocation contains all the allocations that were committed.
NodeAllocation map[string][]*Allocation
// FailedAllocs are allocations that could not be made,
// but are persisted so that the user can use the feedback
// to determine the cause.
FailedAllocs []*Allocation
// RefreshIndex is the index the worker should refresh state up to.
// This allows all evictions and allocations to be materialized.
// If any allocations were rejected due to stale data (node state,
// over committed) this can be used to force a worker refresh.
RefreshIndex uint64
// AllocIndex is the Raft index in which the evictions and
// allocations took place. This is used for the write index.
AllocIndex uint64
}
// IsNoOp checks if this plan result would do nothing
func (p *PlanResult) IsNoOp() bool {
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && len(p.FailedAllocs) == 0
}
// FullCommit is used to check if all the allocations in a plan
// were committed as part of the result. Returns if there was
// a match, and the number of expected and actual allocations.
func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) {
expected := 0
actual := 0
for name, allocList := range plan.NodeAllocation {
didAlloc, _ := p.NodeAllocation[name]
expected += len(allocList)
actual += len(didAlloc)
}
return actual == expected, expected, actual
}
// msgpackHandle is a shared handle for encoding/decoding of structs
var msgpackHandle = &codec.MsgpackHandle{}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
}
// Encode is used to encode a MsgPack object with type prefix
func Encode(t MessageType, msg interface{}) ([]byte, error) {
var buf bytes.Buffer
buf.WriteByte(uint8(t))
err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg)
return buf.Bytes(), err
}