agent: replace docker check

This patch replaces the Docker client which is used
for health checks with a simplified version tailored
for that purpose.

See #3254
See #3257
Fixes #3270
This commit is contained in:
Frank Schroeder 2017-07-12 07:01:42 -07:00
parent 25aaf35522
commit b4e5c0647b
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
8 changed files with 552 additions and 380 deletions

View File

@ -128,6 +128,9 @@ type Agent struct {
// checkLock protects updates to the check* maps
checkLock sync.Mutex
// dockerClient is the client for performing docker health checks.
dockerClient *DockerClient
// eventCh is used to receive user events
eventCh chan serf.UserEvent
@ -1637,9 +1640,12 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
// Check if already registered
if chkType != nil {
if chkType.IsTTL() {
switch {
case chkType.IsTTL():
if existing, ok := a.checkTTLs[check.CheckID]; ok {
existing.Stop()
delete(a.checkTTLs, check.CheckID)
}
ttl := &CheckTTL{
@ -1658,9 +1664,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
ttl.Start()
a.checkTTLs[check.CheckID] = ttl
} else if chkType.IsHTTP() {
case chkType.IsHTTP():
if existing, ok := a.checkHTTPs[check.CheckID]; ok {
existing.Stop()
delete(a.checkHTTPs, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
@ -1682,9 +1689,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
http.Start()
a.checkHTTPs[check.CheckID] = http
} else if chkType.IsTCP() {
case chkType.IsTCP():
if existing, ok := a.checkTCPs[check.CheckID]; ok {
existing.Stop()
delete(a.checkTCPs, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
@ -1703,9 +1711,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
tcp.Start()
a.checkTCPs[check.CheckID] = tcp
} else if chkType.IsDocker() {
case chkType.IsDocker():
if existing, ok := a.checkDockers[check.CheckID]; ok {
existing.Stop()
delete(a.checkDockers, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
@ -1713,6 +1722,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
chkType.Interval = MinInterval
}
if a.dockerClient == nil {
dc, err := NewDockerClient(os.Getenv("DOCKER_HOST"), CheckBufSize)
if err != nil {
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
return err
}
a.dockerClient = dc
}
dockerCheck := &CheckDocker{
Notify: a.state,
CheckID: check.CheckID,
@ -1721,15 +1739,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Script: chkType.Script,
Interval: chkType.Interval,
Logger: a.logger,
}
if err := dockerCheck.Init(); err != nil {
return err
client: a.dockerClient,
}
dockerCheck.Start()
a.checkDockers[check.CheckID] = dockerCheck
} else if chkType.IsMonitor() {
case chkType.IsMonitor():
if existing, ok := a.checkMonitors[check.CheckID]; ok {
existing.Stop()
delete(a.checkMonitors, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
@ -1747,7 +1765,8 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
} else {
default:
return fmt.Errorf("Check type is not valid")
}

View File

@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@ -14,7 +15,6 @@ import (
"time"
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
@ -516,14 +516,6 @@ func (c *CheckTCP) check() {
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
}
// DockerClient defines an interface for a docker client
// which is used for injecting a fake client during tests.
type DockerClient interface {
CreateExec(docker.CreateExecOptions) (*docker.Exec, error)
StartExec(string, docker.StartExecOptions) error
InspectExec(string) (*docker.ExecInspect, error)
}
// CheckDocker is used to periodically invoke a script to
// determine the health of an application running inside a
// Docker Container. We assume that the script is compatible
@ -537,137 +529,100 @@ type CheckDocker struct {
Interval time.Duration
Logger *log.Logger
dockerClient DockerClient
cmd []string
stop bool
stopCh chan struct{}
stopLock sync.Mutex
client *DockerClient
stop chan struct{}
}
// Init initializes the Docker Client
func (c *CheckDocker) Init() error {
var err error
c.dockerClient, err = docker.NewClientFromEnv()
if err != nil {
c.Logger.Printf("[DEBUG] Error creating the Docker client: %s", err.Error())
return err
}
return nil
}
// Start is used to start checks.
// Docker Checks runs until stop is called
func (c *CheckDocker) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
//figure out the shell
if c.Shell == "" {
c.Shell = shell()
if c.stop != nil {
panic("Docker check already started")
}
c.cmd = []string{c.Shell, "-c", c.Script}
if c.Logger == nil {
c.Logger = log.New(ioutil.Discard, "", 0)
}
c.stop = false
c.stopCh = make(chan struct{})
if c.Shell == "" {
c.Shell = os.Getenv("SHELL")
if c.Shell == "" {
c.Shell = "/bin/sh"
}
}
c.stop = make(chan struct{})
go c.run()
}
// Stop is used to stop a docker check.
func (c *CheckDocker) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
if c.stop == nil {
panic("Stop called before start")
}
close(c.stop)
}
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckDocker) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
next := time.After(initialPauseTime)
firstWait := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", firstWait, c.Shell, c.Script, c.DockerContainerID)
next := time.After(firstWait)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
case <-c.stop:
return
}
}
}
func (c *CheckDocker) check() {
//Set up the Exec since
execOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: c.cmd,
Container: c.DockerContainerID,
}
var (
exec *docker.Exec
err error
)
if exec, err = c.dockerClient.CreateExec(execOpts); err != nil {
c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to create Exec, error: %s", err.Error()))
return
}
// Collect the output
output, _ := circbuf.NewBuffer(CheckBufSize)
err = c.dockerClient.StartExec(exec.ID, docker.StartExecOptions{Detach: false, Tty: false, OutputStream: output, ErrorStream: output})
var out string
status, b, err := c.doCheck()
if err != nil {
c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to start Exec: %s", err.Error()))
return
c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err)
out = err.Error()
} else {
// out is already limited to CheckBufSize since we're getting a
// limited buffer. So we don't need to truncate it just report
// that it was truncated.
out = string(b.Bytes())
if int(b.TotalWritten()) > len(out) {
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out)
}
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out)
}
// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s",
c.CheckID, c.Script, outputStr)
execInfo, err := c.dockerClient.InspectExec(exec.ID)
if err != nil {
c.Logger.Printf("[DEBUG] Error in inspecting check result : %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to inspect Exec: %s", err.Error()))
return
}
// Sets the status of the check to healthy if exit code is 0
if execInfo.ExitCode == 0 {
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
return
}
// Set the status of the check to Warning if exit code is 1
if execInfo.ExitCode == 1 {
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", execInfo.ExitCode)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
return
}
// Set the health as critical
if status == api.HealthCritical {
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
}
func shell() string {
if sh := os.Getenv("SHELL"); sh != "" {
return sh
c.Notify.UpdateCheck(c.CheckID, status, out)
}
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
cmd := []string{c.Shell, "-c", c.Script}
execID, err := c.client.CreateExec(c.DockerContainerID, cmd)
if err != nil {
return api.HealthCritical, nil, err
}
buf, err := c.client.StartExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}
exitCode, err := c.client.InspectExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}
switch exitCode {
case 0:
return api.HealthPassing, buf, nil
case 1:
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
return api.HealthWarning, buf, nil
default:
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
return api.HealthCritical, buf, nil
}
return "/bin/sh"
}

