diff --git a/command/agent/agent.go b/command/agent/agent.go index 99b4d25b5..417c4552e 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -83,6 +83,9 @@ type Agent struct { // checkTTLs maps the check ID to an associated check TTL checkTTLs map[string]*CheckTTL + // checkDockers maps the check ID to an associated Docker Exec based check + checkDockers map[string]*CheckDocker + // checkLock protects updates to the check* maps checkLock sync.Mutex @@ -151,6 +154,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { checkTTLs: make(map[string]*CheckTTL), checkHTTPs: make(map[string]*CheckHTTP), checkTCPs: make(map[string]*CheckTCP), + checkDockers: make(map[string]*CheckDocker), eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), shutdownCh: make(chan struct{}), @@ -905,7 +909,31 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist tcp.Start() a.checkTCPs[check.CheckID] = tcp - } else { + } else if chkType.IsDocker() { + if existing, ok := a.checkDockers[check.CheckID]; ok { + existing.Stop() + } + if chkType.Interval < MinInterval { + a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", + check.CheckID, MinInterval)) + chkType.Interval = MinInterval + } + + dockerCheck := &CheckDocker{ + Notify: &a.state, + CheckID: check.CheckID, + DockerContainerId: chkType.DockerContainerId, + Shell: chkType.Shell, + Script: chkType.Script, + Interval: chkType.Interval, + Logger: a.logger, + } + if err := dockerCheck.Init(); err != nil { + return err + } + dockerCheck.Start() + a.checkDockers[check.CheckID] = dockerCheck + } else if chkType.IsMonitor() { if existing, ok := a.checkMonitors[check.CheckID]; ok { existing.Stop() } @@ -924,6 +952,8 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist } monitor.Start() a.checkMonitors[check.CheckID] = monitor + } else { + return fmt.Errorf("Check type is not valid") } } diff --git a/command/agent/check.go b/command/agent/check.go index d1d316569..8ca3e3cc6 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -6,12 +6,14 @@ import ( "log" "net" "net/http" + "os" "os/exec" "sync" "syscall" "time" "github.com/armon/circbuf" + docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-cleanhttp" ) @@ -33,15 +35,17 @@ const ( // CheckType is used to create either the CheckMonitor // or the CheckTTL. -// Four types are supported: Script, HTTP, TCP and TTL -// Script, HTTP and TCP all require Interval +// Five types are supported: Script, HTTP, TCP, Docker and TTL +// Script, HTTP, Docker and TCP all require Interval // Only one of the types needs to be provided -// TTL or Script/Interval or HTTP/Interval or TCP/Interval +// TTL or Script/Interval or HTTP/Interval or TCP/Interval or Docker/Interval type CheckType struct { - Script string - HTTP string - TCP string - Interval time.Duration + Script string + HTTP string + TCP string + Interval time.Duration + DockerContainerId string + Shell string Timeout time.Duration TTL time.Duration @@ -54,7 +58,7 @@ type CheckTypes []*CheckType // Valid checks if the CheckType is valid func (c *CheckType) Valid() bool { - return c.IsTTL() || c.IsMonitor() || c.IsHTTP() || c.IsTCP() + return c.IsTTL() || c.IsMonitor() || c.IsHTTP() || c.IsTCP() || c.IsDocker() } // IsTTL checks if this is a TTL type @@ -64,7 +68,7 @@ func (c *CheckType) IsTTL() bool { // IsMonitor checks if this is a Monitor type func (c *CheckType) IsMonitor() bool { - return c.Script != "" && c.Interval != 0 + return c.Script != "" && c.DockerContainerId == "" && c.Interval != 0 } // IsHTTP checks if this is a HTTP type @@ -77,6 +81,10 @@ func (c *CheckType) IsTCP() bool { return c.TCP != "" && c.Interval != 0 } +func (c *CheckType) IsDocker() bool { + return c.DockerContainerId != "" && c.Script != "" && c.Interval != 0 +} + // CheckNotifier interface is used by the CheckMonitor // to notify when a check has a status update. The update // should take care to be idempotent. @@ -493,3 +501,161 @@ func (c *CheckTCP) check() { c.Logger.Printf("[DEBUG] agent: check '%v' is passing", c.CheckID) c.Notify.UpdateCheck(c.CheckID, structs.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP)) } + +// A custom interface since go-dockerclient doesn't have one +// We will use this interface in our test to inject a fake client +type DockerClient interface { + CreateExec(docker.CreateExecOptions) (*docker.Exec, error) + StartExec(string, docker.StartExecOptions) error + InspectExec(string) (*docker.ExecInspect, error) +} + +// CheckDocker is used to periodically invoke a script to +// determine the health of an application running inside a +// Docker Container. We assume that the script is compatible +// with nagios plugins and expects the output in the same format. +type CheckDocker struct { + Notify CheckNotifier + CheckID string + Script string + DockerContainerId string + Shell string + Interval time.Duration + Logger *log.Logger + + dockerClient DockerClient + cmd []string + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +//Initializes the Docker Client +func (c *CheckDocker) Init() error { + //create the docker client + var err error + c.dockerClient, err = docker.NewClientFromEnv() + if err != nil { + c.Logger.Println("[DEBUG] Error creating the Docker Client : %s", err.Error()) + return err + } + return nil +} + +// Start is used to start checks. +// Docker Checks runs until stop is called +func (c *CheckDocker) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + + //figure out the shell + if c.Shell == "" { + c.Shell = shell() + } + + c.cmd = []string{c.Shell, "-c", c.Script} + + c.stop = false + c.stopCh = make(chan struct{}) + go c.run() +} + +// Stop is used to stop a docker check. +func (c *CheckDocker) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.stop = true + close(c.stopCh) + } +} + +// run is invoked by a goroutine to run until Stop() is called +func (c *CheckDocker) run() { + // Get the randomized initial pause time + initialPauseTime := randomStagger(c.Interval) + c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerId) + next := time.After(initialPauseTime) + for { + select { + case <-next: + c.check() + next = time.After(c.Interval) + case <-c.stopCh: + return + } + } +} + +func (c *CheckDocker) check() { + //Set up the Exec since + execOpts := docker.CreateExecOptions{ + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + Tty: false, + Cmd: c.cmd, + Container: c.DockerContainerId, + } + var ( + exec *docker.Exec + err error + ) + if exec, err = c.dockerClient.CreateExec(execOpts); err != nil { + c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error()) + c.Notify.UpdateCheck(c.CheckID, structs.HealthCritical, fmt.Sprintf("Unable to create Exec, error: %s", err.Error())) + return + } + + // Collect the output + output, _ := circbuf.NewBuffer(CheckBufSize) + + err = c.dockerClient.StartExec(exec.ID, docker.StartExecOptions{Detach: false, Tty: false, OutputStream: output, ErrorStream: output}) + if err != nil { + c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error()) + c.Notify.UpdateCheck(c.CheckID, structs.HealthCritical, fmt.Sprintf("Unable to start Exec: %s", err.Error())) + return + } + + // Get the output, add a message about truncation + outputStr := string(output.Bytes()) + if output.TotalWritten() > output.Size() { + outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", + output.Size(), output.TotalWritten(), outputStr) + } + + c.Logger.Printf("[DEBUG] agent: check '%s' script '%s' output: %s", + c.CheckID, c.Script, outputStr) + + execInfo, err := c.dockerClient.InspectExec(exec.ID) + if err != nil { + c.Logger.Printf("[DEBUG] Error in inspecting check result : %s", err.Error()) + c.Notify.UpdateCheck(c.CheckID, structs.HealthCritical, fmt.Sprintf("Unable to inspect Exec: %s", err.Error())) + return + } + + // Sets the status of the check to healthy if exit code is 0 + if execInfo.ExitCode == 0 { + c.Notify.UpdateCheck(c.CheckID, structs.HealthPassing, outputStr) + return + } + + // Set the status of the check to Warning if exit code is 1 + if execInfo.ExitCode == 1 { + c.Logger.Printf("[DEBUG] Check failed with exit code: %d", execInfo.ExitCode) + c.Notify.UpdateCheck(c.CheckID, structs.HealthWarning, outputStr) + return + } + + // Set the health as critical + c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) + c.Notify.UpdateCheck(c.CheckID, structs.HealthCritical, outputStr) +} + +func shell() string { + if otherShell := os.Getenv("SHELL"); otherShell != "" { + return otherShell + } else { + return "/bin/sh" + } +} diff --git a/command/agent/check_test.go b/command/agent/check_test.go index 6b9f59df7..3736f8d3a 100644 --- a/command/agent/check_test.go +++ b/command/agent/check_test.go @@ -1,15 +1,18 @@ package agent import ( + "errors" "fmt" "log" "net" "net/http" "net/http/httptest" "os" + "os/exec" "testing" "time" + docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" ) @@ -393,3 +396,269 @@ func TestCheckTCPPassing(t *testing.T) { expectTCPStatus(t, tcpServer.Addr().String(), "passing") tcpServer.Close() } + +// A fake docker client to test happy path scenario +type fakeDockerClientWithNoErrors struct { +} + +func (d *fakeDockerClientWithNoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { + return &docker.Exec{ID: "123"}, nil +} + +func (d *fakeDockerClientWithNoErrors) StartExec(id string, opts docker.StartExecOptions) error { + fmt.Fprint(opts.OutputStream, "output") + return nil +} + +func (d *fakeDockerClientWithNoErrors) InspectExec(id string) (*docker.ExecInspect, error) { + return &docker.ExecInspect{ + ID: "123", + ExitCode: 0, + }, nil +} + +// A fake docker client to test truncation of output +type fakeDockerClientWithLongOutput struct { +} + +func (d *fakeDockerClientWithLongOutput) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { + return &docker.Exec{ID: "123"}, nil +} + +func (d *fakeDockerClientWithLongOutput) StartExec(id string, opts docker.StartExecOptions) error { + b, _ := exec.Command("od", "-N", "81920", "/dev/urandom").Output() + fmt.Fprint(opts.OutputStream, string(b)) + return nil +} + +func (d *fakeDockerClientWithLongOutput) InspectExec(id string) (*docker.ExecInspect, error) { + return &docker.ExecInspect{ + ID: "123", + ExitCode: 0, + }, nil +} + +// A fake docker client to test non-zero exit codes from exec invocation +type fakeDockerClientWithExecNonZeroExitCode struct { +} + +func (d *fakeDockerClientWithExecNonZeroExitCode) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { + return &docker.Exec{ID: "123"}, nil +} + +func (d *fakeDockerClientWithExecNonZeroExitCode) StartExec(id string, opts docker.StartExecOptions) error { + return nil +} + +func (d *fakeDockerClientWithExecNonZeroExitCode) InspectExec(id string) (*docker.ExecInspect, error) { + return &docker.ExecInspect{ + ID: "123", + ExitCode: 127, + }, nil +} + +// A fake docker client to test exit code which result into Warning +type fakeDockerClientWithExecExitCodeOne struct { +} + +func (d *fakeDockerClientWithExecExitCodeOne) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { + return &docker.Exec{ID: "123"}, nil +} + +func (d *fakeDockerClientWithExecExitCodeOne) StartExec(id string, opts docker.StartExecOptions) error { + fmt.Fprint(opts.OutputStream, "output") + return nil +} + +func (d *fakeDockerClientWithExecExitCodeOne) InspectExec(id string) (*docker.ExecInspect, error) { + return &docker.ExecInspect{ + ID: "123", + ExitCode: 1, + }, nil +} + +// A fake docker client to simulate create exec failing +type fakeDockerClientWithCreateExecFailure struct { +} + +func (d *fakeDockerClientWithCreateExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { + return nil, errors.New("Exec Creation Failed") +} + +func (d *fakeDockerClientWithCreateExecFailure) StartExec(id string, opts docker.StartExecOptions) error { + return errors.New("Exec doesn't exist") +} + +func (d *fakeDockerClientWithCreateExecFailure) InspectExec(id string) (*docker.ExecInspect, error) { + return nil, errors.New("Exec doesn't exist") +} + +// A fake docker client to simulate start exec failing +type fakeDockerClientWithStartExecFailure struct { +} + +func (d *fakeDockerClientWithStartExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { + return &docker.Exec{ID: "123"}, nil +} + +func (d *fakeDockerClientWithStartExecFailure) StartExec(id string, opts docker.StartExecOptions) error { + return errors.New("Couldn't Start Exec") +} + +func (d *fakeDockerClientWithStartExecFailure) InspectExec(id string) (*docker.ExecInspect, error) { + return nil, errors.New("Exec doesn't exist") +} + +// A fake docker client to test exec info query failures +type fakeDockerClientWithExecInfoErrors struct { +} + +func (d *fakeDockerClientWithExecInfoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { + return &docker.Exec{ID: "123"}, nil +} + +func (d *fakeDockerClientWithExecInfoErrors) StartExec(id string, opts docker.StartExecOptions) error { + return nil +} + +func (d *fakeDockerClientWithExecInfoErrors) InspectExec(id string) (*docker.ExecInspect, error) { + return nil, errors.New("Unable to query exec info") +} + +func expectDockerCheckStatus(t *testing.T, dockerClient DockerClient, status string, output string) { + mock := &MockNotify{ + state: make(map[string]string), + updates: make(map[string]int), + output: make(map[string]string), + } + check := &CheckDocker{ + Notify: mock, + CheckID: "foo", + Script: "/health.sh", + DockerContainerId: "54432bad1fc7", + Shell: "/bin/sh", + Interval: 10 * time.Millisecond, + Logger: log.New(os.Stderr, "", log.LstdFlags), + dockerClient: dockerClient, + } + check.Start() + defer check.Stop() + + time.Sleep(50 * time.Millisecond) + + // Should have at least 2 updates + if mock.updates["foo"] < 2 { + t.Fatalf("should have 2 updates %v", mock.updates) + } + + if mock.state["foo"] != status { + t.Fatalf("should be %v %v", status, mock.state) + } + + if mock.output["foo"] != output { + t.Fatalf("should be %v %v", output, mock.output) + } +} + +func TestDockerCheckWhenExecReturnsSuccessExitCode(t *testing.T) { + expectDockerCheckStatus(t, &fakeDockerClientWithNoErrors{}, "passing", "output") +} + +func TestDockerCheckWhenExecCreationFails(t *testing.T) { + expectDockerCheckStatus(t, &fakeDockerClientWithCreateExecFailure{}, "critical", "Unable to create Exec, error: Exec Creation Failed") +} + +func TestDockerCheckWhenExitCodeIsNonZero(t *testing.T) { + expectDockerCheckStatus(t, &fakeDockerClientWithExecNonZeroExitCode{}, "critical", "") +} + +func TestDockerCheckWhenExitCodeIsone(t *testing.T) { + expectDockerCheckStatus(t, &fakeDockerClientWithExecExitCodeOne{}, "warning", "output") +} + +func TestDockerCheckWhenExecStartFails(t *testing.T) { + expectDockerCheckStatus(t, &fakeDockerClientWithStartExecFailure{}, "critical", "Unable to start Exec: Couldn't Start Exec") +} + +func TestDockerCheckWhenExecInfoFails(t *testing.T) { + expectDockerCheckStatus(t, &fakeDockerClientWithExecInfoErrors{}, "critical", "Unable to inspect Exec: Unable to query exec info") +} + +func TestDockerCheckDefaultToSh(t *testing.T) { + os.Setenv("SHELL", "") + mock := &MockNotify{ + state: make(map[string]string), + updates: make(map[string]int), + output: make(map[string]string), + } + check := &CheckDocker{ + Notify: mock, + CheckID: "foo", + Script: "/health.sh", + DockerContainerId: "54432bad1fc7", + Interval: 10 * time.Millisecond, + Logger: log.New(os.Stderr, "", log.LstdFlags), + dockerClient: &fakeDockerClientWithNoErrors{}, + } + check.Start() + defer check.Stop() + + time.Sleep(50 * time.Millisecond) + if check.Shell != "/bin/sh" { + t.Fatalf("Shell should be: %v , actual: %v", "/bin/sh", check.Shell) + } +} + +func TestDockerCheckUseShellFromEnv(t *testing.T) { + mock := &MockNotify{ + state: make(map[string]string), + updates: make(map[string]int), + output: make(map[string]string), + } + os.Setenv("SHELL", "/bin/bash") + check := &CheckDocker{ + Notify: mock, + CheckID: "foo", + Script: "/health.sh", + DockerContainerId: "54432bad1fc7", + Interval: 10 * time.Millisecond, + Logger: log.New(os.Stderr, "", log.LstdFlags), + dockerClient: &fakeDockerClientWithNoErrors{}, + } + check.Start() + defer check.Stop() + + time.Sleep(50 * time.Millisecond) + if check.Shell != "/bin/bash" { + t.Fatalf("Shell should be: %v , actual: %v", "/bin/bash", check.Shell) + } + os.Setenv("SHELL", "") +} + +func TestDockerCheckTruncateOutput(t *testing.T) { + mock := &MockNotify{ + state: make(map[string]string), + updates: make(map[string]int), + output: make(map[string]string), + } + check := &CheckDocker{ + Notify: mock, + CheckID: "foo", + Script: "/health.sh", + DockerContainerId: "54432bad1fc7", + Shell: "/bin/sh", + Interval: 10 * time.Millisecond, + Logger: log.New(os.Stderr, "", log.LstdFlags), + dockerClient: &fakeDockerClientWithLongOutput{}, + } + check.Start() + defer check.Stop() + + time.Sleep(50 * time.Millisecond) + + // Allow for extra bytes for the truncation message + if len(mock.output["foo"]) > CheckBufSize+100 { + t.Fatalf("output size is too long") + } + +} diff --git a/command/agent/config.go b/command/agent/config.go index e7b3322bd..6240b33f2 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -773,6 +773,9 @@ func FixupCheckType(raw interface{}) error { case "service_id": rawMap["serviceid"] = v delete(rawMap, "service_id") + case "docker_container_id": + rawMap["DockerContainerId"] = v + delete(rawMap, "docker_container_id") } } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index b7a107be6..dc0517f34 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -1072,7 +1072,7 @@ func TestDecodeConfig_Service(t *testing.T) { func TestDecodeConfig_Check(t *testing.T) { // Basics - input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s" }}` + input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis" }}` config, err := DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -1106,6 +1106,14 @@ func TestDecodeConfig_Check(t *testing.T) { if chk.TTL != 15*time.Second { t.Fatalf("bad: %v", chk) } + + if chk.Shell != "/bin/bash" { + t.Fatalf("bad: %v", chk) + } + + if chk.DockerContainerId != "redis" { + t.Fatalf("bad: %v", chk) + } } func TestMergeConfig(t *testing.T) {