Refactor Consul Syncer into new ServiceClient

Fixes #2478 #2474 #1995 #2294

The new client only handles agent and task service advertisement. Server
discovery is mostly unchanged.

The Nomad client agent now handles all Consul operations instead of the
executor handling task related operations. When upgrading from an
earlier version of Nomad existing executors will be told to deregister
from Consul so that the Nomad agent can re-register the task's services
and checks.

Drivers - other than qemu - now support an Exec method for executing
abritrary commands in a task's environment. This is used to implement
script checks.

Interfaces are used extensively to avoid interacting with Consul in
tests that don't assert any Consul related behavior.
This commit is contained in:
Michael Schurter 2017-01-31 16:43:57 -08:00
parent 291f4f0b44
commit e204a287ed
44 changed files with 2360 additions and 2509 deletions

View File

@ -59,7 +59,8 @@ type AllocRunner struct {
updateCh chan *structs.Allocation
vaultClient vaultclient.VaultClient
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI
otherAllocDir *allocdir.AllocDir
@ -96,20 +97,23 @@ type allocRunnerState struct {
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient) *AllocRunner {
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {
ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
}
return ar
}
@ -174,7 +178,7 @@ func (r *AllocRunner) RestoreState() error {
}
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr
// Skip tasks in terminal states.
@ -512,7 +516,7 @@ func (r *AllocRunner) Run() {
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
r.tasks[task.Name] = tr
tr.MarkReceived()

View File

@ -40,7 +40,7 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient())
return upd, ar
}
@ -323,7 +323,8 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
@ -415,7 +416,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
if err != nil {
@ -573,7 +574,8 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient)
defer ar.Destroy()
// RestoreState should fail on the task state since we only test the

View File

@ -34,8 +34,12 @@ var (
// included in snapshots.
SharedDataDir = "data"
// TmpDirName is the name of the temporary directory in each alloc and
// task.
TmpDirName = "tmp"
// The set of directories that exist inside eache shared alloc directory.
SharedAllocDirs = []string{LogDirName, "tmp", SharedDataDir}
SharedAllocDirs = []string{LogDirName, TmpDirName, SharedDataDir}
// The name of the directory that exists inside each task directory
// regardless of driver.
@ -46,7 +50,7 @@ var (
TaskSecrets = "secrets"
// TaskDirs is the set of directories created in each tasks directory.
TaskDirs = map[string]os.FileMode{"tmp": os.ModeSticky | 0777}
TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777}
)
type AllocDir struct {

View File

@ -49,10 +49,6 @@ const (
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 9
// consulReaperIntv is the interval at which the Consul reaper will
// run.
consulReaperIntv = 5 * time.Second
// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 15 * time.Second
@ -142,8 +138,12 @@ type Client struct {
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService ConsulServiceAPI
// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI
// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
@ -196,7 +196,7 @@ var (
)
// NewClient is used to create a new client from the given configuration
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) {
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService ConsulServiceAPI, logger *log.Logger) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {
@ -210,7 +210,8 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
// Create the client
c := &Client{
config: cfg,
consulSyncer: consulSyncer,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
@ -285,9 +286,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
}
}
// Start Consul reaper
go c.consulReaper()
// Setup the vault client for token and secret renewals
if err := c.setupVaultClient(); err != nil {
return nil, fmt.Errorf("failed to setup vault client: %v", err)
@ -606,7 +604,7 @@ func (c *Client) restoreState() error {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
@ -1894,7 +1892,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
defer c.allocLock.Unlock()
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()
go ar.Run()
@ -2047,8 +2045,7 @@ func (c *Client) consulDiscoveryImpl() error {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
dcs, err := c.consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
@ -2084,7 +2081,7 @@ DISCOLOOP:
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
consulServices, _, err := c.consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
@ -2143,54 +2140,6 @@ DISCOLOOP:
}
}
// consulReaper periodically reaps unmatched domains from Consul. Intended to
// be called in its own goroutine. See consulReaperIntv for interval.
func (c *Client) consulReaper() {
ticker := time.NewTicker(consulReaperIntv)
defer ticker.Stop()
lastok := true
for {
select {
case <-ticker.C:
if err := c.consulReaperImpl(); err != nil {
if lastok {
c.logger.Printf("[ERR] client.consul: error reaping services in consul: %v", err)
lastok = false
}
} else {
lastok = true
}
case <-c.shutdownCh:
return
}
}
}
// consulReaperImpl reaps unmatched domains from Consul.
func (c *Client) consulReaperImpl() error {
const estInitialExecutorDomains = 8
// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain
for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
}
}
return c.consulSyncer.ReapUnmatched(domains)
}
// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Start collecting host stats right away and then keep collecting every

View File

@ -75,15 +75,11 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}
shutdownCh := make(chan struct{})
logger := log.New(config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
catalog := consul.NewMockCatalog(logger)
// Create server
server, err := nomad.NewServer(config, consulSyncer, logger)
server, err := nomad.NewServer(config, catalog, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -105,14 +101,11 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client {
cb(conf)
}
shutdownCh := make(chan struct{})
consulSyncer, err := consul.NewSyncer(conf.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags))
if err != nil {
t.Fatalf("err: %v", err)
}
logger := log.New(conf.LogOutput, "", log.LstdFlags)
client, err := NewClient(conf, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
client, err := NewClient(conf, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -754,14 +747,11 @@ func TestClient_SaveRestoreState(t *testing.T) {
}
// Create a new client
shutdownCh := make(chan struct{})
logger := log.New(c1.config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(c1.config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
c2, err := NewClient(c1.config, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
c2, err := NewClient(c1.config, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}

13
client/consul.go Normal file
View File

@ -0,0 +1,13 @@
package client
import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec consul.ScriptExecutor) error
RemoveTask(allocID string, task *structs.Task)
}

56
client/consul_test.go Normal file
View File

@ -0,0 +1,56 @@
package client
import (
"io/ioutil"
"log"
"os"
"sync"
"testing"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
// mockConsulOp represents the register/deregister operations.
type mockConsulOp struct {
allocID string
task *structs.Task
exec consul.ScriptExecutor
}
// mockConsulServiceClient implements the ConsulServiceAPI interface to record
// and log task registration/deregistration.
type mockConsulServiceClient struct {
registers []mockConsulOp
removes []mockConsulOp
mu sync.Mutex
logger *log.Logger
}
func newMockConsulServiceClient() *mockConsulServiceClient {
m := mockConsulServiceClient{
registers: make([]mockConsulOp, 0, 10),
removes: make([]mockConsulOp, 0, 10),
logger: log.New(ioutil.Discard, "", 0),
}
if testing.Verbose() {
m.logger = log.New(os.Stderr, "", log.LstdFlags)
}
return &m
}
func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec consul.ScriptExecutor) error {
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T)", allocID, task.Name, exec)
m.mu.Lock()
defer m.mu.Unlock()
m.registers = append(m.registers, mockConsulOp{allocID, task, exec})
return nil
}
func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) {
m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", allocID, task.Name)
m.mu.Lock()
defer m.mu.Unlock()
m.removes = append(m.removes, mockConsulOp{allocID, task, nil})
}

View File

@ -1,6 +1,7 @@
package driver
import (
"context"
"encoding/json"
"fmt"
"log"
@ -14,6 +15,7 @@ import (
"syscall"
"time"
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/docker/docker/cli/config/configfile"
@ -564,9 +566,6 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
doneCh: make(chan bool),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil {
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
}
go h.collectStats()
go h.run()
return h, nil
@ -1227,10 +1226,6 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
doneCh: make(chan bool),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, pid.ContainerID)); err != nil {
h.logger.Printf("[ERR] driver.docker: error registering services with consul: %v", err)
}
go h.collectStats()
go h.run()
return h, nil
@ -1273,6 +1268,42 @@ func (h *DockerHandle) Update(task *structs.Task) error {
return nil
}
func (h *DockerHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
fullCmd := make([]string, len(args)+1)
fullCmd[0] = cmd
copy(fullCmd[1:], args)
createExecOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: fullCmd,
Container: h.containerID,
Context: ctx,
}
exec, err := h.client.CreateExec(createExecOpts)
if err != nil {
return nil, 0, err
}
output, _ := circbuf.NewBuffer(int64(dstructs.CheckBufSize))
startOpts := docker.StartExecOptions{
Detach: false,
Tty: false,
OutputStream: output,
ErrorStream: output,
Context: ctx,
}
if err := client.StartExec(exec.ID, startOpts); err != nil {
return nil, 0, err
}
res, err := client.InspectExec(exec.ID)
if err != nil {
return output.Bytes(), 0, err
}
return output.Bytes(), res.ExitCode, nil
}
func (h *DockerHandle) Signal(s os.Signal) error {
// Convert types
sysSig, ok := s.(syscall.Signal)
@ -1332,11 +1363,6 @@ func (h *DockerHandle) run() {
close(h.doneCh)
// Remove services
if err := h.executor.DeregisterServices(); err != nil {
h.logger.Printf("[ERR] driver.docker: error deregistering services: %v", err)
}
// Shutdown the syslog collector
if err := h.executor.Exit(); err != nil {
h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err)

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -421,3 +422,15 @@ func TestCreatedResources_CopyRemove(t *testing.T) {
t.Fatalf("res1 should not equal res2: #%v", res1)
}
}
// TestHandleExec statically asserts the drivers we expect to implement the
// consul.Executor interface do.
func TestHandleScriptExecutor(t *testing.T) {
_ = []consul.ScriptExecutor{
&DockerHandle{},
&execHandle{},
&javaHandle{},
&rawExecHandle{},
&rktHandle{},
}
}

View File

@ -1,6 +1,7 @@
package driver
import (
"context"
"encoding/json"
"fmt"
"log"
@ -163,9 +164,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
version: d.config.Version,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, "")); err != nil {
d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %q: %v", task.Name, err)
taskDir: ctx.TaskDir,
}
go h.run()
return h, nil
@ -222,9 +221,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, "")); err != nil {
d.logger.Printf("[ERR] driver.exec: error registering services with consul: %v", err)
taskDir: ctx.TaskDir,
}
go h.run()
return h, nil
@ -260,6 +257,10 @@ func (h *execHandle) Update(task *structs.Task) error {
return nil
}
func (h *execHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
return execChroot(ctx, h.taskDir.Dir, cmd, args)
}
func (h *execHandle) Signal(s os.Signal) error {
return h.executor.Signal(s)
}
@ -307,11 +308,6 @@ func (h *execHandle) run() {
}
}
// Remove services
if err := h.executor.DeregisterServices(); err != nil {
h.logger.Printf("[ERR] driver.exec: failed to deregister services: %v", err)
}
// Exit the executor
if err := h.executor.Exit(); err != nil {
h.logger.Printf("[ERR] driver.exec: error destroying executor: %v", err)

View File

@ -1,6 +1,8 @@
package driver
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"path/filepath"
@ -11,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@ -280,3 +283,64 @@ func TestExecDriverUser(t *testing.T) {
t.Fatalf("Expecting '%v' in '%v'", msg, err)
}
}
// TestExecDriver_HandlerExec ensures the exec driver's handle properly executes commands inside the chroot.
func TestExecDriver_HandlerExec(t *testing.T) {
ctestutils.ExecCompatible(t)
task := &structs.Task{
Name: "sleep",
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"9000"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
ctx := testDriverContexts(t, task)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(ctx.DriverCtx)
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Exec a command that should work
out, code, err := handle.(consul.ScriptExecutor).Exec(context.TODO(), "/usr/bin/stat", []string{"/alloc"})
if err != nil {
t.Fatalf("error exec'ing stat: %v", err)
}
if code != 0 {
t.Fatalf("expected `stat /alloc` to succeed but exit code was: %d", code)
}
if expected := 100; len(out) < expected {
t.Fatalf("expected at least %d bytes of output but found %d:\n%s", expected, len(out), out)
}
// Exec a command that should fail
out, code, err = handle.(consul.ScriptExecutor).Exec(context.TODO(), "/usr/bin/stat", []string{"lkjhdsaflkjshowaisxmcvnlia"})
if err != nil {
t.Fatalf("error exec'ing stat: %v", err)
}
if code == 0 {
t.Fatalf("expected `stat` to fail but exit code was: %d", code)
}
if expected := "No such file or directory"; !bytes.Contains(out, []byte(expected)) {
t.Fatalf("expected output to contain %q but found: %q", expected, out)
}
if err := handle.Kill(); err != nil {
t.Fatalf("error killing exec handle: %v", err)
}
}

View File

@ -1,205 +0,0 @@
package executor
import (
"fmt"
"log"
"os/exec"
"sync"
"syscall"
"time"
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
var (
// We store the client globally to cache the connection to the docker daemon.
createClient sync.Once
client *docker.Client
)
const (
// The default check timeout
defaultCheckTimeout = 30 * time.Second
)
// DockerScriptCheck runs nagios compatible scripts in a docker container and
// provides the check result
type DockerScriptCheck struct {
id string // id of the check
interval time.Duration // interval of the check
timeout time.Duration // timeout of the check
containerID string // container id in which the check will be invoked
logger *log.Logger
cmd string // check command
args []string // check command arguments
dockerEndpoint string // docker endpoint
tlsCert string // path to tls certificate
tlsCa string // path to tls ca
tlsKey string // path to tls key
}
// dockerClient creates the client to interact with the docker daemon
func (d *DockerScriptCheck) dockerClient() (*docker.Client, error) {
if client != nil {
return client, nil
}
var err error
createClient.Do(func() {
if d.dockerEndpoint != "" {
if d.tlsCert+d.tlsKey+d.tlsCa != "" {
d.logger.Printf("[DEBUG] executor.checks: using TLS client connection to %s", d.dockerEndpoint)
client, err = docker.NewTLSClient(d.dockerEndpoint, d.tlsCert, d.tlsKey, d.tlsCa)
} else {
d.logger.Printf("[DEBUG] executor.checks: using standard client connection to %s", d.dockerEndpoint)
client, err = docker.NewClient(d.dockerEndpoint)
}
return
}
d.logger.Println("[DEBUG] executor.checks: using client connection initialized from environment")
client, err = docker.NewClientFromEnv()
})
return client, err
}
// Run runs a script check inside a docker container
func (d *DockerScriptCheck) Run() *cstructs.CheckResult {
var (
exec *docker.Exec
err error
execRes *docker.ExecInspect
time = time.Now()
)
if client, err = d.dockerClient(); err != nil {
return &cstructs.CheckResult{Err: err}
}
execOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: append([]string{d.cmd}, d.args...),
Container: d.containerID,
}
if exec, err = client.CreateExec(execOpts); err != nil {
return &cstructs.CheckResult{Err: err}
}
output, _ := circbuf.NewBuffer(int64(cstructs.CheckBufSize))
startOpts := docker.StartExecOptions{
Detach: false,
Tty: false,
OutputStream: output,
ErrorStream: output,
}
if err = client.StartExec(exec.ID, startOpts); err != nil {
return &cstructs.CheckResult{Err: err}
}
if execRes, err = client.InspectExec(exec.ID); err != nil {
return &cstructs.CheckResult{Err: err}
}
return &cstructs.CheckResult{
ExitCode: execRes.ExitCode,
Output: string(output.Bytes()),
Timestamp: time,
}
}
// ID returns the check id
func (d *DockerScriptCheck) ID() string {
return d.id
}
// Interval returns the interval at which the check has to run
func (d *DockerScriptCheck) Interval() time.Duration {
return d.interval
}
// Timeout returns the duration after which a check is timed out.
func (d *DockerScriptCheck) Timeout() time.Duration {
if d.timeout == 0 {
return defaultCheckTimeout
}
return d.timeout
}
// ExecScriptCheck runs a nagios compatible script and returns the check result
type ExecScriptCheck struct {
id string // id of the script check
interval time.Duration // interval at which the check is invoked
timeout time.Duration // timeout duration of the check
cmd string // command of the check
args []string // args passed to the check
taskDir string // the root directory of the check
FSIsolation bool // indicates whether the check has to be run within a chroot
}
// Run runs an exec script check
func (e *ExecScriptCheck) Run() *cstructs.CheckResult {
buf, _ := circbuf.NewBuffer(int64(cstructs.CheckBufSize))
cmd := exec.Command(e.cmd, e.args...)
cmd.Stdout = buf
cmd.Stderr = buf
e.setChroot(cmd)
ts := time.Now()
if err := cmd.Start(); err != nil {
return &cstructs.CheckResult{Err: err}
}
errCh := make(chan error, 2)
go func() {
errCh <- cmd.Wait()
}()
select {
case err := <-errCh:
endTime := time.Now()
if err == nil {
return &cstructs.CheckResult{
ExitCode: 0,
Output: string(buf.Bytes()),
Timestamp: ts,
}
}
exitCode := 1
if exitErr, ok := err.(*exec.ExitError); ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
exitCode = status.ExitStatus()
}
}
return &cstructs.CheckResult{
ExitCode: exitCode,
Output: string(buf.Bytes()),
Timestamp: ts,
Duration: endTime.Sub(ts),
}
case <-time.After(e.Timeout()):
errCh <- fmt.Errorf("timed out after waiting 30s")
}
return nil
}
// ID returns the check id
func (e *ExecScriptCheck) ID() string {
return e.id
}
// Interval returns the interval at which the check has to run
func (e *ExecScriptCheck) Interval() time.Duration {
return e.interval
}
// Timeout returns the duration after which a check is timed out.
func (e *ExecScriptCheck) Timeout() time.Duration {
if e.timeout == 0 {
return defaultCheckTimeout
}
return e.timeout
}