View File

@ -2,21 +2,18 @@ package agent
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"reflect"
"regexp"
"strings"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/agent/mock"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil/retry"
@ -512,260 +509,288 @@ func TestCheckTCPPassing(t *testing.T) {
tcpServer.Close()
}
// A fake docker client to test happy path scenario
type fakeDockerClientWithNoErrors struct {
func TestCheck_Docker(t *testing.T) {
tests := []struct {
desc string
handlers map[string]http.HandlerFunc
out *regexp.Regexp
state string
}{
{
desc: "create exec: bad container id",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
},
},
out: regexp.MustCompile("^create exec failed for unknown container 123$"),
state: api.HealthCritical,
},
{
desc: "create exec: paused container",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(409)
},
},
out: regexp.MustCompile("^create exec failed since container 123 is paused or stopped$"),
state: api.HealthCritical,
},
{
desc: "create exec: bad status code",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(999)
fmt.Fprint(w, "some output")
},
},
out: regexp.MustCompile("^create exec failed for container 123 with status 999: some output$"),
state: api.HealthCritical,
},
{
desc: "create exec: bad json",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `this is not json`)
},
},
out: regexp.MustCompile("^create exec response for container 123 cannot be parsed: .*$"),
state: api.HealthCritical,
},
{
desc: "start exec: bad exec id",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
},
},
out: regexp.MustCompile("^start exec failed for container 123: invalid exec id 456$"),
state: api.HealthCritical,
},
{
desc: "start exec: paused container",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(409)
},
},
out: regexp.MustCompile("^start exec failed since container 123 is paused or stopped$"),
state: api.HealthCritical,
},
{
desc: "start exec: bad status code",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(999)
fmt.Fprint(w, "some output")
},
},
out: regexp.MustCompile("^start exec failed for container 123 with status 999: some output$"),
state: api.HealthCritical,
},
{
desc: "inspect exec: bad exec id",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "OK")
},
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
},
},
out: regexp.MustCompile("^inspect exec failed for container 123: invalid exec id 456$"),
state: api.HealthCritical,
},
{
desc: "inspect exec: bad status code",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "OK")
},
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(999)
fmt.Fprint(w, "some output")
},
},
out: regexp.MustCompile("^inspect exec failed for container 123 with status 999: some output$"),
state: api.HealthCritical,
},
{
desc: "inspect exec: bad json",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "OK")
},
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `this is not json`)
},
},
out: regexp.MustCompile("^inspect exec response for container 123 cannot be parsed: .*$"),
state: api.HealthCritical,
},
{
desc: "inspect exec: exit code 0: passing",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "OK")
},
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"ExitCode":0}`)
},
},
out: regexp.MustCompile("^OK$"),
state: api.HealthPassing,
},
{
desc: "inspect exec: exit code 0: passing: truncated",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "01234567890123456789OK") // more than 20 bytes
},
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"ExitCode":0}`)
},
},
out: regexp.MustCompile("^Captured 20 of 22 bytes\n...\n234567890123456789OK$"),
state: api.HealthPassing,
},
{
desc: "inspect exec: exit code 1: warning",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "WARN")
},
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"ExitCode":1}`)
},
},
out: regexp.MustCompile("^WARN$"),
state: api.HealthWarning,
},
{
desc: "inspect exec: exit code 2: critical",
handlers: map[string]http.HandlerFunc{
"POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(201)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"Id":"456"}`)
},
"POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "NOK")
},
"GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"ExitCode":2}`)
},
},
out: regexp.MustCompile("^NOK$"),
state: api.HealthCritical,
},
}
func (d *fakeDockerClientWithNoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
return &docker.Exec{ID: "123"}, nil
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
x := r.Method + " " + r.RequestURI
h := tt.handlers[x]
if h == nil {
t.Fatalf("bad url %s", x)
}
h(w, r)
}))
defer srv.Close()
// create a docker client with a tiny output buffer
// to test the truncation
c, err := NewDockerClient(srv.URL, 20)
if err != nil {
t.Fatal(err)
}
func (d *fakeDockerClientWithNoErrors) StartExec(id string, opts docker.StartExecOptions) error {
fmt.Fprint(opts.OutputStream, "output")
return nil
}
func (d *fakeDockerClientWithNoErrors) InspectExec(id string) (*docker.ExecInspect, error) {
return &docker.ExecInspect{
ID: "123",
ExitCode: 0,
}, nil
}
// A fake docker client to test truncation of output
type fakeDockerClientWithLongOutput struct {
}
func (d *fakeDockerClientWithLongOutput) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
return &docker.Exec{ID: "123"}, nil
}
func (d *fakeDockerClientWithLongOutput) StartExec(id string, opts docker.StartExecOptions) error {
b, _ := exec.Command("od", "-N", "81920", "/dev/urandom").Output()
fmt.Fprint(opts.OutputStream, string(b))
return nil
}
func (d *fakeDockerClientWithLongOutput) InspectExec(id string) (*docker.ExecInspect, error) {
return &docker.ExecInspect{
ID: "123",
ExitCode: 0,
}, nil
}
// A fake docker client to test non-zero exit codes from exec invocation
type fakeDockerClientWithExecNonZeroExitCode struct {
}
func (d *fakeDockerClientWithExecNonZeroExitCode) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
return &docker.Exec{ID: "123"}, nil
}
func (d *fakeDockerClientWithExecNonZeroExitCode) StartExec(id string, opts docker.StartExecOptions) error {
return nil
}
func (d *fakeDockerClientWithExecNonZeroExitCode) InspectExec(id string) (*docker.ExecInspect, error) {
return &docker.ExecInspect{
ID: "123",
ExitCode: 127,
}, nil
}
// A fake docker client to test exit code which result into Warning
type fakeDockerClientWithExecExitCodeOne struct {
}
func (d *fakeDockerClientWithExecExitCodeOne) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
return &docker.Exec{ID: "123"}, nil
}
func (d *fakeDockerClientWithExecExitCodeOne) StartExec(id string, opts docker.StartExecOptions) error {
fmt.Fprint(opts.OutputStream, "output")
return nil
}
func (d *fakeDockerClientWithExecExitCodeOne) InspectExec(id string) (*docker.ExecInspect, error) {
return &docker.ExecInspect{
ID: "123",
ExitCode: 1,
}, nil
}
// A fake docker client to simulate create exec failing
type fakeDockerClientWithCreateExecFailure struct {
}
func (d *fakeDockerClientWithCreateExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
return nil, errors.New("Exec Creation Failed")
}
func (d *fakeDockerClientWithCreateExecFailure) StartExec(id string, opts docker.StartExecOptions) error {
return errors.New("Exec doesn't exist")
}
func (d *fakeDockerClientWithCreateExecFailure) InspectExec(id string) (*docker.ExecInspect, error) {
return nil, errors.New("Exec doesn't exist")
}
// A fake docker client to simulate start exec failing
type fakeDockerClientWithStartExecFailure struct {
}
func (d *fakeDockerClientWithStartExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
return &docker.Exec{ID: "123"}, nil
}
func (d *fakeDockerClientWithStartExecFailure) StartExec(id string, opts docker.StartExecOptions) error {
return errors.New("Couldn't Start Exec")
}
func (d *fakeDockerClientWithStartExecFailure) InspectExec(id string) (*docker.ExecInspect, error) {
return nil, errors.New("Exec doesn't exist")
}
// A fake docker client to test exec info query failures
type fakeDockerClientWithExecInfoErrors struct {
}
func (d *fakeDockerClientWithExecInfoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) {
return &docker.Exec{ID: "123"}, nil
}
func (d *fakeDockerClientWithExecInfoErrors) StartExec(id string, opts docker.StartExecOptions) error {
return nil
}
func (d *fakeDockerClientWithExecInfoErrors) InspectExec(id string) (*docker.ExecInspect, error) {
return nil, errors.New("Unable to query exec info")
}
func expectDockerCheckStatus(t *testing.T, dockerClient DockerClient, status string, output string) {
notif := mock.NewNotify()
notif, upd := mock.NewNotifyChan()
id := types.CheckID("chk")
check := &CheckDocker{
Notify: notif,
CheckID: types.CheckID("foo"),
CheckID: id,
Script: "/health.sh",
DockerContainerID: "54432bad1fc7",
Shell: "/bin/sh",
DockerContainerID: "123",
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
dockerClient: dockerClient,
client: c,
}
check.Start()
defer check.Stop()
time.Sleep(250 * time.Millisecond)
<-upd // wait for update
// Should have at least 2 updates
if notif.Updates("foo") < 2 {
t.Fatalf("should have 2 updates %v", notif.UpdatesMap())
if got, want := notif.Output(id), tt.out; !want.MatchString(got) {
t.Fatalf("got %q want %q", got, want)
}
if notif.State("foo") != status {
t.Fatalf("should be %v %v", status, notif.StateMap())
if got, want := notif.State(id), tt.state; got != want {
t.Fatalf("got status %q want %q", got, want)
}
if notif.Output("foo") != output {
t.Fatalf("should be %v %v", output, notif.OutputMap())
}
}
func TestDockerCheckWhenExecReturnsSuccessExitCode(t *testing.T) {
t.Parallel()
expectDockerCheckStatus(t, &fakeDockerClientWithNoErrors{}, api.HealthPassing, "output")
}
func TestDockerCheckWhenExecCreationFails(t *testing.T) {
t.Parallel()
expectDockerCheckStatus(t, &fakeDockerClientWithCreateExecFailure{}, api.HealthCritical, "Unable to create Exec, error: Exec Creation Failed")
}
func TestDockerCheckWhenExitCodeIsNonZero(t *testing.T) {
t.Parallel()
expectDockerCheckStatus(t, &fakeDockerClientWithExecNonZeroExitCode{}, api.HealthCritical, "")
}
func TestDockerCheckWhenExitCodeIsone(t *testing.T) {
t.Parallel()
expectDockerCheckStatus(t, &fakeDockerClientWithExecExitCodeOne{}, api.HealthWarning, "output")
}
func TestDockerCheckWhenExecStartFails(t *testing.T) {
t.Parallel()
expectDockerCheckStatus(t, &fakeDockerClientWithStartExecFailure{}, api.HealthCritical, "Unable to start Exec: Couldn't Start Exec")
}
func TestDockerCheckWhenExecInfoFails(t *testing.T) {
t.Parallel()
expectDockerCheckStatus(t, &fakeDockerClientWithExecInfoErrors{}, api.HealthCritical, "Unable to inspect Exec: Unable to query exec info")
}
func TestDockerCheckDefaultToSh(t *testing.T) {
t.Parallel()
os.Setenv("SHELL", "")
notif := mock.NewNotify()
check := &CheckDocker{
Notify: notif,
CheckID: types.CheckID("foo"),
Script: "/health.sh",
DockerContainerID: "54432bad1fc7",
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
dockerClient: &fakeDockerClientWithNoErrors{},
}
check.Start()
defer check.Stop()
time.Sleep(50 * time.Millisecond)
if check.Shell != "/bin/sh" {
t.Fatalf("Shell should be: %v , actual: %v", "/bin/sh", check.Shell)
}
}
func TestDockerCheckUseShellFromEnv(t *testing.T) {
t.Parallel()
notif := mock.NewNotify()
os.Setenv("SHELL", "/bin/bash")
check := &CheckDocker{
Notify: notif,
CheckID: types.CheckID("foo"),
Script: "/health.sh",
DockerContainerID: "54432bad1fc7",
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
dockerClient: &fakeDockerClientWithNoErrors{},
}
check.Start()
defer check.Stop()
time.Sleep(50 * time.Millisecond)
if check.Shell != "/bin/bash" {
t.Fatalf("Shell should be: %v , actual: %v", "/bin/bash", check.Shell)
}
os.Setenv("SHELL", "")
}
func TestDockerCheckTruncateOutput(t *testing.T) {
t.Parallel()
notif := mock.NewNotify()
check := &CheckDocker{
Notify: notif,
CheckID: types.CheckID("foo"),
Script: "/health.sh",
DockerContainerID: "54432bad1fc7",
Shell: "/bin/sh",
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags),
dockerClient: &fakeDockerClientWithLongOutput{},
}
check.Start()
defer check.Stop()
time.Sleep(50 * time.Millisecond)
// Allow for extra bytes for the truncation message
if len(notif.Output("foo")) > CheckBufSize+100 {
t.Fatalf("output size is too long")
})
}
}

