From fdf2a10835aea53a02b3cffcdbff77bb1498d73a Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 18 Jan 2017 16:23:33 -0500 Subject: [PATCH] First pass at adding node meta filter to prepared queries --- consul/health_endpoint.go | 8 +-- consul/prepared_query/template_test.go | 8 +++ consul/prepared_query/walk.go | 19 +++++++ consul/prepared_query/walk_test.go | 3 ++ consul/prepared_query_endpoint.go | 17 ++++++ consul/prepared_query_endpoint_test.go | 74 ++++++++++++++++++++++++++ consul/structs/prepared_query.go | 5 ++ 7 files changed, 127 insertions(+), 7 deletions(-) diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 242309b29..7eb28ce2a 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -138,13 +138,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc reply.Index, reply.Nodes = index, nodes if len(args.NodeMetaFilters) > 0 { - var filtered structs.CheckServiceNodes - for _, node := range nodes { - if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) { - filtered = append(filtered, node) - } - } - reply.Nodes = filtered + reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes) } if err := h.srv.filterACL(args.Token, reply); err != nil { return err diff --git a/consul/prepared_query/template_test.go b/consul/prepared_query/template_test.go index 5d58d3798..24f32899c 100644 --- a/consul/prepared_query/template_test.go +++ b/consul/prepared_query/template_test.go @@ -38,6 +38,11 @@ var ( "${match(1)}", "${match(2)}", }, + NodeMeta: map[string]string{ + "${name.full}": "${name.prefix}", + "${name.suffix}": "${match(0)}", + "${match(1)}": "${match(2)}", + }, }, } @@ -222,6 +227,7 @@ func TestTemplate_Render(t *testing.T) { "${match(4)}", "${40 + 2}", }, + NodeMeta: map[string]string{"${match(1)}": "${match(2)}"}, }, } ct, err := Compile(query) @@ -252,6 +258,7 @@ func TestTemplate_Render(t *testing.T) { "", "42", }, + NodeMeta: map[string]string{"hello": "foo"}, }, } if !reflect.DeepEqual(actual, expected) { @@ -282,6 +289,7 @@ func TestTemplate_Render(t *testing.T) { "", "42", }, + NodeMeta: map[string]string{"": ""}, }, } if !reflect.DeepEqual(actual, expected) { diff --git a/consul/prepared_query/walk.go b/consul/prepared_query/walk.go index 11f6c14dc..21652edac 100644 --- a/consul/prepared_query/walk.go +++ b/consul/prepared_query/walk.go @@ -34,6 +34,25 @@ func visit(path string, v reflect.Value, t reflect.Type, fn visitor) error { return err } } + case reflect.Map: + for i, key := range v.MapKeys() { + value := v.MapIndex(key) + + newKey := reflect.New(key.Type()).Elem() + newKey.SetString(key.String()) + newValue := reflect.New(value.Type()).Elem() + newValue.SetString(value.String()) + + if err := visit(fmt.Sprintf("%s.keys[%d]", path, i), newKey, newKey.Type(), fn); err != nil { + return err + } + if err := visit(fmt.Sprintf("%s[%s]", path, key.String()), newValue, newValue.Type(), fn); err != nil { + return err + } + // delete the old entry and add the new one + v.SetMapIndex(key, reflect.Value{}) + v.SetMapIndex(newKey, newValue) + } } return nil } diff --git a/consul/prepared_query/walk_test.go b/consul/prepared_query/walk_test.go index 05294e3b6..06bb6ae2b 100644 --- a/consul/prepared_query/walk_test.go +++ b/consul/prepared_query/walk_test.go @@ -22,6 +22,7 @@ func TestWalk_ServiceQuery(t *testing.T) { }, Near: "_agent", Tags: []string{"tag1", "tag2", "tag3"}, + NodeMeta: map[string]string{"role": "server"}, } if err := walk(service, fn); err != nil { t.Fatalf("err: %v", err) @@ -35,6 +36,8 @@ func TestWalk_ServiceQuery(t *testing.T) { ".Tags[0]:tag1", ".Tags[1]:tag2", ".Tags[2]:tag3", + ".NodeMeta.keys[0]:role", + ".NodeMeta[role]:server", } if !reflect.DeepEqual(actual, expected) { t.Fatalf("bad: %#v", actual) diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index b73a4c848..cae1f9dcf 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -492,6 +492,11 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery, // Filter out any unhealthy nodes. nodes = nodes.Filter(query.Service.OnlyPassing) + // Apply the node metadata filters, if any. + if len(query.Service.NodeMeta) > 0 { + nodes = nodeMetaFilter(query.Service.NodeMeta, nodes) + } + // Apply the tag filters, if any. if len(query.Service.Tags) > 0 { nodes = tagFilter(query.Service.Tags, nodes) @@ -562,6 +567,18 @@ func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServ return nodes[:n] } +// nodeMetaFilter returns a list of the nodes who satisfy the given metadata filters. Nodes +// must have ALL the given tags. +func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes { + var filtered structs.CheckServiceNodes + for _, node := range nodes { + if structs.SatisfiesMetaFilters(node.Node.Meta, filters) { + filtered = append(filtered, node) + } + } + return filtered +} + // queryServer is a wrapper that makes it easier to test the failover logic. type queryServer interface { GetLogger() *log.Logger diff --git a/consul/prepared_query_endpoint_test.go b/consul/prepared_query_endpoint_test.go index d6516c09d..54dbd95c3 100644 --- a/consul/prepared_query_endpoint_test.go +++ b/consul/prepared_query_endpoint_test.go @@ -1482,6 +1482,10 @@ func TestPreparedQuery_Execute(t *testing.T) { Datacenter: dc, Node: fmt.Sprintf("node%d", i+1), Address: fmt.Sprintf("127.0.0.%d", i+1), + NodeMeta: map[string]string{ + "group": fmt.Sprintf("%d", i/5), + "instance_type": "t2.micro", + }, Service: &structs.NodeService{ Service: "foo", Port: 8000, @@ -1489,6 +1493,9 @@ func TestPreparedQuery_Execute(t *testing.T) { }, WriteRequest: structs.WriteRequest{Token: "root"}, } + if i == 0 { + req.NodeMeta["unique"] = "true" + } var codec rpc.ClientCodec if dc == "dc1" { @@ -1587,6 +1594,73 @@ func TestPreparedQuery_Execute(t *testing.T) { } } + // Run various service queries with node metadata filters. + if false { + cases := []struct{ + filters map[string]string + numNodes int + }{ + { + filters: map[string]string{}, + numNodes: 10, + }, + { + filters: map[string]string{"instance_type": "t2.micro"}, + numNodes: 10, + }, + { + filters: map[string]string{"group": "1"}, + numNodes: 5, + }, + { + filters: map[string]string{"group": "0", "unique": "true"}, + numNodes: 1, + }, + } + + for _, tc := range cases { + nodeMetaQuery := structs.PreparedQueryRequest{ + Datacenter: "dc1", + Op: structs.PreparedQueryCreate, + Query: &structs.PreparedQuery{ + Service: structs.ServiceQuery{ + Service: "foo", + NodeMeta: tc.filters, + }, + DNS: structs.QueryDNSOptions{ + TTL: "10s", + }, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID); err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: nodeMetaQuery.Query.ID, + QueryOptions: structs.QueryOptions{Token: execToken}, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != tc.numNodes { + t.Fatalf("bad: %v, %v", len(reply.Nodes), tc.numNodes) + } + + for _, node := range reply.Nodes { + if !structs.SatisfiesMetaFilters(node.Node.Meta, tc.filters) { + t.Fatalf("bad: %v", node.Node.Meta) + } + } + } + } + + // Push a coordinate for one of the nodes so we can try an RTT sort. We // have to sleep a little while for the coordinate batch to get flushed. { diff --git a/consul/structs/prepared_query.go b/consul/structs/prepared_query.go index 5e9c31847..af535f010 100644 --- a/consul/structs/prepared_query.go +++ b/consul/structs/prepared_query.go @@ -44,6 +44,11 @@ type ServiceQuery struct { // this list it must be present. If the tag is preceded with "!" then // it is disallowed. Tags []string + + // NodeMeta is a map of required node metadata fields. If a key/value + // pair is in this map it must be present on the node in order for the + // service entry to be returned. + NodeMeta map[string]string } const (