View File

@ -1,56 +0,0 @@
package executor
import (
"log"
"os"
"strings"
"testing"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/testutil"
)
func TestExecScriptCheckWithIsolation(t *testing.T) {
testutil.ExecCompatible(t)
execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}}
ctx, allocDir := testExecutorContextWithChroot(t)
defer allocDir.Destroy()
execCmd.FSIsolation = true
execCmd.ResourceLimits = true
execCmd.User = dstructs.DefaultUnpriviledgedUser
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
_, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}
check := &ExecScriptCheck{
id: "foo",
cmd: "/bin/echo",
args: []string{"hello", "world"},
taskDir: ctx.TaskDir,
FSIsolation: true,
}
res := check.Run()
expectedOutput := "hello world"
expectedExitCode := 0
if res.Err != nil {
t.Fatalf("err: %v", res.Err)
}
if strings.TrimSpace(res.Output) != expectedOutput {
t.Fatalf("output expected: %v, actual: %v", expectedOutput, res.Output)
}
if res.ExitCode != expectedExitCode {
t.Fatalf("exitcode expected: %v, actual: %v", expectedExitCode, res.ExitCode)
}
}

View File

@ -1,96 +0,0 @@
package executor
import (
"log"
"os"
"strings"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/client/testutil"
)
func TestExecScriptCheckNoIsolation(t *testing.T) {
check := &ExecScriptCheck{
id: "foo",
cmd: "/bin/echo",
args: []string{"hello", "world"},
taskDir: "/tmp",
FSIsolation: false,
}
res := check.Run()
expectedOutput := "hello world"
expectedExitCode := 0
if res.Err != nil {
t.Fatalf("err: %v", res.Err)
}
if strings.TrimSpace(res.Output) != expectedOutput {
t.Fatalf("output expected: %v, actual: %v", expectedOutput, res.Output)
}
if res.ExitCode != expectedExitCode {
t.Fatalf("exitcode expected: %v, actual: %v", expectedExitCode, res.ExitCode)
}
}
func TestDockerScriptCheck(t *testing.T) {
if !testutil.DockerIsConnected(t) {
return
}
client, err := docker.NewClientFromEnv()
if err != nil {
t.Fatalf("error creating docker client: %v", err)
}
if err := client.PullImage(docker.PullImageOptions{Repository: "busybox", Tag: "latest"},
docker.AuthConfiguration{}); err != nil {
t.Fatalf("error pulling redis: %v", err)
}
container, err := client.CreateContainer(docker.CreateContainerOptions{
Config: &docker.Config{
Image: "busybox",
Cmd: []string{"/bin/sleep", "1000"},
},
})
if err != nil {
t.Fatalf("error creating container: %v", err)
}
defer removeContainer(client, container.ID)
if err := client.StartContainer(container.ID, container.HostConfig); err != nil {
t.Fatalf("error starting container: %v", err)
}
check := &DockerScriptCheck{
id: "1",
interval: 5 * time.Second,
containerID: container.ID,
logger: log.New(os.Stdout, "", log.LstdFlags),
cmd: "/bin/echo",
args: []string{"hello", "world"},
}
res := check.Run()
expectedOutput := "hello world"
expectedExitCode := 0
if res.Err != nil {
t.Fatalf("err: %v", res.Err)
}
if strings.TrimSpace(res.Output) != expectedOutput {
t.Fatalf("output expected: %v, actual: %v", expectedOutput, res.Output)
}
if res.ExitCode != expectedExitCode {
t.Fatalf("exitcode expected: %v, actual: %v", expectedExitCode, res.ExitCode)
}
}
// removeContainer kills and removes a container
func removeContainer(client *docker.Client, containerID string) {
client.KillContainer(docker.KillContainerOptions{ID: containerID})
client.RemoveContainer(docker.RemoveContainerOptions{ID: containerID, RemoveVolumes: true, Force: true})
}

View File

@ -1,18 +0,0 @@
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package executor
import (
"os/exec"
"syscall"
)
func (e *ExecScriptCheck) setChroot(cmd *exec.Cmd) {
if e.FSIsolation {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.Chroot = e.taskDir
}
cmd.Dir = "/"
}

View File

@ -1,8 +0,0 @@
// +build windows
package executor
import "os/exec"
func (e *ExecScriptCheck) setChroot(cmd *exec.Cmd) {
}

View File

@ -23,10 +23,8 @@ import (
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/driver/logging"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/command/agent/consul"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
@ -56,38 +54,11 @@ type Executor interface {
Exit() error
UpdateLogConfig(logConfig *structs.LogConfig) error
UpdateTask(task *structs.Task) error
SyncServices(ctx *ConsulContext) error
DeregisterServices() error
Version() (*ExecutorVersion, error)
Stats() (*cstructs.TaskResourceUsage, error)
Signal(s os.Signal) error
}
// ConsulContext holds context to configure the Consul client and run checks
type ConsulContext struct {
// ConsulConfig contains the configuration information for talking
// with this Nomad Agent's Consul Agent.
ConsulConfig *config.ConsulConfig
// ContainerID is the ID of the container
ContainerID string
// TLSCert is the cert which docker client uses while interactng with the docker
// daemon over TLS
TLSCert string
// TLSCa is the CA which the docker client uses while interacting with the docker
// daeemon over TLS
TLSCa string
// TLSKey is the TLS key which the docker client uses while interacting with
// the docker daemon
TLSKey string
// DockerEndpoint is the endpoint of the docker daemon
DockerEndpoint string
}
// ExecutorContext holds context to configure the command user
// wants to run and isolate it
type ExecutorContext struct {
@ -196,8 +167,6 @@ type UniversalExecutor struct {
resConCtx resourceContainerContext
consulSyncer *consul.Syncer
consulCtx *ConsulContext
totalCpuStats *stats.CpuStats
userCpuStats *stats.CpuStats
systemCpuStats *stats.CpuStats
@ -224,7 +193,7 @@ func NewExecutor(logger *log.Logger) Executor {
// Version returns the api version of the executor
func (e *UniversalExecutor) Version() (*ExecutorVersion, error) {
return &ExecutorVersion{Version: "1.0.0"}, nil
return &ExecutorVersion{Version: "1.1.0"}, nil
}
// SetContext is used to set the executors context and should be the first call
@ -377,28 +346,9 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
e.lre.FileSize = fileSize
}
e.rotatorLock.Unlock()
// Re-syncing task with Consul agent
if e.consulSyncer != nil {
e.interpolateServices(e.ctx.Task)
domain := consul.NewExecutorDomain(e.ctx.AllocID, task.Name)
serviceMap := generateServiceKeys(e.ctx.AllocID, task.Services)
e.consulSyncer.SetServices(domain, serviceMap)
}
return nil
}
// generateServiceKeys takes a list of interpolated Nomad Services and returns a map
// of ServiceKeys to Nomad Services.
func generateServiceKeys(allocID string, services []*structs.Service) map[consul.ServiceKey]*structs.Service {
keys := make(map[consul.ServiceKey]*structs.Service, len(services))
for _, service := range services {
key := consul.GenerateServiceKey(service)
keys[key] = service
}
return keys
}
func (e *UniversalExecutor) wait() {
defer close(e.processExited)
err := e.cmd.Wait()
@ -464,10 +414,6 @@ func (e *UniversalExecutor) Exit() error {
e.lro.Close()
}
if e.consulSyncer != nil {
e.consulSyncer.Shutdown()
}
// If the executor did not launch a process, return.
if e.command == nil {
return nil
@ -514,38 +460,6 @@ func (e *UniversalExecutor) ShutDown() error {
return nil
}
// SyncServices syncs the services of the task that the executor is running with
// Consul
func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
e.logger.Printf("[INFO] executor: registering services")
e.consulCtx = ctx
if e.consulSyncer == nil {
cs, err := consul.NewSyncer(ctx.ConsulConfig, e.shutdownCh, e.logger)
if err != nil {
return err
}
e.consulSyncer = cs
go e.consulSyncer.Run()
}
e.interpolateServices(e.ctx.Task)
e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
domain := consul.NewExecutorDomain(e.ctx.AllocID, e.ctx.Task.Name)
serviceMap := generateServiceKeys(e.ctx.AllocID, e.ctx.Task.Services)
e.consulSyncer.SetServices(domain, serviceMap)
return nil
}
// DeregisterServices removes the services of the task that the executor is
// running from Consul
func (e *UniversalExecutor) DeregisterServices() error {
e.logger.Printf("[INFO] executor: de-registering services and shutting down consul service")
if e.consulSyncer != nil {
return e.consulSyncer.Shutdown()
}
return nil
}
// pidStats returns the resource usage stats per pid
func (e *UniversalExecutor) pidStats() (map[string]*cstructs.ResourceUsage, error) {
stats := make(map[string]*cstructs.ResourceUsage)
@ -677,66 +591,6 @@ func (e *UniversalExecutor) listenerUnix() (net.Listener, error) {
return net.Listen("unix", path)
}
// createCheckMap creates a map of checks that the executor will handle on it's
// own
func (e *UniversalExecutor) createCheckMap() map[string]struct{} {
checks := map[string]struct{}{
"script": struct{}{},
}
return checks
}
// createCheck creates NomadCheck from a ServiceCheck
func (e *UniversalExecutor) createCheck(check *structs.ServiceCheck, checkID string) (consul.Check, error) {
if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "docker" {
return &DockerScriptCheck{
id: checkID,
interval: check.Interval,
timeout: check.Timeout,
containerID: e.consulCtx.ContainerID,
logger: e.logger,
cmd: check.Command,
args: check.Args,
}, nil
}
if check.Type == structs.ServiceCheckScript && (e.ctx.Driver == "exec" ||
e.ctx.Driver == "raw_exec" || e.ctx.Driver == "java") {
return &ExecScriptCheck{
id: checkID,
interval: check.Interval,
timeout: check.Timeout,
cmd: check.Command,
args: check.Args,
taskDir: e.ctx.TaskDir,
FSIsolation: e.command.FSIsolation,
}, nil
}
return nil, fmt.Errorf("couldn't create check for %v", check.Name)
}
// interpolateServices interpolates tags in a service and checks with values from the
// task's environment.
func (e *UniversalExecutor) interpolateServices(task *structs.Task) {
e.ctx.TaskEnv.Build()
for _, service := range task.Services {
for _, check := range service.Checks {
check.Name = e.ctx.TaskEnv.ReplaceEnv(check.Name)
check.Type = e.ctx.TaskEnv.ReplaceEnv(check.Type)
check.Command = e.ctx.TaskEnv.ReplaceEnv(check.Command)
check.Args = e.ctx.TaskEnv.ParseAndReplace(check.Args)
check.Path = e.ctx.TaskEnv.ReplaceEnv(check.Path)
check.Protocol = e.ctx.TaskEnv.ReplaceEnv(check.Protocol)
check.PortLabel = e.ctx.TaskEnv.ReplaceEnv(check.PortLabel)
check.InitialStatus = e.ctx.TaskEnv.ReplaceEnv(check.InitialStatus)
}
service.Name = e.ctx.TaskEnv.ReplaceEnv(service.Name)
service.PortLabel = e.ctx.TaskEnv.ReplaceEnv(service.PortLabel)
service.Tags = e.ctx.TaskEnv.ParseAndReplace(service.Tags)
}
}
// collectPids collects the pids of the child processes that the executor is
// running every 5 seconds
func (e *UniversalExecutor) collectPids() {

View File

@ -5,7 +5,6 @@ import (
"log"
"os"
"path/filepath"
"reflect"
"strings"
"syscall"
"testing"
@ -259,31 +258,6 @@ func TestExecutor_MakeExecutable(t *testing.T) {
}
}
func TestExecutorInterpolateServices(t *testing.T) {
task := mock.Job().TaskGroups[0].Tasks[0]
// Make a fake exececutor
ctx, allocDir := testExecutorContext(t)
defer allocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
executor.(*UniversalExecutor).ctx = ctx
executor.(*UniversalExecutor).interpolateServices(task)
expectedTags := []string{"pci:true", "datacenter:dc1"}
if !reflect.DeepEqual(task.Services[0].Tags, expectedTags) {
t.Fatalf("expected: %v, actual: %v", expectedTags, task.Services[0].Tags)
}
expectedCheckCmd := "/usr/local/check-table-mysql"
expectedCheckArgs := []string{"5.6"}
if !reflect.DeepEqual(task.Services[0].Checks[0].Command, expectedCheckCmd) {
t.Fatalf("expected: %v, actual: %v", expectedCheckCmd, task.Services[0].Checks[0].Command)
}
if !reflect.DeepEqual(task.Services[0].Checks[0].Args, expectedCheckArgs) {
t.Fatalf("expected: %v, actual: %v", expectedCheckArgs, task.Services[0].Checks[0].Args)
}
}
func TestScanPids(t *testing.T) {
p1 := NewFakeProcess(2, 5)
p2 := NewFakeProcess(10, 2)

View File

@ -33,11 +33,6 @@ type LaunchCmdArgs struct {
Cmd *executor.ExecCommand
}
// SyncServicesArgs wraps the consul context for the purposes of RPC
type SyncServicesArgs struct {
Ctx *executor.ConsulContext
}
func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand) (*executor.ProcessState, error) {
var ps *executor.ProcessState
err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd}, &ps)
@ -76,10 +71,6 @@ func (e *ExecutorRPC) UpdateTask(task *structs.Task) error {
return e.client.Call("Plugin.UpdateTask", task, new(interface{}))
}
func (e *ExecutorRPC) SyncServices(ctx *executor.ConsulContext) error {
return e.client.Call("Plugin.SyncServices", SyncServicesArgs{Ctx: ctx}, new(interface{}))
}
func (e *ExecutorRPC) DeregisterServices() error {
return e.client.Call("Plugin.DeregisterServices", new(interface{}), new(interface{}))
}
@ -149,12 +140,9 @@ func (e *ExecutorRPCServer) UpdateTask(args *structs.Task, resp *interface{}) er
return e.Impl.UpdateTask(args)
}
func (e *ExecutorRPCServer) SyncServices(args SyncServicesArgs, resp *interface{}) error {
return e.Impl.SyncServices(args.Ctx)
}
func (e *ExecutorRPCServer) DeregisterServices(args interface{}, resp *interface{}) error {
return e.Impl.DeregisterServices()
// In 0.6 this is a noop. Goes away in 0.7.
return nil
}
func (e *ExecutorRPCServer) Version(args interface{}, version *executor.ExecutorVersion) error {

View File

@ -2,6 +2,7 @@ package driver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
@ -59,6 +60,7 @@ type javaHandle struct {
userPid int
executor executor.Executor
isolationConfig *dstructs.IsolationConfig
taskDir string
killTimeout time.Duration
maxKillTimeout time.Duration
@ -284,6 +286,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
executor: execIntf,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: ctx.TaskDir.Dir,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
version: d.config.Version,
@ -291,9 +294,6 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil
}
@ -306,6 +306,7 @@ type javaId struct {
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
IsolationConfig *dstructs.IsolationConfig
TaskDir string
UserPid int
}
@ -352,10 +353,6 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
d.logger.Printf("[ERR] driver.java: error registering services with consul: %v", err)
}
go h.run()
return h, nil
}
@ -368,6 +365,7 @@ func (h *javaHandle) ID() string {
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
IsolationConfig: h.isolationConfig,
TaskDir: h.taskDir,
}
data, err := json.Marshal(id)
@ -390,6 +388,10 @@ func (h *javaHandle) Update(task *structs.Task) error {
return nil
}
func (h *javaHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
return execChroot(ctx, h.taskDir, cmd, args)
}
func (h *javaHandle) Signal(s os.Signal) error {
return h.executor.Signal(s)
}
@ -436,11 +438,6 @@ func (h *javaHandle) run() {
}
}
// Remove services
if err := h.executor.DeregisterServices(); err != nil {
h.logger.Printf("[ERR] driver.java: failed to kill the deregister services: %v", err)
}
// Exit the executor
h.executor.Exit()
h.pluginClient.Kill()

View File

@ -3,6 +3,7 @@
package driver
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -234,6 +235,11 @@ func (h *mockDriverHandle) WaitCh() chan *dstructs.WaitResult {
return h.waitCh
}
func (h *mockDriverHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
h.logger.Printf("[DEBUG] driver.mock: Exec(%q, %q)", cmd, args)
return []byte(fmt.Sprintf("Exec(%q, %q)", cmd, args)), 0, nil
}
// TODO Implement when we need it.
func (h *mockDriverHandle) Update(task *structs.Task) error {
return nil

View File

@ -273,10 +273,6 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
h.logger.Printf("[ERR] driver.qemu: error registering services for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil
}
@ -322,9 +318,6 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
h.logger.Printf("[ERR] driver.qemu: error registering services: %v", err)
}
go h.run()
return h, nil
}
@ -402,11 +395,6 @@ func (h *qemuHandle) run() {
}
close(h.doneCh)
// Remove services
if err := h.executor.DeregisterServices(); err != nil {
h.logger.Printf("[ERR] driver.qemu: failed to deregister services: %v", err)
}
// Exit the executor
h.executor.Exit()
h.pluginClient.Kill()

