Backport of watch: support -filter for consul watch: checks, services, nodes, service into release/1.16.x (#17965)
* backport to 1.16.x --------- Co-authored-by: cskh <hui.kang@hashicorp.com>
This commit is contained in:
parent
48e7fb1bfe
commit
1109d20262
|
@ -0,0 +1,3 @@
|
|||
```release-note:feature
|
||||
cli: `consul watch` command uses `-filter` expression to filter response from checks, services, nodes, and service.
|
||||
```
|
|
@ -1767,5 +1767,11 @@ func TestHealth_RPC_Filter(t *testing.T) {
|
|||
out = new(structs.IndexedHealthChecks)
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
|
||||
require.Len(t, out.HealthChecks, 1)
|
||||
|
||||
args.State = api.HealthAny
|
||||
args.Filter = "connect in ServiceTags and v2 in ServiceTags"
|
||||
out = new(structs.IndexedHealthChecks)
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
|
||||
require.Len(t, out.HealthChecks, 1)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -92,13 +92,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
// servicesWatch is used to watch the list of available services
|
||||
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||
stale := false
|
||||
filter := ""
|
||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assignValue(params, "filter", &filter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
||||
catalog := p.client.Catalog()
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
if filter != "" {
|
||||
opts.Filter = filter
|
||||
}
|
||||
defer p.cancelFunc()
|
||||
services, meta, err := catalog.Services(&opts)
|
||||
if err != nil {
|
||||
|
@ -112,13 +119,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
// nodesWatch is used to watch the list of available nodes
|
||||
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||
stale := false
|
||||
filter := ""
|
||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assignValue(params, "filter", &filter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
||||
catalog := p.client.Catalog()
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
if filter != "" {
|
||||
opts.Filter = filter
|
||||
}
|
||||
defer p.cancelFunc()
|
||||
nodes, meta, err := catalog.Nodes(&opts)
|
||||
if err != nil {
|
||||
|
@ -132,9 +146,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
// serviceWatch is used to watch a specific service for changes
|
||||
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||
stale := false
|
||||
filter := ""
|
||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assignValue(params, "filter", &filter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
service string
|
||||
|
@ -158,6 +176,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
||||
health := p.client.Health()
|
||||
opts := makeQueryOptionsWithContext(p, stale)
|
||||
if filter != "" {
|
||||
opts.Filter = filter
|
||||
}
|
||||
defer p.cancelFunc()
|
||||
nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
|
||||
if err != nil {
|
||||
|
@ -175,13 +196,16 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var service, state string
|
||||
var service, state, filter string
|
||||
if err := assignValue(params, "service", &service); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assignValue(params, "state", &state); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assignValue(params, "filter", &filter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if service != "" && state != "" {
|
||||
return nil, fmt.Errorf("Cannot specify service and state")
|
||||
}
|
||||
|
@ -196,6 +220,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
|
|||
var checks []*consulapi.HealthCheck
|
||||
var meta *consulapi.QueryMeta
|
||||
var err error
|
||||
if filter != "" {
|
||||
opts.Filter = filter
|
||||
}
|
||||
if state != "" {
|
||||
checks, meta, err = health.State(state, &opts)
|
||||
} else {
|
||||
|
|
|
@ -378,6 +378,82 @@ func TestServicesWatch(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestServicesWatch_Filter(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
s.WaitForSerfCheck(t)
|
||||
|
||||
var (
|
||||
wakeups []map[string][]string
|
||||
notifyCh = make(chan struct{})
|
||||
)
|
||||
|
||||
plan := mustParse(t, `{"type":"services", "filter":"b in ServiceTags and a in ServiceTags"}`)
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil {
|
||||
return // ignore
|
||||
}
|
||||
v, ok := raw.(map[string][]string)
|
||||
if !ok {
|
||||
return // ignore
|
||||
}
|
||||
wakeups = append(wakeups, v)
|
||||
notifyCh <- struct{}{}
|
||||
}
|
||||
|
||||
// Register some services
|
||||
{
|
||||
agent := c.Agent()
|
||||
|
||||
// we don't want to find this
|
||||
reg := &api.AgentServiceRegistration{
|
||||
ID: "foo",
|
||||
Name: "foo",
|
||||
Tags: []string{"b"},
|
||||
}
|
||||
if err := agent.ServiceRegister(reg); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// // we want to find this
|
||||
reg = &api.AgentServiceRegistration{
|
||||
ID: "bar",
|
||||
Name: "bar",
|
||||
Tags: []string{"a", "b"},
|
||||
}
|
||||
if err := agent.ServiceRegister(reg); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := plan.Run(s.HTTPAddr); err != nil {
|
||||
t.Errorf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
defer plan.Stop()
|
||||
|
||||
// Wait for second wakeup.
|
||||
<-notifyCh
|
||||
|
||||
plan.Stop()
|
||||
wg.Wait()
|
||||
|
||||
require.Len(t, wakeups, 1)
|
||||
|
||||
{
|
||||
v := wakeups[0]
|
||||
require.Len(t, v, 1)
|
||||
_, ok := v["bar"]
|
||||
require.True(t, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodesWatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
|
@ -453,6 +529,82 @@ func TestNodesWatch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNodesWatch_Filter(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
s.WaitForSerfCheck(t) // wait for AE to sync
|
||||
|
||||
var (
|
||||
wakeups [][]*api.Node
|
||||
notifyCh = make(chan struct{})
|
||||
)
|
||||
|
||||
plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`)
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil {
|
||||
return // ignore
|
||||
}
|
||||
v, ok := raw.([]*api.Node)
|
||||
if !ok {
|
||||
return // ignore
|
||||
}
|
||||
wakeups = append(wakeups, v)
|
||||
notifyCh <- struct{}{}
|
||||
}
|
||||
|
||||
// Register 2 nodes
|
||||
{
|
||||
catalog := c.Catalog()
|
||||
|
||||
// we want to find this node
|
||||
reg := &api.CatalogRegistration{
|
||||
Node: "foo",
|
||||
Address: "1.1.1.1",
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
if _, err := catalog.Register(reg, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// we don't want to find this node
|
||||
reg = &api.CatalogRegistration{
|
||||
Node: "bar",
|
||||
Address: "2.2.2.2",
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
if _, err := catalog.Register(reg, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
// Start the watch nodes plan
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := plan.Run(s.HTTPAddr); err != nil {
|
||||
t.Errorf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
defer plan.Stop()
|
||||
|
||||
// Wait for first wakeup.
|
||||
<-notifyCh
|
||||
|
||||
plan.Stop()
|
||||
wg.Wait()
|
||||
|
||||
require.Len(t, wakeups, 1)
|
||||
|
||||
{
|
||||
v := wakeups[0]
|
||||
require.Len(t, v, 1)
|
||||
require.Equal(t, "foo", v[0].Node)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceWatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
|
@ -616,6 +768,94 @@ func TestServiceMultipleTagsWatch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServiceWatch_Filter(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
s.WaitForSerfCheck(t)
|
||||
|
||||
var (
|
||||
wakeups [][]*api.ServiceEntry
|
||||
notifyCh = make(chan struct{})
|
||||
)
|
||||
|
||||
plan := mustParse(t, `{"type":"service", "service":"foo", "filter":"bar in Service.Tags and buzz in Service.Tags"}`)
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil {
|
||||
return // ignore
|
||||
}
|
||||
v, ok := raw.([]*api.ServiceEntry)
|
||||
if !ok {
|
||||
return // ignore
|
||||
}
|
||||
|
||||
wakeups = append(wakeups, v)
|
||||
notifyCh <- struct{}{}
|
||||
}
|
||||
|
||||
// register some services
|
||||
{
|
||||
agent := c.Agent()
|
||||
|
||||
// we do not want to find this one.
|
||||
reg := &api.AgentServiceRegistration{
|
||||
ID: "foobarbiff",
|
||||
Name: "foo",
|
||||
Tags: []string{"bar", "biff"},
|
||||
}
|
||||
if err := agent.ServiceRegister(reg); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// we do not want to find this one.
|
||||
reg = &api.AgentServiceRegistration{
|
||||
ID: "foobuzzbiff",
|
||||
Name: "foo",
|
||||
Tags: []string{"buzz", "biff"},
|
||||
}
|
||||
if err := agent.ServiceRegister(reg); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// we want to find this one
|
||||
reg = &api.AgentServiceRegistration{
|
||||
ID: "foobarbuzzbiff",
|
||||
Name: "foo",
|
||||
Tags: []string{"bar", "buzz", "biff"},
|
||||
}
|
||||
if err := agent.ServiceRegister(reg); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := plan.Run(s.HTTPAddr); err != nil {
|
||||
t.Errorf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
defer plan.Stop()
|
||||
|
||||
// Wait for second wakeup.
|
||||
<-notifyCh
|
||||
|
||||
plan.Stop()
|
||||
wg.Wait()
|
||||
|
||||
require.Len(t, wakeups, 1)
|
||||
|
||||
{
|
||||
v := wakeups[0]
|
||||
require.Len(t, v, 1)
|
||||
|
||||
require.Equal(t, "foobarbuzzbiff", v[0].Service.ID)
|
||||
require.ElementsMatch(t, []string{"bar", "buzz", "biff"}, v[0].Service.Tags)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChecksWatch_State(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
|
@ -772,6 +1012,190 @@ func TestChecksWatch_Service(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestChecksWatch_Service_Filter(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
s.WaitForSerfCheck(t)
|
||||
|
||||
var (
|
||||
wakeups [][]*api.HealthCheck
|
||||
notifyCh = make(chan struct{})
|
||||
)
|
||||
|
||||
plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`)
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil {
|
||||
return // ignore
|
||||
}
|
||||
v, ok := raw.([]*api.HealthCheck)
|
||||
if !ok {
|
||||
return // ignore
|
||||
}
|
||||
wakeups = append(wakeups, v)
|
||||
notifyCh <- struct{}{}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := plan.Run(s.HTTPAddr); err != nil {
|
||||
t.Errorf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
defer plan.Stop()
|
||||
|
||||
// Wait for first wakeup.
|
||||
<-notifyCh
|
||||
{
|
||||
catalog := c.Catalog()
|
||||
reg := &api.CatalogRegistration{
|
||||
Node: "foobar",
|
||||
Address: "1.1.1.1",
|
||||
Datacenter: "dc1",
|
||||
Service: &api.AgentService{
|
||||
ID: "foobar",
|
||||
Service: "foobar",
|
||||
Tags: []string{"a", "b"},
|
||||
},
|
||||
Check: &api.AgentCheck{
|
||||
Node: "foobar",
|
||||
CheckID: "foobar",
|
||||
Name: "foobar",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "foobar",
|
||||
},
|
||||
}
|
||||
if _, err := catalog.Register(reg, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for second wakeup.
|
||||
<-notifyCh
|
||||
|
||||
plan.Stop()
|
||||
wg.Wait()
|
||||
|
||||
require.Len(t, wakeups, 2)
|
||||
|
||||
{
|
||||
v := wakeups[0]
|
||||
require.Len(t, v, 0)
|
||||
}
|
||||
{
|
||||
v := wakeups[1]
|
||||
require.Len(t, v, 1)
|
||||
require.Equal(t, "foobar", v[0].CheckID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChecksWatch_Filter(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
s.WaitForSerfCheck(t)
|
||||
|
||||
var (
|
||||
wakeups [][]*api.HealthCheck
|
||||
notifyCh = make(chan struct{})
|
||||
)
|
||||
|
||||
plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`)
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil {
|
||||
return // ignore
|
||||
}
|
||||
v, ok := raw.([]*api.HealthCheck)
|
||||
if !ok {
|
||||
return // ignore
|
||||
}
|
||||
wakeups = append(wakeups, v)
|
||||
notifyCh <- struct{}{}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := plan.Run(s.HTTPAddr); err != nil {
|
||||
t.Errorf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
defer plan.Stop()
|
||||
|
||||
// Wait for first wakeup.
|
||||
<-notifyCh
|
||||
{
|
||||
catalog := c.Catalog()
|
||||
|
||||
// we don't want to find this one
|
||||
reg := &api.CatalogRegistration{
|
||||
Node: "foo",
|
||||
Address: "1.1.1.1",
|
||||
Datacenter: "dc1",
|
||||
Service: &api.AgentService{
|
||||
ID: "foo",
|
||||
Service: "foo",
|
||||
Tags: []string{"a"},
|
||||
},
|
||||
Check: &api.AgentCheck{
|
||||
Node: "foo",
|
||||
CheckID: "foo",
|
||||
Name: "foo",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "foo",
|
||||
},
|
||||
}
|
||||
if _, err := catalog.Register(reg, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// we want to find this one
|
||||
reg = &api.CatalogRegistration{
|
||||
Node: "bar",
|
||||
Address: "2.2.2.2",
|
||||
Datacenter: "dc1",
|
||||
Service: &api.AgentService{
|
||||
ID: "bar",
|
||||
Service: "bar",
|
||||
Tags: []string{"a", "b"},
|
||||
},
|
||||
Check: &api.AgentCheck{
|
||||
Node: "bar",
|
||||
CheckID: "bar",
|
||||
Name: "bar",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "bar",
|
||||
},
|
||||
}
|
||||
if _, err := catalog.Register(reg, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for second wakeup.
|
||||
<-notifyCh
|
||||
|
||||
plan.Stop()
|
||||
wg.Wait()
|
||||
|
||||
require.Len(t, wakeups, 2)
|
||||
|
||||
{
|
||||
v := wakeups[0]
|
||||
require.Len(t, v, 0)
|
||||
}
|
||||
{
|
||||
v := wakeups[1]
|
||||
require.Len(t, v, 1)
|
||||
require.Equal(t, "bar", v[0].CheckID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventWatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
|
|
|
@ -45,6 +45,7 @@ type cmd struct {
|
|||
state string
|
||||
name string
|
||||
shell bool
|
||||
filter string
|
||||
}
|
||||
|
||||
func (c *cmd) init() {
|
||||
|
@ -71,6 +72,7 @@ func (c *cmd) init() {
|
|||
"Specifies the states to watch. Optional for 'checks' type.")
|
||||
c.flags.StringVar(&c.name, "name", "",
|
||||
"Specifies an event name to watch. Only for 'event' type.")
|
||||
c.flags.StringVar(&c.filter, "filter", "", "Filter to use with the request")
|
||||
|
||||
c.http = &flags.HTTPFlags{}
|
||||
flags.Merge(c.flags, c.http.ClientFlags())
|
||||
|
@ -128,6 +130,9 @@ func (c *cmd) Run(args []string) int {
|
|||
if c.service != "" {
|
||||
params["service"] = c.service
|
||||
}
|
||||
if c.filter != "" {
|
||||
params["filter"] = c.filter
|
||||
}
|
||||
if len(c.tag) > 0 {
|
||||
params["tag"] = c.tag
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue