Implement /v1/agent/health/service/<service name> endpoint (#3551)

This endpoint aggregates all checks related to <service id> on the agent
and return an appropriate http code + the string describing the worst
check.

This allows to cleanly expose service status to other component, hiding
complexity of multiple checks.
This is especially useful to use consul to feed a load balancer which
would delegate health checking to consul agent.

Exposing this endpoint on the agent is necessary to avoid a hit on
consul servers and avoid decreasing resiliency (this endpoint will work
even if there is no consul leader in the cluster).
This commit is contained in:
Grégoire Seux 2019-01-07 15:39:23 +01:00 committed by Matt Keeler
parent bf6db3934c
commit 6a57c7fec5
12 changed files with 1085 additions and 77 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/debug"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
@ -153,6 +154,62 @@ func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (i
}
}
func buildAgentService(s *structs.NodeService, proxies map[string]*local.ManagedProxy) api.AgentService {
weights := api.AgentWeights{Passing: 1, Warning: 1}
if s.Weights != nil {
if s.Weights.Passing > 0 {
weights.Passing = s.Weights.Passing
}
weights.Warning = s.Weights.Warning
}
as := api.AgentService{
Kind: api.ServiceKind(s.Kind),
ID: s.ID,
Service: s.Service,
Tags: s.Tags,
Meta: s.Meta,
Port: s.Port,
Address: s.Address,
EnableTagOverride: s.EnableTagOverride,
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
Weights: weights,
}
if as.Tags == nil {
as.Tags = []string{}
}
if as.Meta == nil {
as.Meta = map[string]string{}
}
// Attach Unmanaged Proxy config if exists
if s.Kind == structs.ServiceKindConnectProxy {
as.Proxy = s.Proxy.ToAPI()
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
// Also set the deprecated ProxyDestination
as.ProxyDestination = as.Proxy.DestinationServiceName
}
// Attach Connect configs if they exist. We use the actual proxy state since
// that may have had defaults filled in compared to the config that was
// provided with the service as stored in the NodeService here.
if proxy, ok := proxies[s.ID+"-proxy"]; ok {
as.Connect = &api.AgentServiceConnect{
Proxy: &api.AgentServiceConnectProxy{
ExecMode: api.ProxyExecMode(proxy.Proxy.ExecMode.String()),
Command: proxy.Proxy.Command,
Config: proxy.Proxy.Config,
Upstreams: proxy.Proxy.Upstreams.ToAPI(),
},
}
} else if s.Connect.Native {
as.Connect = &api.AgentServiceConnect{
Native: true,
}
}
return as
}
func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
@ -173,59 +230,8 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
// Use empty list instead of nil
for id, s := range services {
weights := api.AgentWeights{Passing: 1, Warning: 1}
if s.Weights != nil {
if s.Weights.Passing > 0 {
weights.Passing = s.Weights.Passing
}
weights.Warning = s.Weights.Warning
}
as := &api.AgentService{
Kind: api.ServiceKind(s.Kind),
ID: s.ID,
Service: s.Service,
Tags: s.Tags,
Meta: s.Meta,
Port: s.Port,
Address: s.Address,
EnableTagOverride: s.EnableTagOverride,
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
Weights: weights,
}
if as.Tags == nil {
as.Tags = []string{}
}
if as.Meta == nil {
as.Meta = map[string]string{}
}
// Attach Unmanaged Proxy config if exists
if s.Kind == structs.ServiceKindConnectProxy {
as.Proxy = s.Proxy.ToAPI()
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
// Also set the deprecated ProxyDestination
as.ProxyDestination = as.Proxy.DestinationServiceName
}
// Attach Connect configs if they exist. We use the actual proxy state since
// that may have had defaults filled in compared to the config that was
// provided with the service as stored in the NodeService here.
if proxy, ok := proxies[id+"-proxy"]; ok {
as.Connect = &api.AgentServiceConnect{
Proxy: &api.AgentServiceConnectProxy{
ExecMode: api.ProxyExecMode(proxy.Proxy.ExecMode.String()),
Command: proxy.Proxy.Command,
Config: proxy.Proxy.Config,
Upstreams: proxy.Proxy.Upstreams.ToAPI(),
},
}
} else if s.Connect.Native {
as.Connect = &api.AgentServiceConnect{
Native: true,
}
}
agentSvcs[id] = as
agentService := buildAgentService(s, proxies)
agentSvcs[id] = &agentService
}
return agentSvcs, nil
@ -704,6 +710,124 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
return nil, nil
}
// agentHealthService Returns Health for a given service ID
func agentHealthService(serviceID string, s *HTTPServer) (int, string, api.HealthChecks) {
checks := s.agent.State.Checks()
serviceChecks := make(api.HealthChecks, 0)
for _, c := range checks {
if c.ServiceID == serviceID || c.ServiceID == "" {
// TODO: harmonize struct.HealthCheck and api.HealthCheck (or at least extract conversion function)
healthCheck := &api.HealthCheck{
Node: c.Node,
CheckID: string(c.CheckID),
Name: c.Name,
Status: c.Status,
Notes: c.Notes,
Output: c.Output,
ServiceID: c.ServiceID,
ServiceName: c.ServiceName,
ServiceTags: c.ServiceTags,
}
serviceChecks = append(serviceChecks, healthCheck)
}
}
status := serviceChecks.AggregatedStatus()
switch status {
case api.HealthWarning:
return http.StatusTooManyRequests, status, serviceChecks
case api.HealthPassing:
return http.StatusOK, status, serviceChecks
default:
return http.StatusServiceUnavailable, status, serviceChecks
}
}
func returnTextPlain(req *http.Request) bool {
if contentType := req.Header.Get("Accept"); strings.HasPrefix(contentType, "text/plain") {
return true
}
if format := req.URL.Query().Get("format"); format != "" {
return format == "text"
}
return false
}
// AgentHealthServiceByID return the local Service Health given its ID
func (s *HTTPServer) AgentHealthServiceByID(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Pull out the service id (service id since there may be several instance of the same service on this host)
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/id/")
if serviceID == "" {
return nil, &BadRequestError{Reason: "Missing serviceID"}
}
services := s.agent.State.Services()
proxies := s.agent.State.Proxies()
for _, service := range services {
if service.ID == serviceID {
code, status, healthChecks := agentHealthService(serviceID, s)
if returnTextPlain(req) {
return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"}
}
serviceInfo := buildAgentService(service, proxies)
result := &api.AgentServiceChecksInfo{
AggregatedStatus: status,
Checks: healthChecks,
Service: &serviceInfo,
}
return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"}
}
}
notFoundReason := fmt.Sprintf("ServiceId %s not found", serviceID)
if returnTextPlain(req) {
return notFoundReason, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("ServiceId %s not found", serviceID), ContentType: "application/json"}
}
return &api.AgentServiceChecksInfo{
AggregatedStatus: api.HealthCritical,
Checks: nil,
Service: nil,
}, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "application/json"}
}
// AgentHealthServiceByName return the worse status of all the services with given name on an agent
func (s *HTTPServer) AgentHealthServiceByName(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Pull out the service name
serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/name/")
if serviceName == "" {
return nil, &BadRequestError{Reason: "Missing service Name"}
}
code := http.StatusNotFound
status := fmt.Sprintf("ServiceName %s Not Found", serviceName)
services := s.agent.State.Services()
result := make([]api.AgentServiceChecksInfo, 0, 16)
proxies := s.agent.State.Proxies()
for _, service := range services {
if service.Service == serviceName {
scode, sstatus, healthChecks := agentHealthService(service.ID, s)
serviceInfo := buildAgentService(service, proxies)
res := api.AgentServiceChecksInfo{
AggregatedStatus: sstatus,
Checks: healthChecks,
Service: &serviceInfo,
}
result = append(result, res)
// When service is not found, we ignore it and keep existing HTTP status
if code == http.StatusNotFound {
code = scode
status = sstatus
}
// We take the worst of all statuses, so we keep iterating
// passing: 200 < warning: 429 < critical: 503
if code < scode {
code = scode
status = sstatus
}
}
}
if returnTextPlain(req) {
return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"}
}
return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"}
}
func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.ServiceDefinition
// Fixup the type decode of TTL or Interval if a check if provided.