View File

@ -1,6 +1,7 @@
package driver
import (
"context"
"encoding/json"
"fmt"
"log"
@ -164,9 +165,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil
}
@ -214,9 +212,6 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul: %v", err)
}
go h.run()
return h, nil
}
@ -250,6 +245,10 @@ func (h *rawExecHandle) Update(task *structs.Task) error {
return nil
}
func (h *rawExecHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
return execChroot(ctx, "", cmd, args)
}
func (h *rawExecHandle) Signal(s os.Signal) error {
return h.executor.Signal(s)
}
@ -289,10 +288,6 @@ func (h *rawExecHandle) run() {
h.logger.Printf("[ERR] driver.raw_exec: error killing user process: %v", e)
}
}
// Remove services
if err := h.executor.DeregisterServices(); err != nil {
h.logger.Printf("[ERR] driver.raw_exec: failed to deregister services: %v", err)
}
// Exit the executor
if err := h.executor.Exit(); err != nil {

View File

@ -1,6 +1,8 @@
package driver
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"path/filepath"
@ -11,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@ -298,3 +301,62 @@ func TestRawExecDriverUser(t *testing.T) {
t.Fatalf("Expecting '%v' in '%v'", msg, err)
}
}
func TestRawExecDriver_HandlerExec(t *testing.T) {
task := &structs.Task{
Name: "sleep",
Driver: "raw_exec",
Config: map[string]interface{}{
"command": testtask.Path(),
"args": []string{"sleep", "9000"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)
ctx := testDriverContexts(t, task)
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(ctx.DriverCtx)
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Exec a command that should work
out, code, err := handle.(consul.ScriptExecutor).Exec(context.TODO(), "/usr/bin/stat", []string{"/tmp"})
if err != nil {
t.Fatalf("error exec'ing stat: %v", err)
}
if code != 0 {
t.Fatalf("expected `stat /alloc` to succeed but exit code was: %d", code)
}
if expected := 100; len(out) < expected {
t.Fatalf("expected at least %d bytes of output but found %d:\n%s", expected, len(out), out)
}
// Exec a command that should fail
out, code, err = handle.(consul.ScriptExecutor).Exec(context.TODO(), "/usr/bin/stat", []string{"lkjhdsaflkjshowaisxmcvnlia"})
if err != nil {
t.Fatalf("error exec'ing stat: %v", err)
}
if code == 0 {
t.Fatalf("expected `stat` to fail but exit code was: %d", code)
}
if expected := "No such file or directory"; !bytes.Contains(out, []byte(expected)) {
t.Fatalf("expected output to contain %q but found: %q", expected, out)
}
if err := handle.Kill(); err != nil {
t.Fatalf("error killing exec handle: %v", err)
}
}

View File

@ -2,8 +2,10 @@ package driver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"os"
@ -51,6 +53,9 @@ const (
// rktCmd is the command rkt is installed as.
rktCmd = "rkt"
// rktUuidDeadline is how long to wait for the uuid file to be written
rktUuidDeadline = 5 * time.Second
)
// RktDriver is a driver for running images via Rkt
@ -81,6 +86,7 @@ type RktDriverConfig struct {
// rktHandle is returned from Start/Open as a handle to the PID
type rktHandle struct {
uuid string
pluginClient *plugin.Client
executorPid int
executor executor.Executor
@ -94,6 +100,7 @@ type rktHandle struct {
// rktPID is a struct to map the pid running the process to the vm image on
// disk
type rktPID struct {
UUID string
PluginConfig *PluginReattachConfig
ExecutorPid int
KillTimeout time.Duration
@ -229,7 +236,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
img := driverConfig.ImageName
// Build the command.
var cmdArgs []string
cmdArgs := make([]string, 0, 32)
// Add debug option to rkt command.
debug := driverConfig.Debug
@ -253,6 +260,11 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
}
cmdArgs = append(cmdArgs, "run")
// Write the UUID out to a file in the state dir so we can read it back
// in and access the pod by UUID from other commands
uuidPath := filepath.Join(ctx.TaskDir.Dir, "rkt.uuid")
cmdArgs = append(cmdArgs, fmt.Sprintf("--uuid-file-save=%s", uuidPath))
// Convert underscores to dashes in task names for use in volume names #2358
sanitizedName := strings.Replace(task.Name, "_", "-", -1)
@ -439,9 +451,28 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return nil, err
}
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmdArgs)
// Wait for UUID file to get written
uuid := ""
deadline := time.Now().Add(rktUuidDeadline)
var lastErr error
for time.Now().Before(deadline) {
if uuidBytes, err := ioutil.ReadFile(uuidPath); err != nil {
lastErr = err
} else {
uuid = string(uuidBytes)
break
}
time.Sleep(400 * time.Millisecond)
}
if uuid == "" {
d.logger.Printf("[WARN] driver.rkt: reading uuid from %q failed; unable to run script checks for task %q. Last error: %v",
uuidPath, d.taskName, lastErr)
}
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q (UUID: %s) for task %q with: %v", img, uuid, d.taskName, cmdArgs)
maxKill := d.DriverContext.config.MaxKillTimeout
h := &rktHandle{
uuid: uuid,
pluginClient: pluginClient,
executor: execIntf,
executorPid: ps.Pid,
@ -451,9 +482,6 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
h.logger.Printf("[ERR] driver.rkt: error registering services for task: %q: %v", task.Name, err)
}
go h.run()
return h, nil
}
@ -484,6 +512,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
d.logger.Printf("[DEBUG] driver.rkt: version of executor: %v", ver.Version)
// Return a driver handle
h := &rktHandle{
uuid: id.UUID,
pluginClient: pluginClient,
executorPid: id.ExecutorPid,
executor: exec,
@ -493,9 +522,6 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil {
h.logger.Printf("[ERR] driver.rkt: error registering services: %v", err)
}
go h.run()
return h, nil
}
@ -503,6 +529,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
func (h *rktHandle) ID() string {
// Return a handle to the PID
pid := &rktPID{
UUID: h.uuid,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
@ -528,6 +555,19 @@ func (h *rktHandle) Update(task *structs.Task) error {
return nil
}
func (h *rktHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
if h.uuid == "" {
return nil, 0, fmt.Errorf("unable to find rkt pod UUID")
}
// enter + UUID + cmd + args...
enterArgs := make([]string, 3+len(args))
enterArgs[0] = "enter"
enterArgs[1] = h.uuid
enterArgs[2] = cmd
copy(enterArgs[3:], args)
return execChroot(ctx, "", rktCmd, enterArgs)
}
func (h *rktHandle) Signal(s os.Signal) error {
return fmt.Errorf("Rkt does not support signals")
}
@ -556,10 +596,6 @@ func (h *rktHandle) run() {
h.logger.Printf("[ERROR] driver.rkt: error killing user process: %v", e)
}
}
// Remove services
if err := h.executor.DeregisterServices(); err != nil {
h.logger.Printf("[ERR] driver.rkt: failed to deregister services: %v", err)
}
// Exit the executor
if err := h.executor.Exit(); err != nil {

View File

@ -1,6 +1,8 @@
package driver
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
@ -12,6 +14,7 @@ import (
"time"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@ -489,3 +492,74 @@ func TestRktDriver_PortsMapping(t *testing.T) {
t.Fatalf("timeout")
}
}
func TestRktDriver_HandlerExec(t *testing.T) {
if os.Getenv("NOMAD_TEST_RKT") == "" {
t.Skip("skipping rkt tests")
}
ctestutils.RktCompatible(t)
task := &structs.Task{
Name: "etcd",
Driver: "rkt",
Config: map[string]interface{}{
"trust_prefix": "coreos.com/etcd",
"image": "coreos.com/etcd:v2.0.4",
"command": "/etcd",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: &structs.Resources{
MemoryMB: 128,
CPU: 100,
},
}
ctx := testDriverContexts(t, task)
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Give the pod a second to start
time.Sleep(time.Second)
// Exec a command that should work
out, code, err := handle.(consul.ScriptExecutor).Exec(context.TODO(), "/etcd", []string{"--version"})
if err != nil {
t.Fatalf("error exec'ing etcd --version: %v", err)
}
if code != 0 {
t.Fatalf("expected `etcd --version` to succeed but exit code was: %d\n%s", code, string(out))
}
if expected := []byte("etcd version "); !bytes.HasPrefix(out, expected) {
t.Fatalf("expected output to start with %q but found:\n%q", expected, out)
}
// Exec a command that should fail
out, code, err = handle.(consul.ScriptExecutor).Exec(context.TODO(), "/etcd", []string{"--kaljdshf"})
if err != nil {
t.Fatalf("error exec'ing bad command: %v", err)
}
if code == 0 {
t.Fatalf("expected `stat` to fail but exit code was: %d", code)
}
if expected := "flag provided but not defined"; !bytes.Contains(out, []byte(expected)) {
t.Fatalf("expected output to contain %q but found: %q", expected, out)
}
if err := handle.Kill(); err != nil {
t.Fatalf("error killing handle: %v", err)
}
}

View File

@ -1,6 +1,7 @@
package driver
import (
"context"
"encoding/json"
"fmt"
"io"
@ -8,8 +9,10 @@ import (
"os/exec"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/armon/circbuf"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
@ -85,19 +88,16 @@ func createExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executo
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(executor.Executor)
return executorPlugin, executorClient, nil
}
func consulContext(clientConfig *config.Config, containerID string) *executor.ConsulContext {
return &executor.ConsulContext{
ConsulConfig: clientConfig.ConsulConfig,
ContainerID: containerID,
DockerEndpoint: clientConfig.Read("docker.endpoint"),
TLSCa: clientConfig.Read("docker.tls.ca"),
TLSCert: clientConfig.Read("docker.tls.cert"),
TLSKey: clientConfig.Read("docker.tls.key"),
executorPlugin, ok := raw.(*ExecutorRPC)
if !ok {
return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw)
}
// 0.6 Upgrade path: Deregister services from the executor as the Nomad
// client agent now handles all Consul interactions.
if err := executorPlugin.DeregisterServices(); err != nil {
return nil, nil, err
}
return executorPlugin, executorClient, nil
}
// killProcess kills a process with the given pid
@ -181,3 +181,36 @@ func getExecutorUser(task *structs.Task) string {
}
return task.User
}
// execChroot executes cmd with args inside chroot if set and returns the
// output, exit code, and error. If chroot is an empty string the command is
// executed on the host.
func execChroot(ctx context.Context, chroot, name string, args []string) ([]byte, int, error) {
buf, _ := circbuf.NewBuffer(int64(cstructs.CheckBufSize))
cmd := exec.CommandContext(ctx, name, args...)
cmd.Dir = "/"
cmd.Stdout = buf
cmd.Stderr = buf
if chroot != "" {
setChroot(cmd, chroot)
}
if err := cmd.Run(); err != nil {
exitErr, ok := err.(*exec.ExitError)
if !ok {
// Non-exit error, return it and let the caller treat
// it as a critical failure
return nil, 0, err
}
// Some kind of error happened; default to critical
exitCode := 2
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
exitCode = status.ExitStatus()
}
// Don't return the exitError as the caller only needs the
// output and code.
return buf.Bytes(), exitCode, nil
}
return buf.Bytes(), 0, nil
}

View File

@ -16,3 +16,11 @@ func isolateCommand(cmd *exec.Cmd) {
}
cmd.SysProcAttr.Setsid = true
}
// setChroot on a command
func setChroot(cmd *exec.Cmd, chroot string) {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.Chroot = chroot
}

View File

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/client/driver/env"
@ -61,6 +62,7 @@ type TaskRunner struct {
logger *log.Logger
alloc *structs.Allocation
restartTracker *RestartTracker
consul ConsulServiceAPI
// running marks whether the task is running
running bool
@ -173,7 +175,7 @@ type SignalEvent struct {
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, taskDir *allocdir.TaskDir,
alloc *structs.Allocation, task *structs.Task,
vaultClient vaultclient.VaultClient) *TaskRunner {
vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *TaskRunner {
// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
@ -195,6 +197,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
task: task,
taskDir: taskDir,
createdResources: driver.NewCreatedResources(),
consul: consulClient,
vaultClient: vaultClient,
vaultFuture: NewTokenFuture().Set(""),
updateCh: make(chan *structs.Allocation, 64),
@ -289,6 +292,19 @@ func (r *TaskRunner) RestoreState() error {
r.task.Name, r.alloc.ID, err)
return nil
}
//FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), r.task)
// Ensure the service is registered
scriptExec, _ := handle.(consul.ScriptExecutor)
if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil {
//FIXME What to do if this fails?
r.logger.Printf("[WARN] client: failed to register services and checks for task %q alloc %q: %v",
r.task.Name, r.alloc.ID, err)
}
r.handleLock.Lock()
r.handle = handle
r.handleLock.Unlock()
@ -1220,9 +1236,43 @@ func (r *TaskRunner) startTask() error {
r.handleLock.Lock()
r.handle = handle
r.handleLock.Unlock()
//FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), r.task)
// RegisterTask properly handles scriptExec being nil, so it just
// ignore the ok value.
scriptExec, _ := handle.(consul.ScriptExecutor)
if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil {
//FIXME handle errors?!
//FIXME could break into prepare & submit steps as only preperation can error...
r.logger.Printf("[ERR] client: failed to register services and checks for task %q alloc %q: %v", r.task.Name, r.alloc.ID, err)
}
return nil
}
// interpolateServices interpolates tags in a service and checks with values from the
// task's environment.
func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) {
for _, service := range task.Services {
for _, check := range service.Checks {
check.Name = taskEnv.ReplaceEnv(check.Name)
check.Type = taskEnv.ReplaceEnv(check.Type)
check.Command = taskEnv.ReplaceEnv(check.Command)
check.Args = taskEnv.ParseAndReplace(check.Args)
check.Path = taskEnv.ReplaceEnv(check.Path)
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
}
service.Name = taskEnv.ReplaceEnv(service.Name)
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
service.Tags = taskEnv.ParseAndReplace(service.Tags)
}
}
// buildTaskDir creates the task directory before driver.Prestart. It is safe
// to call multiple times as its state is persisted.
func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error {
@ -1335,13 +1385,16 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// Merge in the task resources
updatedTask.Resources = update.TaskResources[updatedTask.Name]
// Update will update resources and store the new kill timeout.
var mErr multierror.Error
var scriptExec consul.ScriptExecutor
r.handleLock.Lock()
if r.handle != nil {
// Update will update resources and store the new kill timeout.
if err := r.handle.Update(updatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
}
// Not all drivers support Exec (eg QEMU)
scriptExec, _ = r.handle.(consul.ScriptExecutor)
}
r.handleLock.Unlock()
@ -1350,9 +1403,21 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
r.restartTracker.SetPolicy(tg.RestartPolicy)
}
// Deregister the old service+checks
r.consul.RemoveTask(r.alloc.ID, r.task)
// Store the updated alloc.
r.alloc = update
r.task = updatedTask
//FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), r.task)
// Register the new service+checks
if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("error registering updated task with consul: %v", err))
}
return mErr.ErrorOrNil()
}
@ -1361,6 +1426,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// given limit. It returns whether the task was destroyed and the error
// associated with the last kill attempt.
func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
// Remove from Consul
r.consul.RemoveTask(r.alloc.ID, r.task)
// Cap the number of times we attempt to kill the task.
for i := 0; i < killFailureLimit; i++ {
if err = r.handle.Kill(); err != nil {

View File

@ -104,7 +104,8 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat
}
vclient := vaultclient.NewMockVaultClient()
tr := NewTaskRunner(logger, conf, upd.Update, taskDir, alloc, task, vclient)
cclient := newMockConsulServiceClient()
tr := NewTaskRunner(logger, conf, upd.Update, taskDir, alloc, task, vclient, cclient)
if !restarts {
tr.restartTracker = noRestartsTracker()
}
@ -366,7 +367,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
// Create a new task runner
task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver}
tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.upd.Update,
ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient)
ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient, ctx.tr.consul)
tr2.restartTracker = noRestartsTracker()
if err := tr2.RestoreState(); err != nil {
t.Fatalf("err: %v", err)

View File

@ -8,17 +8,18 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
const (
@ -30,6 +31,10 @@ const (
serverRpcCheckTimeout = 3 * time.Second
serverSerfCheckInterval = 10 * time.Second
serverSerfCheckTimeout = 3 * time.Second
// roles used in identifying Consul entries for Nomad agents
consulRoleServer = "server"
consulRoleClient = "client"
)
// Agent is a long running daemon that is used to run both
@ -42,8 +47,12 @@ type Agent struct {
logger *log.Logger
logOutput io.Writer
// consulSyncer registers the Nomad agent with the Consul Agent
consulSyncer *consul.Syncer
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService *consul.ServiceClient
// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI
client *client.Client
@ -63,8 +72,8 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
shutdownCh: make(chan struct{}),
}
if err := a.setupConsulSyncer(); err != nil {
return nil, fmt.Errorf("Failed to initialize Consul syncer task: %v", err)
if err := a.setupConsul(config.Consul); err != nil {
return nil, fmt.Errorf("Failed to initialize Consul client: %v", err)
}
if err := a.setupServer(); err != nil {
return nil, err
@ -76,15 +85,6 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, fmt.Errorf("must have at least client or server mode enabled")
}
// The Nomad Agent runs the consul.Syncer regardless of whether or not the
// Agent is running in Client or Server mode (or both), and regardless of
// the consul.auto_advertise parameter. The Client and Server both reuse the
// same consul.Syncer instance. This Syncer task periodically executes
// callbacks that update Consul. The reason the Syncer is always running is
// because one of the callbacks is attempts to self-bootstrap Nomad using
// information found in Consul.
go a.consulSyncer.Run()
return a, nil
}
@ -339,7 +339,7 @@ func (a *Agent) setupServer() error {
}
// Create the server
server, err := nomad.NewServer(conf, a.consulSyncer, a.logger)
server, err := nomad.NewServer(conf, a.consulCatalog, a.logger)
if err != nil {
return fmt.Errorf("server setup failed: %v", err)
}
@ -405,14 +405,16 @@ func (a *Agent) setupServer() error {
// Add the http port check if TLS isn't enabled
// TODO Add TLS check when Consul 0.7.1 comes out.
consulServices := map[consul.ServiceKey]*structs.Service{
consul.GenerateServiceKey(rpcServ): rpcServ,
consul.GenerateServiceKey(serfServ): serfServ,
consulServices := []*structs.Service{
rpcServ,
serfServ,
}
if !conf.TLSConfig.EnableHTTP {
consulServices[consul.GenerateServiceKey(httpServ)] = httpServ
consulServices = append(consulServices, httpServ)
}
if err := a.consulService.RegisterAgent(consulRoleServer, consulServices); err != nil {
return err
}
a.consulSyncer.SetServices(consul.ServerDomain, consulServices)
}
return nil
@ -462,7 +464,7 @@ func (a *Agent) setupClient() error {
}
// Create the client
client, err := client.NewClient(conf, a.consulSyncer, a.logger)
client, err := client.NewClient(conf, a.consulCatalog, a.consulService, a.logger)
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
@ -495,9 +497,9 @@ func (a *Agent) setupClient() error {
},
}
if !conf.TLSConfig.EnableHTTP {
a.consulSyncer.SetServices(consul.ClientDomain, map[consul.ServiceKey]*structs.Service{
consul.GenerateServiceKey(httpServ): httpServ,
})
if err := a.consulService.RegisterAgent(consulRoleClient, []*structs.Service{httpServ}); err != nil {
return err
}
}
}
@ -612,8 +614,8 @@ func (a *Agent) Shutdown() error {
}
}
if err := a.consulSyncer.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err)
if err := a.consulService.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: shutting down Consul client failed: %v", err)
}
a.logger.Println("[INFO] agent: shutdown complete")
@ -659,46 +661,22 @@ func (a *Agent) Stats() map[string]map[string]string {
return stats
}
// setupConsulSyncer creates the Consul tasks used by this Nomad Agent
// (either Client or Server mode).
func (a *Agent) setupConsulSyncer() error {
var err error
a.consulSyncer, err = consul.NewSyncer(a.config.Consul, a.shutdownCh, a.logger)
// setupConsul creates the Consul client and starts its main Run loop.
func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
apiConf, err := consulConfig.ApiConfig()
if err != nil {
return err
}
client, err := api.NewClient(apiConf)
if err != nil {
return err
}
a.consulSyncer.SetAddrFinder(func(portLabel string) (string, int) {
host, port, err := net.SplitHostPort(portLabel)
if err != nil {
p, err := strconv.Atoi(port)
if err != nil {
return "", 0
}
return "", p
}
// If the addr for the service is ":port", then we fall back
// to Nomad's default address resolution protocol.
//
// TODO(sean@): This should poll Consul to figure out what
// its advertise address is and use that in order to handle
// the case where there is something funky like NAT on this
// host. For now we just use the BindAddr if set, otherwise
// we fall back to a loopback addr.
if host == "" {
if a.config.BindAddr != "" {
host = a.config.BindAddr
} else {
host = "127.0.0.1"
}
}
p, err := strconv.Atoi(port)
if err != nil {
return host, 0
}
return host, p
})
// Create Consul Catalog client for service discovery.
a.consulCatalog = client.Catalog()
// Create Consul Service client for service advertisement and checks.
a.consulService = consul.NewServiceClient(client.Agent(), a.logger)
go a.consulService.Run()
return nil
}

