ac78c23021
Fixes: #4222 # Data Filtering This PR will implement filtering for the following endpoints: ## Supported HTTP Endpoints - `/agent/checks` - `/agent/services` - `/catalog/nodes` - `/catalog/service/:service` - `/catalog/connect/:service` - `/catalog/node/:node` - `/health/node/:node` - `/health/checks/:service` - `/health/service/:service` - `/health/connect/:service` - `/health/state/:state` - `/internal/ui/nodes` - `/internal/ui/services` More can be added going forward and any endpoint which is used to list some data is a good candidate. ## Usage When using the HTTP API a `filter` query parameter can be used to pass a filter expression to Consul. Filter Expressions take the general form of: ``` <selector> == <value> <selector> != <value> <value> in <selector> <value> not in <selector> <selector> contains <value> <selector> not contains <value> <selector> is empty <selector> is not empty not <other expression> <expression 1> and <expression 2> <expression 1> or <expression 2> ``` Normal boolean logic and precedence is supported. All of the actual filtering and evaluation logic is coming from the [go-bexpr](https://github.com/hashicorp/go-bexpr) library ## Other changes Adding the `Internal.ServiceDump` RPC endpoint. This will allow the UI to filter services better.
486 lines
12 KiB
Go
486 lines
12 KiB
Go
package consul
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"os"
|
|
"testing"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/lib"
|
|
"github.com/hashicorp/consul/testrpc"
|
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestInternal_NodeInfo(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
arg := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"master"},
|
|
},
|
|
Check: &structs.HealthCheck{
|
|
Name: "db connect",
|
|
Status: api.HealthPassing,
|
|
ServiceID: "db",
|
|
},
|
|
}
|
|
var out struct{}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
var out2 structs.IndexedNodeDump
|
|
req := structs.NodeSpecificRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out2); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
nodes := out2.Dump
|
|
if len(nodes) != 1 {
|
|
t.Fatalf("Bad: %v", nodes)
|
|
}
|
|
if nodes[0].Node != "foo" {
|
|
t.Fatalf("Bad: %v", nodes[0])
|
|
}
|
|
if !lib.StrContains(nodes[0].Services[0].Tags, "master") {
|
|
t.Fatalf("Bad: %v", nodes[0])
|
|
}
|
|
if nodes[0].Checks[0].Status != api.HealthPassing {
|
|
t.Fatalf("Bad: %v", nodes[0])
|
|
}
|
|
}
|
|
|
|
func TestInternal_NodeDump(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
arg := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"master"},
|
|
},
|
|
Check: &structs.HealthCheck{
|
|
Name: "db connect",
|
|
Status: api.HealthPassing,
|
|
ServiceID: "db",
|
|
},
|
|
}
|
|
var out struct{}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
arg = structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "bar",
|
|
Address: "127.0.0.2",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"slave"},
|
|
},
|
|
Check: &structs.HealthCheck{
|
|
Name: "db connect",
|
|
Status: api.HealthWarning,
|
|
ServiceID: "db",
|
|
},
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
var out2 structs.IndexedNodeDump
|
|
req := structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
nodes := out2.Dump
|
|
if len(nodes) != 3 {
|
|
t.Fatalf("Bad: %v", nodes)
|
|
}
|
|
|
|
var foundFoo, foundBar bool
|
|
for _, node := range nodes {
|
|
switch node.Node {
|
|
case "foo":
|
|
foundFoo = true
|
|
if !lib.StrContains(node.Services[0].Tags, "master") {
|
|
t.Fatalf("Bad: %v", nodes[0])
|
|
}
|
|
if node.Checks[0].Status != api.HealthPassing {
|
|
t.Fatalf("Bad: %v", nodes[0])
|
|
}
|
|
|
|
case "bar":
|
|
foundBar = true
|
|
if !lib.StrContains(node.Services[0].Tags, "slave") {
|
|
t.Fatalf("Bad: %v", nodes[1])
|
|
}
|
|
if node.Checks[0].Status != api.HealthWarning {
|
|
t.Fatalf("Bad: %v", nodes[1])
|
|
}
|
|
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
if !foundFoo || !foundBar {
|
|
t.Fatalf("missing foo or bar")
|
|
}
|
|
}
|
|
|
|
func TestInternal_NodeDump_Filter(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
arg := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"master"},
|
|
},
|
|
Check: &structs.HealthCheck{
|
|
Name: "db connect",
|
|
Status: api.HealthPassing,
|
|
ServiceID: "db",
|
|
},
|
|
}
|
|
var out struct{}
|
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
|
|
|
|
arg = structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "bar",
|
|
Address: "127.0.0.2",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"slave"},
|
|
},
|
|
Check: &structs.HealthCheck{
|
|
Name: "db connect",
|
|
Status: api.HealthWarning,
|
|
ServiceID: "db",
|
|
},
|
|
}
|
|
|
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
|
|
|
|
var out2 structs.IndexedNodeDump
|
|
req := structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Filter: "master in Services.Tags"},
|
|
}
|
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2))
|
|
|
|
nodes := out2.Dump
|
|
require.Len(t, nodes, 1)
|
|
require.Equal(t, "foo", nodes[0].Node)
|
|
}
|
|
|
|
func TestInternal_KeyringOperation(t *testing.T) {
|
|
t.Parallel()
|
|
key1 := "H1dfkSZOVnP/JUnaBfTzXg=="
|
|
keyBytes1, err := base64.StdEncoding.DecodeString(key1)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.SerfLANConfig.MemberlistConfig.SecretKey = keyBytes1
|
|
c.SerfWANConfig.MemberlistConfig.SecretKey = keyBytes1
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
var out structs.KeyringResponses
|
|
req := structs.KeyringRequest{
|
|
Operation: structs.KeyringList,
|
|
Datacenter: "dc1",
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Two responses (local lan/wan pools) from single-node cluster
|
|
if len(out.Responses) != 2 {
|
|
t.Fatalf("bad: %#v", out)
|
|
}
|
|
if _, ok := out.Responses[0].Keys[key1]; !ok {
|
|
t.Fatalf("bad: %#v", out)
|
|
}
|
|
wanResp, lanResp := 0, 0
|
|
for _, resp := range out.Responses {
|
|
if resp.WAN {
|
|
wanResp++
|
|
} else {
|
|
lanResp++
|
|
}
|
|
}
|
|
if lanResp != 1 || wanResp != 1 {
|
|
t.Fatalf("should have one lan and one wan response")
|
|
}
|
|
|
|
// Start a second agent to test cross-dc queries
|
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
|
c.SerfLANConfig.MemberlistConfig.SecretKey = keyBytes1
|
|
c.SerfWANConfig.MemberlistConfig.SecretKey = keyBytes1
|
|
c.Datacenter = "dc2"
|
|
})
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
// Try to join
|
|
joinWAN(t, s2, s1)
|
|
|
|
var out2 structs.KeyringResponses
|
|
req2 := structs.KeyringRequest{
|
|
Operation: structs.KeyringList,
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req2, &out2); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// 3 responses (one from each DC LAN, one from WAN) in two-node cluster
|
|
if len(out2.Responses) != 3 {
|
|
t.Fatalf("bad: %#v", out)
|
|
}
|
|
wanResp, lanResp = 0, 0
|
|
for _, resp := range out2.Responses {
|
|
if resp.WAN {
|
|
wanResp++
|
|
} else {
|
|
lanResp++
|
|
}
|
|
}
|
|
if lanResp != 2 || wanResp != 1 {
|
|
t.Fatalf("should have two lan and one wan response")
|
|
}
|
|
}
|
|
|
|
func TestInternal_NodeInfo_FilterACL(t *testing.T) {
|
|
t.Parallel()
|
|
dir, token, srv, codec := testACLFilterServer(t)
|
|
defer os.RemoveAll(dir)
|
|
defer srv.Shutdown()
|
|
defer codec.Close()
|
|
|
|
opt := structs.NodeSpecificRequest{
|
|
Datacenter: "dc1",
|
|
Node: srv.config.NodeName,
|
|
QueryOptions: structs.QueryOptions{Token: token},
|
|
}
|
|
reply := structs.IndexedNodeDump{}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
for _, info := range reply.Dump {
|
|
found := false
|
|
for _, chk := range info.Checks {
|
|
if chk.ServiceName == "foo" {
|
|
found = true
|
|
}
|
|
if chk.ServiceName == "bar" {
|
|
t.Fatalf("bad: %#v", info.Checks)
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("bad: %#v", info.Checks)
|
|
}
|
|
|
|
found = false
|
|
for _, svc := range info.Services {
|
|
if svc.Service == "foo" {
|
|
found = true
|
|
}
|
|
if svc.Service == "bar" {
|
|
t.Fatalf("bad: %#v", info.Services)
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("bad: %#v", info.Services)
|
|
}
|
|
}
|
|
|
|
// We've already proven that we call the ACL filtering function so we
|
|
// test node filtering down in acl.go for node cases. This also proves
|
|
// that we respect the version 8 ACL flag, since the test server sets
|
|
// that to false (the regression value of *not* changing this is better
|
|
// for now until we change the sense of the version 8 ACL flag).
|
|
}
|
|
|
|
func TestInternal_NodeDump_FilterACL(t *testing.T) {
|
|
t.Parallel()
|
|
dir, token, srv, codec := testACLFilterServer(t)
|
|
defer os.RemoveAll(dir)
|
|
defer srv.Shutdown()
|
|
defer codec.Close()
|
|
|
|
opt := structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: token},
|
|
}
|
|
reply := structs.IndexedNodeDump{}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
for _, info := range reply.Dump {
|
|
found := false
|
|
for _, chk := range info.Checks {
|
|
if chk.ServiceName == "foo" {
|
|
found = true
|
|
}
|
|
if chk.ServiceName == "bar" {
|
|
t.Fatalf("bad: %#v", info.Checks)
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("bad: %#v", info.Checks)
|
|
}
|
|
|
|
found = false
|
|
for _, svc := range info.Services {
|
|
if svc.Service == "foo" {
|
|
found = true
|
|
}
|
|
if svc.Service == "bar" {
|
|
t.Fatalf("bad: %#v", info.Services)
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("bad: %#v", info.Services)
|
|
}
|
|
}
|
|
|
|
// We've already proven that we call the ACL filtering function so we
|
|
// test node filtering down in acl.go for node cases. This also proves
|
|
// that we respect the version 8 ACL flag, since the test server sets
|
|
// that to false (the regression value of *not* changing this is better
|
|
// for now until we change the sense of the version 8 ACL flag).
|
|
}
|
|
|
|
func TestInternal_EventFire_Token(t *testing.T) {
|
|
t.Parallel()
|
|
dir, srv := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLsEnabled = true
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDownPolicy = "deny"
|
|
c.ACLDefaultPolicy = "deny"
|
|
})
|
|
defer os.RemoveAll(dir)
|
|
defer srv.Shutdown()
|
|
|
|
codec := rpcClient(t, srv)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, srv.RPC, "dc1")
|
|
|
|
// No token is rejected
|
|
event := structs.EventFireRequest{
|
|
Name: "foo",
|
|
Datacenter: "dc1",
|
|
Payload: []byte("nope"),
|
|
}
|
|
err := msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil)
|
|
if !acl.IsErrPermissionDenied(err) {
|
|
t.Fatalf("bad: %s", err)
|
|
}
|
|
|
|
// Root token is allowed to fire
|
|
event.Token = "root"
|
|
err = msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
}
|
|
|
|
func TestInternal_ServiceDump(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// prep the cluster with some data we can use in our filters
|
|
registerTestCatalogEntries(t, codec)
|
|
|
|
doRequest := func(t *testing.T, filter string) structs.CheckServiceNodes {
|
|
t.Helper()
|
|
args := structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Filter: filter},
|
|
}
|
|
|
|
var out structs.IndexedCheckServiceNodes
|
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out))
|
|
return out.Nodes
|
|
}
|
|
|
|
// Run the tests against the test server
|
|
t.Run("No Filter", func(t *testing.T) {
|
|
nodes := doRequest(t, "")
|
|
// redis (3), web (3), critical (1), warning (1) and consul (1)
|
|
require.Len(t, nodes, 9)
|
|
})
|
|
|
|
t.Run("Filter Node foo and service version 1", func(t *testing.T) {
|
|
nodes := doRequest(t, "Node.Node == foo and Service.Meta.version == 1")
|
|
require.Len(t, nodes, 1)
|
|
require.Equal(t, "redis", nodes[0].Service.Service)
|
|
require.Equal(t, "redisV1", nodes[0].Service.ID)
|
|
})
|
|
|
|
t.Run("Filter service web", func(t *testing.T) {
|
|
nodes := doRequest(t, "Service.Service == web")
|
|
require.Len(t, nodes, 3)
|
|
})
|
|
}
|