Add node metadata filtering to remaining health/catalog endpoints

This commit is contained in:
Kyle Havlovitz 2017-01-13 20:08:43 -05:00
parent 73821c769e
commit e37f25dc02
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
17 changed files with 1085 additions and 13 deletions

View File

@ -222,6 +222,36 @@ func TestCatalog_Service(t *testing.T) {
}) })
} }
func TestCatalog_Service_NodeMetaFilter(t *testing.T) {
t.Parallel()
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
catalog := c.Catalog()
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Service("consul", "", &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(services) == 0 {
return false, fmt.Errorf("Bad: %v", services)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Node(t *testing.T) { func TestCatalog_Node(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)

View File

@ -208,6 +208,46 @@ func TestHealth_Checks(t *testing.T) {
}) })
} }
func TestHealth_Checks_NodeMetaFilter(t *testing.T) {
t.Parallel()
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
agent := c.Agent()
health := c.Health()
// Make a service with a check
reg := &AgentServiceRegistration{
Name: "foo",
Check: &AgentServiceCheck{
TTL: "15s",
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
defer agent.ServiceDeregister("foo")
testutil.WaitForResult(func() (bool, error) {
checks, meta, err := health.Checks("foo", &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_Service(t *testing.T) { func TestHealth_Service(t *testing.T) {
c, s := makeClient(t) c, s := makeClient(t)
defer s.Stop() defer s.Stop()
@ -235,6 +275,36 @@ func TestHealth_Service(t *testing.T) {
}) })
} }
func TestHealth_Service_NodeMetaFilter(t *testing.T) {
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
health := c.Health()
testutil.WaitForResult(func() (bool, error) {
// consul service should always exist...
checks, meta, err := health.Service("consul", "", true, &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
if _, ok := checks[0].Node.TaggedAddresses["wan"]; !ok {
return false, fmt.Errorf("Bad: %v", checks[0].Node)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_State(t *testing.T) { func TestHealth_State(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)
@ -258,3 +328,30 @@ func TestHealth_State(t *testing.T) {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
}) })
} }
func TestHealth_State_NodeMetaFilter(t *testing.T) {
t.Parallel()
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
health := c.Health()
testutil.WaitForResult(func() (bool, error) {
checks, meta, err := health.State("any", &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

View File

@ -103,6 +103,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
// Set default DC // Set default DC
args := structs.ServiceSpecificRequest{} args := structs.ServiceSpecificRequest{}
s.parseSource(req, &args.Source) s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil return nil, nil
} }

View File

@ -608,6 +608,72 @@ func TestCatalogServiceNodes(t *testing.T) {
} }
} }
func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
// Make sure an empty list is returned, not a nil
{
req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.CatalogServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
nodes := obj.(structs.ServiceNodes)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
}
// Register node
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
},
Service: &structs.NodeService{
Service: "api",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.CatalogServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
nodes := obj.(structs.ServiceNodes)
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
}
}
func TestCatalogServiceNodes_WanTranslation(t *testing.T) { func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
dir1, srv1 := makeHTTPServerWithConfig(t, dir1, srv1 := makeHTTPServerWithConfig(t,
func(c *Config) { func(c *Config) {

View File

@ -11,6 +11,7 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
// Set default DC // Set default DC
args := structs.ChecksInStateRequest{} args := structs.ChecksInStateRequest{}
s.parseSource(req, &args.Source) s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil return nil, nil
} }
@ -70,6 +71,7 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
// Set default DC // Set default DC
args := structs.ServiceSpecificRequest{} args := structs.ServiceSpecificRequest{}
s.parseSource(req, &args.Source) s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil return nil, nil
} }
@ -100,6 +102,7 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
// Set default DC // Set default DC
args := structs.ServiceSpecificRequest{} args := structs.ServiceSpecificRequest{}
s.parseSource(req, &args.Source) s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil return nil, nil
} }

View File