View File

@ -1,193 +0,0 @@
// +build chaos
package consul
import (
"fmt"
"io/ioutil"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
func TestSyncerChaos(t *testing.T) {
// Create an embedded Consul server
testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
defer testconsul.Stop()
// Configure Syncer to talk to the test server
cconf := config.DefaultConsulConfig()
cconf.Addr = testconsul.HTTPAddr
clientSyncer, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Error creating Syncer: %v", err)
}
defer clientSyncer.Shutdown()
execSyncer, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Error creating Syncer: %v", err)
}
defer execSyncer.Shutdown()
clientService := &structs.Service{Name: "nomad-client"}
services := map[ServiceKey]*structs.Service{
GenerateServiceKey(clientService): clientService,
}
if err := clientSyncer.SetServices("client", services); err != nil {
t.Fatalf("error setting client service: %v", err)
}
const execn = 100
const reapern = 2
errors := make(chan error, 100)
wg := sync.WaitGroup{}
// Start goroutines to concurrently SetServices
for i := 0; i < execn; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
domain := ServiceDomain(fmt.Sprintf("exec-%d", i))
services := map[ServiceKey]*structs.Service{}
for ii := 0; ii < 10; ii++ {
s := &structs.Service{Name: fmt.Sprintf("exec-%d-%d", i, ii)}
services[GenerateServiceKey(s)] = s
if err := execSyncer.SetServices(domain, services); err != nil {
select {
case errors <- err:
default:
}
return
}
time.Sleep(1)
}
}(i)
}
// SyncServices runs a timer started by Syncer.Run which we don't use
// in this test, so run SyncServices concurrently
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < execn; i++ {
if err := execSyncer.SyncServices(); err != nil {
select {
case errors <- err:
default:
}
return
}
time.Sleep(100)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := clientSyncer.ReapUnmatched([]ServiceDomain{"nomad-client"}); err != nil {
select {
case errors <- err:
default:
}
return
}
}()
// Reap all but exec-0-*
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < execn; i++ {
if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0", ServiceDomain(fmt.Sprintf("exec-%d", i))}); err != nil {
select {
case errors <- err:
default:
}
}
time.Sleep(100)
}
}()
go func() {
wg.Wait()
close(errors)
}()
for err := range errors {
if err != nil {
t.Errorf("error setting service from executor goroutine: %v", err)
}
}
// Do a final ReapUnmatched to get consul back into a deterministic state
if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0"}); err != nil {
t.Fatalf("error doing final reap: %v", err)
}
// flattenedServices should be fully populated as ReapUnmatched doesn't
// touch Syncer's internal state
expected := map[string]struct{}{}
for i := 0; i < execn; i++ {
for ii := 0; ii < 10; ii++ {
expected[fmt.Sprintf("exec-%d-%d", i, ii)] = struct{}{}
}
}
for _, s := range execSyncer.flattenedServices() {
_, ok := expected[s.Name]
if !ok {
t.Errorf("%s unexpected", s.Name)
}
delete(expected, s.Name)
}
if len(expected) > 0 {
left := []string{}
for s := range expected {
left = append(left, s)
}
sort.Strings(left)
t.Errorf("Couldn't find %d names in flattened services:\n%s", len(expected), strings.Join(left, "\n"))
}
// All but exec-0 and possibly some of exec-99 should have been reaped
{
services, err := execSyncer.client.Agent().Services()
if err != nil {
t.Fatalf("Error getting services: %v", err)
}
expected := []int{}
for k, service := range services {
if service.Service == "consul" {
continue
}
i := -1
ii := -1
fmt.Sscanf(service.Service, "exec-%d-%d", &i, &ii)
switch {
case i == -1 || ii == -1:
t.Errorf("invalid service: %s -> %s", k, service.Service)
case i != 0 || ii > 9:
t.Errorf("unexpected service: %s -> %s", k, service.Service)
default:
expected = append(expected, ii)
}
}
if len(expected) != 10 {
t.Errorf("expected 0-9 but found: %#q", expected)
}
}
}

