Merge pull request #910 from hashicorp/f-reserved-resources

Reserve Client Resources + Config Validation
This commit is contained in:
Alex Dadgar 2016-03-15 21:09:13 -07:00
commit 719f5d34ed
10 changed files with 1102 additions and 264 deletions

View file

@ -157,6 +157,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
return nil, fmt.Errorf("driver setup failed: %v", err)
}
// Setup the reserved resources
c.reservePorts()
// Set up the known servers list
c.SetServers(c.config.Servers)
@ -537,6 +540,9 @@ func (c *Client) setupNode() error {
if node.Resources == nil {
node.Resources = &structs.Resources{}
}
if node.Reserved == nil {
node.Reserved = &structs.Resources{}
}
if node.Datacenter == "" {
node.Datacenter = "dc1"
}
@ -550,6 +556,49 @@ func (c *Client) setupNode() error {
return nil
}
// reservePorts is used to reserve ports on the fingerprinted network devices.
func (c *Client) reservePorts() {
c.configLock.RLock()
defer c.configLock.RUnlock()
global := c.config.GloballyReservedPorts
if len(global) == 0 {
return
}
node := c.config.Node
networks := node.Resources.Networks
reservedIndex := make(map[string]*structs.NetworkResource, len(networks))
for _, resNet := range node.Reserved.Networks {
reservedIndex[resNet.IP] = resNet
}
// Go through each network device and reserve ports on it.
for _, net := range networks {
res, ok := reservedIndex[net.IP]
if !ok {
res = net.Copy()
reservedIndex[net.IP] = res
}
for _, portVal := range global {
p := structs.Port{Value: portVal}
res.ReservedPorts = append(res.ReservedPorts, p)
}
}
// Clear the reserved networks.
if node.Reserved == nil {
node.Reserved = new(structs.Resources)
} else {
node.Reserved.Networks = nil
}
// Restore the reserved networks
for _, net := range reservedIndex {
node.Reserved.Networks = append(node.Reserved.Networks, net)
}
}
// fingerprint is used to fingerprint the client and setup the node
func (c *Client) fingerprint() error {
whitelist := c.config.ReadStringListToMap("fingerprint.whitelist")

View file

@ -58,13 +58,17 @@ type Config struct {
Node *structs.Node
// ClientMaxPort is the upper range of the ports that the client uses for
// communicating with plugin subsystems
// communicating with plugin subsystems over loopback
ClientMaxPort uint
// ClientMinPort is the lower range of the ports that the client uses for
// communicating with plugin subsystems
// communicating with plugin subsystems over loopback
ClientMinPort uint
// GloballyReservedPorts are ports that are reserved across all network
// devices and IPs.
GloballyReservedPorts []int
// Options provides arbitrary key-value configuration for nomad internals,
// like fingerprinters and drivers. The format is:
//

View file

@ -224,6 +224,19 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
}
conf.Node.HTTPAddr = httpAddr
conf.Version = a.config.Version
// Reserve resources on the node.
r := conf.Node.Reserved
if r == nil {
r = new(structs.Resources)
conf.Node.Reserved = r
}
r.CPU = a.config.Client.Reserved.CPU
r.MemoryMB = a.config.Client.Reserved.MemoryMB
r.DiskMB = a.config.Client.Reserved.DiskMB
r.IOPS = a.config.Client.Reserved.IOPS
conf.GloballyReservedPorts = a.config.Client.Reserved.ParsedReservedPorts
return conf, nil
}

View file

@ -0,0 +1,83 @@
region = "foobar"
datacenter = "dc2"
name = "my-web"
data_dir = "/tmp/nomad"
log_level = "ERR"
bind_addr = "192.168.0.1"
enable_debug = true
ports {
http = 1234
rpc = 2345
serf = 3456
}
addresses {
http = "127.0.0.1"
rpc = "127.0.0.2"
serf = "127.0.0.3"
}
advertise {
rpc = "127.0.0.3"
serf = "127.0.0.4"
}
client {
enabled = true
state_dir = "/tmp/client-state"
alloc_dir = "/tmp/alloc"
servers = ["a.b.c:80", "127.0.0.1:1234"]
node_class = "linux-medium-64bit"
meta {
foo = "bar"
baz = "zip"
}
options {
foo = "bar"
baz = "zip"
}
network_interface = "eth0"
network_speed = 100
reserved {
cpu = 10
memory = 10
disk = 10
iops = 10
reserved_ports = "1,100,10-12"
}
client_min_port = 1000
client_max_port = 2000
max_kill_timeout = "10s"
}
server {
enabled = true
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
num_schedulers = 2
enabled_schedulers = ["test"]
node_gc_threshold = "12h"
heartbeat_grace = "30s"
retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3
retry_interval = "15s"
rejoin_after_leave = true
}
telemetry {
statsite_address = "127.0.0.1:1234"
statsd_address = "127.0.0.1:2345"
disable_hostname = true
}
leave_on_interrupt = true
leave_on_terminate = true
enable_syslog = true
syslog_facility = "LOCAL1"
disable_update_check = true
disable_anonymous_signature = true
atlas {
infrastructure = "armon/test"
token = "abcd"
join = true
endpoint = "127.0.0.1:1234"
}
http_api_response_headers {
Access-Control-Allow-Origin = "*"
}

View file

@ -3,16 +3,15 @@ package agent
import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/hashicorp/hcl"
client "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad"
)
@ -20,79 +19,79 @@ import (
// Config is the configuration for the Nomad agent.
type Config struct {
// Region is the region this agent is in. Defaults to global.
Region string `hcl:"region"`
Region string `mapstructure:"region"`
// Datacenter is the datacenter this agent is in. Defaults to dc1
Datacenter string `hcl:"datacenter"`
Datacenter string `mapstructure:"datacenter"`
// NodeName is the name we register as. Defaults to hostname.
NodeName string `hcl:"name"`
NodeName string `mapstructure:"name"`
// DataDir is the directory to store our state in
DataDir string `hcl:"data_dir"`
DataDir string `mapstructure:"data_dir"`
// LogLevel is the level of the logs to putout
LogLevel string `hcl:"log_level"`
LogLevel string `mapstructure:"log_level"`
// BindAddr is the address on which all of nomad's services will
// be bound. If not specified, this defaults to 127.0.0.1.
BindAddr string `hcl:"bind_addr"`
BindAddr string `mapstructure:"bind_addr"`
// EnableDebug is used to enable debugging HTTP endpoints
EnableDebug bool `hcl:"enable_debug"`
EnableDebug bool `mapstructure:"enable_debug"`
// Ports is used to control the network ports we bind to.
Ports *Ports `hcl:"ports"`
Ports *Ports `mapstructure:"ports"`
// Addresses is used to override the network addresses we bind to.
Addresses *Addresses `hcl:"addresses"`
Addresses *Addresses `mapstructure:"addresses"`
// AdvertiseAddrs is used to control the addresses we advertise.
AdvertiseAddrs *AdvertiseAddrs `hcl:"advertise"`
AdvertiseAddrs *AdvertiseAddrs `mapstructure:"advertise"`
// Client has our client related settings
Client *ClientConfig `hcl:"client"`
Client *ClientConfig `mapstructure:"client"`
// Server has our server related settings
Server *ServerConfig `hcl:"server"`
Server *ServerConfig `mapstructure:"server"`
// Telemetry is used to configure sending telemetry
Telemetry *Telemetry `hcl:"telemetry"`
Telemetry *Telemetry `mapstructure:"telemetry"`
// LeaveOnInt is used to gracefully leave on the interrupt signal
LeaveOnInt bool `hcl:"leave_on_interrupt"`
LeaveOnInt bool `mapstructure:"leave_on_interrupt"`
// LeaveOnTerm is used to gracefully leave on the terminate signal
LeaveOnTerm bool `hcl:"leave_on_terminate"`
LeaveOnTerm bool `mapstructure:"leave_on_terminate"`
// EnableSyslog is used to enable sending logs to syslog
EnableSyslog bool `hcl:"enable_syslog"`
EnableSyslog bool `mapstructure:"enable_syslog"`
// SyslogFacility is used to control the syslog facility used.
SyslogFacility string `hcl:"syslog_facility"`
SyslogFacility string `mapstructure:"syslog_facility"`
// DisableUpdateCheck is used to disable the periodic update
// and security bulletin checking.
DisableUpdateCheck bool `hcl:"disable_update_check"`
DisableUpdateCheck bool `mapstructure:"disable_update_check"`
// DisableAnonymousSignature is used to disable setting the
// anonymous signature when doing the update check and looking
// for security bulletins
DisableAnonymousSignature bool `hcl:"disable_anonymous_signature"`
DisableAnonymousSignature bool `mapstructure:"disable_anonymous_signature"`
// AtlasConfig is used to configure Atlas
Atlas *AtlasConfig `hcl:"atlas"`
Atlas *AtlasConfig `mapstructure:"atlas"`
// NomadConfig is used to override the default config.
// This is largly used for testing purposes.
NomadConfig *nomad.Config `hcl:"-" json:"-"`
NomadConfig *nomad.Config `mapstructure:"-" json:"-"`
// ClientConfig is used to override the default config.
// This is largly used for testing purposes.
ClientConfig *client.Config `hcl:"-" json:"-"`
ClientConfig *client.Config `mapstructure:"-" json:"-"`
// DevMode is set by the -dev CLI flag.
DevMode bool `hcl:"-"`
DevMode bool `mapstructure:"-"`
// Version information is set at compilation time
Revision string
@ -100,164 +99,237 @@ type Config struct {
VersionPrerelease string
// List of config files that have been loaded (in order)
Files []string
Files []string `mapstructure:"-"`
// HTTPAPIResponseHeaders allows users to configure the Nomad http agent to
// set arbritrary headers on API responses
HTTPAPIResponseHeaders map[string]string `hcl:"http_api_response_headers"`
HTTPAPIResponseHeaders map[string]string `mapstructure:"http_api_response_headers"`
}
// AtlasConfig is used to enable an parameterize the Atlas integration
type AtlasConfig struct {
// Infrastructure is the name of the infrastructure
// we belong to. e.g. hashicorp/stage
Infrastructure string `hcl:"infrastructure"`
Infrastructure string `mapstructure:"infrastructure"`
// Token is our authentication token from Atlas
Token string `hcl:"token" json:"-"`
Token string `mapstructure:"token" json:"-"`
// Join controls if Atlas will attempt to auto-join the node
// to it's cluster. Requires Atlas integration.
Join bool `hcl:"join"`
Join bool `mapstructure:"join"`
// Endpoint is the SCADA endpoint used for Atlas integration. If
// empty, the defaults from the provider are used.
Endpoint string `hcl:"endpoint"`
Endpoint string `mapstructure:"endpoint"`
}
// ClientConfig is configuration specific to the client mode
type ClientConfig struct {
// Enabled controls if we are a client
Enabled bool `hcl:"enabled"`
Enabled bool `mapstructure:"enabled"`
// StateDir is the state directory
StateDir string `hcl:"state_dir"`
StateDir string `mapstructure:"state_dir"`
// AllocDir is the directory for storing allocation data
AllocDir string `hcl:"alloc_dir"`
AllocDir string `mapstructure:"alloc_dir"`
// Servers is a list of known server addresses. These are as "host:port"
Servers []string `hcl:"servers"`
Servers []string `mapstructure:"servers"`
// NodeClass is used to group the node by class
NodeClass string `hcl:"node_class"`
NodeClass string `mapstructure:"node_class"`
// Options is used for configuration of nomad internals,
// like fingerprinters and drivers. The format is:
//
// namespace.option = value
Options map[string]string `hcl:"options"`
Options map[string]string `mapstructure:"options"`
// Metadata associated with the node
Meta map[string]string `hcl:"meta"`
Meta map[string]string `mapstructure:"meta"`
// Interface to use for network fingerprinting
NetworkInterface string `hcl:"network_interface"`
NetworkInterface string `mapstructure:"network_interface"`
// The network link speed to use if it can not be determined dynamically.
NetworkSpeed int `hcl:"network_speed"`
NetworkSpeed int `mapstructure:"network_speed"`
// MaxKillTimeout allows capping the user-specifiable KillTimeout.
MaxKillTimeout string `hcl:"max_kill_timeout"`
MaxKillTimeout string `mapstructure:"max_kill_timeout"`
// ClientMaxPort is the upper range of the ports that the client uses for
// communicating with plugin subsystems
ClientMaxPort int `hcl:"client_max_port"`
ClientMaxPort int `mapstructure:"client_max_port"`
// ClientMinPort is the lower range of the ports that the client uses for
// communicating with plugin subsystems
ClientMinPort int `hcl:"client_min_port"`
ClientMinPort int `mapstructure:"client_min_port"`
// Reserved is used to reserve resources from being used by Nomad. This can
// be used to target a certain utilization or to prevent Nomad from using a
// particular set of ports.
Reserved *Resources `mapstructure:"reserved"`
}
// ServerConfig is configuration specific to the server mode
type ServerConfig struct {
// Enabled controls if we are a server
Enabled bool `hcl:"enabled"`
Enabled bool `mapstructure:"enabled"`
// BootstrapExpect tries to automatically bootstrap the Consul cluster,
// by witholding peers until enough servers join.
BootstrapExpect int `hcl:"bootstrap_expect"`
BootstrapExpect int `mapstructure:"bootstrap_expect"`
// DataDir is the directory to store our state in
DataDir string `hcl:"data_dir"`
DataDir string `mapstructure:"data_dir"`
// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion int `hcl:"protocol_version"`
ProtocolVersion int `mapstructure:"protocol_version"`
// NumSchedulers is the number of scheduler thread that are run.
// This can be as many as one per core, or zero to disable this server
// from doing any scheduling work.
NumSchedulers int `hcl:"num_schedulers"`
NumSchedulers int `mapstructure:"num_schedulers"`
// EnabledSchedulers controls the set of sub-schedulers that are
// enabled for this server to handle. This will restrict the evaluations
// that the workers dequeue for processing.
EnabledSchedulers []string `hcl:"enabled_schedulers"`
EnabledSchedulers []string `mapstructure:"enabled_schedulers"`
// NodeGCThreshold controls how "old" a node must be to be collected by GC.
NodeGCThreshold string `hcl:"node_gc_threshold"`
NodeGCThreshold string `mapstructure:"node_gc_threshold"`
// HeartbeatGrace is the grace period beyond the TTL to account for network,
// processing delays and clock skew before marking a node as "down".
HeartbeatGrace string `hcl:"heartbeat_grace"`
HeartbeatGrace string `mapstructure:"heartbeat_grace"`
// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
StartJoin []string `hcl:"start_join"`
StartJoin []string `mapstructure:"start_join"`
// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `hcl:"retry_join"`
RetryJoin []string `mapstructure:"retry_join"`
// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttempts int `hcl:"retry_max"`
RetryMaxAttempts int `mapstructure:"retry_max"`
// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryInterval string `hcl:"retry_interval"`
retryInterval time.Duration `hcl:"-"`
RetryInterval string `mapstructure:"retry_interval"`
retryInterval time.Duration `mapstructure:"-"`
// RejoinAfterLeave controls our interaction with the cluster after leave.
// When set to false (default), a leave causes Consul to not rejoin
// the cluster until an explicit join is received. If this is set to
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool `hcl:"rejoin_after_leave"`
RejoinAfterLeave bool `mapstructure:"rejoin_after_leave"`
}
// Telemetry is the telemetry configuration for the server
type Telemetry struct {
StatsiteAddr string `hcl:"statsite_address"`
StatsdAddr string `hcl:"statsd_address"`
DisableHostname bool `hcl:"disable_hostname"`
StatsiteAddr string `mapstructure:"statsite_address"`
StatsdAddr string `mapstructure:"statsd_address"`
DisableHostname bool `mapstructure:"disable_hostname"`
}
// Ports is used to encapsulate the various ports we bind to for network
// services. If any are not specified then the defaults are used instead.
type Ports struct {
HTTP int `hcl:"http"`
RPC int `hcl:"rpc"`
Serf int `hcl:"serf"`
HTTP int `mapstructure:"http"`
RPC int `mapstructure:"rpc"`
Serf int `mapstructure:"serf"`
}
// Addresses encapsulates all of the addresses we bind to for various
// network services. Everything is optional and defaults to BindAddr.
type Addresses struct {
HTTP string `hcl:"http"`
RPC string `hcl:"rpc"`
Serf string `hcl:"serf"`
HTTP string `mapstructure:"http"`
RPC string `mapstructure:"rpc"`
Serf string `mapstructure:"serf"`
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. Not all network services support an
// advertise address. All are optional and default to BindAddr.
type AdvertiseAddrs struct {
HTTP string `hcl:"http"`
RPC string `hcl:"rpc"`
Serf string `hcl:"serf"`
HTTP string `mapstructure:"http"`
RPC string `mapstructure:"rpc"`
Serf string `mapstructure:"serf"`
}
type Resources struct {
CPU int `mapstructure:"cpu"`
MemoryMB int `mapstructure:"memory"`
DiskMB int `mapstructure:"disk"`
IOPS int `mapstructure:"iops"`
ReservedPorts string `mapstructure:"reserved_ports"`
ParsedReservedPorts []int `mapstructure:"-"`
}
// ParseReserved expands the ReservedPorts string into a slice of port numbers.
// The supported syntax is comma seperated integers or ranges seperated by
// hyphens. For example, "80,120-150,160"
func (r *Resources) ParseReserved() error {
parts := strings.Split(r.ReservedPorts, ",")
// Hot path the empty case
if len(parts) == 1 && parts[0] == "" {
return nil
}
ports := make(map[int]struct{})
for _, part := range parts {
part = strings.TrimSpace(part)
rangeParts := strings.Split(part, "-")
l := len(rangeParts)
switch l {
case 1:
if val := rangeParts[0]; val == "" {
return fmt.Errorf("can't specify empty port")
} else {
port, err := strconv.Atoi(val)
if err != nil {
return err
}
ports[port] = struct{}{}
}
case 2:
// We are parsing a range
start, err := strconv.Atoi(rangeParts[0])
if err != nil {
return err
}
end, err := strconv.Atoi(rangeParts[1])
if err != nil {
return err
}
if end < start {
return fmt.Errorf("invalid range: starting value (%v) less than ending (%v) value", end, start)
}
for i := start; i <= end; i++ {
ports[i] = struct{}{}
}
default:
return fmt.Errorf("can only parse single port numbers or port ranges (ex. 80,100-120,150)")
}
}
for port := range ports {
r.ParsedReservedPorts = append(r.ParsedReservedPorts, port)
}
sort.Ints(r.ParsedReservedPorts)
return nil
}
// DevConfig is a Config that is used for dev mode of Nomad.
@ -302,6 +374,7 @@ func DefaultConfig() *Config {
MaxKillTimeout: "30s",
ClientMinPort: 14000,
ClientMaxPort: 14512,
Reserved: &Resources{},
},
Server: &ServerConfig{
Enabled: false,
@ -528,6 +601,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
if b.ClientMinPort != 0 {
result.ClientMinPort = b.ClientMinPort
}
if b.Reserved != nil {
result.Reserved = result.Reserved.Merge(b.Reserved)
}
// Add the servers
result.Servers = append(result.Servers, b.Servers...)
@ -634,6 +710,29 @@ func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig {
return &result
}
func (r *Resources) Merge(b *Resources) *Resources {
result := *r
if b.CPU != 0 {
result.CPU = b.CPU
}
if b.MemoryMB != 0 {
result.MemoryMB = b.MemoryMB
}
if b.DiskMB != 0 {
result.DiskMB = b.DiskMB
}
if b.IOPS != 0 {
result.IOPS = b.IOPS
}
if b.ReservedPorts != "" {
result.ReservedPorts = b.ReservedPorts
}
if len(b.ParsedReservedPorts) != 0 {
result.ParsedReservedPorts = b.ParsedReservedPorts
}
return &result
}
// LoadConfig loads the configuration at the given path, regardless if
// its a file or directory.
func LoadConfig(path string) (*Config, error) {
@ -645,40 +744,15 @@ func LoadConfig(path string) (*Config, error) {
if fi.IsDir() {
return LoadConfigDir(path)
}
return LoadConfigFile(filepath.Clean(path))
}
// LoadConfigString is used to parse a config string
func LoadConfigString(s string) (*Config, error) {
// Parse!
obj, err := hcl.Parse(s)
cleaned := filepath.Clean(path)
config, err := ParseConfigFile(cleaned)
if err != nil {
return nil, err
return nil, fmt.Errorf("Error loading %s: %s", cleaned, err)
}
// Start building the result
var result Config
if err := hcl.DecodeObject(&result, obj); err != nil {
return nil, err
}
return &result, nil
}
// LoadConfigFile loads the configuration from the given file.
func LoadConfigFile(path string) (*Config, error) {
// Read the file
d, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
config, err := LoadConfigString(string(d))
if err == nil {
config.Files = append(config.Files, path)
}
return config, err
config.Files = append(config.Files, cleaned)
return config, nil
}
// LoadConfigDir loads all the configurations in the given directory
@ -696,8 +770,7 @@ func LoadConfigDir(dir string) (*Config, error) {
}
if !fi.IsDir() {
return nil, fmt.Errorf(
"configuration path must be a directory: %s",
dir)
"configuration path must be a directory: %s", dir)
}
var files []string
@ -741,10 +814,11 @@ func LoadConfigDir(dir string) (*Config, error) {
var result *Config
for _, f := range files {
config, err := LoadConfigFile(f)
config, err := ParseConfigFile(f)
if err != nil {
return nil, fmt.Errorf("Error loading %s: %s", f, err)
}
config.Files = append(config.Files, f)
if result == nil {
result = config

View file

@ -0,0 +1,557 @@
package agent
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
"github.com/mitchellh/mapstructure"
)
// ParseConfigFile parses the given path as a config file.
func ParseConfigFile(path string) (*Config, error) {
path, err := filepath.Abs(path)
if err != nil {
return nil, err
}
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
config, err := ParseConfig(f)
if err != nil {
return nil, err
}
return config, nil
}
// ParseConfig parses the config from the given io.Reader.
//
// Due to current internal limitations, the entire contents of the
// io.Reader will be copied into memory first before parsing.
func ParseConfig(r io.Reader) (*Config, error) {
// Copy the reader into an in-memory buffer first since HCL requires it.
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
// Parse the buffer
root, err := hcl.Parse(buf.String())
if err != nil {
return nil, fmt.Errorf("error parsing: %s", err)
}
buf.Reset()
// Top-level item should be a list
list, ok := root.Node.(*ast.ObjectList)
if !ok {
return nil, fmt.Errorf("error parsing: root should be an object")
}
var config Config
if err := parseConfig(&config, list); err != nil {
return nil, fmt.Errorf("error parsing 'config': %v", err)
}
return &config, nil
}
func parseConfig(result *Config, list *ast.ObjectList) error {
// Check for invalid keys
valid := []string{
"region",
"datacenter",
"name",
"data_dir",
"log_level",
"bind_addr",
"enable_debug",
"ports",
"addresses",
"advertise",
"client",
"server",
"telemetry",
"leave_on_interrupt",
"leave_on_terminate",
"enable_syslog",
"syslog_facility",
"disable_update_check",
"disable_anonymous_signature",
"atlas",
"http_api_response_headers",
}
if err := checkHCLKeys(list, valid); err != nil {
return multierror.Prefix(err, "config:")
}
// Decode the full thing into a map[string]interface for ease
var m map[string]interface{}
if err := hcl.DecodeObject(&m, list); err != nil {
return err
}
delete(m, "ports")
delete(m, "addresses")
delete(m, "advertise")
delete(m, "client")
delete(m, "server")
delete(m, "telemetry")
delete(m, "atlas")
delete(m, "http_api_response_headers")
// Decode the rest
if err := mapstructure.WeakDecode(m, result); err != nil {
return err
}
// Parse ports
if o := list.Filter("ports"); len(o.Items) > 0 {
if err := parsePorts(&result.Ports, o); err != nil {
return multierror.Prefix(err, "ports ->")
}
}
// Parse addresses
if o := list.Filter("addresses"); len(o.Items) > 0 {
if err := parseAddresses(&result.Addresses, o); err != nil {
return multierror.Prefix(err, "addresses ->")
}
}
// Parse advertise
if o := list.Filter("advertise"); len(o.Items) > 0 {
if err := parseAdvertise(&result.AdvertiseAddrs, o); err != nil {
return multierror.Prefix(err, "advertise ->")
}
}
// Parse client config
if o := list.Filter("client"); len(o.Items) > 0 {
if err := parseClient(&result.Client, o); err != nil {
return multierror.Prefix(err, "client ->")
}
}
// Parse server config
if o := list.Filter("server"); len(o.Items) > 0 {
if err := parseServer(&result.Server, o); err != nil {
return multierror.Prefix(err, "server ->")
}
}
// Parse telemetry config
if o := list.Filter("telemetry"); len(o.Items) > 0 {
if err := parseTelemetry(&result.Telemetry, o); err != nil {
return multierror.Prefix(err, "telemetry ->")
}
}
// Parse atlas config
if o := list.Filter("atlas"); len(o.Items) > 0 {
if err := parseAtlas(&result.Atlas, o); err != nil {
return multierror.Prefix(err, "atlas ->")
}
}
// Parse out http_api_response_headers fields. These are in HCL as a list so
// we need to iterate over them and merge them.
if headersO := list.Filter("http_api_response_headers"); len(headersO.Items) > 0 {
for _, o := range headersO.Elem().Items {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &result.HTTPAPIResponseHeaders); err != nil {
return err
}
}
}
return nil
}
func parsePorts(result **Ports, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'ports' block allowed")
}
// Get our ports object
listVal := list.Items[0].Val
// Check for invalid keys
valid := []string{
"http",
"rpc",
"serf",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var ports Ports
if err := mapstructure.WeakDecode(m, &ports); err != nil {
return err
}
*result = &ports
return nil
}
func parseAddresses(result **Addresses, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'addresses' block allowed")
}
// Get our addresses object
listVal := list.Items[0].Val
// Check for invalid keys
valid := []string{
"http",
"rpc",
"serf",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var addresses Addresses
if err := mapstructure.WeakDecode(m, &addresses); err != nil {
return err
}
*result = &addresses
return nil
}
func parseAdvertise(result **AdvertiseAddrs, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'advertise' block allowed")
}
// Get our advertise object
listVal := list.Items[0].Val
// Check for invalid keys
valid := []string{
"http",
"rpc",
"serf",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var advertise AdvertiseAddrs
if err := mapstructure.WeakDecode(m, &advertise); err != nil {
return err
}
*result = &advertise
return nil
}
func parseClient(result **ClientConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'client' block allowed")
}
// Get our client object
obj := list.Items[0]
// Value should be an object
var listVal *ast.ObjectList
if ot, ok := obj.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("client value: should be an object")
}
// Check for invalid keys
valid := []string{
"enabled",
"state_dir",
"alloc_dir",
"servers",
"node_class",
"options",
"meta",
"network_interface",
"network_speed",
"max_kill_timeout",
"client_max_port",
"client_min_port",
"reserved",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
delete(m, "options")
delete(m, "meta")
delete(m, "reserved")
var config ClientConfig
if err := mapstructure.WeakDecode(m, &config); err != nil {
return err
}
// Parse out options fields. These are in HCL as a list so we need to
// iterate over them and merge them.
if optionsO := listVal.Filter("options"); len(optionsO.Items) > 0 {
for _, o := range optionsO.Elem().Items {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &config.Options); err != nil {
return err
}
}
}
// Parse out options meta. These are in HCL as a list so we need to
// iterate over them and merge them.
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
for _, o := range metaO.Elem().Items {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &config.Meta); err != nil {
return err
}
}
}
// Parse reserved config
if o := listVal.Filter("reserved"); len(o.Items) > 0 {
if err := parseReserved(&config.Reserved, o); err != nil {
return multierror.Prefix(err, "reserved ->")
}
}
*result = &config
return nil
}
func parseReserved(result **Resources, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'reserved' block allowed")
}
// Get our reserved object
obj := list.Items[0]
// Value should be an object
var listVal *ast.ObjectList
if ot, ok := obj.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("client value: should be an object")
}
// Check for invalid keys
valid := []string{
"cpu",
"memory",
"disk",
"iops",
"reserved_ports",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var reserved Resources
if err := mapstructure.WeakDecode(m, &reserved); err != nil {
return err
}
if err := reserved.ParseReserved(); err != nil {
return err
}
*result = &reserved
return nil
}
func parseServer(result **ServerConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'server' block allowed")
}
// Get our server object
obj := list.Items[0]
// Value should be an object
var listVal *ast.ObjectList
if ot, ok := obj.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("client value: should be an object")
}
// Check for invalid keys
valid := []string{
"enabled",
"bootstrap_expect",
"data_dir",
"protocol_version",
"num_schedulers",
"enabled_schedulers",
"node_gc_threshold",
"heartbeat_grace",
"start_join",
"retry_join",
"retry_max",
"retry_interval",
"rejoin_after_leave",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var config ServerConfig
if err := mapstructure.WeakDecode(m, &config); err != nil {
return err
}
*result = &config
return nil
}
func parseTelemetry(result **Telemetry, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'telemetry' block allowed")
}
// Get our telemetry object
listVal := list.Items[0].Val
// Check for invalid keys
valid := []string{
"statsite_address",
"statsd_address",
"disable_hostname",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var telemetry Telemetry
if err := mapstructure.WeakDecode(m, &telemetry); err != nil {
return err
}
*result = &telemetry
return nil
}
func parseAtlas(result **AtlasConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'atlas' block allowed")
}
// Get our atlas object
listVal := list.Items[0].Val
// Check for invalid keys
valid := []string{
"infrastructure",
"token",
"join",
"endpoint",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var atlas AtlasConfig
if err := mapstructure.WeakDecode(m, &atlas); err != nil {
return err
}
*result = &atlas
return nil
}
func checkHCLKeys(node ast.Node, valid []string) error {
var list *ast.ObjectList
switch n := node.(type) {
case *ast.ObjectList:
list = n
case *ast.ObjectType:
list = n.List
default:
return fmt.Errorf("cannot check HCL keys of type %T", n)
}
validMap := make(map[string]struct{}, len(valid))
for _, v := range valid {
validMap[v] = struct{}{}
}
var result error
for _, item := range list.Items {
key := item.Keys[0].Token.Value().(string)
if _, ok := validMap[key]; !ok {
result = multierror.Append(result, fmt.Errorf(
"invalid key: %s", key))
}
}
return result
}