@ -65,6 +65,49 @@ func TestHealthChecksInState(t *testing.T) {
}) })
} }
func TestHealthChecksInState_NodeMetaFilter(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Check: &structs.HealthCheck{
Node: "bar",
Name: "node check",
Status: structs.HealthCritical,
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("GET", "/v1/health/state/critical?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
resp := httptest.NewRecorder()
obj, err := srv.HealthChecksInState(resp, req)
if err != nil {
return false, err
}
if err := checkIndex(resp); err != nil {
return false, err
}
// Should be 1 health check for the server
nodes := obj.(structs.HealthChecks)
if len(nodes) != 1 {
return false, fmt.Errorf("bad: %v", obj)
}
return true, nil
}, func(err error) { t.Fatalf("err: %v", err) })
})
}
func TestHealthChecksInState_DistanceSort(t *testing.T) { func TestHealthChecksInState_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -258,6 +301,69 @@ func TestHealthServiceChecks(t *testing.T) {
} }
} }
func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
req, err := http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.HealthServiceChecks(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be a non-nil empty list
nodes := obj.(structs.HealthChecks)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
// Create a service check
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: srv.agent.config.NodeName,
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Check: &structs.HealthCheck{
Node: srv.agent.config.NodeName,
Name: "consul check",
ServiceID: "consul",
},
}
var out struct{}
if err = srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err = http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = httptest.NewRecorder()
obj, err = srv.HealthServiceChecks(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be 1 health check for consul
nodes = obj.(structs.HealthChecks)
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
}
}
func TestHealthServiceChecks_DistanceSort(t *testing.T) { func TestHealthServiceChecks_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -429,6 +535,69 @@ func TestHealthServiceNodes(t *testing.T) {
} }
} }
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be a non-nil empty list
nodes := obj.(structs.CheckServiceNodes)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: &structs.NodeService{
ID: "test",
Service: "test",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = httptest.NewRecorder()
obj, err = srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be a non-nil empty list for checks
nodes = obj.(structs.CheckServiceNodes)
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj)
}
}
func TestHealthServiceNodes_DistanceSort(t *testing.T) { func TestHealthServiceNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)

View File

@ -243,6 +243,15 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return err return err
} }
reply.Index, reply.ServiceNodes = index, services reply.Index, reply.ServiceNodes = index, services
if len(args.NodeMetaFilters) > 0 {
var filtered structs.ServiceNodes
for _, service := range services {
if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) {
filtered = append(filtered, service)
}
}
reply.ServiceNodes = filtered
}
if err := c.srv.filterACL(args.Token, reply); err != nil { if err := c.srv.filterACL(args.Token, reply); err != nil {
return err return err
} }

View File

@ -592,7 +592,7 @@ func TestCatalog_ListNodes(t *testing.T) {
} }
} }
func TestCatalog_ListNodes_MetaFilter(t *testing.T) { func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
@ -1060,7 +1060,7 @@ func TestCatalog_ListServices(t *testing.T) {
} }
} }
func TestCatalog_ListServices_MetaFilter(t *testing.T) { func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
@ -1308,6 +1308,106 @@ func TestCatalog_ListServiceNodes(t *testing.T) {
} }
} }
func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Add 2 nodes with specific meta maps
node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue", "common": "1"}}
if err := s1.fsm.State().EnsureNode(1, node); err != nil {
t.Fatalf("err: %v", err)
}
node2 := &structs.Node{Node: "bar", Address: "127.0.0.2", Meta: map[string]string{"common": "1"}}
if err := s1.fsm.State().EnsureNode(2, node2); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(4, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
tag string
services structs.ServiceNodes
}{
// Basic meta filter
{
filters: map[string]string{"somekey": "somevalue"},
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Basic meta filter, tag
{
filters: map[string]string{"somekey": "somevalue"},
tag: "primary",
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Common meta filter
{
filters: map[string]string{"common": "1"},
services: structs.ServiceNodes{
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
&structs.ServiceNode{Node: "foo", ServiceID: "db"},
},
},
// Common meta filter, tag
{
filters: map[string]string{"common": "1"},
tag: "secondary",
services: structs.ServiceNodes{
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
},
},
// Invalid meta filter
{
filters: map[string]string{"invalid": "nope"},
services: structs.ServiceNodes{},
},
// Multiple filter values
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Multiple filter values, tag
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
tag: "primary",
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
}
for _, tc := range cases {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
ServiceName: "db",
ServiceTag: tc.tag,
TagFilter: tc.tag != "",
}
var out structs.IndexedServiceNodes
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.ServiceNodes) != len(tc.services) {
t.Fatalf("bad: %v", out)
}
for i, serviceNode := range out.ServiceNodes {
if serviceNode.Node != tc.services[i].Node || serviceNode.ServiceID != tc.services[i].ServiceID {
t.Fatalf("bad: %v, %v filters: %v", serviceNode, tc.services[i], tc.filters)
}
}
}
}
func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) { func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)