View File

@ -1,91 +0,0 @@
package consul
import (
"log"
"sync"
"time"
"github.com/hashicorp/consul/lib"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
// CheckRunner runs a given check in a specific interval and update a
// corresponding Consul TTL check
type CheckRunner struct {
check Check
runCheck func(Check)
logger *log.Logger
stop bool
stopCh chan struct{}
stopLock sync.Mutex
started bool
startedLock sync.Mutex
}
// NewCheckRunner configures and returns a CheckRunner
func NewCheckRunner(check Check, runCheck func(Check), logger *log.Logger) *CheckRunner {
cr := CheckRunner{
check: check,
runCheck: runCheck,
logger: logger,
stopCh: make(chan struct{}),
}
return &cr
}
// Start is used to start the check. The check runs until stop is called
func (r *CheckRunner) Start() {
r.startedLock.Lock()
defer r.startedLock.Unlock()
if r.started {
return
}
r.stopLock.Lock()
defer r.stopLock.Unlock()
go r.run()
r.started = true
}
// Started returns if the check runner has started running
func (r *CheckRunner) Started() bool {
r.startedLock.Lock()
defer r.startedLock.Unlock()
return r.started
}
// Stop is used to stop the check.
func (r *CheckRunner) Stop() {
r.stopLock.Lock()
defer r.stopLock.Unlock()
if !r.stop {
r.stop = true
close(r.stopCh)
}
}
// run is invoked by a goroutine to run until Stop() is called
func (r *CheckRunner) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(r.check.Interval())
r.logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, r.check.ID())
next := time.NewTimer(initialPauseTime)
for {
select {
case <-next.C:
r.runCheck(r.check)
next.Reset(r.check.Interval())
case <-r.stopCh:
next.Stop()
return
}
}
}
// Check is an interface which check providers can implement for Nomad to run
type Check interface {
Run() *cstructs.CheckResult
ID() string
Interval() time.Duration
Timeout() time.Duration
}

View File

@ -0,0 +1,636 @@
package consul
import (
"context"
"fmt"
"log"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)
var mark = struct{}{}
const (
// nomadServicePrefix is the first prefix that scopes all Nomad registered
// services
nomadServicePrefix = "_nomad"
// The periodic time interval for syncing services and checks with Consul
defaultSyncInterval = 6 * time.Second
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
// defaultShutdownWait is how long Shutdown() should block waiting for
// enqueued operations to sync to Consul by default.
defaultShutdownWait = time.Minute
// DefaultQueryWaitDuration is the max duration the Consul Agent will
// spend waiting for a response from a Consul Query.
DefaultQueryWaitDuration = 2 * time.Second
// ServiceTagHTTP is the tag assigned to HTTP services
ServiceTagHTTP = "http"
// ServiceTagRPC is the tag assigned to RPC services
ServiceTagRPC = "rpc"
// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"
)
// ScriptExecutor is the interface the ServiceClient uses to execute script
// checks inside a container.
type ScriptExecutor interface {
Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}
// CatalogAPI is the consul/api.Catalog API used by Nomad.
type CatalogAPI interface {
Datacenters() ([]string, error)
Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error)
}
// AgentAPI is the consul/api.Agent API used by Nomad.
type AgentAPI interface {
CheckRegister(check *api.AgentCheckRegistration) error
CheckDeregister(checkID string) error
ServiceRegister(service *api.AgentServiceRegistration) error
ServiceDeregister(serviceID string) error
UpdateTTL(id, output, status string) error
}
// ServiceClient handles task and agent service registration with Consul.
type ServiceClient struct {
client AgentAPI
logger *log.Logger
retryInterval time.Duration
syncInterval time.Duration
// runningCh is closed when the main Run loop exits
runningCh chan struct{}
// shutdownCh is closed when the client should shutdown
shutdownCh chan struct{}
// shutdownWait is how long Shutdown() blocks waiting for the final
// sync() to finish. Defaults to defaultShutdownWait
shutdownWait time.Duration
// syncCh triggers a sync in the main Run loop
syncCh chan struct{}
// services and checks to be registered
regServices map[string]*api.AgentServiceRegistration
regChecks map[string]*api.AgentCheckRegistration
// services and checks to be unregisterd
deregServices map[string]struct{}
deregChecks map[string]struct{}
// script checks to be run() after their corresponding check is
// registered
regScripts map[string]*scriptCheck
// script check cancel funcs to be called before their corresponding
// check is removed. Only accessed in sync() so not covered by regLock
runningScripts map[string]*scriptHandle
// regLock must be held while accessing reg and dereg maps
regLock sync.Mutex
// Registered agent services and checks
agentServices map[string]struct{}
agentChecks map[string]struct{}
// agentLock must be held while accessing agent maps
agentLock sync.Mutex
}
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client and logger.
func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient {
return &ServiceClient{
client: consulClient,
logger: logger,
retryInterval: defaultSyncInterval, //TODO what should this default to?!
syncInterval: defaultSyncInterval,
runningCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
syncCh: make(chan struct{}, 1),
regServices: make(map[string]*api.AgentServiceRegistration),
regChecks: make(map[string]*api.AgentCheckRegistration),
deregServices: make(map[string]struct{}),
deregChecks: make(map[string]struct{}),
regScripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
agentServices: make(map[string]struct{}, 8),
agentChecks: make(map[string]struct{}, 8),
}
}
// Run the Consul main loop which retries operations against Consul. It should
// be called exactly once.
func (c *ServiceClient) Run() {
defer close(c.runningCh)
timer := time.NewTimer(0)
defer timer.Stop()
// Drain the initial tick so we don't sync until instructed
<-timer.C
lastOk := true
for {
select {
case <-c.syncCh:
timer.Reset(0)
case <-timer.C:
if err := c.sync(); err != nil {
if lastOk {
lastOk = false
c.logger.Printf("[WARN] consul: failed to update services in Consul: %v", err)
}
//TODO Log? and jitter/backoff
timer.Reset(c.retryInterval)
} else {
if !lastOk {
c.logger.Printf("[INFO] consul: successfully updated services in Consul")
lastOk = true
}
}
case <-c.shutdownCh:
return
}
}
}
// forceSync asynchronously causes a sync to happen. Any operations enqueued
// prior to calling forceSync will be synced.
func (c *ServiceClient) forceSync() {
select {
case c.syncCh <- mark:
default:
}
}
// sync enqueued operations.
func (c *ServiceClient) sync() error {
// Shallow copy and reset the pending operations fields
c.regLock.Lock()
regServices := make(map[string]*api.AgentServiceRegistration, len(c.regServices))
for k, v := range c.regServices {
regServices[k] = v
}
c.regServices = map[string]*api.AgentServiceRegistration{}
regChecks := make(map[string]*api.AgentCheckRegistration, len(c.regChecks))
for k, v := range c.regChecks {
regChecks[k] = v
}
c.regChecks = map[string]*api.AgentCheckRegistration{}
regScripts := make(map[string]*scriptCheck, len(c.regScripts))
for k, v := range c.regScripts {
regScripts[k] = v
}
c.regScripts = map[string]*scriptCheck{}
deregServices := make(map[string]struct{}, len(c.deregServices))
for k := range c.deregServices {
deregServices[k] = mark
}
c.deregServices = map[string]struct{}{}
deregChecks := make(map[string]struct{}, len(c.deregChecks))
for k := range c.deregChecks {
deregChecks[k] = mark
}
c.deregChecks = map[string]struct{}{}
c.regLock.Unlock()
var err error
regServiceN, regCheckN, deregServiceN, deregCheckN := len(regServices), len(regChecks), len(deregServices), len(deregChecks)
// Register Services
for id, service := range regServices {
if err = c.client.ServiceRegister(service); err != nil {
goto ERROR
}
delete(regServices, id)
}
// Register Checks
for id, check := range regChecks {
if err = c.client.CheckRegister(check); err != nil {
goto ERROR
}
delete(regChecks, id)
// Run the script for this check if one exists
if script, ok := regScripts[id]; ok {
// This check is a script check; run it
c.runningScripts[id] = script.run()
}
}
// Deregister Checks
for id := range deregChecks {
if h, ok := c.runningScripts[id]; ok {
// This check is a script check; stop it
h.cancel()
delete(c.runningScripts, id)
}
if err = c.client.CheckDeregister(id); err != nil {
goto ERROR
}
delete(deregChecks, id)
}
// Deregister Services
for id := range deregServices {
if err = c.client.ServiceDeregister(id); err != nil {
goto ERROR
}
delete(deregServices, id)
}
c.logger.Printf("[DEBUG] consul: registered %d services / %d checks; deregisterd %d services / %d checks", regServiceN, regCheckN, deregServiceN, deregCheckN)
return nil
//TODO Labels and gotos are nasty; move to a function?
ERROR:
// An error occurred, repopulate the operation maps omitting any keys
// that have been updated while sync() ran.
c.regLock.Lock()
for id, service := range regServices {
if _, ok := c.regServices[id]; ok {
continue
}
if _, ok := c.deregServices[id]; ok {
continue
}
c.regServices[id] = service
}
for id, check := range regChecks {
if _, ok := c.regChecks[id]; ok {
continue
}
if _, ok := c.deregChecks[id]; ok {
continue
}
c.regChecks[id] = check
}
for id, script := range regScripts {
if _, ok := c.regScripts[id]; ok {
// a new version of this script was added, drop this one
continue
}
c.regScripts[id] = script
}
for id, _ := range deregServices {
if _, ok := c.regServices[id]; ok {
continue
}
c.deregServices[id] = mark
}
for id, _ := range deregChecks {
if _, ok := c.regChecks[id]; ok {
continue
}
c.deregChecks[id] = mark
}
c.regLock.Unlock()
return err
}
// RegisterAgent registers Nomad agents (client or server). Script checks are
// not supported and will return an error. Registration is asynchronous.
//
// Agents will be deregistered when Shutdown is called.
func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error {
regs := make([]*api.AgentServiceRegistration, len(services))
checks := make([]*api.AgentCheckRegistration, 0, len(services))
for i, service := range services {
id := makeAgentServiceID(role, service)
host, rawport, err := net.SplitHostPort(service.PortLabel)
if err != nil {
return fmt.Errorf("error parsing port label %q from service %q: %v", service.PortLabel, service.Name, err)
}
port, err := strconv.Atoi(rawport)
if err != nil {
return fmt.Errorf("error parsing port %q from service %q: %v", rawport, service.Name, err)
}
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: service.Tags,
Address: host,
Port: port,
}
regs[i] = serviceReg
for _, check := range service.Checks {
checkID := createCheckID(id, check)
if check.Type == structs.ServiceCheckScript {
return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name)
}
checkHost, checkPort := serviceReg.Address, serviceReg.Port
if check.PortLabel != "" {
host, rawport, err := net.SplitHostPort(check.PortLabel)
if err != nil {
return fmt.Errorf("error parsing port label %q from check %q: %v", service.PortLabel, check.Name, err)
}
port, err := strconv.Atoi(rawport)
if err != nil {
return fmt.Errorf("error parsing port %q from check %q: %v", rawport, check.Name, err)
}
checkHost, checkPort = host, port
}
checkReg, err := createCheckReg(id, checkID, check, checkHost, checkPort)
if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
checks = append(checks, checkReg)
}
}
// Now add them to the registration queue
c.enqueueRegs(regs, checks, nil)
// Record IDs for deregistering on shutdown
c.agentLock.Lock()
for _, s := range regs {
c.agentServices[s.ID] = mark
}
for _, ch := range checks {
c.agentChecks[ch.ID] = mark
}
c.agentLock.Unlock()
return nil
}
// RegisterTask with Consul. Adds all sevice entries and checks to Consul. If
// exec is nil and a script check exists an error is returned.
//
// Actual communication with Consul is done asynchrously (see Run).
func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec ScriptExecutor) error {
regs := make([]*api.AgentServiceRegistration, len(task.Services))
checks := make([]*api.AgentCheckRegistration, 0, len(task.Services)*2) // just guess at size
var scriptChecks []*scriptCheck
for i, service := range task.Services {
id := makeTaskServiceID(allocID, task.Name, service)
host, port := task.FindHostAndPortFor(service.PortLabel)
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: make([]string, len(service.Tags)),
Address: host,
Port: port,
}
// copy isn't strictly necessary but can avoid bugs especially
// with tests that may reuse Tasks
copy(serviceReg.Tags, service.Tags)
regs[i] = serviceReg
for _, check := range service.Checks {
checkID := createCheckID(id, check)
if check.Type == structs.ServiceCheckScript {
if exec == nil {
return fmt.Errorf("driver %q doesn't support script checks", task.Driver)
}
scriptChecks = append(scriptChecks, newScriptCheck(checkID, check, exec, c.client, c.logger, c.shutdownCh))
}
host, port := serviceReg.Address, serviceReg.Port
if check.PortLabel != "" {
host, port = task.FindHostAndPortFor(check.PortLabel)
}
checkReg, err := createCheckReg(id, checkID, check, host, port)
if err != nil {
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
checks = append(checks, checkReg)
}
}
// Now add them to the registration queue
c.enqueueRegs(regs, checks, scriptChecks)
return nil
}
// RemoveTask from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchrously (see Run).
func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
deregs := make([]string, len(task.Services))
checks := make([]string, 0, len(task.Services)*2) // just guess at size
for i, service := range task.Services {
id := makeTaskServiceID(allocID, task.Name, service)
deregs[i] = id
for _, check := range service.Checks {
checks = append(checks, createCheckID(id, check))
}
}
// Now add them to the deregistration fields; main Run loop will update
c.enqueueDeregs(deregs, checks)
}
// enqueueRegs enqueues service and check registrations for the next time
// operations are sync'd to Consul.
func (c *ServiceClient) enqueueRegs(regs []*api.AgentServiceRegistration, checks []*api.AgentCheckRegistration, scriptChecks []*scriptCheck) {
c.regLock.Lock()
for _, reg := range regs {
// Add reg
c.regServices[reg.ID] = reg
// Make sure it's not being removed
delete(c.deregServices, reg.ID)
}
for _, check := range checks {
// Add check
c.regChecks[check.ID] = check
// Make sure it's not being removed
delete(c.deregChecks, check.ID)
}
for _, script := range scriptChecks {
c.regScripts[script.id] = script
}
c.regLock.Unlock()
c.forceSync()
}
// enqueueDeregs enqueues service and check removals for the next time
// operations are sync'd to Consul.
func (c *ServiceClient) enqueueDeregs(deregs []string, checks []string) {
c.regLock.Lock()
for _, dereg := range deregs {
// Add dereg
c.deregServices[dereg] = mark
// Make sure it's not being added
delete(c.regServices, dereg)
}
for _, check := range checks {
// Add check for removal
c.deregChecks[check] = mark
// Make sure it's not being added
delete(c.regChecks, check)
}
c.regLock.Unlock()
c.forceSync()
}
// Shutdown the Consul client. Update running task registations and deregister
// agent from Consul. Blocks up to shutdownWait before giving up on syncing
// operations.
func (c *ServiceClient) Shutdown() error {
select {
case <-c.shutdownCh:
return nil
default:
close(c.shutdownCh)
}
var mErr multierror.Error
// Don't let Shutdown block indefinitely
deadline := time.After(c.shutdownWait)
// Deregister agent services and checks
c.agentLock.Lock()
for id := range c.agentServices {
if err := c.client.ServiceDeregister(id); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
// Deregister Checks
for id := range c.agentChecks {
if err := c.client.CheckDeregister(id); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
c.agentLock.Unlock()
// Wait for Run to finish any outstanding sync() calls and exit
select {
case <-c.runningCh:
// sync one last time to ensure all enqueued operations are applied
if err := c.sync(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
case <-deadline:
// Don't wait forever though
mErr.Errors = append(mErr.Errors, fmt.Errorf("timed out waiting for Consul operations to complete"))
return mErr.ErrorOrNil()
}
// Give script checks time to exit (no need to lock as Run() has exited)
for _, h := range c.runningScripts {
select {
case <-h.wait():
case <-deadline:
mErr.Errors = append(mErr.Errors, fmt.Errorf("timed out waiting for script checks to run"))
return mErr.ErrorOrNil()
}
}
return mErr.ErrorOrNil()
}
// makeAgentServiceID creates a unique ID for identifying an agent service in
// Consul.
//
// Agent service IDs are of the form:
//
// {nomadServicePrefix}-{ROLE}-{Service.Name}-{Service.Tags...}
// Example Server ID: _nomad-server-nomad-serf
// Example Client ID: _nomad-client-nomad-client-http
//
func makeAgentServiceID(role string, service *structs.Service) string {
parts := make([]string, len(service.Tags)+3)
parts[0] = nomadServicePrefix
parts[1] = role
parts[2] = service.Name
copy(parts[3:], service.Tags)
return strings.Join(parts, "-")
}
// makeTaskServiceID creates a unique ID for identifying a task service in
// Consul.
//
// Task service IDs are of the form:
//
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
//
func makeTaskServiceID(allocID, taskName string, service *structs.Service) string {
parts := make([]string, len(service.Tags)+5)
parts[0] = nomadServicePrefix
parts[1] = "executor"
parts[2] = allocID
parts[3] = taskName
parts[4] = service.Name
copy(parts[5:], service.Tags)
return strings.Join(parts, "-")
}
// createCheckID creates a unique ID for a check.
func createCheckID(serviceID string, check *structs.ServiceCheck) string {
return check.Hash(serviceID)
}
// createCheckReg creates a Check that can be registered with Consul.
//
// Only supports HTTP(S) and TCP checks. Script checks must be handled
// externally.
func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host string, port int) (*api.AgentCheckRegistration, error) {
chkReg := api.AgentCheckRegistration{
ID: checkID,
Name: check.Name,
ServiceID: serviceID,
}
chkReg.Status = check.InitialStatus
chkReg.Timeout = check.Timeout.String()
chkReg.Interval = check.Interval.String()
switch check.Type {
case structs.ServiceCheckHTTP:
if check.Protocol == "" {
check.Protocol = "http"
}
base := url.URL{
Scheme: check.Protocol,
Host: net.JoinHostPort(host, strconv.Itoa(port)),
}
relative, err := url.Parse(check.Path)
if err != nil {
return nil, err
}
url := base.ResolveReference(relative)
chkReg.HTTP = url.String()
case structs.ServiceCheckTCP:
chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port))
case structs.ServiceCheckScript:
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
default:
return nil, fmt.Errorf("check type %+q not valid", check.Type)
}
return &chkReg, nil
}

