Merge branches 'master' and 'f-config-consul-block' of github.com:hashicorp/nomad into f-config-consul-block
This commit is contained in:
commit
d94e651c6e
|
@ -1173,15 +1173,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error {
|
|||
|
||||
// setupConsulClient creates a ConsulService
|
||||
func (c *Client) setupConsulClient() error {
|
||||
cfg := consul.ConsulConfig{
|
||||
Addr: c.config.ReadDefault("consul.address", "127.0.0.1:8500"),
|
||||
Token: c.config.Read("consul.token"),
|
||||
Auth: c.config.Read("consul.auth"),
|
||||
EnableSSL: c.config.ReadBoolDefault("consul.ssl", false),
|
||||
VerifySSL: c.config.ReadBoolDefault("consul.verifyssl", true),
|
||||
}
|
||||
|
||||
cs, err := consul.NewConsulService(&cfg, c.logger, "")
|
||||
cs, err := consul.NewConsulService(c.config.ConsulConfig, c.logger)
|
||||
c.consulService = cs
|
||||
return err
|
||||
}
|
||||
|
@ -1216,12 +1208,14 @@ func (c *Client) syncConsul() {
|
|||
if taskState.State == structs.TaskStateRunning {
|
||||
if tr, ok := ar.tasks[taskName]; ok {
|
||||
for _, service := range tr.task.Services {
|
||||
services[service.ID(ar.alloc.ID, tr.task.Name)] = struct{}{}
|
||||
svcIdentifier := fmt.Sprintf("%s-%s", ar.alloc.ID, tr.task.Name)
|
||||
services[service.ID(svcIdentifier)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.consulService.KeepServices(services); err != nil {
|
||||
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -71,6 +72,7 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
|
|||
func testClient(t *testing.T, cb func(c *config.Config)) *Client {
|
||||
conf := DefaultConfig()
|
||||
conf.DevMode = true
|
||||
conf.ConsulConfig = &consul.ConsulConfig{}
|
||||
if cb != nil {
|
||||
cb(conf)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
|
@ -108,6 +109,11 @@ type Config struct {
|
|||
|
||||
// Revision is the commit number of the Nomad client
|
||||
Revision string
|
||||
|
||||
ClientServiceName string
|
||||
|
||||
// ConsulConfig is the configuration to connect with Consul Agent
|
||||
ConsulConfig *consul.ConsulConfig
|
||||
}
|
||||
|
||||
func (c *Config) Copy() *Config {
|
||||
|
|
|
@ -22,10 +22,10 @@ type ConsulService struct {
|
|||
client *consul.Client
|
||||
availble bool
|
||||
|
||||
task *structs.Task
|
||||
allocID string
|
||||
delegateChecks map[string]struct{}
|
||||
createCheck func(*structs.ServiceCheck, string) (Check, error)
|
||||
serviceIdentifier string
|
||||
delegateChecks map[string]struct{}
|
||||
createCheck func(*structs.ServiceCheck, string) (Check, error)
|
||||
addrFinder func(string) (string, int)
|
||||
|
||||
trackedServices map[string]*consul.AgentService
|
||||
trackedChecks map[string]*consul.AgentCheckRegistration
|
||||
|
@ -60,7 +60,7 @@ const (
|
|||
)
|
||||
|
||||
// NewConsulService returns a new ConsulService
|
||||
func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) (*ConsulService, error) {
|
||||
func NewConsulService(config *ConsulConfig, logger *log.Logger) (*ConsulService, error) {
|
||||
var err error
|
||||
var c *consul.Client
|
||||
cfg := consul.DefaultConfig()
|
||||
|
@ -114,7 +114,6 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string)
|
|||
}
|
||||
consulService := ConsulService{
|
||||
client: c,
|
||||
allocID: allocID,
|
||||
logger: logger,
|
||||
trackedServices: make(map[string]*consul.AgentService),
|
||||
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
|
||||
|
@ -133,15 +132,26 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c
|
|||
return c
|
||||
}
|
||||
|
||||
// SyncTask sync the services and task with consul
|
||||
func (c *ConsulService) SyncTask(task *structs.Task) error {
|
||||
// SetAddrFinder sets a function to find the host and port for a Service
|
||||
func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService {
|
||||
c.addrFinder = addrFinder
|
||||
return c
|
||||
}
|
||||
|
||||
// SetServiceIdentifier sets the identifier of the services we are syncing with Consul
|
||||
func (c *ConsulService) SetServiceIdentifier(serviceIdentifier string) *ConsulService {
|
||||
c.serviceIdentifier = serviceIdentifier
|
||||
return c
|
||||
}
|
||||
|
||||
// SyncServices sync the services with consul
|
||||
func (c *ConsulService) SyncServices(services []*structs.Service) error {
|
||||
var mErr multierror.Error
|
||||
c.task = task
|
||||
taskServices := make(map[string]*consul.AgentService)
|
||||
taskChecks := make(map[string]*consul.AgentCheckRegistration)
|
||||
|
||||
// Register Services and Checks that we don't know about or has changed
|
||||
for _, service := range task.Services {
|
||||
for _, service := range services {
|
||||
srv, err := c.createService(service)
|
||||
if err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
|
@ -296,11 +306,11 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con
|
|||
// createService creates a Consul AgentService from a Nomad Service
|
||||
func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) {
|
||||
srv := consul.AgentService{
|
||||
ID: service.ID(c.allocID, c.task.Name),
|
||||
ID: service.ID(c.serviceIdentifier),
|
||||
Service: service.Name,
|
||||
Tags: service.Tags,
|
||||
}
|
||||
host, port := c.task.FindHostAndPortFor(service.PortLabel)
|
||||
host, port := c.addrFinder(service.PortLabel)
|
||||
if host != "" {
|
||||
srv.Address = host
|
||||
}
|
||||
|
@ -350,7 +360,7 @@ func (c *ConsulService) PeriodicSync() {
|
|||
case <-sync.C:
|
||||
if err := c.performSync(); err != nil {
|
||||
if c.availble {
|
||||
c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err)
|
||||
c.logger.Printf("[DEBUG] consul: error in syncing services for %q: %v", c.serviceIdentifier, err)
|
||||
}
|
||||
c.availble = false
|
||||
} else {
|
||||
|
@ -358,7 +368,7 @@ func (c *ConsulService) PeriodicSync() {
|
|||
}
|
||||
case <-c.shutdownCh:
|
||||
sync.Stop()
|
||||
c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name)
|
||||
c.logger.Printf("[INFO] consul: shutting down sync for %q", c.serviceIdentifier)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -401,7 +411,8 @@ func (c *ConsulService) performSync() error {
|
|||
func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService {
|
||||
nomadServices := make(map[string]*consul.AgentService)
|
||||
for _, srv := range srvcs {
|
||||
if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) {
|
||||
if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) &&
|
||||
!strings.HasPrefix(srv.ID, fmt.Sprintf("%s-%s", structs.NomadConsulPrefix, "agent")) {
|
||||
nomadServices[srv.ID] = srv
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,10 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
allocID = "12"
|
||||
)
|
||||
|
||||
var (
|
||||
logger = log.New(os.Stdout, "", log.LstdFlags)
|
||||
check1 = structs.ServiceCheck{
|
||||
|
@ -37,8 +41,7 @@ var (
|
|||
)
|
||||
|
||||
func TestConsulServiceRegisterServices(t *testing.T) {
|
||||
allocID := "12"
|
||||
cs, err := NewConsulService(&ConsulConfig{}, logger, allocID)
|
||||
cs, err := NewConsulService(&ConsulConfig{}, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Err: %v", err)
|
||||
}
|
||||
|
@ -47,13 +50,15 @@ func TestConsulServiceRegisterServices(t *testing.T) {
|
|||
return
|
||||
}
|
||||
task := mockTask()
|
||||
if err := cs.SyncTask(task); err != nil {
|
||||
cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
cs.SetAddrFinder(task.FindHostAndPortFor)
|
||||
if err := cs.SyncServices(task.Services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer cs.Shutdown()
|
||||
|
||||
service1ID := service1.ID(allocID, task.Name)
|
||||
service2ID := service2.ID(allocID, task.Name)
|
||||
service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil {
|
||||
t.Fatalf("err : %v", err)
|
||||
}
|
||||
|
@ -63,8 +68,7 @@ func TestConsulServiceRegisterServices(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestConsulServiceUpdateService(t *testing.T) {
|
||||
allocID := "12"
|
||||
cs, err := NewConsulService(&ConsulConfig{}, logger, allocID)
|
||||
cs, err := NewConsulService(&ConsulConfig{}, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Err: %v", err)
|
||||
}
|
||||
|
@ -74,7 +78,9 @@ func TestConsulServiceUpdateService(t *testing.T) {
|
|||
}
|
||||
|
||||
task := mockTask()
|
||||
if err := cs.SyncTask(task); err != nil {
|
||||
cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
cs.SetAddrFinder(task.FindHostAndPortFor)
|
||||
if err := cs.SyncServices(task.Services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer cs.Shutdown()
|
||||
|
@ -82,12 +88,12 @@ func TestConsulServiceUpdateService(t *testing.T) {
|
|||
//Update Service defn 1
|
||||
newTags := []string{"tag3"}
|
||||
task.Services[0].Tags = newTags
|
||||
if err := cs.SyncTask(task); err != nil {
|
||||
if err := cs.SyncServices(task.Services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Make sure all the services and checks are still present
|
||||
service1ID := service1.ID(allocID, task.Name)
|
||||
service2ID := service2.ID(allocID, task.Name)
|
||||
service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil {
|
||||
t.Fatalf("err : %v", err)
|
||||
}
|
||||
|
|
|
@ -317,7 +317,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
|
|||
|
||||
// Re-syncing task with consul service
|
||||
if e.consulService != nil {
|
||||
if err := e.consulService.SyncTask(task); err != nil {
|
||||
if err := e.consulService.SyncServices(task.Services); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -431,17 +431,19 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
|
|||
e.logger.Printf("[INFO] executor: registering services")
|
||||
e.consulCtx = ctx
|
||||
if e.consulService == nil {
|
||||
cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger, e.ctx.AllocID)
|
||||
cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
|
||||
cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", e.ctx.AllocID, e.ctx.Task.Name))
|
||||
cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
|
||||
e.consulService = cs
|
||||
}
|
||||
if e.ctx != nil {
|
||||
e.interpolateServices(e.ctx.Task)
|
||||
}
|
||||
err := e.consulService.SyncTask(e.ctx.Task)
|
||||
err := e.consulService.SyncServices(e.ctx.Task.Services)
|
||||
go e.consulService.PeriodicSync()
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -8,11 +8,13 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
@ -27,6 +29,11 @@ type Agent struct {
|
|||
logger *log.Logger
|
||||
logOutput io.Writer
|
||||
|
||||
consulService *consul.ConsulService
|
||||
consulConfig *consul.ConsulConfig
|
||||
serverHTTPAddr string
|
||||
clientHTTPAddr string
|
||||
|
||||
server *nomad.Server
|
||||
client *client.Client
|
||||
|
||||
|
@ -49,6 +56,8 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
a.createConsulConfig()
|
||||
|
||||
if err := a.setupServer(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -58,6 +67,12 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
if a.client == nil && a.server == nil {
|
||||
return nil, fmt.Errorf("must have at least client or server mode enabled")
|
||||
}
|
||||
if err := a.syncAgentServicesWithConsul(a.serverHTTPAddr, a.clientHTTPAddr); err != nil {
|
||||
a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err)
|
||||
}
|
||||
if a.consulService != nil {
|
||||
go a.consulService.PeriodicSync()
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
|
@ -140,6 +155,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
|
|||
if port := a.config.Ports.Serf; port != 0 {
|
||||
conf.SerfConfig.MemberlistConfig.BindPort = port
|
||||
}
|
||||
a.serverHTTPAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP)
|
||||
|
||||
if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" {
|
||||
dur, err := time.ParseDuration(gcThreshold)
|
||||
|
@ -226,6 +242,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
|
|||
httpAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
|
||||
}
|
||||
conf.Node.HTTPAddr = httpAddr
|
||||
a.clientHTTPAddr = httpAddr
|
||||
|
||||
// Reserve resources on the node.
|
||||
r := conf.Node.Reserved
|
||||
|
@ -242,6 +259,8 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
|
|||
conf.Version = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease)
|
||||
conf.Revision = a.config.Revision
|
||||
|
||||
conf.ConsulConfig = a.consulConfig
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
|
@ -403,6 +422,12 @@ func (a *Agent) Shutdown() error {
|
|||
}
|
||||
}
|
||||
|
||||
if a.consulService != nil {
|
||||
if err := a.consulService.Shutdown(); err != nil {
|
||||
a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
a.logger.Println("[INFO] agent: shutdown complete")
|
||||
a.shutdown = true
|
||||
close(a.shutdownCh)
|
||||
|
@ -445,3 +470,67 @@ func (a *Agent) Stats() map[string]map[string]string {
|
|||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
func (a *Agent) createConsulConfig() {
|
||||
cfg := &consul.ConsulConfig{
|
||||
Addr: a.config.ConsulConfig.Addr,
|
||||
Token: a.config.ConsulConfig.Token,
|
||||
Auth: a.config.ConsulConfig.Auth,
|
||||
EnableSSL: a.config.ConsulConfig.EnableSSL,
|
||||
VerifySSL: a.config.ConsulConfig.VerifySSL,
|
||||
CAFile: a.config.ConsulConfig.CAFile,
|
||||
CertFile: a.config.ConsulConfig.CertFile,
|
||||
KeyFile: a.config.ConsulConfig.KeyFile,
|
||||
}
|
||||
a.consulConfig = cfg
|
||||
}
|
||||
|
||||
// syncAgentServicesWithConsul syncs the client and server services with Consul
|
||||
func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error {
|
||||
cs, err := consul.NewConsulService(a.consulConfig, a.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.consulService = cs
|
||||
var services []*structs.Service
|
||||
addrs := make(map[string]string)
|
||||
if a.client != nil && a.config.ConsulConfig.ClientServiceName != "" {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clientService := &structs.Service{
|
||||
Name: a.config.ConsulConfig.ClientServiceName,
|
||||
PortLabel: "clienthttpaddr",
|
||||
}
|
||||
addrs["clienthttpaddr"] = clientHttpAddr
|
||||
services = append(services, clientService)
|
||||
cs.SetServiceIdentifier("agent-client")
|
||||
}
|
||||
if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" {
|
||||
serverService := &structs.Service{
|
||||
Name: a.config.ConsulConfig.ServerServiceName,
|
||||
PortLabel: "serverhttpaddr",
|
||||
}
|
||||
addrs["serverhttpaddr"] = serverHttpAddr
|
||||
services = append(services, serverService)
|
||||
cs.SetServiceIdentifier("agent-server")
|
||||
}
|
||||
|
||||
cs.SetAddrFinder(func(portLabel string) (string, int) {
|
||||
addr := addrs[portLabel]
|
||||
if addr == "" {
|
||||
return "", 0
|
||||
}
|
||||
host, port, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
return "", 0
|
||||
}
|
||||
p, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
return "", 0
|
||||
}
|
||||
return host, p
|
||||
})
|
||||
|
||||
return cs.SyncServices(services)
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ func makeAgent(t *testing.T, cb func(*Config)) (string, *Agent) {
|
|||
Serf: getPort(),
|
||||
}
|
||||
conf.NodeName = fmt.Sprintf("Node %d", conf.Ports.RPC)
|
||||
conf.ConsulConfig = &ConsulConfig{}
|
||||
|
||||
// Tighten the Serf timing
|
||||
config.SerfConfig.MemberlistConfig.SuspicionMult = 2
|
||||
|
|
|
@ -58,10 +58,11 @@ func (c *Command) readConfig() *Config {
|
|||
|
||||
// Make a new, empty config.
|
||||
cmdConfig := &Config{
|
||||
Atlas: &AtlasConfig{},
|
||||
Client: &ClientConfig{},
|
||||
Ports: &Ports{},
|
||||
Server: &ServerConfig{},
|
||||
Atlas: &AtlasConfig{},
|
||||
ConsulConfig: &ConsulConfig{},
|
||||
Client: &ClientConfig{},
|
||||
Ports: &Ports{},
|
||||
Server: &ServerConfig{},
|
||||
}
|
||||
|
||||
flags := flag.NewFlagSet("agent", flag.ContinueOnError)
|
||||
|
|
|
@ -81,3 +81,17 @@ atlas {
|
|||
http_api_response_headers {
|
||||
Access-Control-Allow-Origin = "*"
|
||||
}
|
||||
consul {
|
||||
server_service_name = "nomad-server"
|
||||
client_service_name = "nomad-client"
|
||||
addr = "127.0.0.1:9500"
|
||||
token = "token1"
|
||||
auth = "username:pass"
|
||||
ssl = true
|
||||
verify_ssl = false
|
||||
ca_file = "/path/to/ca/file"
|
||||
cert_file = "/path/to/cert/file"
|
||||
key_file = "/path/to/key/file"
|
||||
server_auto_join = true
|
||||
client_auto_join = true
|
||||
}
|
||||
|
|
|
@ -82,6 +82,10 @@ type Config struct {
|
|||
// AtlasConfig is used to configure Atlas
|
||||
Atlas *AtlasConfig `mapstructure:"atlas"`
|
||||
|
||||
// ConsulConfig is used to configure Consul clients and register the nomad
|
||||
// server and client services with Consul
|
||||
ConsulConfig *ConsulConfig `mapstructure:"consul"`
|
||||
|
||||
// NomadConfig is used to override the default config.
|
||||
// This is largly used for testing purposes.
|
||||
NomadConfig *nomad.Config `mapstructure:"-" json:"-"`
|
||||
|
@ -124,6 +128,57 @@ type AtlasConfig struct {
|
|||
Endpoint string `mapstructure:"endpoint"`
|
||||
}
|
||||
|
||||
// ConsulConfig is used to configure Consul clients and register the nomad
|
||||
// server and client services with Consul
|
||||
type ConsulConfig struct {
|
||||
|
||||
// ServerServiceName is the name of the service that Nomad uses to register
|
||||
// servers with Consul
|
||||
ServerServiceName string `mapstructure:"server_service_name"`
|
||||
|
||||
// ClientServiceName is the name of the service that Nomad uses to register
|
||||
// clients with Consul
|
||||
ClientServiceName string `mapstructure:"client_service_name"`
|
||||
|
||||
// AutoRegister determines if Nomad will register the Nomad client and
|
||||
// server agents with Consul
|
||||
AutoRegister bool `mapstructure:"auto_register"`
|
||||
|
||||
// Addr is the address of the local Consul agent
|
||||
Addr string `mapstructure:"addr"`
|
||||
|
||||
// Token is used to provide a per-request ACL token.This options overrides
|
||||
// the agent's default token
|
||||
Token string `mapstructure:"token"`
|
||||
|
||||
// Auth is the information to use for http access to Consul agent
|
||||
Auth string `mapstructure:"auth"`
|
||||
|
||||
// EnableSSL sets the transport scheme to talk to the Consul agent as https
|
||||
EnableSSL bool `mapstructure:"ssl"`
|
||||
|
||||
// VerifySSL enables or disables SSL verification when the transport scheme
|
||||
// for the consul api client is https
|
||||
VerifySSL bool `mapstructure:"verify_ssl"`
|
||||
|
||||
// CAFile is the path to the ca certificate used for Consul communication
|
||||
CAFile string `mapstructure:"ca_file"`
|
||||
|
||||
// CertFile is the path to the certificate for Consul communication
|
||||
CertFile string `mapstructure:"cert_file"`
|
||||
|
||||
// KeyFile is the path to the private key for Consul communication
|
||||
KeyFile string `mapstructure:"key_file"`
|
||||
|
||||
// ServerAutoJoin enables Nomad servers to find peers by querying Consul and
|
||||
// joining them
|
||||
ServerAutoJoin bool `mapstructure:"server_auto_join"`
|
||||
|
||||
// ClientAutoJoin enables Nomad servers to find addresses of Nomad servers
|
||||
// and register with them
|
||||
ClientAutoJoin bool `mapstructure:"client_auto_join"`
|
||||
}
|
||||
|
||||
// ClientConfig is configuration specific to the client mode
|
||||
type ClientConfig struct {
|
||||
// Enabled controls if we are a client
|
||||
|
@ -368,6 +423,11 @@ func DefaultConfig() *Config {
|
|||
Addresses: &Addresses{},
|
||||
AdvertiseAddrs: &AdvertiseAddrs{},
|
||||
Atlas: &AtlasConfig{},
|
||||
ConsulConfig: &ConsulConfig{
|
||||
ServerServiceName: "nomad-server",
|
||||
ClientServiceName: "nomad-client",
|
||||
AutoRegister: true,
|
||||
},
|
||||
Client: &ClientConfig{
|
||||
Enabled: false,
|
||||
NetworkSpeed: 100,
|
||||
|
@ -512,6 +572,14 @@ func (c *Config) Merge(b *Config) *Config {
|
|||
result.Atlas = result.Atlas.Merge(b.Atlas)
|
||||
}
|
||||
|
||||
// Apply the Consul Configuration
|
||||
if result.ConsulConfig == nil && b.ConsulConfig != nil {
|
||||
consulConfig := *b.ConsulConfig
|
||||
result.ConsulConfig = &consulConfig
|
||||
} else if b.ConsulConfig != nil {
|
||||
result.ConsulConfig = result.ConsulConfig.Merge(b.ConsulConfig)
|
||||
}
|
||||
|
||||
// Merge config files lists
|
||||
result.Files = append(result.Files, b.Files...)
|
||||
|
||||
|
@ -710,6 +778,49 @@ func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig {
|
|||
return &result
|
||||
}
|
||||
|
||||
// Merge merges two Consul Configurations together.
|
||||
func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig {
|
||||
result := *a
|
||||
|
||||
if b.ServerServiceName != "" {
|
||||
result.ServerServiceName = b.ServerServiceName
|
||||
}
|
||||
if b.ClientServiceName != "" {
|
||||
result.ClientServiceName = b.ClientServiceName
|
||||
}
|
||||
if b.Addr != "" {
|
||||
result.Addr = b.Addr
|
||||
}
|
||||
if b.Token != "" {
|
||||
result.Token = b.Token
|
||||
}
|
||||
if b.Auth != "" {
|
||||
result.Auth = b.Auth
|
||||
}
|
||||
if b.EnableSSL {
|
||||
result.EnableSSL = true
|
||||
}
|
||||
if b.VerifySSL {
|
||||
result.VerifySSL = true
|
||||
}
|
||||
if b.CAFile != "" {
|
||||
result.CAFile = b.CAFile
|
||||
}
|
||||
if b.CertFile != "" {
|
||||
result.CertFile = b.CertFile
|
||||
}
|
||||
if b.KeyFile != "" {
|
||||
result.KeyFile = b.KeyFile
|
||||
}
|
||||
if b.ServerAutoJoin {
|
||||
result.ServerAutoJoin = true
|
||||
}
|
||||
if b.ClientAutoJoin {
|
||||
result.ClientAutoJoin = true
|
||||
}
|
||||
return &result
|
||||
}
|
||||
|
||||
func (r *Resources) Merge(b *Resources) *Resources {
|
||||
result := *r
|
||||
if b.CPU != 0 {
|
||||
|
|
|
@ -90,6 +90,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error {
|
|||
"disable_update_check",
|
||||
"disable_anonymous_signature",
|
||||
"atlas",
|
||||
"consul",
|
||||
"http_api_response_headers",
|
||||
}
|
||||
if err := checkHCLKeys(list, valid); err != nil {
|
||||
|
@ -109,6 +110,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error {
|
|||
delete(m, "server")
|
||||
delete(m, "telemetry")
|
||||
delete(m, "atlas")
|
||||
delete(m, "consul")
|
||||
delete(m, "http_api_response_headers")
|
||||
|
||||
// Decode the rest
|
||||
|
@ -165,6 +167,13 @@ func parseConfig(result *Config, list *ast.ObjectList) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Parse the consul config
|
||||
if o := list.Filter("consul"); len(o.Items) > 0 {
|
||||
if err := parseConsulConfig(&result.ConsulConfig, o); err != nil {
|
||||
return multierror.Prefix(err, "consul ->")
|
||||
}
|
||||
}
|
||||
|
||||
// Parse out http_api_response_headers fields. These are in HCL as a list so
|
||||
// we need to iterate over them and merge them.
|
||||
if headersO := list.Filter("http_api_response_headers"); len(headersO.Items) > 0 {
|
||||
|
@ -530,6 +539,49 @@ func parseAtlas(result **AtlasConfig, list *ast.ObjectList) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseConsulConfig(result **ConsulConfig, list *ast.ObjectList) error {
|
||||
list = list.Elem()
|
||||
if len(list.Items) > 1 {
|
||||
return fmt.Errorf("only one 'consul' block allowed")
|
||||
}
|
||||
|
||||
// Get our consul object
|
||||
listVal := list.Items[0].Val
|
||||
|
||||
// Check for invalid keys
|
||||
valid := []string{
|
||||
"server_service_name",
|
||||
"client_service_name",
|
||||
"addr",
|
||||
"token",
|
||||
"auth",
|
||||
"ssl",
|
||||
"verify_ssl",
|
||||
"ca_file",
|
||||
"cert_file",
|
||||
"key_file",
|
||||
"client_auto_join",
|
||||
"server_auto_join",
|
||||
}
|
||||
|
||||
if err := checkHCLKeys(listVal, valid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var m map[string]interface{}
|
||||
if err := hcl.DecodeObject(&m, listVal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var consulConfig ConsulConfig
|
||||
if err := mapstructure.WeakDecode(m, &consulConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*result = &consulConfig
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkHCLKeys(node ast.Node, valid []string) error {
|
||||
var list *ast.ObjectList
|
||||
switch n := node.(type) {
|
||||
|
|
|
@ -96,6 +96,20 @@ func TestConfig_Parse(t *testing.T) {
|
|||
Join: true,
|
||||
Endpoint: "127.0.0.1:1234",
|
||||
},
|
||||
ConsulConfig: &ConsulConfig{
|
||||
ServerServiceName: "nomad-server",
|
||||
ClientServiceName: "nomad-client",
|
||||
Addr: "127.0.0.1:9500",
|
||||
Token: "token1",
|
||||
Auth: "username:pass",
|
||||
EnableSSL: true,
|
||||
VerifySSL: false,
|
||||
CAFile: "/path/to/ca/file",
|
||||
CertFile: "/path/to/cert/file",
|
||||
KeyFile: "/path/to/key/file",
|
||||
ServerAutoJoin: true,
|
||||
ClientAutoJoin: true,
|
||||
},
|
||||
HTTPAPIResponseHeaders: map[string]string{
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
},
|
||||
|
|
130
command/check.go
Normal file
130
command/check.go
Normal file
|
@ -0,0 +1,130 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
HealthCritical = 2
|
||||
HealthWarn = 1
|
||||
HealthPass = 0
|
||||
HealthUnknown = 3
|
||||
)
|
||||
|
||||
type AgentCheckCommand struct {
|
||||
Meta
|
||||
}
|
||||
|
||||
func (c *AgentCheckCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: nomad check
|
||||
|
||||
Display state of the Nomad agent. The exit code of the command is Nagios
|
||||
compatible and could be used with alerting systems.
|
||||
|
||||
General Options:
|
||||
|
||||
` + generalOptionsUsage() + `
|
||||
|
||||
Agent Check Options:
|
||||
|
||||
-min-peers
|
||||
Minimum number of peers that a server is expected to know.
|
||||
|
||||
-min-servers
|
||||
Minumum number of servers that a client is expected to know.
|
||||
`
|
||||
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *AgentCheckCommand) Synopsis() string {
|
||||
return "Displays health of the local Nomad agent"
|
||||
}
|
||||
|
||||
func (c *AgentCheckCommand) Run(args []string) int {
|
||||
var minPeers, minServers int
|
||||
|
||||
flags := c.Meta.FlagSet("check", FlagSetClient)
|
||||
flags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
flags.IntVar(&minPeers, "min-peers", 0, "")
|
||||
flags.IntVar(&minServers, "min-servers", 1, "")
|
||||
|
||||
if err := flags.Parse(args); err != nil {
|
||||
return 1
|
||||
}
|
||||
|
||||
client, err := c.Meta.Client()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("error initializing client: %s", err))
|
||||
return HealthCritical
|
||||
}
|
||||
|
||||
info, err := client.Agent().Self()
|
||||
if err != nil {
|
||||
c.Ui.Output(fmt.Sprintf("unable to query agent info: %v", err))
|
||||
return HealthCritical
|
||||
}
|
||||
if _, ok := info["stats"]["nomad"]; ok {
|
||||
return c.checkServerHealth(info["stats"], minPeers)
|
||||
}
|
||||
|
||||
if _, ok := info["stats"]["client"]; ok {
|
||||
return c.checkClientHealth(info["stats"], minServers)
|
||||
}
|
||||
return HealthWarn
|
||||
}
|
||||
|
||||
// checkServerHealth returns the health of a server.
|
||||
// TODO Add more rules for determining server health
|
||||
func (c *AgentCheckCommand) checkServerHealth(info map[string]interface{}, minPeers int) int {
|
||||
raft := info["raft"].(map[string]interface{})
|
||||
knownPeers, err := strconv.Atoi(raft["num_peers"].(string))
|
||||
if err != nil {
|
||||
c.Ui.Output(fmt.Sprintf("unable to get known peers: %v", err))
|
||||
return HealthCritical
|
||||
}
|
||||
|
||||
if knownPeers < minPeers {
|
||||
c.Ui.Output(fmt.Sprintf("known peers: %v, is less than expected number of peers: %v", knownPeers, minPeers))
|
||||
return HealthCritical
|
||||
}
|
||||
return HealthPass
|
||||
}
|
||||
|
||||
// checkClientHealth retuns the health of a client
|
||||
func (c *AgentCheckCommand) checkClientHealth(info map[string]interface{}, minServers int) int {
|
||||
clientStats := info["client"].(map[string]interface{})
|
||||
knownServers, err := strconv.Atoi(clientStats["known_servers"].(string))
|
||||
if err != nil {
|
||||
c.Ui.Output(fmt.Sprintf("unable to get known servers: %v", err))
|
||||
return HealthCritical
|
||||
}
|
||||
|
||||
heartbeatTTL, err := time.ParseDuration(clientStats["heartbeat_ttl"].(string))
|
||||
if err != nil {
|
||||
c.Ui.Output(fmt.Sprintf("unable to parse heartbeat TTL: %v", err))
|
||||
return HealthCritical
|
||||
}
|
||||
|
||||
lastHeartbeat, err := time.ParseDuration(clientStats["last_heartbeat"].(string))
|
||||
if err != nil {
|
||||
c.Ui.Output(fmt.Sprintf("unable to parse last heartbeat: %v", err))
|
||||
return HealthCritical
|
||||
}
|
||||
|
||||
if lastHeartbeat > heartbeatTTL {
|
||||
c.Ui.Output(fmt.Sprintf("last heartbeat was %q time ago, expected heartbeat ttl: %q", lastHeartbeat, heartbeatTTL))
|
||||
return HealthCritical
|
||||
}
|
||||
|
||||
if knownServers < minServers {
|
||||
c.Ui.Output(fmt.Sprintf("known servers: %v, is less than expected number of servers: %v", knownServers, minServers))
|
||||
return HealthCritical
|
||||
}
|
||||
|
||||
return HealthPass
|
||||
}
|
29
command/check_test.go
Normal file
29
command/check_test.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func TestAgentCheckCommand_ServerHealth(t *testing.T) {
|
||||
srv, _, url := testServer(t, nil)
|
||||
defer srv.Stop()
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
cmd := &AgentCheckCommand{Meta: Meta{Ui: ui}}
|
||||
address := fmt.Sprintf("-address=%s", url)
|
||||
|
||||
code := cmd.Run([]string{address})
|
||||
if code != HealthPass {
|
||||
t.Fatalf("expected exit: %v, actual: %d", HealthPass, code)
|
||||
}
|
||||
|
||||
minPeers := fmt.Sprintf("-min-peers=%v", 3)
|
||||
code = cmd.Run([]string{address, minPeers})
|
||||
if code != HealthCritical {
|
||||
t.Fatalf("expected exitcode: %v, actual: %v", HealthCritical, code)
|
||||
}
|
||||
|
||||
}
|
|
@ -30,7 +30,6 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
|
|||
Meta: meta,
|
||||
}, nil
|
||||
},
|
||||
|
||||
"agent": func() (cli.Command, error) {
|
||||
return &agent.Command{
|
||||
Revision: GitCommit,
|
||||
|
@ -40,13 +39,16 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
|
|||
ShutdownCh: make(chan struct{}),
|
||||
}, nil
|
||||
},
|
||||
|
||||
"agent-info": func() (cli.Command, error) {
|
||||
return &command.AgentInfoCommand{
|
||||
Meta: meta,
|
||||
}, nil
|
||||
},
|
||||
|
||||
"check": func() (cli.Command, error) {
|
||||
return &command.AgentCheckCommand{
|
||||
Meta: meta,
|
||||
}, nil
|
||||
},
|
||||
"client-config": func() (cli.Command, error) {
|
||||
return &command.ClientConfigCommand{
|
||||
Meta: meta,
|
||||
|
|
|
@ -1519,8 +1519,8 @@ func (s *Service) InitFields(job string, taskGroup string, task string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) ID(allocID string, taskName string) string {
|
||||
return fmt.Sprintf("%s-%s-%s-%s", NomadConsulPrefix, allocID, taskName, s.Hash())
|
||||
func (s *Service) ID(identifier string) string {
|
||||
return fmt.Sprintf("%s-%s-%s", NomadConsulPrefix, identifier, s.Hash())
|
||||
}
|
||||
|
||||
// Validate checks if the Check definition is valid
|
||||
|
|
Loading…
Reference in a new issue