View File

@ -25,7 +25,14 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
&reply.QueryMeta, &reply.QueryMeta,
state.GetQueryWatch("ChecksInState"), state.GetQueryWatch("ChecksInState"),
func() error { func() error {
index, checks, err := state.ChecksInState(args.State) var index uint64
var checks structs.HealthChecks
var err error
if len(args.NodeMetaFilters) > 0 {
index, checks, err = state.ChecksInStateByNodeMeta(args.State, args.NodeMetaFilters)
} else {
index, checks, err = state.ChecksInState(args.State)
}
if err != nil { if err != nil {
return err return err
} }
@ -80,7 +87,14 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
&reply.QueryMeta, &reply.QueryMeta,
state.GetQueryWatch("ServiceChecks"), state.GetQueryWatch("ServiceChecks"),
func() error { func() error {
index, checks, err := state.ServiceChecks(args.ServiceName) var index uint64
var checks structs.HealthChecks
var err error
if len(args.NodeMetaFilters) > 0 {
index, checks, err = state.ServiceChecksByNodeMeta(args.ServiceName, args.NodeMetaFilters)
} else {
index, checks, err = state.ServiceChecks(args.ServiceName)
}
if err != nil { if err != nil {
return err return err
} }
@ -123,6 +137,15 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
} }
reply.Index, reply.Nodes = index, nodes reply.Index, reply.Nodes = index, nodes
if len(args.NodeMetaFilters) > 0 {
var filtered structs.CheckServiceNodes
for _, node := range nodes {
if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) {
filtered = append(filtered, node)
}
}
reply.Nodes = filtered
}
if err := h.srv.filterACL(args.Token, reply); err != nil { if err := h.srv.filterACL(args.Token, reply); err != nil {
return err return err
} }

View File

@ -57,6 +57,101 @@ func TestHealth_ChecksInState(t *testing.T) {
} }
} }
func TestHealth_ChecksInState_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
"common": "1",
},
Check: &structs.HealthCheck{
Name: "memory utilization",
Status: structs.HealthPassing,
},
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
NodeMeta: map[string]string{
"common": "1",
},
Check: &structs.HealthCheck{
Name: "disk space",
Status: structs.HealthPassing,
},
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
checkNames []string
}{
// Get foo's check by its unique meta value
{
filters: map[string]string{"somekey": "somevalue"},
checkNames: []string{"memory utilization"},
},
// Get both foo/bar's checks by their common meta value
{
filters: map[string]string{"common": "1"},
checkNames: []string{"disk space", "memory utilization"},
},
// Use an invalid meta value, should get empty result
{
filters: map[string]string{"invalid": "nope"},
checkNames: []string{},
},
// Use multiple filters to get foo's check
{
filters: map[string]string{
"somekey": "somevalue",
"common": "1",
},
checkNames: []string{"memory utilization"},
},
}
for _, tc := range cases {
var out structs.IndexedHealthChecks
inState := structs.ChecksInStateRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
State: structs.HealthPassing,
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out); err != nil {
t.Fatalf("err: %v", err)
}
checks := out.HealthChecks
if len(checks) != len(tc.checkNames) {
t.Fatalf("Bad: %v, %v", checks, tc.checkNames)
}
for i, check := range checks {
if tc.checkNames[i] != check.Name {
t.Fatalf("Bad: %v %v", checks, tc.checkNames)
}
}
}
}
func TestHealth_ChecksInState_DistanceSort(t *testing.T) { func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -221,6 +316,111 @@ func TestHealth_ServiceChecks(t *testing.T) {
} }
} }
func TestHealth_ServiceChecks_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "memory utilization",
Status: structs.HealthPassing,
ServiceID: "db",
},
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
NodeMeta: map[string]string{
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "disk space",
Status: structs.HealthPassing,
ServiceID: "db",
},
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
checkNames []string
}{
// Get foo's check by its unique meta value
{
filters: map[string]string{"somekey": "somevalue"},
checkNames: []string{"memory utilization"},
},
// Get both foo/bar's checks by their common meta value
{
filters: map[string]string{"common": "1"},
checkNames: []string{"disk space", "memory utilization"},
},
// Use an invalid meta value, should get empty result
{
filters: map[string]string{"invalid": "nope"},
checkNames: []string{},
},
// Use multiple filters to get foo's check
{
filters: map[string]string{
"somekey": "somevalue",
"common": "1",
},
checkNames: []string{"memory utilization"},
},
}
for _, tc := range cases {
var out structs.IndexedHealthChecks
inState := structs.ServiceSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
ServiceName: "db",
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &inState, &out); err != nil {
t.Fatalf("err: %v", err)
}
checks := out.HealthChecks
if len(checks) != len(tc.checkNames) {
t.Fatalf("Bad: %v, %v", checks, tc.checkNames)
}
for i, check := range checks {
if tc.checkNames[i] != check.Name {
t.Fatalf("Bad: %v %v", checks, tc.checkNames)
}
}
}
}
func TestHealth_ServiceChecks_DistanceSort(t *testing.T) { func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -392,6 +592,136 @@ func TestHealth_ServiceNodes(t *testing.T) {
} }
} }
func TestHealth_ServiceNodes_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "memory utilization",
Status: structs.HealthPassing,
ServiceID: "db",
},
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
NodeMeta: map[string]string{
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "disk space",
Status: structs.HealthWarning,
ServiceID: "db",
},
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
nodes structs.CheckServiceNodes
}{
// Get foo's check by its unique meta value
{
filters: map[string]string{"somekey": "somevalue"},
nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{Node: "foo"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
},
},
},
// Get both foo/bar's checks by their common meta value
{
filters: map[string]string{"common": "1"},
nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{Node: "bar"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "disk space"}},
},
structs.CheckServiceNode{
Node: &structs.Node{Node: "foo"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
},
},
},
// Use an invalid meta value, should get empty result
{
filters: map[string]string{"invalid": "nope"},
nodes: structs.CheckServiceNodes{},
},
// Use multiple filters to get foo's check
{
filters: map[string]string{
"somekey": "somevalue",
"common": "1",
},
nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{Node: "foo"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
},
},
},
}
for _, tc := range cases {
var out structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
ServiceName: "db",
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Nodes) != len(tc.nodes) {
t.Fatalf("bad: %v, %v, filters: %v", out.Nodes, tc.nodes, tc.filters)
}
for i, node := range out.Nodes {
checks := tc.nodes[i].Checks
if len(node.Checks) != len(checks) {
t.Fatalf("bad: %v, %v, filters: %v", node.Checks, checks, tc.filters)
}
for j, check := range node.Checks {
if check.Name != checks[j].Name {
t.Fatalf("bad: %v, %v, filters: %v", check, checks[j], tc.filters)
}
}
}
}
}
func TestHealth_ServiceNodes_DistanceSort(t *testing.T) { func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)