View File

@ -0,0 +1,228 @@
package consul_test
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
"golang.org/x/sys/unix"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
)
func testLogger() *log.Logger {
if testing.Verbose() {
return log.New(os.Stderr, "", log.LstdFlags)
}
return log.New(ioutil.Discard, "", 0)
}
// TestConsul_Integration asserts TaskRunner properly registers and deregisters
// services and checks with Consul using an embedded Consul agent.
func TestConsul_Integration(t *testing.T) {
if _, ok := driver.BuiltinDrivers["mock_driver"]; !ok {
t.Skip(`test requires mock_driver; run with "-tags nomad_test"`)
}
if testing.Short() {
t.Skip("-short set; skipping")
}
if unix.Geteuid() != 0 {
t.Skip("Must be run as root")
}
// Create an embedded Consul server
testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
defer testconsul.Stop()
conf := config.DefaultConfig()
conf.ConsulConfig.Addr = testconsul.HTTPAddr
consulConfig, err := conf.ConsulConfig.ApiConfig()
if err != nil {
t.Fatalf("error generating consul config: %v", err)
}
conf.StateDir, err = ioutil.TempDir("", "nomadtest-consulstate")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(conf.StateDir)
conf.AllocDir, err = ioutil.TempDir("", "nomdtest-consulalloc")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(conf.AllocDir)
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1h",
}
// Choose a port that shouldn't be in use
task.Resources.Networks[0].ReservedPorts = []structs.Port{{Label: "http", Value: 3}}
task.Services = []*structs.Service{
{
Name: "httpd",
PortLabel: "http",
Tags: []string{"nomad", "test", "http"},
Checks: []*structs.ServiceCheck{
{
Name: "httpd-http-check",
Type: "http",
Path: "/",
Protocol: "http",
PortLabel: "http",
Interval: 9000 * time.Hour,
Timeout: 1, // fail as fast as possible
},
{
Name: "httpd-script-check",
Type: "script",
Command: "/bin/true",
Interval: 10 * time.Second,
Timeout: 10 * time.Second,
},
},
},
{
Name: "httpd2",
PortLabel: "http",
Tags: []string{"test", "http2"},
},
}
logger := testLogger()
logUpdate := func(name, state string, event *structs.TaskEvent) {
logger.Printf("[TEST] test.updater: name=%q state=%q event=%v", name, state, event)
}
allocDir := allocdir.NewAllocDir(logger, filepath.Join(conf.AllocDir, alloc.ID))
if err := allocDir.Build(); err != nil {
t.Fatalf("error building alloc dir: %v", err)
}
taskDir := allocDir.NewTaskDir(task.Name)
vclient := vaultclient.NewMockVaultClient()
consulClient, err := consulapi.NewClient(consulConfig)
if err != nil {
t.Fatalf("error creating consul client: %v", err)
}
serviceClient := consul.NewServiceClient(consulClient.Agent(), logger)
defer serviceClient.Shutdown() // just-in-case cleanup
consulRan := make(chan struct{})
go func() {
serviceClient.Run()
close(consulRan)
}()
tr := client.NewTaskRunner(logger, conf, logUpdate, taskDir, alloc, task, vclient, serviceClient)
tr.MarkReceived()
go tr.Run()
defer func() {
// Just in case cleanup
select {
case <-tr.WaitCh():
// Exited cleanly, no need to kill
default:
tr.Kill("", "", true) // just in case
}
}()
// Block waiting for the service to appear
catalog := consulClient.Catalog()
res, meta, err := catalog.Service("httpd2", "test", nil)
for len(res) == 0 {
//Expected initial request to fail, do a blocking query
res, meta, err = catalog.Service("httpd2", "test", &consulapi.QueryOptions{WaitIndex: meta.LastIndex + 1, WaitTime: 3 * time.Second})
if err != nil {
t.Fatalf("error querying for service: %v", err)
}
}
if len(res) != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", len(res), res)
}
res = res[:]
// Assert the service with the checks exists
for len(res) == 0 {
res, meta, err = catalog.Service("httpd", "http", &consulapi.QueryOptions{WaitIndex: meta.LastIndex + 1, WaitTime: 3 * time.Second})
if err != nil {
t.Fatalf("error querying for service: %v", err)
}
}
if len(res) != 1 {
t.Fatalf("exepcted 1 service but found %d:\n%#v", len(res), res)
}
// Assert the script check passes (mock_driver script checks always
// pass) after having time to run once
time.Sleep(2 * time.Second)
checks, _, err := consulClient.Health().Checks("httpd", nil)
if err != nil {
t.Fatalf("error querying checks: %v", err)
}
if expected := 2; len(checks) != expected {
t.Fatalf("expected %d checks but found %d:\n%#v", expected, len(checks), checks)
}
for _, check := range checks {
if expected := "httpd"; check.ServiceName != expected {
t.Fatalf("expected checks to be for %q but found service name = %q", expected, check.ServiceName)
}
switch check.Name {
case "httpd-http-check":
// Port check should fail
if expected := consulapi.HealthCritical; check.Status != expected {
t.Errorf("expected %q status to be %q but found %q", check.Name, expected, check.Status)
}
case "httpd-script-check":
// mock_driver script checks always succeed
if expected := consulapi.HealthPassing; check.Status != expected {
t.Errorf("expected %q status to be %q but found %q", check.Name, expected, check.Status)
}
default:
t.Errorf("unexpected check %q with status %q", check.Name, check.Status)
}
}
logger.Printf("[TEST] consul.test: killing task")
// Kill the task
tr.Kill("", "", false)
select {
case <-tr.WaitCh():
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for Run() to exit")
}
// Shutdown Consul ServiceClient to ensure all pending operations complete
if err := serviceClient.Shutdown(); err != nil {
t.Errorf("error shutting down Consul ServiceClient: %v", err)
}
// Ensure Consul is clean
services, _, err := catalog.Services(nil)
if err != nil {
t.Fatalf("error query services: %v", err)
}
if len(services) != 1 {
t.Fatalf("expected only 1 service in Consul but found %d:\n%#v", len(services), services)
}
if _, ok := services["consul"]; !ok {
t.Fatalf(`expected only the "consul" key in Consul but found: %#v`, services)
}
}

View File

@ -0,0 +1,27 @@
package consul
import (
"log"
"github.com/hashicorp/consul/api"
)
// MockCatalog can be used for testing where the CatalogAPI is needed.
type MockCatalog struct {
logger *log.Logger
}
func NewMockCatalog(l *log.Logger) *MockCatalog {
return &MockCatalog{logger: l}
}
func (m *MockCatalog) Datacenters() ([]string, error) {
dcs := []string{"dc1"}
m.logger.Printf("[DEBUG] mock_consul: Datacenters() -> (%q, nil)", dcs)
return dcs, nil
}
func (m *MockCatalog) Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error) {
m.logger.Printf("[DEBUG] mock_consul: Service(%q, %q, %#v) -> (nil, nil, nil)", service, tag, q)
return nil, nil, nil
}

View File