View file

@ -0,0 +1,126 @@
package agent
import (
"path/filepath"
"reflect"
"testing"
)
func TestConfig_Parse(t *testing.T) {
cases := []struct {
File string
Result *Config
Err bool
}{
{
"basic.hcl",
&Config{
Region: "foobar",
Datacenter: "dc2",
NodeName: "my-web",
DataDir: "/tmp/nomad",
LogLevel: "ERR",
BindAddr: "192.168.0.1",
EnableDebug: true,
Ports: &Ports{
HTTP: 1234,
RPC: 2345,
Serf: 3456,
},
Addresses: &Addresses{
HTTP: "127.0.0.1",
RPC: "127.0.0.2",
Serf: "127.0.0.3",
},
AdvertiseAddrs: &AdvertiseAddrs{
RPC: "127.0.0.3",
Serf: "127.0.0.4",
},
Client: &ClientConfig{
Enabled: true,
StateDir: "/tmp/client-state",
AllocDir: "/tmp/alloc",
Servers: []string{"a.b.c:80", "127.0.0.1:1234"},
NodeClass: "linux-medium-64bit",
Meta: map[string]string{
"foo": "bar",
"baz": "zip",
},
Options: map[string]string{
"foo": "bar",
"baz": "zip",
},
NetworkInterface: "eth0",
NetworkSpeed: 100,
MaxKillTimeout: "10s",
ClientMinPort: 1000,
ClientMaxPort: 2000,
Reserved: &Resources{
CPU: 10,
MemoryMB: 10,
DiskMB: 10,
IOPS: 10,
ReservedPorts: "1,100,10-12",
ParsedReservedPorts: []int{1, 10, 11, 12, 100},
},
},
Server: &ServerConfig{
Enabled: true,
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
NumSchedulers: 2,
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
HeartbeatGrace: "30s",
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
},
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.1:1234",
StatsdAddr: "127.0.0.1:2345",
DisableHostname: true,
},
LeaveOnInt: true,
LeaveOnTerm: true,
EnableSyslog: true,
SyslogFacility: "LOCAL1",
DisableUpdateCheck: true,
DisableAnonymousSignature: true,
Atlas: &AtlasConfig{
Infrastructure: "armon/test",
Token: "abcd",
Join: true,
Endpoint: "127.0.0.1:1234",
},
HTTPAPIResponseHeaders: map[string]string{
"Access-Control-Allow-Origin": "*",
},
},
false,
},
}
for _, tc := range cases {
t.Logf("Testing parse: %s", tc.File)
path, err := filepath.Abs(filepath.Join("./config-test-fixtures", tc.File))
if err != nil {
t.Fatalf("file: %s\n\n%s", tc.File, err)
continue
}
actual, err := ParseConfigFile(path)
if (err != nil) != tc.Err {
t.Fatalf("file: %s\n\n%s", tc.File, err)
continue
}
if !reflect.DeepEqual(actual, tc.Result) {
t.Fatalf("file: %s\n\n%#v\n\n%#v", tc.File, actual, tc.Result)
}
}
}