View File

@ -627,6 +627,444 @@ func TestAgent_Checks(t *testing.T) {
}
}
func TestAgent_HealthServiceByID(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
service := &structs.NodeService{
ID: "mysql",
Service: "mysql",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
service = &structs.NodeService{
ID: "mysql2",
Service: "mysql2",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
service = &structs.NodeService{
ID: "mysql3",
Service: "mysql3",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
chk1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql",
Name: "mysql",
ServiceID: "mysql",
Status: api.HealthPassing,
}
err := a.State.AddCheck(chk1, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk2 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql",
Name: "mysql",
ServiceID: "mysql",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk2, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk3 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql2",
Name: "mysql2",
ServiceID: "mysql2",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk3, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk4 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql2",
Name: "mysql2",
ServiceID: "mysql2",
Status: api.HealthWarning,
}
err = a.State.AddCheck(chk4, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk5 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql3",
Name: "mysql3",
ServiceID: "mysql3",
Status: api.HealthMaint,
}
err = a.State.AddCheck(chk5, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk6 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql3",
Name: "mysql3",
ServiceID: "mysql3",
Status: api.HealthCritical,
}
err = a.State.AddCheck(chk6, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
eval := func(t *testing.T, url string, expectedCode int, expected string) {
t.Helper()
t.Run("format=text", func(t *testing.T) {
t.Helper()
req, _ := http.NewRequest("GET", url+"?format=text", nil)
resp := httptest.NewRecorder()
data, err := a.srv.AgentHealthServiceByID(resp, req)
codeWithPayload, ok := err.(CodeWithPayloadError)
if !ok {
t.Fatalf("Err: %v", err)
}
if got, want := codeWithPayload.StatusCode, expectedCode; got != want {
t.Fatalf("returned bad status: expected %d, but had: %d in %#v", expectedCode, codeWithPayload.StatusCode, codeWithPayload)
}
body, ok := data.(string)
if !ok {
t.Fatalf("Cannot get result as string in := %#v", data)
}
if got, want := body, expected; got != want {
t.Fatalf("got body %q want %q", got, want)
}
if got, want := codeWithPayload.Reason, expected; got != want {
t.Fatalf("got body %q want %q", got, want)
}
})
t.Run("format=json", func(t *testing.T) {
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
dataRaw, err := a.srv.AgentHealthServiceByID(resp, req)
codeWithPayload, ok := err.(CodeWithPayloadError)
if !ok {
t.Fatalf("Err: %v", err)
}
if got, want := codeWithPayload.StatusCode, expectedCode; got != want {
t.Fatalf("returned bad status: expected %d, but had: %d in %#v", expectedCode, codeWithPayload.StatusCode, codeWithPayload)
}
data, ok := dataRaw.(*api.AgentServiceChecksInfo)
if !ok {
t.Fatalf("Cannot connvert result to JSON: %#v", dataRaw)
}
if codeWithPayload.StatusCode != http.StatusNotFound {
if data != nil && data.AggregatedStatus != expected {
t.Fatalf("got body %v want %v", data, expected)
}
}
})
}
t.Run("passing checks", func(t *testing.T) {
eval(t, "/v1/agent/health/service/id/mysql", http.StatusOK, "passing")
})
t.Run("warning checks", func(t *testing.T) {
eval(t, "/v1/agent/health/service/id/mysql2", http.StatusTooManyRequests, "warning")
})
t.Run("critical checks", func(t *testing.T) {
eval(t, "/v1/agent/health/service/id/mysql3", http.StatusServiceUnavailable, "critical")
})
t.Run("unknown serviceid", func(t *testing.T) {
eval(t, "/v1/agent/health/service/id/mysql1", http.StatusNotFound, "ServiceId mysql1 not found")
})
nodeCheck := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "diskCheck",
Name: "diskCheck",
Status: api.HealthCritical,
}
err = a.State.AddCheck(nodeCheck, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
t.Run("critical check on node", func(t *testing.T) {
eval(t, "/v1/agent/health/service/id/mysql", http.StatusServiceUnavailable, "critical")
})
err = a.State.RemoveCheck(nodeCheck.CheckID)
if err != nil {
t.Fatalf("Err: %v", err)
}
nodeCheck = &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "_node_maintenance",
Name: "_node_maintenance",
Status: api.HealthMaint,
}
err = a.State.AddCheck(nodeCheck, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
t.Run("maintenance check on node", func(t *testing.T) {
eval(t, "/v1/agent/health/service/id/mysql", http.StatusServiceUnavailable, "maintenance")
})
}
func TestAgent_HealthServiceByName(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
service := &structs.NodeService{
ID: "mysql1",
Service: "mysql-pool-r",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
service = &structs.NodeService{
ID: "mysql2",
Service: "mysql-pool-r",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
service = &structs.NodeService{
ID: "mysql3",
Service: "mysql-pool-rw",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
service = &structs.NodeService{
ID: "mysql4",
Service: "mysql-pool-rw",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
service = &structs.NodeService{
ID: "httpd1",
Service: "httpd",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
service = &structs.NodeService{
ID: "httpd2",
Service: "httpd",
}
if err := a.AddService(service, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
chk1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql1",
Name: "mysql1",
ServiceID: "mysql1",
ServiceName: "mysql-pool-r",
Status: api.HealthPassing,
}
err := a.State.AddCheck(chk1, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk2 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql1",
Name: "mysql1",
ServiceID: "mysql1",
ServiceName: "mysql-pool-r",
Status: api.HealthWarning,
}
err = a.State.AddCheck(chk2, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk3 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql2",
Name: "mysql2",
ServiceID: "mysql2",
ServiceName: "mysql-pool-r",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk3, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk4 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql2",
Name: "mysql2",
ServiceID: "mysql2",
ServiceName: "mysql-pool-r",
Status: api.HealthCritical,
}
err = a.State.AddCheck(chk4, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk5 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql3",
Name: "mysql3",
ServiceID: "mysql3",
ServiceName: "mysql-pool-rw",
Status: api.HealthWarning,
}
err = a.State.AddCheck(chk5, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk6 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql4",
Name: "mysql4",
ServiceID: "mysql4",
ServiceName: "mysql-pool-rw",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk6, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk7 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "httpd1",
Name: "httpd1",
ServiceID: "httpd1",
ServiceName: "httpd",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk7, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
chk8 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "httpd2",
Name: "httpd2",
ServiceID: "httpd2",
ServiceName: "httpd",
Status: api.HealthPassing,
}
err = a.State.AddCheck(chk8, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
eval := func(t *testing.T, url string, expectedCode int, expected string) {
t.Helper()
t.Run("format=text", func(t *testing.T) {
t.Helper()
req, _ := http.NewRequest("GET", url+"?format=text", nil)
resp := httptest.NewRecorder()
data, err := a.srv.AgentHealthServiceByName(resp, req)
codeWithPayload, ok := err.(CodeWithPayloadError)
if !ok {
t.Fatalf("Err: %v", err)
}
if got, want := codeWithPayload.StatusCode, expectedCode; got != want {
t.Fatalf("returned bad status: %d. Body: %q", resp.Code, resp.Body.String())
}
if got, want := codeWithPayload.Reason, expected; got != want {
t.Fatalf("got reason %q want %q", got, want)
}
if got, want := data, expected; got != want {
t.Fatalf("got body %q want %q", got, want)
}
})
t.Run("format=json", func(t *testing.T) {
t.Helper()
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
dataRaw, err := a.srv.AgentHealthServiceByName(resp, req)
codeWithPayload, ok := err.(CodeWithPayloadError)
if !ok {
t.Fatalf("Err: %v", err)
}
data, ok := dataRaw.([]api.AgentServiceChecksInfo)
if !ok {
t.Fatalf("Cannot connvert result to JSON")
}
if got, want := codeWithPayload.StatusCode, expectedCode; got != want {
t.Fatalf("returned bad code: %d. Body: %#v", resp.Code, data)
}
if resp.Code != http.StatusNotFound {
if codeWithPayload.Reason != expected {
t.Fatalf("got wrong status %#v want %#v", codeWithPayload, expected)
}
}
})
}
t.Run("passing checks", func(t *testing.T) {
eval(t, "/v1/agent/health/service/name/httpd", http.StatusOK, "passing")
})
t.Run("warning checks", func(t *testing.T) {
eval(t, "/v1/agent/health/service/name/mysql-pool-rw", http.StatusTooManyRequests, "warning")
})
t.Run("critical checks", func(t *testing.T) {
eval(t, "/v1/agent/health/service/name/mysql-pool-r", http.StatusServiceUnavailable, "critical")
})
t.Run("unknown serviceName", func(t *testing.T) {
eval(t, "/v1/agent/health/service/name/test", http.StatusNotFound, "ServiceName test Not Found")
})
nodeCheck := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "diskCheck",
Name: "diskCheck",
Status: api.HealthCritical,
}
err = a.State.AddCheck(nodeCheck, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
t.Run("critical check on node", func(t *testing.T) {
eval(t, "/v1/agent/health/service/name/mysql-pool-r", http.StatusServiceUnavailable, "critical")
})
err = a.State.RemoveCheck(nodeCheck.CheckID)
if err != nil {
t.Fatalf("Err: %v", err)
}
nodeCheck = &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "_node_maintenance",
Name: "_node_maintenance",
Status: api.HealthMaint,
}
err = a.State.AddCheck(nodeCheck, "")
if err != nil {
t.Fatalf("Err: %v", err)
}
t.Run("maintenance check on node", func(t *testing.T) {
eval(t, "/v1/agent/health/service/name/mysql-pool-r", http.StatusServiceUnavailable, "maintenance")
})
}
func TestAgent_Checks_ACLFilter(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), TestACLConfig())

View File

@ -740,6 +740,7 @@ func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register nodes.
args := &structs.RegisterRequest{
@ -831,6 +832,7 @@ func TestCatalogServiceNodes_ConnectProxy(t *testing.T) {
assert := assert.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register
args := structs.TestRegisterRequestProxy(t)
@ -860,6 +862,7 @@ func TestCatalogConnectServiceNodes_good(t *testing.T) {
assert := assert.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register
args := structs.TestRegisterRequestProxy(t)

View File

@ -42,6 +42,18 @@ func (e BadRequestError) Error() string {
return fmt.Sprintf("Bad request: %s", e.Reason)
}
// CodeWithPayloadError allow returning non HTTP 200
// Error codes while not returning PlainText payload
type CodeWithPayloadError struct {
Reason string
StatusCode int
ContentType string
}
func (e CodeWithPayloadError) Error() string {
return e.Reason
}
// HTTPServer provides an HTTP api for an agent.
type HTTPServer struct {
*http.Server
@ -364,21 +376,41 @@ func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc {
// Invoke the handler
obj, err = handler(resp, req)
}
contentType := "application/json"
httpCode := http.StatusOK
if err != nil {
handleErr(err)
return
if errPayload, ok := err.(CodeWithPayloadError); ok {
httpCode = errPayload.StatusCode
if errPayload.ContentType != "" {
contentType = errPayload.ContentType
}
if errPayload.Reason != "" {
resp.Header().Add("X-Consul-Reason", errPayload.Reason)
}
} else {
handleErr(err)
return
}
}
if obj == nil {
return
}
buf, err := s.marshalJSON(req, obj)
if err != nil {
handleErr(err)
return
var buf []byte
if contentType == "application/json" {
buf, err = s.marshalJSON(req, obj)
if err != nil {
handleErr(err)
return
}
} else {
if strings.HasPrefix(contentType, "text/") {
if val, ok := obj.(string); ok {
buf = []byte(val)
}
}
}
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("Content-Type", contentType)
resp.WriteHeader(httpCode)
resp.Write(buf)
}
}

View File

@ -34,6 +34,8 @@ func init() {
registerEndpoint("/v1/agent/join/", []string{"PUT"}, (*HTTPServer).AgentJoin)
registerEndpoint("/v1/agent/leave", []string{"PUT"}, (*HTTPServer).AgentLeave)
registerEndpoint("/v1/agent/force-leave/", []string{"PUT"}, (*HTTPServer).AgentForceLeave)
registerEndpoint("/v1/agent/health/service/id/", []string{"GET"}, (*HTTPServer).AgentHealthServiceByID)
registerEndpoint("/v1/agent/health/service/name/", []string{"GET"}, (*HTTPServer).AgentHealthServiceByName)
registerEndpoint("/v1/agent/check/register", []string{"PUT"}, (*HTTPServer).AgentRegisterCheck)
registerEndpoint("/v1/agent/check/deregister/", []string{"PUT"}, (*HTTPServer).AgentDeregisterCheck)
registerEndpoint("/v1/agent/check/pass/", []string{"PUT"}, (*HTTPServer).AgentCheckPass)

View File

@ -12,6 +12,7 @@ const (
errNotReadyForConsistentReads = "Not ready to serve consistent reads"
errSegmentsNotSupported = "Network segments are not supported in this version of Consul"
errRPCRateExceeded = "RPC rate limit exceeded"
errServiceNotFound = "Service not found: "
)
var (
@ -30,3 +31,7 @@ func IsErrNoLeader(err error) bool {
func IsErrRPCRateExceeded(err error) bool {
return err != nil && strings.Contains(err.Error(), errRPCRateExceeded)
}
func IsErrServiceNotFound(err error) bool {
return err != nil && strings.Contains(err.Error(), errServiceNotFound)
}

View File

@ -3,6 +3,8 @@ package api
import (
"bufio"
"fmt"
"net/http"
"net/url"
)
// ServiceKind is the kind of service being registered.
@ -89,6 +91,13 @@ type AgentService struct {
Connect *AgentServiceConnect `json:",omitempty"`
}
// AgentServiceChecksInfo returns information about a Service and its checks
type AgentServiceChecksInfo struct {
AggregatedStatus string
Service *AgentService
Checks HealthChecks
}
// AgentServiceConnect represents the Connect configuration of a service.
type AgentServiceConnect struct {
Native bool `json:",omitempty"`
@ -407,6 +416,73 @@ func (a *Agent) Services() (map[string]*AgentService, error) {
return out, nil
}
// AgentHealthServiceByID returns for a given serviceID: the aggregated health status, the service definition or an error if any
// - If the service is not found, will return status (critical, nil, nil)
// - If the service is found, will return (critical|passing|warning), AgentServiceChecksInfo, nil)
// - In all other cases, will return an error
func (a *Agent) AgentHealthServiceByID(serviceID string) (string, *AgentServiceChecksInfo, error) {
path := fmt.Sprintf("/v1/agent/health/service/id/%v", url.PathEscape(serviceID))
r := a.c.newRequest("GET", path)
r.params.Add("format", "json")
r.header.Set("Accept", "application/json")
_, resp, err := a.c.doRequest(r)
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
// Service not Found
if resp.StatusCode == http.StatusNotFound {
return HealthCritical, nil, nil
}
var out *AgentServiceChecksInfo
if err := decodeBody(resp, &out); err != nil {
return HealthCritical, out, err
}
switch resp.StatusCode {
case http.StatusOK:
return HealthPassing, out, nil
case http.StatusTooManyRequests:
return HealthWarning, out, nil
case http.StatusServiceUnavailable:
return HealthCritical, out, nil
}
return HealthCritical, out, fmt.Errorf("Unexpected Error Code %v for %s", resp.StatusCode, path)
}
// AgentHealthServiceByName returns for a given service name: the aggregated health status for all services
// having the specified name.
// - If no service is not found, will return status (critical, [], nil)
// - If the service is found, will return (critical|passing|warning), []api.AgentServiceChecksInfo, nil)
// - In all other cases, will return an error
func (a *Agent) AgentHealthServiceByName(service string) (string, []AgentServiceChecksInfo, error) {
path := fmt.Sprintf("/v1/agent/health/service/name/%v", url.PathEscape(service))
r := a.c.newRequest("GET", path)
r.params.Add("format", "json")
r.header.Set("Accept", "application/json")
_, resp, err := a.c.doRequest(r)
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
// Service not Found
if resp.StatusCode == http.StatusNotFound {
return HealthCritical, nil, nil
}
var out []AgentServiceChecksInfo
if err := decodeBody(resp, &out); err != nil {
return HealthCritical, out, err
}
switch resp.StatusCode {
case http.StatusOK:
return HealthPassing, out, nil
case http.StatusTooManyRequests:
return HealthWarning, out, nil
case http.StatusServiceUnavailable:
return HealthCritical, out, nil
}
return HealthCritical, out, fmt.Errorf("Unexpected Error Code %v for %s", resp.StatusCode, path)
}
// Service returns a locally registered service instance and allows for
// hash-based blocking.
//

View File

@ -1,6 +1,7 @@
package api
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -170,6 +171,7 @@ func TestAPI_AgentServices(t *testing.T) {
reg := &AgentServiceRegistration{
Name: "foo",
ID: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
Check: &AgentServiceCheck{
@ -185,7 +187,7 @@ func TestAPI_AgentServices(t *testing.T) {
t.Fatalf("err: %v", err)
}
if _, ok := services["foo"]; !ok {
t.Fatalf("missing service: %v", services)
t.Fatalf("missing service: %#v", services)
}
checks, err := agent.Checks()
if err != nil {
@ -201,6 +203,23 @@ func TestAPI_AgentServices(t *testing.T) {
t.Fatalf("Bad: %#v", chk)
}
state, out, err := agent.AgentHealthServiceByID("foo2")
require.Nil(t, err)
require.Nil(t, out)
require.Equal(t, HealthCritical, state)
state, out, err = agent.AgentHealthServiceByID("foo")
require.Nil(t, err)
require.NotNil(t, out)
require.Equal(t, HealthCritical, state)
require.Equal(t, 8000, out.Service.Port)
state, outs, err := agent.AgentHealthServiceByName("foo")
require.Nil(t, err)
require.NotNil(t, outs)
require.Equal(t, HealthCritical, state)
require.Equal(t, 8000, out.Service.Port)
if err := agent.ServiceDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
@ -1368,3 +1387,101 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
require.Equal(t, expectConfig, config)
require.Equal(t, expectConfig.ContentHash, qm.LastContentHash)
}
func TestAPI_AgentHealthService(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
requireServiceHealthID := func(t *testing.T, serviceID, expected string, shouldExist bool) {
msg := fmt.Sprintf("service id:%s, shouldExist:%v, expectedStatus:%s : bad %%s", serviceID, shouldExist, expected)
state, out, err := agent.AgentHealthServiceByID(serviceID)
require.Nil(t, err, msg, "err")
require.Equal(t, expected, state, msg, "state")
if !shouldExist {
require.Nil(t, out, msg, "shouldExist")
} else {
require.NotNil(t, out, msg, "output")
require.Equal(t, serviceID, out.Service.ID, msg, "output")
}
}
requireServiceHealthName := func(t *testing.T, serviceName, expected string, shouldExist bool) {
msg := fmt.Sprintf("service name:%s, shouldExist:%v, expectedStatus:%s : bad %%s", serviceName, shouldExist, expected)
state, outs, err := agent.AgentHealthServiceByName(serviceName)
require.Nil(t, err, msg, "err")
require.Equal(t, expected, state, msg, "state")
if !shouldExist {
require.Equal(t, 0, len(outs), msg, "output")
} else {
require.True(t, len(outs) > 0, msg, "output")
for _, o := range outs {
require.Equal(t, serviceName, o.Service.Service, msg, "output")
}
}
}
requireServiceHealthID(t, "_i_do_not_exist_", HealthCritical, false)
requireServiceHealthName(t, "_i_do_not_exist_", HealthCritical, false)
testServiceID1 := "foo"
testServiceID2 := "foofoo"
testServiceName := "bar"
// register service
reg := &AgentServiceRegistration{
Name: testServiceName,
ID: testServiceID1,
Port: 8000,
Check: &AgentServiceCheck{
TTL: "15s",
},
}
err := agent.ServiceRegister(reg)
require.Nil(t, err)
requireServiceHealthID(t, testServiceID1, HealthCritical, true)
requireServiceHealthName(t, testServiceName, HealthCritical, true)
err = agent.WarnTTL(fmt.Sprintf("service:%s", testServiceID1), "I am warn")
require.Nil(t, err)
requireServiceHealthName(t, testServiceName, HealthWarning, true)
requireServiceHealthID(t, testServiceID1, HealthWarning, true)
err = agent.PassTTL(fmt.Sprintf("service:%s", testServiceID1), "I am good :)")
require.Nil(t, err)
requireServiceHealthName(t, testServiceName, HealthPassing, true)
requireServiceHealthID(t, testServiceID1, HealthPassing, true)
err = agent.FailTTL(fmt.Sprintf("service:%s", testServiceID1), "I am dead.")
require.Nil(t, err)
requireServiceHealthName(t, testServiceName, HealthCritical, true)
requireServiceHealthID(t, testServiceID1, HealthCritical, true)
// register another service
reg = &AgentServiceRegistration{
Name: testServiceName,
ID: testServiceID2,
Port: 8000,
Check: &AgentServiceCheck{
TTL: "15s",
},
}
err = agent.ServiceRegister(reg)
require.Nil(t, err)
requireServiceHealthName(t, testServiceName, HealthCritical, true)
err = agent.PassTTL(fmt.Sprintf("service:%s", testServiceID1), "I am good :)")
require.Nil(t, err)
requireServiceHealthName(t, testServiceName, HealthCritical, true)
err = agent.WarnTTL(fmt.Sprintf("service:%s", testServiceID2), "I am warn")
require.Nil(t, err)
requireServiceHealthName(t, testServiceName, HealthWarning, true)
err = agent.PassTTL(fmt.Sprintf("service:%s", testServiceID2), "I am good :)")
require.Nil(t, err)
requireServiceHealthName(t, testServiceName, HealthPassing, true)
}

View File

@ -773,7 +773,7 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
r := c.newRequest("GET", endpoint)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.doRequest(r))
rtt, resp, err := c.doRequest(r)
if err != nil {
return nil, err
}

View File

@ -46,16 +46,18 @@ func TestRegisterMonitor_heartbeat(t *testing.T) {
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
m, _ := testMonitor(t, client)
defer m.Close()
// Get the check and verify that it is passing
checks, err := client.Agent().Checks()
require.NoError(err)
require.Contains(checks, m.checkID())
require.Equal("passing", checks[m.checkID()].Status)
// Purposely fail the TTL check, verify it becomes healthy again
require.NoError(client.Agent().FailTTL(m.checkID(), ""))
retry.Run(t, func(r *retry.R) {
// Get the check and verify that it is passing
checks, err := client.Agent().Checks()
require.NoError(err)
require.Contains(checks, m.checkID())
require.Equal("passing", checks[m.checkID()].Status)
// Purposely fail the TTL check, verify it becomes healthy again
require.NoError(client.Agent().FailTTL(m.checkID(), ""))
})
retry.Run(t, func(r *retry.R) {
checks, err := client.Agent().Checks()
if err != nil {
r.Fatalf("err: %s", err)

View File

@ -4,9 +4,8 @@ import (
"strings"
"testing"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/testrpc"
"github.com/mitchellh/cli"
)

View File

@ -151,6 +151,216 @@ contains the [hash-based blocking
query](/api/index.html#hash-based-blocking-queries) hash for the result. The
same hash is also present in `X-Consul-ContentHash`.
## Get local service health
Retrieve an aggregated state of service(s) on the local agent by name.
This endpoints support JSON format and text/plain formats, JSON being the
default. In order to get the text format, you can append `?format=text` to
the URL or use Mime Content negotiation by specifying a HTTP Header
`Accept` starting with `text/plain`.
| Method | Path | Produces |
| ------ | --------------------------------------------------------- | ------------------ |
| `GET` | `/v1/agent/health/service/name/:service_name` | `application/json` |
| `GET` | `/v1/agent/health/service/name/:service_name?format=text` | `text/plain` |
The table below shows this endpoint's support for
[blocking queries](/api/index.html#blocking-queries),
[consistency modes](/api/index.html#consistency-modes),
[agent caching](/api/index.html#agent-caching), and
[required ACLs](/api/index.html#acls).
| Blocking Queries | Consistency Modes | Agent Caching | ACL Required |
| ---------------- | ----------------- | ------------- | -------------- |
| `NO` | `none` | `none` | `service:read` |
Those endpoints return the aggregated values of all healthchecks for the
service instance(s) and will return the corresponding HTTP codes:
| Result | Meaning |
| ------ | ----------------------------------------------------------------|
| `200` | All healthchecks of every matching service instance are passing |
| `400` | Bad parameter (missing service name of id) |
| `404` | No such service id or name |
| `429` | Some healthchecks are passing, at least one is warning |
| `503` | At least one of the healthchecks is critical |
Those endpoints might be usefull for the following use-cases:
* a load-balancer wants to check IP connectivity with an agent and retrieve
the aggregated status of given service
* create aliases for a given service (thus, the healthcheck of alias uses
http://localhost:8500/v1/agent/service/id/aliased_service_id healthcheck)
##### Note
If you know the ID of service you want to target, it is recommended to use
[`/v1/agent/health/service/id/:service_id`](/api/service.html#get-local-service-health-by-id)
so you have the result for the service only. When requesting
`/v1/agent/health/service/name/:service_name`, the caller will receive the
worst state of all services having the given name.
### Sample Requests
Given 2 services with name `web`, with web2 critical and web1 passing:
#### List worst statuses of all instances of web-demo services (HTTP 503)
##### By Name, Text
```shell
curl http://localhost:8500/v1/agent/health/service/name/web?format=text
critical
```
##### By Name, JSON
In JSON, the detail of passing/warning/critical services is present in output,
in a array.
```shell
curl localhost:8500/v1/agent/health/service/name/web
```
```json
{
"critical": [
{
"ID": "web2",
"Service": "web",
"Tags": [
"rails"
],
"Address": "",
"Meta": null,
"Port": 80,
"EnableTagOverride": false,
"ProxyDestination": "",
"Connect": {
"Native": false,
"Proxy": null
},
"CreateIndex": 0,
"ModifyIndex": 0
}
],
"passing": [
{
"ID": "web1",
"Service": "web",
"Tags": [
"rails"
],
"Address": "",
"Meta": null,
"Port": 80,
"EnableTagOverride": false,
"ProxyDestination": "",
"Connect": {
"Native": false,
"Proxy": null
},
"CreateIndex": 0,
"ModifyIndex": 0
}
]
}
```
#### List status of web2 (HTTP 503)
##### Failure By ID, Text
```shell
curl http://localhost:8500/v1/agent/health/service/id/web2?format=text
critical
```
##### Failure By ID, JSON
In JSON, the output per ID is not an array, but only contains the value
of service.
```shell
curl localhost:8500/v1/agent/health/service/id/web2
```
```json
{
"critical": {
"ID": "web2",
"Service": "web",
"Tags": [
"rails"
],
"Address": "",
"Meta": null,
"Port": 80,
"EnableTagOverride": false,
"ProxyDestination": "",
"Connect": {
"Native": false,
"Proxy": null
},
"CreateIndex": 0,
"ModifyIndex": 0
}
}
```
#### List status of web2 (HTTP 200)
##### Success By ID, Text
```shell
curl localhost:8500/v1/agent/health/service/id/web1?format=text
passing
```
#### Success By ID, JSON
```shell
curl localhost:8500/v1/agent/health/service/id/web1
```
```json
{
"passing": {
"ID": "web1",
"Service": "web",
"Tags": [
"rails"
],
"Address": "",
"Meta": null,
"Port": 80,
"EnableTagOverride": false,
"ProxyDestination": "",
"Connect": {
"Native": false,
"Proxy": null
},
"CreateIndex": 0,
"ModifyIndex": 0
}
}
```
## Get local service health by its ID
Retrive an aggregated state of service(s) on the local agent by ID.
See:
| Method | Path | Produces |
| ------ | ------------------------------------------------------ | ------------------ |
| `GET` | `/v1/agent/health/service/id/:service_id` | `application/json` |
| `GET` | `/v1/agent/health/service/id/:service_id?format=text` | `text/plain` |
Parameters and response format are the same as
[`/v1/agent/health/service/name/:service_name`](/api/service.html#get-local-service-health).
## Register Service
This endpoint adds a new service, with an optional health check, to the local