diff --git a/agent/agent.go b/agent/agent.go index 32c651e05..4650d0c6a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -128,6 +128,9 @@ type Agent struct { // checkLock protects updates to the check* maps checkLock sync.Mutex + // dockerClient is the client for performing docker health checks. + dockerClient *DockerClient + // eventCh is used to receive user events eventCh chan serf.UserEvent @@ -1637,9 +1640,12 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, // Check if already registered if chkType != nil { - if chkType.IsTTL() { + switch { + + case chkType.IsTTL(): if existing, ok := a.checkTTLs[check.CheckID]; ok { existing.Stop() + delete(a.checkTTLs, check.CheckID) } ttl := &CheckTTL{ @@ -1658,9 +1664,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, ttl.Start() a.checkTTLs[check.CheckID] = ttl - } else if chkType.IsHTTP() { + case chkType.IsHTTP(): if existing, ok := a.checkHTTPs[check.CheckID]; ok { existing.Stop() + delete(a.checkHTTPs, check.CheckID) } if chkType.Interval < MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", @@ -1682,9 +1689,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, http.Start() a.checkHTTPs[check.CheckID] = http - } else if chkType.IsTCP() { + case chkType.IsTCP(): if existing, ok := a.checkTCPs[check.CheckID]; ok { existing.Stop() + delete(a.checkTCPs, check.CheckID) } if chkType.Interval < MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", @@ -1703,9 +1711,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, tcp.Start() a.checkTCPs[check.CheckID] = tcp - } else if chkType.IsDocker() { + case chkType.IsDocker(): if existing, ok := a.checkDockers[check.CheckID]; ok { existing.Stop() + delete(a.checkDockers, check.CheckID) } if chkType.Interval < MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", @@ -1713,6 +1722,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, chkType.Interval = MinInterval } + if a.dockerClient == nil { + dc, err := NewDockerClient(os.Getenv("DOCKER_HOST"), CheckBufSize) + if err != nil { + a.logger.Printf("[ERR] agent: error creating docker client: %s", err) + return err + } + a.dockerClient = dc + } + dockerCheck := &CheckDocker{ Notify: a.state, CheckID: check.CheckID, @@ -1721,15 +1739,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, Script: chkType.Script, Interval: chkType.Interval, Logger: a.logger, - } - if err := dockerCheck.Init(); err != nil { - return err + client: a.dockerClient, } dockerCheck.Start() a.checkDockers[check.CheckID] = dockerCheck - } else if chkType.IsMonitor() { + + case chkType.IsMonitor(): if existing, ok := a.checkMonitors[check.CheckID]; ok { existing.Stop() + delete(a.checkMonitors, check.CheckID) } if chkType.Interval < MinInterval { a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v", @@ -1747,7 +1765,8 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } monitor.Start() a.checkMonitors[check.CheckID] = monitor - } else { + + default: return fmt.Errorf("Check type is not valid") } diff --git a/agent/check.go b/agent/check.go index d577a901a..3060405fa 100644 --- a/agent/check.go +++ b/agent/check.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "fmt" "io" + "io/ioutil" "log" "net" "net/http" @@ -14,7 +15,6 @@ import ( "time" "github.com/armon/circbuf" - docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -516,14 +516,6 @@ func (c *CheckTCP) check() { c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP)) } -// DockerClient defines an interface for a docker client -// which is used for injecting a fake client during tests. -type DockerClient interface { - CreateExec(docker.CreateExecOptions) (*docker.Exec, error) - StartExec(string, docker.StartExecOptions) error - InspectExec(string) (*docker.ExecInspect, error) -} - // CheckDocker is used to periodically invoke a script to // determine the health of an application running inside a // Docker Container. We assume that the script is compatible @@ -537,137 +529,100 @@ type CheckDocker struct { Interval time.Duration Logger *log.Logger - dockerClient DockerClient - cmd []string - stop bool - stopCh chan struct{} - stopLock sync.Mutex + client *DockerClient + stop chan struct{} } -// Init initializes the Docker Client -func (c *CheckDocker) Init() error { - var err error - c.dockerClient, err = docker.NewClientFromEnv() - if err != nil { - c.Logger.Printf("[DEBUG] Error creating the Docker client: %s", err.Error()) - return err - } - return nil -} - -// Start is used to start checks. -// Docker Checks runs until stop is called func (c *CheckDocker) Start() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - - //figure out the shell - if c.Shell == "" { - c.Shell = shell() + if c.stop != nil { + panic("Docker check already started") } - c.cmd = []string{c.Shell, "-c", c.Script} + if c.Logger == nil { + c.Logger = log.New(ioutil.Discard, "", 0) + } - c.stop = false - c.stopCh = make(chan struct{}) + if c.Shell == "" { + c.Shell = os.Getenv("SHELL") + if c.Shell == "" { + c.Shell = "/bin/sh" + } + } + c.stop = make(chan struct{}) go c.run() } -// Stop is used to stop a docker check. func (c *CheckDocker) Stop() { - c.stopLock.Lock() - defer c.stopLock.Unlock() - if !c.stop { - c.stop = true - close(c.stopCh) + if c.stop == nil { + panic("Stop called before start") } + close(c.stop) } -// run is invoked by a goroutine to run until Stop() is called func (c *CheckDocker) run() { - // Get the randomized initial pause time - initialPauseTime := lib.RandomStagger(c.Interval) - c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID) - next := time.After(initialPauseTime) + firstWait := lib.RandomStagger(c.Interval) + c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", firstWait, c.Shell, c.Script, c.DockerContainerID) + next := time.After(firstWait) for { select { case <-next: c.check() next = time.After(c.Interval) - case <-c.stopCh: + case <-c.stop: return } } } func (c *CheckDocker) check() { - //Set up the Exec since - execOpts := docker.CreateExecOptions{ - AttachStdin: false, - AttachStdout: true, - AttachStderr: true, - Tty: false, - Cmd: c.cmd, - Container: c.DockerContainerID, - } - var ( - exec *docker.Exec - err error - ) - if exec, err = c.dockerClient.CreateExec(execOpts); err != nil { - c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error()) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to create Exec, error: %s", err.Error())) - return - } - - // Collect the output - output, _ := circbuf.NewBuffer(CheckBufSize) - - err = c.dockerClient.StartExec(exec.ID, docker.StartExecOptions{Detach: false, Tty: false, OutputStream: output, ErrorStream: output}) + var out string + status, b, err := c.doCheck() if err != nil { - c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error()) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to start Exec: %s", err.Error())) - return + c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err) + out = err.Error() + } else { + // out is already limited to CheckBufSize since we're getting a + // limited buffer. So we don't need to truncate it just report + // that it was truncated. + out = string(b.Bytes()) + if int(b.TotalWritten()) > len(out) { + out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out) + } + c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out) } - // Get the output, add a message about truncation - outputStr := string(output.Bytes()) - if output.TotalWritten() > output.Size() { - outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", - output.Size(), output.TotalWritten(), outputStr) + if status == api.HealthCritical { + c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) } - c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", - c.CheckID, c.Script, outputStr) - - execInfo, err := c.dockerClient.InspectExec(exec.ID) - if err != nil { - c.Logger.Printf("[DEBUG] Error in inspecting check result : %s", err.Error()) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to inspect Exec: %s", err.Error())) - return - } - - // Sets the status of the check to healthy if exit code is 0 - if execInfo.ExitCode == 0 { - c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr) - return - } - - // Set the status of the check to Warning if exit code is 1 - if execInfo.ExitCode == 1 { - c.Logger.Printf("[DEBUG] Check failed with exit code: %d", execInfo.ExitCode) - c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr) - return - } - - // Set the health as critical - c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID) - c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr) + c.Notify.UpdateCheck(c.CheckID, status, out) } -func shell() string { - if sh := os.Getenv("SHELL"); sh != "" { - return sh +func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) { + cmd := []string{c.Shell, "-c", c.Script} + execID, err := c.client.CreateExec(c.DockerContainerID, cmd) + if err != nil { + return api.HealthCritical, nil, err + } + + buf, err := c.client.StartExec(c.DockerContainerID, execID) + if err != nil { + return api.HealthCritical, nil, err + } + + exitCode, err := c.client.InspectExec(c.DockerContainerID, execID) + if err != nil { + return api.HealthCritical, nil, err + } + + switch exitCode { + case 0: + return api.HealthPassing, buf, nil + case 1: + c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode) + return api.HealthWarning, buf, nil + default: + c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode) + return api.HealthCritical, buf, nil } - return "/bin/sh" } diff --git a/agent/check_test.go b/agent/check_test.go index f18795e37..d89302c52 100644 --- a/agent/check_test.go +++ b/agent/check_test.go @@ -2,21 +2,18 @@ package agent import ( "bytes" - "errors" "fmt" "io/ioutil" "log" "net" "net/http" "net/http/httptest" - "os" - "os/exec" "reflect" + "regexp" "strings" "testing" "time" - docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/consul/agent/mock" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil/retry" @@ -512,260 +509,288 @@ func TestCheckTCPPassing(t *testing.T) { tcpServer.Close() } -// A fake docker client to test happy path scenario -type fakeDockerClientWithNoErrors struct { -} - -func (d *fakeDockerClientWithNoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { - return &docker.Exec{ID: "123"}, nil -} - -func (d *fakeDockerClientWithNoErrors) StartExec(id string, opts docker.StartExecOptions) error { - fmt.Fprint(opts.OutputStream, "output") - return nil -} - -func (d *fakeDockerClientWithNoErrors) InspectExec(id string) (*docker.ExecInspect, error) { - return &docker.ExecInspect{ - ID: "123", - ExitCode: 0, - }, nil -} - -// A fake docker client to test truncation of output -type fakeDockerClientWithLongOutput struct { -} - -func (d *fakeDockerClientWithLongOutput) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { - return &docker.Exec{ID: "123"}, nil -} - -func (d *fakeDockerClientWithLongOutput) StartExec(id string, opts docker.StartExecOptions) error { - b, _ := exec.Command("od", "-N", "81920", "/dev/urandom").Output() - fmt.Fprint(opts.OutputStream, string(b)) - return nil -} - -func (d *fakeDockerClientWithLongOutput) InspectExec(id string) (*docker.ExecInspect, error) { - return &docker.ExecInspect{ - ID: "123", - ExitCode: 0, - }, nil -} - -// A fake docker client to test non-zero exit codes from exec invocation -type fakeDockerClientWithExecNonZeroExitCode struct { -} - -func (d *fakeDockerClientWithExecNonZeroExitCode) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { - return &docker.Exec{ID: "123"}, nil -} - -func (d *fakeDockerClientWithExecNonZeroExitCode) StartExec(id string, opts docker.StartExecOptions) error { - return nil -} - -func (d *fakeDockerClientWithExecNonZeroExitCode) InspectExec(id string) (*docker.ExecInspect, error) { - return &docker.ExecInspect{ - ID: "123", - ExitCode: 127, - }, nil -} - -// A fake docker client to test exit code which result into Warning -type fakeDockerClientWithExecExitCodeOne struct { -} - -func (d *fakeDockerClientWithExecExitCodeOne) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { - return &docker.Exec{ID: "123"}, nil -} - -func (d *fakeDockerClientWithExecExitCodeOne) StartExec(id string, opts docker.StartExecOptions) error { - fmt.Fprint(opts.OutputStream, "output") - return nil -} - -func (d *fakeDockerClientWithExecExitCodeOne) InspectExec(id string) (*docker.ExecInspect, error) { - return &docker.ExecInspect{ - ID: "123", - ExitCode: 1, - }, nil -} - -// A fake docker client to simulate create exec failing -type fakeDockerClientWithCreateExecFailure struct { -} - -func (d *fakeDockerClientWithCreateExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { - return nil, errors.New("Exec Creation Failed") -} - -func (d *fakeDockerClientWithCreateExecFailure) StartExec(id string, opts docker.StartExecOptions) error { - return errors.New("Exec doesn't exist") -} - -func (d *fakeDockerClientWithCreateExecFailure) InspectExec(id string) (*docker.ExecInspect, error) { - return nil, errors.New("Exec doesn't exist") -} - -// A fake docker client to simulate start exec failing -type fakeDockerClientWithStartExecFailure struct { -} - -func (d *fakeDockerClientWithStartExecFailure) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { - return &docker.Exec{ID: "123"}, nil -} - -func (d *fakeDockerClientWithStartExecFailure) StartExec(id string, opts docker.StartExecOptions) error { - return errors.New("Couldn't Start Exec") -} - -func (d *fakeDockerClientWithStartExecFailure) InspectExec(id string) (*docker.ExecInspect, error) { - return nil, errors.New("Exec doesn't exist") -} - -// A fake docker client to test exec info query failures -type fakeDockerClientWithExecInfoErrors struct { -} - -func (d *fakeDockerClientWithExecInfoErrors) CreateExec(opts docker.CreateExecOptions) (*docker.Exec, error) { - return &docker.Exec{ID: "123"}, nil -} - -func (d *fakeDockerClientWithExecInfoErrors) StartExec(id string, opts docker.StartExecOptions) error { - return nil -} - -func (d *fakeDockerClientWithExecInfoErrors) InspectExec(id string) (*docker.ExecInspect, error) { - return nil, errors.New("Unable to query exec info") -} - -func expectDockerCheckStatus(t *testing.T, dockerClient DockerClient, status string, output string) { - notif := mock.NewNotify() - check := &CheckDocker{ - Notify: notif, - CheckID: types.CheckID("foo"), - Script: "/health.sh", - DockerContainerID: "54432bad1fc7", - Shell: "/bin/sh", - Interval: 25 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), - dockerClient: dockerClient, - } - check.Start() - defer check.Stop() - - time.Sleep(250 * time.Millisecond) - - // Should have at least 2 updates - if notif.Updates("foo") < 2 { - t.Fatalf("should have 2 updates %v", notif.UpdatesMap()) +func TestCheck_Docker(t *testing.T) { + tests := []struct { + desc string + handlers map[string]http.HandlerFunc + out *regexp.Regexp + state string + }{ + { + desc: "create exec: bad container id", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + }, + }, + out: regexp.MustCompile("^create exec failed for unknown container 123$"), + state: api.HealthCritical, + }, + { + desc: "create exec: paused container", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(409) + }, + }, + out: regexp.MustCompile("^create exec failed since container 123 is paused or stopped$"), + state: api.HealthCritical, + }, + { + desc: "create exec: bad status code", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(999) + fmt.Fprint(w, "some output") + }, + }, + out: regexp.MustCompile("^create exec failed for container 123 with status 999: some output$"), + state: api.HealthCritical, + }, + { + desc: "create exec: bad json", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `this is not json`) + }, + }, + out: regexp.MustCompile("^create exec response for container 123 cannot be parsed: .*$"), + state: api.HealthCritical, + }, + { + desc: "start exec: bad exec id", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + }, + }, + out: regexp.MustCompile("^start exec failed for container 123: invalid exec id 456$"), + state: api.HealthCritical, + }, + { + desc: "start exec: paused container", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(409) + }, + }, + out: regexp.MustCompile("^start exec failed since container 123 is paused or stopped$"), + state: api.HealthCritical, + }, + { + desc: "start exec: bad status code", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(999) + fmt.Fprint(w, "some output") + }, + }, + out: regexp.MustCompile("^start exec failed for container 123 with status 999: some output$"), + state: api.HealthCritical, + }, + { + desc: "inspect exec: bad exec id", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + fmt.Fprint(w, "OK") + }, + "GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + }, + }, + out: regexp.MustCompile("^inspect exec failed for container 123: invalid exec id 456$"), + state: api.HealthCritical, + }, + { + desc: "inspect exec: bad status code", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + fmt.Fprint(w, "OK") + }, + "GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(999) + fmt.Fprint(w, "some output") + }, + }, + out: regexp.MustCompile("^inspect exec failed for container 123 with status 999: some output$"), + state: api.HealthCritical, + }, + { + desc: "inspect exec: bad json", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + fmt.Fprint(w, "OK") + }, + "GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `this is not json`) + }, + }, + out: regexp.MustCompile("^inspect exec response for container 123 cannot be parsed: .*$"), + state: api.HealthCritical, + }, + { + desc: "inspect exec: exit code 0: passing", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + fmt.Fprint(w, "OK") + }, + "GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"ExitCode":0}`) + }, + }, + out: regexp.MustCompile("^OK$"), + state: api.HealthPassing, + }, + { + desc: "inspect exec: exit code 0: passing: truncated", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + fmt.Fprint(w, "01234567890123456789OK") // more than 20 bytes + }, + "GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"ExitCode":0}`) + }, + }, + out: regexp.MustCompile("^Captured 20 of 22 bytes\n...\n234567890123456789OK$"), + state: api.HealthPassing, + }, + { + desc: "inspect exec: exit code 1: warning", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + fmt.Fprint(w, "WARN") + }, + "GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"ExitCode":1}`) + }, + }, + out: regexp.MustCompile("^WARN$"), + state: api.HealthWarning, + }, + { + desc: "inspect exec: exit code 2: critical", + handlers: map[string]http.HandlerFunc{ + "POST /containers/123/exec": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(201) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"Id":"456"}`) + }, + "POST /exec/456/start": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + fmt.Fprint(w, "NOK") + }, + "GET /exec/456/json": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"ExitCode":2}`) + }, + }, + out: regexp.MustCompile("^NOK$"), + state: api.HealthCritical, + }, } - if notif.State("foo") != status { - t.Fatalf("should be %v %v", status, notif.StateMap()) - } + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + x := r.Method + " " + r.RequestURI + h := tt.handlers[x] + if h == nil { + t.Fatalf("bad url %s", x) + } + h(w, r) + })) + defer srv.Close() - if notif.Output("foo") != output { - t.Fatalf("should be %v %v", output, notif.OutputMap()) - } -} - -func TestDockerCheckWhenExecReturnsSuccessExitCode(t *testing.T) { - t.Parallel() - expectDockerCheckStatus(t, &fakeDockerClientWithNoErrors{}, api.HealthPassing, "output") -} - -func TestDockerCheckWhenExecCreationFails(t *testing.T) { - t.Parallel() - expectDockerCheckStatus(t, &fakeDockerClientWithCreateExecFailure{}, api.HealthCritical, "Unable to create Exec, error: Exec Creation Failed") -} - -func TestDockerCheckWhenExitCodeIsNonZero(t *testing.T) { - t.Parallel() - expectDockerCheckStatus(t, &fakeDockerClientWithExecNonZeroExitCode{}, api.HealthCritical, "") -} - -func TestDockerCheckWhenExitCodeIsone(t *testing.T) { - t.Parallel() - expectDockerCheckStatus(t, &fakeDockerClientWithExecExitCodeOne{}, api.HealthWarning, "output") -} - -func TestDockerCheckWhenExecStartFails(t *testing.T) { - t.Parallel() - expectDockerCheckStatus(t, &fakeDockerClientWithStartExecFailure{}, api.HealthCritical, "Unable to start Exec: Couldn't Start Exec") -} - -func TestDockerCheckWhenExecInfoFails(t *testing.T) { - t.Parallel() - expectDockerCheckStatus(t, &fakeDockerClientWithExecInfoErrors{}, api.HealthCritical, "Unable to inspect Exec: Unable to query exec info") -} - -func TestDockerCheckDefaultToSh(t *testing.T) { - t.Parallel() - os.Setenv("SHELL", "") - notif := mock.NewNotify() - check := &CheckDocker{ - Notify: notif, - CheckID: types.CheckID("foo"), - Script: "/health.sh", - DockerContainerID: "54432bad1fc7", - Interval: 10 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), - dockerClient: &fakeDockerClientWithNoErrors{}, - } - check.Start() - defer check.Stop() - - time.Sleep(50 * time.Millisecond) - if check.Shell != "/bin/sh" { - t.Fatalf("Shell should be: %v , actual: %v", "/bin/sh", check.Shell) - } -} - -func TestDockerCheckUseShellFromEnv(t *testing.T) { - t.Parallel() - notif := mock.NewNotify() - os.Setenv("SHELL", "/bin/bash") - check := &CheckDocker{ - Notify: notif, - CheckID: types.CheckID("foo"), - Script: "/health.sh", - DockerContainerID: "54432bad1fc7", - Interval: 10 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), - dockerClient: &fakeDockerClientWithNoErrors{}, - } - check.Start() - defer check.Stop() - - time.Sleep(50 * time.Millisecond) - if check.Shell != "/bin/bash" { - t.Fatalf("Shell should be: %v , actual: %v", "/bin/bash", check.Shell) - } - os.Setenv("SHELL", "") -} - -func TestDockerCheckTruncateOutput(t *testing.T) { - t.Parallel() - notif := mock.NewNotify() - check := &CheckDocker{ - Notify: notif, - CheckID: types.CheckID("foo"), - Script: "/health.sh", - DockerContainerID: "54432bad1fc7", - Shell: "/bin/sh", - Interval: 10 * time.Millisecond, - Logger: log.New(ioutil.Discard, UniqueID(), log.LstdFlags), - dockerClient: &fakeDockerClientWithLongOutput{}, - } - check.Start() - defer check.Stop() - - time.Sleep(50 * time.Millisecond) - - // Allow for extra bytes for the truncation message - if len(notif.Output("foo")) > CheckBufSize+100 { - t.Fatalf("output size is too long") + // create a docker client with a tiny output buffer + // to test the truncation + c, err := NewDockerClient(srv.URL, 20) + if err != nil { + t.Fatal(err) + } + + notif, upd := mock.NewNotifyChan() + id := types.CheckID("chk") + check := &CheckDocker{ + Notify: notif, + CheckID: id, + Script: "/health.sh", + DockerContainerID: "123", + Interval: 25 * time.Millisecond, + client: c, + } + check.Start() + defer check.Stop() + + <-upd // wait for update + + if got, want := notif.Output(id), tt.out; !want.MatchString(got) { + t.Fatalf("got %q want %q", got, want) + } + if got, want := notif.State(id), tt.state; got != want { + t.Fatalf("got status %q want %q", got, want) + } + }) } } diff --git a/agent/docker.go b/agent/docker.go new file mode 100644 index 000000000..012080a56 --- /dev/null +++ b/agent/docker.go @@ -0,0 +1,153 @@ +package agent + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + + "github.com/armon/circbuf" +) + +// DockerClient is a simplified client for the Docker Engine API +// to execute the health checks and avoid significant dependencies. +// It also consumes all data returned from the Docker API through +// a ring buffer with a fixed limit to avoid excessive resource +// consumption. +type DockerClient struct { + network string + addr string + baseurl string + maxbuf int64 + client *http.Client +} + +func NewDockerClient(host string, maxbuf int64) (*DockerClient, error) { + if host == "" { + host = DefaultDockerHost + } + p := strings.SplitN(host, "://", 2) + if len(p) != 2 { + return nil, fmt.Errorf("invalid docker host: %s", host) + } + network, addr := p[0], p[1] + basepath := "http://" + addr + if network == "unix" { + basepath = "http://unix" + } + client := &http.Client{} + if network == "unix" { + client.Transport = &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + return net.Dial(network, addr) + }, + } + } + return &DockerClient{network, addr, basepath, maxbuf, client}, nil +} + +func (c *DockerClient) call(method, uri string, v interface{}) (*circbuf.Buffer, int, error) { + urlstr := c.baseurl + uri + req, err := http.NewRequest(method, urlstr, nil) + if err != nil { + return nil, 0, err + } + + if v != nil { + var b bytes.Buffer + if err := json.NewEncoder(&b).Encode(v); err != nil { + return nil, 0, err + } + req.Body = ioutil.NopCloser(&b) + req.Header.Set("Content-Type", "application/json") + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + + b, err := circbuf.NewBuffer(c.maxbuf) + if err != nil { + return nil, 0, err + } + _, err = io.Copy(b, resp.Body) + return b, resp.StatusCode, err +} + +func (c *DockerClient) CreateExec(containerID string, cmd []string) (string, error) { + data := struct { + AttachStdin bool + AttachStdout bool + AttachStderr bool + Tty bool + Cmd []string + }{ + AttachStderr: true, + AttachStdout: true, + Cmd: cmd, + } + + uri := fmt.Sprintf("/containers/%s/exec", url.QueryEscape(containerID)) + b, code, err := c.call("POST", uri, data) + switch { + case err != nil: + return "", fmt.Errorf("create exec failed for container %s: %s", containerID, err) + case code == 201: + var resp struct{ Id string } + if err = json.NewDecoder(bytes.NewReader(b.Bytes())).Decode(&resp); err != nil { + return "", fmt.Errorf("create exec response for container %s cannot be parsed: %s", containerID, err) + } + return resp.Id, nil + case code == 404: + return "", fmt.Errorf("create exec failed for unknown container %s", containerID) + case code == 409: + return "", fmt.Errorf("create exec failed since container %s is paused or stopped", containerID) + default: + return "", fmt.Errorf("create exec failed for container %s with status %d: %s", containerID, code, b) + } +} + +func (c *DockerClient) StartExec(containerID, execID string) (*circbuf.Buffer, error) { + data := struct{ Detach, Tty bool }{Detach: false, Tty: true} + uri := fmt.Sprintf("/exec/%s/start", execID) + b, code, err := c.call("POST", uri, data) + switch { + case err != nil: + return nil, fmt.Errorf("start exec failed for container %s: %s", containerID, err) + case code == 200: + return b, nil + case code == 404: + return nil, fmt.Errorf("start exec failed for container %s: invalid exec id %s", containerID, execID) + case code == 409: + return nil, fmt.Errorf("start exec failed since container %s is paused or stopped", containerID) + default: + return nil, fmt.Errorf("start exec failed for container %s with status %d: %s", containerID, code, b) + } +} + +func (c *DockerClient) InspectExec(containerID, execID string) (int, error) { + uri := fmt.Sprintf("/exec/%s/json", execID) + b, code, err := c.call("GET", uri, nil) + switch { + case err != nil: + return 0, fmt.Errorf("inspect exec failed for container %s: %s", containerID, err) + case code == 200: + var resp struct{ ExitCode int } + if err := json.NewDecoder(bytes.NewReader(b.Bytes())).Decode(&resp); err != nil { + return 0, fmt.Errorf("inspect exec response for container %s cannot be parsed: %s", containerID, err) + } + return resp.ExitCode, nil + case code == 404: + return 0, fmt.Errorf("inspect exec failed for container %s: invalid exec id %s", containerID, execID) + default: + return 0, fmt.Errorf("inspect exec failed for container %s with status %d: %s", containerID, code, b) + } +} diff --git a/agent/docker_unix.go b/agent/docker_unix.go new file mode 100644 index 000000000..529a5ead1 --- /dev/null +++ b/agent/docker_unix.go @@ -0,0 +1,3 @@ +package agent + +const DefaultDockerHost = "unix:///var/run/docker.sock" diff --git a/agent/docker_windows.go b/agent/docker_windows.go new file mode 100644 index 000000000..0046ea975 --- /dev/null +++ b/agent/docker_windows.go @@ -0,0 +1,3 @@ +package agent + +const DefaultDockerHost = "npipe:////./pipe/docker_engine" diff --git a/agent/mock/notify.go b/agent/mock/notify.go index 93a756556..00bc5380e 100644 --- a/agent/mock/notify.go +++ b/agent/mock/notify.go @@ -8,14 +8,15 @@ import ( ) type Notify struct { - state map[types.CheckID]string - updates map[types.CheckID]int - output map[types.CheckID]string + updated chan int // A guard to protect an access to the internal attributes // of the notification mock in order to prevent panics // raised by the race conditions detector. sync.RWMutex + state map[types.CheckID]string + updates map[types.CheckID]int + output map[types.CheckID]string } func NewNotify() *Notify { @@ -26,6 +27,16 @@ func NewNotify() *Notify { } } +func NewNotifyChan() (*Notify, chan int) { + n := &Notify{ + updated: make(chan int), + state: make(map[types.CheckID]string), + updates: make(map[types.CheckID]int), + output: make(map[types.CheckID]string), + } + return n, n.updated +} + func (m *Notify) sprintf(v interface{}) string { m.RLock() defer m.RUnlock() @@ -38,12 +49,15 @@ func (m *Notify) OutputMap() string { return m.sprintf(m.output) } func (m *Notify) UpdateCheck(id types.CheckID, status, output string) { m.Lock() - defer m.Unlock() - m.state[id] = status old := m.updates[id] m.updates[id] = old + 1 m.output[id] = output + m.Unlock() + + if m.updated != nil { + m.updated <- 1 + } } // State returns the state of the specified health-check. diff --git a/vendor/vendor.json b/vendor/vendor.json index 54d5ace64..6f609243a 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -163,4 +163,4 @@ {"checksumSHA1":"yHpUeGwKoqqwd3cbEp3lkcnvft0=","path":"google.golang.org/grpc/transport","revision":"50955793b0183f9de69bd78e2ec251cf20aab121","revisionTime":"2017-01-11T19:10:52Z"} ], "rootPath": "github.com/hashicorp/consul" -} +} \ No newline at end of file