153
agent/docker.go Normal file
View File

@ -0,0 +1,153 @@
package agent
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"github.com/armon/circbuf"
)
// DockerClient is a simplified client for the Docker Engine API
// to execute the health checks and avoid significant dependencies.
// It also consumes all data returned from the Docker API through
// a ring buffer with a fixed limit to avoid excessive resource
// consumption.
type DockerClient struct {
network string
addr string
baseurl string
maxbuf int64
client *http.Client
}
func NewDockerClient(host string, maxbuf int64) (*DockerClient, error) {
if host == "" {
host = DefaultDockerHost
}
p := strings.SplitN(host, "://", 2)
if len(p) != 2 {
return nil, fmt.Errorf("invalid docker host: %s", host)
}
network, addr := p[0], p[1]
basepath := "http://" + addr
if network == "unix" {
basepath = "http://unix"
}
client := &http.Client{}
if network == "unix" {
client.Transport = &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return net.Dial(network, addr)
},
}
}
return &DockerClient{network, addr, basepath, maxbuf, client}, nil
}
func (c *DockerClient) call(method, uri string, v interface{}) (*circbuf.Buffer, int, error) {
urlstr := c.baseurl + uri
req, err := http.NewRequest(method, urlstr, nil)
if err != nil {
return nil, 0, err
}
if v != nil {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(v); err != nil {
return nil, 0, err
}
req.Body = ioutil.NopCloser(&b)
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.client.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
b, err := circbuf.NewBuffer(c.maxbuf)
if err != nil {
return nil, 0, err
}
_, err = io.Copy(b, resp.Body)
return b, resp.StatusCode, err
}
func (c *DockerClient) CreateExec(containerID string, cmd []string) (string, error) {
data := struct {
AttachStdin bool
AttachStdout bool
AttachStderr bool
Tty bool
Cmd []string
}{
AttachStderr: true,
AttachStdout: true,
Cmd: cmd,
}
uri := fmt.Sprintf("/containers/%s/exec", url.QueryEscape(containerID))
b, code, err := c.call("POST", uri, data)
switch {
case err != nil:
return "", fmt.Errorf("create exec failed for container %s: %s", containerID, err)
case code == 201:
var resp struct{ Id string }
if err = json.NewDecoder(bytes.NewReader(b.Bytes())).Decode(&resp); err != nil {
return "", fmt.Errorf("create exec response for container %s cannot be parsed: %s", containerID, err)
}
return resp.Id, nil
case code == 404:
return "", fmt.Errorf("create exec failed for unknown container %s", containerID)
case code == 409:
return "", fmt.Errorf("create exec failed since container %s is paused or stopped", containerID)
default:
return "", fmt.Errorf("create exec failed for container %s with status %d: %s", containerID, code, b)
}
}
func (c *DockerClient) StartExec(containerID, execID string) (*circbuf.Buffer, error) {
data := struct{ Detach, Tty bool }{Detach: false, Tty: true}
uri := fmt.Sprintf("/exec/%s/start", execID)
b, code, err := c.call("POST", uri, data)
switch {
case err != nil:
return nil, fmt.Errorf("start exec failed for container %s: %s", containerID, err)
case code == 200:
return b, nil
case code == 404:
return nil, fmt.Errorf("start exec failed for container %s: invalid exec id %s", containerID, execID)
case code == 409:
return nil, fmt.Errorf("start exec failed since container %s is paused or stopped", containerID)
default:
return nil, fmt.Errorf("start exec failed for container %s with status %d: %s", containerID, code, b)
}
}
func (c *DockerClient) InspectExec(containerID, execID string) (int, error) {
uri := fmt.Sprintf("/exec/%s/json", execID)
b, code, err := c.call("GET", uri, nil)
switch {
case err != nil:
return 0, fmt.Errorf("inspect exec failed for container %s: %s", containerID, err)
case code == 200:
var resp struct{ ExitCode int }
if err := json.NewDecoder(bytes.NewReader(b.Bytes())).Decode(&resp); err != nil {
return 0, fmt.Errorf("inspect exec response for container %s cannot be parsed: %s", containerID, err)
}
return resp.ExitCode, nil
case code == 404:
return 0, fmt.Errorf("inspect exec failed for container %s: invalid exec id %s", containerID, execID)
default:
return 0, fmt.Errorf("inspect exec failed for container %s with status %d: %s", containerID, code, b)
}
}

3
agent/docker_unix.go Normal file
View File

@ -0,0 +1,3 @@
package agent
const DefaultDockerHost = "unix:///var/run/docker.sock"

3
agent/docker_windows.go Normal file
View File

@ -0,0 +1,3 @@
package agent
const DefaultDockerHost = "npipe:////./pipe/docker_engine"

View File

@ -8,14 +8,15 @@ import (
)
type Notify struct {
state map[types.CheckID]string
updates map[types.CheckID]int
output map[types.CheckID]string
updated chan int
// A guard to protect an access to the internal attributes
// of the notification mock in order to prevent panics
// raised by the race conditions detector.
sync.RWMutex
state map[types.CheckID]string
updates map[types.CheckID]int
output map[types.CheckID]string
}
func NewNotify() *Notify {
@ -26,6 +27,16 @@ func NewNotify() *Notify {
}
}
func NewNotifyChan() (*Notify, chan int) {
n := &Notify{
updated: make(chan int),
state: make(map[types.CheckID]string),
updates: make(map[types.CheckID]int),
output: make(map[types.CheckID]string),
}
return n, n.updated
}
func (m *Notify) sprintf(v interface{}) string {
m.RLock()
defer m.RUnlock()
@ -38,12 +49,15 @@ func (m *Notify) OutputMap() string { return m.sprintf(m.output) }
func (m *Notify) UpdateCheck(id types.CheckID, status, output string) {
m.Lock()
defer m.Unlock()
m.state[id] = status
old := m.updates[id]
m.updates[id] = old + 1
m.output[id] = output
m.Unlock()
if m.updated != nil {
m.updated <- 1
}
}
// State returns the state of the specified health-check.