View File

@ -877,6 +877,24 @@ func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthCh
return s.parseChecks(idx, checks) return s.parseChecks(idx, checks)
} }
// ServiceChecksByNodeMeta is used to get all checks associated with a
// given service ID. The query is performed against a service
// _name_ instead of a service ID.
func (s *StateStore) ServiceChecksByNodeMeta(serviceName string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecksByNodeMeta")...)
// Return the checks.
checks, err := tx.Get("checks", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
}
// ChecksInState is used to query the state store for all checks // ChecksInState is used to query the state store for all checks
// which are in the provided state. // which are in the provided state.
func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) { func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) {
@ -903,6 +921,34 @@ func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks,
return s.parseChecks(idx, checks) return s.parseChecks(idx, checks)
} }
// ChecksInState is used to query the state store for all checks
// which are in the provided state.
func (s *StateStore) ChecksInStateByNodeMeta(state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ChecksInStateByNodeMeta")...)
// Query all checks if HealthAny is passed
var checks memdb.ResultIterator
var err error
if state == structs.HealthAny {
checks, err = tx.Get("checks", "status")
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
} else {
// Any other state we need to query for explicitly
checks, err = tx.Get("checks", "status", state)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
}
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
}
// parseChecks is a helper function used to deduplicate some // parseChecks is a helper function used to deduplicate some
// repetitive code for returning health checks. // repetitive code for returning health checks.
func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) { func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) {
@ -914,6 +960,27 @@ func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64,
return idx, results, nil return idx, results, nil
} }
// parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields.
func (s *StateStore) parseChecksByNodeMeta(idx uint64, iter memdb.ResultIterator, tx *memdb.Txn,
filters map[string]string) (uint64, structs.HealthChecks, error) {
var results structs.HealthChecks
for check := iter.Next(); check != nil; check = iter.Next() {
healthCheck := check.(*structs.HealthCheck)
node, err := tx.First("nodes", "id", healthCheck.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
return 0, nil, ErrMissingNode
}
if structs.SatisfiesMetaFilters(node.(*structs.Node).Meta, filters) {
results = append(results, healthCheck)
}
}
return idx, results, nil
}
// DeleteCheck is used to delete a health check registration. // DeleteCheck is used to delete a health check registration.
func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error { func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
) )
func TestStateStore_EnsureRegistration(t *testing.T) { func TestStateStore_EnsureRegistration(t *testing.T) {
@ -1546,6 +1547,63 @@ func TestStateStore_ServiceChecks(t *testing.T) {
} }
} }
func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Create the first node and service with some checks
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
testRegisterService(t, s, 1, "node1", "service1")
testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing)
// Create a second node/service with a different set of checks
testRegisterNodeWithMeta(t, s, 4, "node2", map[string]string{"common": "1"})
testRegisterService(t, s, 5, "node2", "service1")
testRegisterCheck(t, s, 6, "node2", "service1", "check3", structs.HealthPassing)
cases := []struct {
filters map[string]string
checks []string
}{
// Basic meta filter
{
filters: map[string]string{"somekey": "somevalue"},
checks: []string{"check1", "check2"},
},
// Common meta field
{
filters: map[string]string{"common": "1"},
checks: []string{"check1", "check2", "check3"},
},
// Invalid meta filter
{
filters: map[string]string{"invalid": "nope"},
checks: []string{},
},
// Multiple filters
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
checks: []string{"check1", "check2"},
},
}
// Try querying for all checks associated with service1
for _, tc := range cases {
_, checks, err := s.ServiceChecksByNodeMeta("service1", tc.filters)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != len(tc.checks) {
t.Fatalf("bad checks: %#v", checks)
}
for i, check := range checks {
if check.CheckID != types.CheckID(tc.checks[i]) {
t.Fatalf("bad checks: %#v", checks)
}
}
}
}
func TestStateStore_ChecksInState(t *testing.T) { func TestStateStore_ChecksInState(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -1585,6 +1643,88 @@ func TestStateStore_ChecksInState(t *testing.T) {
} }
} }
func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Querying with no results returns nil
idx, res, err := s.ChecksInStateByNodeMeta(structs.HealthPassing, nil)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register a node with checks in varied states
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical)
testRegisterNodeWithMeta(t, s, 3, "node2", map[string]string{"common": "1"})
testRegisterCheck(t, s, 4, "node2", "", "check3", structs.HealthPassing)
cases := []struct {
filters map[string]string
state string
checks []string
}{
// Basic meta filter, any status
{
filters: map[string]string{"somekey": "somevalue"},
state: structs.HealthAny,
checks: []string{"check2", "check1"},
},
// Basic meta filter, only passing
{
filters: map[string]string{"somekey": "somevalue"},
state: structs.HealthPassing,
checks: []string{"check1"},
},
// Common meta filter, any status
{
filters: map[string]string{"common": "1"},
state: structs.HealthAny,
checks: []string{"check2", "check1", "check3"},
},
// Common meta filter, only passing
{
filters: map[string]string{"common": "1"},
state: structs.HealthPassing,
checks: []string{"check1", "check3"},
},
// Invalid meta filter
{
filters: map[string]string{"invalid": "nope"},
checks: []string{},
},
// Multiple filters, any status
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
state: structs.HealthAny,
checks: []string{"check2", "check1"},
},
// Multiple filters, only passing
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
state: structs.HealthPassing,
checks: []string{"check1"},
},
}
// Try querying for all checks associated with service1
for _, tc := range cases {
_, checks, err := s.ChecksInStateByNodeMeta(tc.state, tc.filters)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != len(tc.checks) {
t.Fatalf("bad checks: %#v", checks)
}
for i, check := range checks {
if check.CheckID != types.CheckID(tc.checks[i]) {
t.Fatalf("bad checks: %#v, %v", checks, tc.checks)
}
}
}
}
func TestStateStore_DeleteCheck(t *testing.T) { func TestStateStore_DeleteCheck(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)

View File

@ -222,6 +222,8 @@ func (s *StateStore) getWatchTables(method string) []string {
return []string{"nodes", "services"} return []string{"nodes", "services"}
case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState": case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState":
return []string{"checks"} return []string{"checks"}
case "ChecksInStateByNodeMeta", "ServiceChecksByNodeMeta":
return []string{"nodes", "checks"}
case "CheckServiceNodes", "NodeInfo", "NodeDump": case "CheckServiceNodes", "NodeInfo", "NodeDump":
return []string{"nodes", "services", "checks"} return []string{"nodes", "services", "checks"}
case "SessionGet", "SessionList", "NodeSessions": case "SessionGet", "SessionList", "NodeSessions":

View File

@ -35,7 +35,11 @@ func testStateStore(t *testing.T) *StateStore {
} }
func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) { func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) {
node := &structs.Node{Node: nodeID} testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
}
func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID string, meta map[string]string) {
node := &structs.Node{Node: nodeID, Meta: meta}
if err := s.EnsureNode(idx, node); err != nil { if err := s.EnsureNode(idx, node); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }

View File

@ -242,6 +242,7 @@ func (r *DCSpecificRequest) RequestDatacenter() string {
// ServiceSpecificRequest is used to query about a specific service // ServiceSpecificRequest is used to query about a specific service
type ServiceSpecificRequest struct { type ServiceSpecificRequest struct {
Datacenter string Datacenter string
NodeMetaFilters map[string]string
ServiceName string ServiceName string
ServiceTag string ServiceTag string
TagFilter bool // Controls tag filtering TagFilter bool // Controls tag filtering
@ -267,6 +268,7 @@ func (r *NodeSpecificRequest) RequestDatacenter() string {
// ChecksInStateRequest is used to query for nodes in a state // ChecksInStateRequest is used to query for nodes in a state
type ChecksInStateRequest struct { type ChecksInStateRequest struct {
Datacenter string Datacenter string
NodeMetaFilters map[string]string
State string State string
Source QuerySource Source QuerySource
QueryOptions QueryOptions
@ -287,6 +289,15 @@ type Node struct {
} }
type Nodes []*Node type Nodes []*Node
func SatisfiesMetaFilters(meta map[string]string, filters map[string]string) bool {
for key, value := range filters {
if v, ok := meta[key]; !ok || v != value {
return false
}
}
return true
}
// Used to return information about a provided services. // Used to return information about a provided services.
// Maps service name to available tags // Maps service name to available tags
type Services map[string][]string type Services map[string][]string

View File

@ -276,6 +276,11 @@ the node list in ascending order based on the estimated round trip
time from that node. Passing `?near=_agent` will use the agent's time from that node. Passing `?near=_agent` will use the agent's
node for the sort. node for the sort.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
service entries on nodes with the specified key/value pair(s).
It returns a JSON body like this: It returns a JSON body like this:
```javascript ```javascript

View File

@ -75,6 +75,11 @@ the node list in ascending order based on the estimated round trip
time from that node. Passing `?near=_agent` will use the agent's time from that node. Passing `?near=_agent` will use the agent's
node for the sort. node for the sort.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
health checks on nodes with the specified key/value pair(s).
It returns a JSON body like this: It returns a JSON body like this:
```javascript ```javascript
@ -112,6 +117,11 @@ Providing the `?passing` query parameter, added in Consul 0.2, will
filter results to only nodes with all checks in the `passing` state. filter results to only nodes with all checks in the `passing` state.
This can be used to avoid extra filtering logic on the client side. This can be used to avoid extra filtering logic on the client side.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
nodes with the specified key/value pair(s).
This endpoint is very similar to the `/v1/catalog/service` endpoint; however, this This endpoint is very similar to the `/v1/catalog/service` endpoint; however, this
endpoint automatically returns the status of the associated health check endpoint automatically returns the status of the associated health check
as well as any system level health checks. This allows a client to avoid as well as any system level health checks. This allows a client to avoid
@ -182,6 +192,11 @@ the node list in ascending order based on the estimated round trip
time from that node. Passing `?near=_agent` will use the agent's time from that node. Passing `?near=_agent` will use the agent's
node for the sort. node for the sort.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
health checks on nodes with the specified key/value pair(s).
The supported states are `any`, `passing`, `warning`, or `critical`. The supported states are `any`, `passing`, `warning`, or `critical`.
The `any` state is a wildcard that can be used to return all checks. The `any` state is a wildcard that can be used to return all checks.