View file

@ -41,6 +41,15 @@ func TestConfig_Merge(t *testing.T) {
},
NetworkSpeed: 100,
MaxKillTimeout: "20s",
ClientMaxPort: 19996,
Reserved: &Resources{
CPU: 10,
MemoryMB: 10,
DiskMB: 10,
IOPS: 10,
ReservedPorts: "1,10-30,55",
ParsedReservedPorts: []int{1, 2, 4},
},
},
Server: &ServerConfig{
Enabled: false,
@ -109,6 +118,14 @@ func TestConfig_Merge(t *testing.T) {
ClientMinPort: 22000,
NetworkSpeed: 105,
MaxKillTimeout: "50s",
Reserved: &Resources{
CPU: 15,
MemoryMB: 15,
DiskMB: 15,
IOPS: 15,
ReservedPorts: "2,10-30,55",
ParsedReservedPorts: []int{1, 2, 3},
},
},
Server: &ServerConfig{
Enabled: true,
@ -149,13 +166,13 @@ func TestConfig_Merge(t *testing.T) {
result := c1.Merge(c2)
if !reflect.DeepEqual(result, c2) {
t.Fatalf("bad:\n%#v\n%#v", result.Server, c2.Server)
t.Fatalf("bad:\n%#v\n%#v", result, c2)
}
}
func TestConfig_LoadConfigFile(t *testing.T) {
func TestConfig_ParseConfigFile(t *testing.T) {
// Fails if the file doesn't exist
if _, err := LoadConfigFile("/unicorns/leprechauns"); err == nil {
if _, err := ParseConfigFile("/unicorns/leprechauns"); err == nil {
t.Fatalf("expected error, got nothing")
}
@ -169,7 +186,7 @@ func TestConfig_LoadConfigFile(t *testing.T) {
if _, err := fh.WriteString("nope;!!!"); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := LoadConfigFile(fh.Name()); err == nil {
if _, err := ParseConfigFile(fh.Name()); err == nil {
t.Fatalf("expected load error, got nothing")
}
@ -184,7 +201,7 @@ func TestConfig_LoadConfigFile(t *testing.T) {
t.Fatalf("err: %s", err)
}
config, err := LoadConfigFile(fh.Name())
config, err := ParseConfigFile(fh.Name())
if err != nil {
t.Fatalf("err: %s", err)
}
@ -380,167 +397,50 @@ func TestConfig_Listener(t *testing.T) {
}
}
func TestConfig_LoadConfigString(t *testing.T) {
// Load the config
config, err := LoadConfigString(testConfig)
if err != nil {
t.Fatalf("err: %s", err)
}
// Expected output
expect := &Config{
Region: "foobar",
Datacenter: "dc2",
NodeName: "my-web",
DataDir: "/tmp/nomad",
LogLevel: "ERR",
BindAddr: "192.168.0.1",
EnableDebug: true,
Ports: &Ports{
HTTP: 1234,
RPC: 2345,
Serf: 3456,
func TestResources_ParseReserved(t *testing.T) {
cases := []struct {
Input string
Parsed []int
Err bool
}{
{
"1,2,3",
[]int{1, 2, 3},
false,
},
Addresses: &Addresses{
HTTP: "127.0.0.1",
RPC: "127.0.0.2",
Serf: "127.0.0.3",
{
"3,1,2,1,2,3,1-3",
[]int{1, 2, 3},
false,
},
AdvertiseAddrs: &AdvertiseAddrs{
RPC: "127.0.0.3",
Serf: "127.0.0.4",
{
"3-1",
nil,
true,
},
Client: &ClientConfig{
Enabled: true,
StateDir: "/tmp/client-state",
AllocDir: "/tmp/alloc",
Servers: []string{"a.b.c:80", "127.0.0.1:1234"},
NodeClass: "linux-medium-64bit",
Meta: map[string]string{
"foo": "bar",
"baz": "zip",
},
Options: map[string]string{
"foo": "bar",
"baz": "zip",
},
NetworkSpeed: 100,
{
"1-3,2-4",
[]int{1, 2, 3, 4},
false,
},
Server: &ServerConfig{
Enabled: true,
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
NumSchedulers: 2,
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
HeartbeatGrace: "30s",
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
},
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.1:1234",
StatsdAddr: "127.0.0.1:2345",
DisableHostname: true,
},
LeaveOnInt: true,
LeaveOnTerm: true,
EnableSyslog: true,
SyslogFacility: "LOCAL1",
DisableUpdateCheck: true,
DisableAnonymousSignature: true,
Atlas: &AtlasConfig{
Infrastructure: "armon/test",
Token: "abcd",
Join: true,
Endpoint: "127.0.0.1:1234",
},
HTTPAPIResponseHeaders: map[string]string{
"Access-Control-Allow-Origin": "*",
{
"1-3,4,5-5,6,7,8-10",
[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
false,
},
}
// Check parsing
if !reflect.DeepEqual(config, expect) {
t.Fatalf("bad: got: %#v\nexpect: %#v", config, expect)
}
}
for i, tc := range cases {
r := &Resources{ReservedPorts: tc.Input}
err := r.ParseReserved()
if (err != nil) != tc.Err {
t.Fatalf("test case %d: %v", i, err)
continue
}
if !reflect.DeepEqual(r.ParsedReservedPorts, tc.Parsed) {
t.Fatalf("test case %d: \n\n%#v\n\n%#v", i, r.ParsedReservedPorts, tc.Parsed)
}
const testConfig = `
region = "foobar"
datacenter = "dc2"
name = "my-web"
data_dir = "/tmp/nomad"
log_level = "ERR"
bind_addr = "192.168.0.1"
enable_debug = true
ports {
http = 1234
rpc = 2345
serf = 3456
}
addresses {
http = "127.0.0.1"
rpc = "127.0.0.2"
serf = "127.0.0.3"
}
advertise {
rpc = "127.0.0.3"
serf = "127.0.0.4"
}
client {
enabled = true
state_dir = "/tmp/client-state"
alloc_dir = "/tmp/alloc"
servers = ["a.b.c:80", "127.0.0.1:1234"]
node_id = "xyz123"
node_class = "linux-medium-64bit"
meta {
foo = "bar"
baz = "zip"
}
options {
foo = "bar"
baz = "zip"
}
network_speed = 100
}
server {
enabled = true
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
num_schedulers = 2
enabled_schedulers = ["test"]
node_gc_threshold = "12h"
heartbeat_grace = "30s"
retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3
retry_interval = "15s"
rejoin_after_leave = true
}
telemetry {
statsite_address = "127.0.0.1:1234"
statsd_address = "127.0.0.1:2345"
disable_hostname = true
}
leave_on_interrupt = true
leave_on_terminate = true
enable_syslog = true
syslog_facility = "LOCAL1"
disable_update_check = true
disable_anonymous_signature = true
atlas {
infrastructure = "armon/test"
token = "abcd"
join = true
endpoint = "127.0.0.1:1234"
}
http_api_response_headers {
Access-Control-Allow-Origin = "*"
}
`

View file

@ -18,6 +18,14 @@ client {
options {
"driver.raw_exec.enable" = "1"
}
reserved {
cpu = 300
memory = 301
disk = 302
iops = 303
reserved_ports = "1-3,80,81-83"
}
}
# Modify our port to avoid a collision with server1

View file

@ -62,6 +62,9 @@ server {
client {
enabled = true
network_speed = 10
options {
"driver.raw_exec.enable" = "1"
}
}
atlas {
@ -190,7 +193,7 @@ nodes, unless otherwise specified:
* `disable_anonymous_signature`: Disables providing an anonymous signature
for de-duplication with the update check. See `disable_update_check`.
* `http_api_response_headers`: This object allows adding headers to the
* `http_api_response_headers`: This object allows adding headers to the
HTTP API responses. For example, the following config can be used to enable
CORS on the HTTP API endpoints:
```
@ -296,6 +299,27 @@ configured on server nodes.
task specifies a `kill_timeout` greater than `max_kill_timeout`,
`max_kill_timeout` is used. This is to prevent a user being able to set an
unreasonable timeout. If unset, a default is used.
* `reserved`: `reserved` is used to reserve a portion of the nodes resources
from being used by Nomad when placing tasks. It can be used to target
a certain capacity usage for the node. For example, 20% of the nodes CPU
could be reserved to target a CPU utilization of 80%. The block has the
following format:
```
reserved {
cpu = 500
memory = 512
disk = 1024
reserved_ports = "22,80,8500-8600"
}
```
* `cpu`: `cpu` is given as MHz to reserve.
* `memory`: `memory` is given as MB to reserve.
* `disk`: `disk` is given as MB to reserve.
* `reserved_ports`: `reserved_ports` is a comma seperated list of ports
to reserve on all fingerprinted network devices. Ranges can be
specified by using a hyphen seperated the two inclusive ends.
### Client Options Map <a id="options_map"></a>