diff --git a/.changelog/17780.txt b/.changelog/17780.txt new file mode 100644 index 000000000..b90925a8b --- /dev/null +++ b/.changelog/17780.txt @@ -0,0 +1,3 @@ +```release-note:feature +cli: `consul watch` command uses `-filter` expression to filter response from checks, services, nodes, and service. +``` diff --git a/agent/consul/health_endpoint_test.go b/agent/consul/health_endpoint_test.go index cd37b5ec4..21a83ea90 100644 --- a/agent/consul/health_endpoint_test.go +++ b/agent/consul/health_endpoint_test.go @@ -1767,5 +1767,11 @@ func TestHealth_RPC_Filter(t *testing.T) { out = new(structs.IndexedHealthChecks) require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out)) require.Len(t, out.HealthChecks, 1) + + args.State = api.HealthAny + args.Filter = "connect in ServiceTags and v2 in ServiceTags" + out = new(structs.IndexedHealthChecks) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out)) + require.Len(t, out.HealthChecks, 1) }) } diff --git a/api/watch/funcs.go b/api/watch/funcs.go index 3c057aa66..0d0f6e100 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -92,13 +92,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { // servicesWatch is used to watch the list of available services func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() services, meta, err := catalog.Services(&opts) if err != nil { @@ -112,13 +119,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { // nodesWatch is used to watch the list of available nodes func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() nodes, meta, err := catalog.Nodes(&opts) if err != nil { @@ -132,9 +146,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { // serviceWatch is used to watch a specific service for changes func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } var ( service string @@ -158,6 +176,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (BlockingParamVal, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts) if err != nil { @@ -175,13 +196,16 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - var service, state string + var service, state, filter string if err := assignValue(params, "service", &service); err != nil { return nil, err } if err := assignValue(params, "state", &state); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } if service != "" && state != "" { return nil, fmt.Errorf("Cannot specify service and state") } @@ -196,6 +220,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error + if filter != "" { + opts.Filter = filter + } if state != "" { checks, meta, err = health.State(state, &opts) } else { diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index d972def6a..91318009c 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -378,6 +378,82 @@ func TestServicesWatch(t *testing.T) { } +func TestServicesWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups []map[string][]string + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"services", "filter":"b in ServiceTags and a in ServiceTags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(map[string][]string) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // Register some services + { + agent := c.Agent() + + // we don't want to find this + reg := &api.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + Tags: []string{"b"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // // we want to find this + reg = &api.AgentServiceRegistration{ + ID: "bar", + Name: "bar", + Tags: []string{"a", "b"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + _, ok := v["bar"] + require.True(t, ok) + } +} + func TestNodesWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -453,6 +529,82 @@ func TestNodesWatch(t *testing.T) { } } +func TestNodesWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) // wait for AE to sync + + var ( + wakeups [][]*api.Node + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.Node) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // Register 2 nodes + { + catalog := c.Catalog() + + // we want to find this node + reg := &api.CatalogRegistration{ + Node: "foo", + Address: "1.1.1.1", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // we don't want to find this node + reg = &api.CatalogRegistration{ + Node: "bar", + Address: "2.2.2.2", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + // Start the watch nodes plan + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + require.Equal(t, "foo", v[0].Node) + } +} + func TestServiceWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -616,6 +768,94 @@ func TestServiceMultipleTagsWatch(t *testing.T) { } } +func TestServiceWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups [][]*api.ServiceEntry + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"service", "service":"foo", "filter":"bar in Service.Tags and buzz in Service.Tags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.ServiceEntry) + if !ok { + return // ignore + } + + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // register some services + { + agent := c.Agent() + + // we do not want to find this one. + reg := &api.AgentServiceRegistration{ + ID: "foobarbiff", + Name: "foo", + Tags: []string{"bar", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // we do not want to find this one. + reg = &api.AgentServiceRegistration{ + ID: "foobuzzbiff", + Name: "foo", + Tags: []string{"buzz", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // we want to find this one + reg = &api.AgentServiceRegistration{ + ID: "foobarbuzzbiff", + Name: "foo", + Tags: []string{"bar", "buzz", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + + require.Equal(t, "foobarbuzzbiff", v[0].Service.ID) + require.ElementsMatch(t, []string{"bar", "buzz", "biff"}, v[0].Service.Tags) + } +} + func TestChecksWatch_State(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -772,6 +1012,190 @@ func TestChecksWatch_Service(t *testing.T) { } } +func TestChecksWatch_Service_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups [][]*api.HealthCheck + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.HealthCheck) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + { + catalog := c.Catalog() + reg := &api.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + Service: &api.AgentService{ + ID: "foobar", + Service: "foobar", + Tags: []string{"a", "b"}, + }, + Check: &api.AgentCheck{ + Node: "foobar", + CheckID: "foobar", + Name: "foobar", + Status: api.HealthPassing, + ServiceID: "foobar", + }, + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 2) + + { + v := wakeups[0] + require.Len(t, v, 0) + } + { + v := wakeups[1] + require.Len(t, v, 1) + require.Equal(t, "foobar", v[0].CheckID) + } +} + +func TestChecksWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups [][]*api.HealthCheck + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.HealthCheck) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + { + catalog := c.Catalog() + + // we don't want to find this one + reg := &api.CatalogRegistration{ + Node: "foo", + Address: "1.1.1.1", + Datacenter: "dc1", + Service: &api.AgentService{ + ID: "foo", + Service: "foo", + Tags: []string{"a"}, + }, + Check: &api.AgentCheck{ + Node: "foo", + CheckID: "foo", + Name: "foo", + Status: api.HealthPassing, + ServiceID: "foo", + }, + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // we want to find this one + reg = &api.CatalogRegistration{ + Node: "bar", + Address: "2.2.2.2", + Datacenter: "dc1", + Service: &api.AgentService{ + ID: "bar", + Service: "bar", + Tags: []string{"a", "b"}, + }, + Check: &api.AgentCheck{ + Node: "bar", + CheckID: "bar", + Name: "bar", + Status: api.HealthPassing, + ServiceID: "bar", + }, + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 2) + + { + v := wakeups[0] + require.Len(t, v, 0) + } + { + v := wakeups[1] + require.Len(t, v, 1) + require.Equal(t, "bar", v[0].CheckID) + } +} + func TestEventWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) diff --git a/command/watch/watch.go b/command/watch/watch.go index 791f93a57..b187fa369 100644 --- a/command/watch/watch.go +++ b/command/watch/watch.go @@ -45,6 +45,7 @@ type cmd struct { state string name string shell bool + filter string } func (c *cmd) init() { @@ -71,6 +72,7 @@ func (c *cmd) init() { "Specifies the states to watch. Optional for 'checks' type.") c.flags.StringVar(&c.name, "name", "", "Specifies an event name to watch. Only for 'event' type.") + c.flags.StringVar(&c.filter, "filter", "", "Filter to use with the request") c.http = &flags.HTTPFlags{} flags.Merge(c.flags, c.http.ClientFlags()) @@ -128,6 +130,9 @@ func (c *cmd) Run(args []string) int { if c.service != "" { params["service"] = c.service } + if c.filter != "" { + params["filter"] = c.filter + } if len(c.tag) > 0 { params["tag"] = c.tag }