@ -0,0 +1,136 @@
package consul
import (
"context"
"log"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/structs"
)
// heartbeater is the subset of consul agent functionality needed by script
// checks to heartbeat
type heartbeater interface {
UpdateTTL(id, output, status string) error
}
type scriptHandle struct {
// cancel the script
cancel func()
done chan struct{}
}
// wait returns a chan that's closed when the script exits
func (s *scriptHandle) wait() <-chan struct{} {
return s.done
}
type scriptCheck struct {
id string
check *structs.ServiceCheck
exec ScriptExecutor
agent heartbeater
// lastCheckOk is true if the last check was ok; otherwise false
lastCheckOk bool
logger *log.Logger
shutdownCh <-chan struct{}
}
func newScriptCheck(id string, check *structs.ServiceCheck, exec ScriptExecutor, agent heartbeater, logger *log.Logger, shutdownCh <-chan struct{}) *scriptCheck {
return &scriptCheck{
id: id,
check: check,
exec: exec,
agent: agent,
lastCheckOk: true, // start logging on first failure
logger: logger,
shutdownCh: shutdownCh,
}
}
// run this script check and return its cancel func. If the shutdownCh is
// closed the check will be run once more before exiting.
func (s *scriptCheck) run() *scriptHandle {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
timer := time.NewTimer(0)
defer timer.Stop()
for {
// Block until check is removed, Nomad is shutting
// down, or the check interval is up
select {
case <-ctx.Done():
// check has been removed
return
case <-s.shutdownCh:
// unblock but don't exit until after we heartbeat once more
case <-timer.C:
timer.Reset(s.check.Interval)
}
// Execute check script with timeout
execctx, cancel := context.WithTimeout(ctx, s.check.Timeout)
output, code, err := s.exec.Exec(execctx, s.check.Command, s.check.Args)
switch execctx.Err() {
case context.Canceled:
// check removed during execution; exit
return
case context.DeadlineExceeded:
// Log deadline exceeded every time, but flip last check to false
s.lastCheckOk = false
s.logger.Printf("[WARN] consul.checks: check %q timed out (%s)", s.check.Name, s.check.Timeout)
}
// cleanup context
cancel()
state := api.HealthCritical
switch code {
case 0:
state = api.HealthPassing
case 1:
state = api.HealthWarning
}
if err != nil {
state = api.HealthCritical
output = []byte(err.Error())
}
// Actually heartbeat the check
err = s.agent.UpdateTTL(s.id, string(output), state)
select {
case <-ctx.Done():
// check has been removed; don't report errors
return
default:
}
if err != nil {
//FIXME Backoff? Retry faster?
if s.lastCheckOk {
s.lastCheckOk = false
s.logger.Printf("[WARN] consul.checks: update for check %q failed: %v", s.check.Name, err)
} else {
s.logger.Printf("[DEBUG] consul.checks: update for check %q still failing: %v", s.check.Name, err)
}
} else if !s.lastCheckOk {
// Succeeded for the first time or after failing; log
s.lastCheckOk = true
s.logger.Printf("[INFO] consul.checks: update for check %q succeeded", s.check.Name)
}
select {
case <-s.shutdownCh:
// We've been told to exit
return
default:
}
}
}()
return &scriptHandle{cancel: cancel, done: done}
}

View File

@ -0,0 +1,165 @@
package consul
import (
"context"
"os/exec"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/structs"
)
// blockingScriptExec implements ScriptExec by running a subcommand that never
// exits.
type blockingScriptExec struct {
// running is ticked before blocking to allow synchronizing operations
running chan struct{}
// set to true if Exec is called and has exited
exited bool
}
func newBlockingScriptExec() *blockingScriptExec {
return &blockingScriptExec{running: make(chan struct{})}
}
func (b *blockingScriptExec) Exec(ctx context.Context, _ string, _ []string) ([]byte, int, error) {
b.running <- mark
cmd := exec.CommandContext(ctx, "/bin/sleep", "9000")
err := cmd.Run()
code := 0
if exitErr, ok := err.(*exec.ExitError); ok {
if !exitErr.Success() {
code = 1
}
}
b.exited = true
return []byte{}, code, err
}
// TestConsulScript_Exec_Cancel asserts cancelling a script check shortcircuits
// any running scripts.
func TestConsulScript_Exec_Cancel(t *testing.T) {
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Hour,
}
exec := newBlockingScriptExec()
// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("checkid", &serviceCheck, exec, nil, testLogger(), nil)
handle := check.run()
// wait until Exec is called
<-exec.running
// cancel now that we're blocked in exec
handle.cancel()
select {
case <-handle.wait():
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
if !exec.exited {
t.Errorf("expected script executor to run and exit but it has not")
}
}
type fakeHeartbeater struct {
updates chan string
}
func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error {
f.updates <- status
return nil
}
func newFakeHeartbeater() *fakeHeartbeater {
return &fakeHeartbeater{updates: make(chan string)}
}
// TestConsulScript_Exec_Timeout asserts a script will be killed when the
// timeout is reached.
func TestConsulScript_Exec_Timeout(t *testing.T) {
t.Parallel() // run the slow tests in parallel
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Second,
}
exec := newBlockingScriptExec()
hb := newFakeHeartbeater()
check := newScriptCheck("checkid", &serviceCheck, exec, hb, testLogger(), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running
// Check for UpdateTTL call
select {
case update := <-hb.updates:
if update != api.HealthCritical {
t.Error("expected %q due to timeout but received %q", api.HealthCritical, update)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
if !exec.exited {
t.Errorf("expected script executor to run and exit but it has not")
}
// Cancel and watch for exit
handle.cancel()
select {
case <-handle.wait():
// ok!
case update := <-hb.updates:
t.Errorf("unexpected UpdateTTL call on exit with status=%q", update)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
type noopExec struct{}
func (noopExec) Exec(context.Context, string, []string) ([]byte, int, error) {
return []byte{}, 0, nil
}
// TestConsulScript_Exec_Shutdown asserts a script will be executed once more
// when told to shutdown.
func TestConsulScript_Exec_Shutdown(t *testing.T) {
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: 3 * time.Second,
}
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
check := newScriptCheck("checkid", &serviceCheck, noopExec{}, hb, testLogger(), shutdown)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
// Tell scriptCheck to exit
close(shutdown)
select {
case update := <-hb.updates:
if update != api.HealthPassing {
t.Error("expected %q due to timeout but received %q", api.HealthPassing, update)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
select {
case <-handle.wait():
// ok!
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,358 +0,0 @@
package consul
import (
"io/ioutil"
"log"
"net"
"os"
"reflect"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
const (
allocID = "12"
serviceRegPrefix = "test"
serviceGroupName = "executor"
)
var logger = log.New(os.Stdout, "", log.LstdFlags)
func TestSyncNow(t *testing.T) {
cs, testconsul := testConsul(t)
defer cs.Shutdown()
defer testconsul.Stop()
cs.SetAddrFinder(func(h string) (string, int) {
a, pstr, _ := net.SplitHostPort(h)
p, _ := net.LookupPort("tcp", pstr)
return a, p
})
cs.syncInterval = 9000 * time.Hour
service := &structs.Service{Name: "foo1", Tags: []string{"a", "b"}}
services := map[ServiceKey]*structs.Service{
GenerateServiceKey(service): service,
}
// Run syncs once on startup and then blocks forever
go cs.Run()
if err := cs.SetServices(serviceGroupName, services); err != nil {
t.Fatalf("error setting services: %v", err)
}
synced := false
for i := 0; !synced && i < 10; i++ {
time.Sleep(250 * time.Millisecond)
agentServices, err := cs.queryAgentServices()
if err != nil {
t.Fatalf("error querying consul services: %v", err)
}
synced = len(agentServices) == 1
}
if !synced {
t.Fatalf("initial sync never occurred")
}
// SetServices again should cause another sync
service1 := &structs.Service{Name: "foo1", Tags: []string{"Y", "Z"}}
service2 := &structs.Service{Name: "bar"}
services = map[ServiceKey]*structs.Service{
GenerateServiceKey(service1): service1,
GenerateServiceKey(service2): service2,
}
if err := cs.SetServices(serviceGroupName, services); err != nil {
t.Fatalf("error setting services: %v", err)
}
synced = false
for i := 0; !synced && i < 10; i++ {
time.Sleep(250 * time.Millisecond)
agentServices, err := cs.queryAgentServices()
if err != nil {
t.Fatalf("error querying consul services: %v", err)
}
synced = len(agentServices) == 2
}
if !synced {
t.Fatalf("SetServices didn't sync immediately")
}
}
func TestCheckRegistration(t *testing.T) {
cs, err := NewSyncer(config.DefaultConsulConfig(), make(chan struct{}), logger)
if err != nil {
t.Fatalf("Err: %v", err)
}
check1 := structs.ServiceCheck{
Name: "check-foo-1",
Type: structs.ServiceCheckTCP,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
InitialStatus: api.HealthPassing,
}
check2 := structs.ServiceCheck{
Name: "check1",
Type: "tcp",
PortLabel: "port2",
Interval: 3 * time.Second,
Timeout: 1 * time.Second,
}
check3 := structs.ServiceCheck{
Name: "check3",
Type: "http",
PortLabel: "port3",
Path: "/health?p1=1&p2=2",
Interval: 3 * time.Second,
Timeout: 1 * time.Second,
}
service1 := structs.Service{
Name: "foo-1",
Tags: []string{"tag1", "tag2"},
PortLabel: "port1",
Checks: []*structs.ServiceCheck{
&check1, &check2,
},
}
task := structs.Task{
Name: "foo",
Services: []*structs.Service{&service1},
Resources: &structs.Resources{
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
IP: "10.10.11.5",
DynamicPorts: []structs.Port{
structs.Port{
Label: "port1",
Value: 20002,
},
structs.Port{
Label: "port2",
Value: 20003,
},
structs.Port{
Label: "port3",
Value: 20004,
},
},
},
},
},
}
cs.SetAddrFinder(task.FindHostAndPortFor)
srvReg, _ := cs.createService(&service1, "domain", "key")
check1Reg, _ := cs.createCheckReg(&check1, srvReg)
check2Reg, _ := cs.createCheckReg(&check2, srvReg)
check3Reg, _ := cs.createCheckReg(&check3, srvReg)
expected := "10.10.11.5:20002"
if check1Reg.TCP != expected {
t.Fatalf("expected: %v, actual: %v", expected, check1Reg.TCP)
}
expected = "10.10.11.5:20003"
if check2Reg.TCP != expected {
t.Fatalf("expected: %v, actual: %v", expected, check2Reg.TCP)
}
expected = "http://10.10.11.5:20004/health?p1=1&p2=2"
if check3Reg.HTTP != expected {
t.Fatalf("expected: %v, actual: %v", expected, check3Reg.HTTP)
}
expected = api.HealthPassing
if check1Reg.Status != expected {
t.Fatalf("expected: %v, actual: %v", expected, check1Reg.Status)
}
}
// testConsul returns a Syncer configured with an embedded Consul server.
//
// Callers must defer Syncer.Shutdown() and TestServer.Stop()
//
func testConsul(t *testing.T) (*Syncer, *testutil.TestServer) {
// Create an embedded Consul server
testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
// Configure Syncer to talk to the test server
cconf := config.DefaultConsulConfig()
cconf.Addr = testconsul.HTTPAddr
cs, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Error creating Syncer: %v", err)
}
return cs, testconsul
}
func TestConsulServiceRegisterServices(t *testing.T) {
cs, testconsul := testConsul(t)
defer cs.Shutdown()
defer testconsul.Stop()
service1 := &structs.Service{Name: "foo", Tags: []string{"a", "b"}}
service2 := &structs.Service{Name: "foo"}
services := map[ServiceKey]*structs.Service{
GenerateServiceKey(service1): service1,
GenerateServiceKey(service2): service2,
}
// Call SetServices to update services in consul
if err := cs.SetServices(serviceGroupName, services); err != nil {
t.Fatalf("error setting services: %v", err)
}
// Manually call SyncServers to cause a synchronous consul update
if err := cs.SyncServices(); err != nil {
t.Fatalf("error syncing services: %v", err)
}
numservices := len(cs.flattenedServices())
if numservices != 2 {
t.Fatalf("expected 2 services but found %d", numservices)
}
numchecks := len(cs.flattenedChecks())
if numchecks != 0 {
t.Fatalf("expected 0 checks but found %d", numchecks)
}
// Assert services are in consul
agentServices, err := cs.client.Agent().Services()
if err != nil {
t.Fatalf("error querying consul services: %v", err)
}
found := 0
for id, as := range agentServices {
if id == "consul" {
found++
continue
}
if _, ok := services[ServiceKey(as.Service)]; ok {
found++
continue
}
t.Errorf("unexpected service in consul: %s", id)
}
if found != 3 {
t.Fatalf("expected 3 services in consul but found %d:\nconsul: %#v", len(agentServices), agentServices)
}
agentChecks, err := cs.queryChecks()
if err != nil {
t.Fatalf("error querying consul checks: %v", err)
}
if len(agentChecks) != numchecks {
t.Fatalf("expected %d checks in consul but found %d:\n%#v", numservices, len(agentChecks), agentChecks)
}
}
func TestConsulServiceUpdateService(t *testing.T) {
cs, testconsul := testConsul(t)
defer cs.Shutdown()
defer testconsul.Stop()
cs.SetAddrFinder(func(h string) (string, int) {
a, pstr, _ := net.SplitHostPort(h)
p, _ := net.LookupPort("tcp", pstr)
return a, p
})
service1 := &structs.Service{Name: "foo1", Tags: []string{"a", "b"}}
service2 := &structs.Service{Name: "foo2"}
services := map[ServiceKey]*structs.Service{
GenerateServiceKey(service1): service1,
GenerateServiceKey(service2): service2,
}
if err := cs.SetServices(serviceGroupName, services); err != nil {
t.Fatalf("error setting services: %v", err)
}
if err := cs.SyncServices(); err != nil {
t.Fatalf("error syncing services: %v", err)
}
// Now update both services
service1 = &structs.Service{Name: "foo1", Tags: []string{"a", "z"}}
service2 = &structs.Service{Name: "foo2", PortLabel: ":8899"}
service3 := &structs.Service{Name: "foo3"}
services = map[ServiceKey]*structs.Service{
GenerateServiceKey(service1): service1,
GenerateServiceKey(service2): service2,
GenerateServiceKey(service3): service3,
}
if err := cs.SetServices(serviceGroupName, services); err != nil {
t.Fatalf("error setting services: %v", err)
}
if err := cs.SyncServices(); err != nil {
t.Fatalf("error syncing services: %v", err)
}
agentServices, err := cs.queryAgentServices()
if err != nil {
t.Fatalf("error querying consul services: %v", err)
}
if len(agentServices) != 3 {
t.Fatalf("expected 3 services in consul but found %d:\n%#v", len(agentServices), agentServices)
}
consulServices := make(map[string]*api.AgentService, 3)
for _, as := range agentServices {
consulServices[as.ID] = as
}
found := 0
for _, s := range cs.flattenedServices() {
// Assert sure changes were applied to internal state
switch s.Name {
case "foo1":
found++
if !reflect.DeepEqual(service1.Tags, s.Tags) {
t.Errorf("incorrect tags on foo1:\n expected: %v\n found: %v", service1.Tags, s.Tags)
}
case "foo2":
found++
if s.Address != "" {
t.Errorf("expected empty host on foo2 but found %q", s.Address)
}
if s.Port != 8899 {
t.Errorf("expected port 8899 on foo2 but found %d", s.Port)
}
case "foo3":
found++
default:
t.Errorf("unexpected service: %s", s.Name)
}
// Assert internal state equals consul's state
cs, ok := consulServices[s.ID]
if !ok {
t.Errorf("service not in consul: %s id: %s", s.Name, s.ID)
continue
}
if !reflect.DeepEqual(s.Tags, cs.Tags) {
t.Errorf("mismatched tags in syncer state and consul for %s:\nsyncer: %v\nconsul: %v", s.Name, s.Tags, cs.Tags)
}
if cs.Port != s.Port {
t.Errorf("mismatched port in syncer state and consul for %s\nsyncer: %v\nconsul: %v", s.Name, s.Port, cs.Port)
}
if cs.Address != s.Address {
t.Errorf("mismatched address in syncer state and consul for %s\nsyncer: %v\nconsul: %v", s.Name, s.Address, cs.Address)
}
}
if found != 3 {
t.Fatalf("expected 3 services locally but found %d", found)
}
}

View File

@ -0,0 +1,497 @@
package consul
import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/structs"
)
func testLogger() *log.Logger {
if testing.Verbose() {
return log.New(os.Stderr, "", log.LstdFlags)
}
return log.New(ioutil.Discard, "", 0)
}
func testTask() *structs.Task {
return &structs.Task{
Name: "taskname",
Resources: &structs.Resources{
Networks: []*structs.NetworkResource{
{
DynamicPorts: []structs.Port{{Label: "x", Value: 1234}},
},
},
},
Services: []*structs.Service{
{
Name: "taskname-service",
PortLabel: "x",
Tags: []string{"tag1", "tag2"},
},
},
}
}
// testFakeCtx contains a fake Consul AgentAPI and implements the Exec
// interface to allow testing without running Consul.
type testFakeCtx struct {
ServiceClient *ServiceClient
FakeConsul *fakeConsul
Task *structs.Task
ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}
// Exec implements the ScriptExecutor interface and will use an alternate
// implementation t.ExecFunc if non-nil.
func (t *testFakeCtx) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
if t.ExecFunc == nil {
// Default impl is just "ok"
return []byte("ok"), 0, nil
}
return t.ExecFunc(ctx, cmd, args)
}
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
// A test Task is also provided.
func setupFake() *testFakeCtx {
fc := &fakeConsul{
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
checkTTLs: make(map[string]int),
}
return &testFakeCtx{
ServiceClient: NewServiceClient(fc, testLogger()),
FakeConsul: fc,
Task: testTask(),
}
}
// fakeConsul is a fake in-memory Consul backend for ServiceClient.
type fakeConsul struct {
// maps of what services and checks have been registered
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
mu sync.Mutex
// when UpdateTTL is called the check ID will have its counter inc'd
checkTTLs map[string]int
}
func (c *fakeConsul) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.checks[check.ID] = check
return nil
}
func (c *fakeConsul) CheckDeregister(checkID string) error {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.checks, checkID)
delete(c.checkTTLs, checkID)
return nil
}
func (c *fakeConsul) ServiceRegister(service *api.AgentServiceRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.services[service.ID] = service
return nil
}
func (c *fakeConsul) ServiceDeregister(serviceID string) error {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.services, serviceID)
return nil
}
func (c *fakeConsul) UpdateTTL(id string, output string, status string) error {
c.mu.Lock()
defer c.mu.Unlock()
check, ok := c.checks[id]
if !ok {
return fmt.Errorf("unknown check id: %q", id)
}
// Flip initial status to passing
check.Status = "passing"
c.checkTTLs[id]++
return nil
}
func TestConsul_ChangeTags(t *testing.T) {
ctx := setupFake()
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, nil); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Manually call sync() since Run() isn't running
if err := ctx.ServiceClient.sync(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
origKey := ""
for k, v := range ctx.FakeConsul.services {
origKey = k
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
}
// Changing a tag removes old entry before adding new one
ctx.ServiceClient.RemoveTask("allocid", ctx.Task)
ctx.Task.Services[0].Tags[0] = "newtag"
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, nil); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.ServiceClient.sync(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for k, v := range ctx.FakeConsul.services {
if k == origKey {
t.Errorf("expected key to change but found %q", k)
}
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
}
}
// TestConsul_RegServices tests basic service registration.
func TestConsul_RegServices(t *testing.T) {
ctx := setupFake()
port := ctx.Task.Resources.Networks[0].DynamicPorts[0].Value
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, nil); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Manually call sync() since Run() isn't running
if err := ctx.ServiceClient.sync(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for _, v := range ctx.FakeConsul.services {
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
if v.Port != port {
t.Errorf("expected Port=%d != %d", port, v.Port)
}
}
// Make a change which will register a new service
ctx.Task.Services[0].Name = "taskname-service2"
ctx.Task.Services[0].Tags[0] = "tag3"
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, nil); err != nil {
t.Fatalf("unpexpected error registering task: %v", err)
}
// Make sure changes don't take affect until sync() is called (since
// Run() isn't running)
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for _, v := range ctx.FakeConsul.services {
if reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags to differ, changes applied before sync()")
}
}
// Now sync() and re-check for the applied updates
if err := ctx.ServiceClient.sync(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 2 {
t.Fatalf("expected 2 services but found %d:\n%#v", n, ctx.FakeConsul.services)
}
found := false
for _, v := range ctx.FakeConsul.services {
if v.Name == ctx.Task.Services[0].Name {
if found {
t.Fatalf("found new service name %q twice", v.Name)
}
found = true
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
}
}
if !found {
t.Fatalf("did not find new service %q", ctx.Task.Services[0].Name)
}
// Remove the new task
ctx.ServiceClient.RemoveTask("allocid", ctx.Task)
if err := ctx.ServiceClient.sync(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for _, v := range ctx.FakeConsul.services {
if v.Name != "taskname-service" {
t.Errorf("expected original task to survive not %q", v.Name)
}
}
}
// TestConsul_ShutdownOK tests the ok path for the shutdown logic in
// ServiceClient.
func TestConsul_ShutdownOK(t *testing.T) {
ctx := setupFake()
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 10 * time.Second,
InitialStatus: "warning",
},
}
hasShutdown := make(chan struct{})
go func() {
ctx.ServiceClient.Run()
close(hasShutdown)
}()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
agentServices := []*structs.Service{
{
Name: "http",
Tags: []string{"nomad"},
PortLabel: "localhost:2345",
},
}
if err := ctx.ServiceClient.RegisterAgent("client", agentServices); err != nil {
t.Fatalf("unexpected error registering agent: %v", err)
}
// Shutdown should block until all enqueued operations finish.
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// assert Run() exits in a timely fashion after Shutdown() exits
select {
case <-hasShutdown:
// ok! Run() exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("expected Run() to exit, but it did not")
}
// Nothing should be enqueued anymore
enqueued := (len(ctx.ServiceClient.regServices) + len(ctx.ServiceClient.deregServices) +
len(ctx.ServiceClient.regChecks) + len(ctx.ServiceClient.deregChecks))
if enqueued > 0 {
t.Errorf("%d operations still enqueued", enqueued)
}
// UpdateTTL should have been called once for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checkTTLs {
if v != 1 {
t.Fatalf("expected script check to be updated once but found %d", v)
}
}
for _, v := range ctx.FakeConsul.checks {
if v.Status != "passing" {
t.Fatalf("expected check to be passing but found %q", v.Status)
}
}
}
// TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in
// ServiceClient.
func TestConsul_ShutdownSlow(t *testing.T) {
t.Parallel() // run the slow tests in parallel
ctx := setupFake()
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 5 * time.Second,
InitialStatus: "warning",
},
}
// Make Exec slow, but not too slow
ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
time.Sleep(time.Second)
return []byte{}, 0, nil
}
// Make shutdown wait time just a bit longer than ctx.Exec takes
ctx.ServiceClient.shutdownWait = 3 * time.Second
hasShutdown := make(chan struct{})
go func() {
ctx.ServiceClient.Run()
close(hasShutdown)
}()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// assert Run() exits in a timely fashion after Shutdown() exits
select {
case <-hasShutdown:
// ok! Run() exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("expected Run() to exit, but it did not")
}
// Shutdown time should have taken: 1s <= shutdown <= 3s
shutdownTime := time.Now().Sub(preShutdown)
if shutdownTime < time.Second || shutdownTime > ctx.ServiceClient.shutdownWait {
t.Errorf("expected shutdown to take >1s and <%s but took: %s", ctx.ServiceClient.shutdownWait, shutdownTime)
}
// UpdateTTL should have been called once for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checkTTLs {
if v != 1 {
t.Fatalf("expected script check to be updated once but found %d", v)
}
}
for _, v := range ctx.FakeConsul.checks {
if v.Status != "passing" {
t.Fatalf("expected check to be passing but found %q", v.Status)
}
}
}
// TestConsul_ShutdownBlocked tests the blocked past deadline path for the
// shutdown logic in ServiceClient.
func TestConsul_ShutdownBlocked(t *testing.T) {
t.Parallel() // run the slow tests in parallel
ctx := setupFake()
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
InitialStatus: "warning",
},
}
block := make(chan struct{})
defer close(block) // cleanup after test
// Make Exec slow, but not too slow
ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
<-block
return []byte{}, 0, nil
}
// Use a short shutdown deadline since we're intentionally blocking forever
ctx.ServiceClient.shutdownWait = time.Second
hasShutdown := make(chan struct{})
go func() {
ctx.ServiceClient.Run()
close(hasShutdown)
}()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
err := ctx.ServiceClient.Shutdown()
if err == nil {
t.Errorf("expected a timed out error from shutdown")
}
// assert Run() exits in a timely fashion after Shutdown() exits
maxWait := 10 * time.Second
select {
case <-hasShutdown:
// ok! Run() exited as expected
case <-time.After(maxWait):
t.Fatalf("expected Run() to exit, but it did not")
}
// Shutdown time should have taken 1s; to avoid timing related errors
// simply test for 1s <= shutdown <= 10s
shutdownTime := time.Now().Sub(preShutdown)
if shutdownTime < ctx.ServiceClient.shutdownWait || shutdownTime > maxWait {
t.Errorf("expected shutdown to take >%s and <%s but took: %s", ctx.ServiceClient.shutdownWait, maxWait, shutdownTime)
}
// UpdateTTL should not have been called for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 0 {
t.Fatalf("expected 0 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checks {
if expected := "warning"; v.Status != expected {
t.Fatalf("expected check to be %q but found %q", expected, v.Status)
}
}
}

View File

@ -63,6 +63,14 @@ const (
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
// defaultConsulDiscoveryInterval is how often to poll Consul for new
// servers if there is no leader.
defaultConsulDiscoveryInterval time.Duration = 9 * time.Second
// defaultConsulDiscoveryIntervalRetry is how often to poll Consul for
// new servers if there is no leader and the last Consul query failed.
defaultConsulDiscoveryIntervalRetry time.Duration = 3 * time.Second
)
// Server is Nomad server which manages the job queues,
@ -136,8 +144,8 @@ type Server struct {
heartbeatTimers map[string]*time.Timer
heartbeatTimersLock sync.Mutex
// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// consulCatalog is used for discovering other Nomad Servers via Consul
consulCatalog consul.CatalogAPI
// vault is the client for communicating with Vault.
vault VaultClient
@ -167,7 +175,7 @@ type endpoints struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Server, error) {
func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logger) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
@ -212,20 +220,20 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger)
// Create the server
s := &Server{
config: config,
consulSyncer: consulSyncer,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
rpcServer: rpc.NewServer(),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: blockedEvals,
planQueue: planQueue,
rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}),
config: config,
consulCatalog: consulCatalog,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
rpcServer: rpc.NewServer(),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: blockedEvals,
planQueue: planQueue,
rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}),
}
// Create the periodic dispatcher for launching periodic jobs.
@ -542,8 +550,7 @@ func (s *Server) setupBootstrapHandler() error {
s.logger.Printf("[DEBUG] server.nomad: lost contact with Nomad quorum, falling back to Consul for server list")
consulCatalog := s.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
dcs, err := s.consulCatalog.Datacenters()
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return fmt.Errorf("server.nomad: unable to query Consul datacenters: %v", err)
@ -570,7 +577,7 @@ func (s *Server) setupBootstrapHandler() error {
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts)
consulServices, _, err := s.consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts)
if err != nil {
err := fmt.Errorf("failed to query service %q in Consul datacenter %q: %v", nomadServerServiceName, dc, err)
s.logger.Printf("[WARN] server.nomad: %v", err)
@ -618,7 +625,28 @@ func (s *Server) setupBootstrapHandler() error {
return nil
}
s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn)
// Hacky replacement for old ConsulSyncer Periodic Handler.
go func() {
lastOk := true
sync := time.NewTimer(0)
for {
select {
case <-sync.C:
d := defaultConsulDiscoveryInterval
if err := bootstrapFn(); err != nil {
// Only log if it worked last time
if lastOk {
lastOk = false
s.logger.Printf("[ERR] consul: error looking up Nomad servers: %v", err)
}
d = defaultConsulDiscoveryIntervalRetry
}
sync.Reset(d)
case <-s.shutdownCh:
return
}
}
}()
return nil
}

View File

@ -76,15 +76,11 @@ func testServer(t *testing.T, cb func(*Config)) *Server {
// Enable raft as leader if we have bootstrap on
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap
shutdownCh := make(chan struct{})
logger := log.New(config.LogOutput, fmt.Sprintf("[%s] ", config.NodeName), log.LstdFlags)
consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
catalog := consul.NewMockCatalog(logger)
// Create server
server, err := NewServer(config, consulSyncer, logger)
server, err := NewServer(config, catalog, logger)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -106,7 +106,7 @@ does not automatically enable service discovery.
~> **Caveat:** The command must be the path to the command on disk, and no
shell exists by default. That means operators like `||` or `&&` are not
available. Additionally, all arguments must be supplied via the `args`
parameter. The achieve the behavior of shell operators, specify the command
parameter. To achieve the behavior of shell operators, specify the command
as a shell, like `/bin/bash` and then use `args` to run the check.
- `initial_status` `(string: <enum>)` - Specifies the originating status of the