Merge pull request #2668 from hashicorp/f-prepared-query-nodemeta
Node metadata support in prepared queries
This commit is contained in:
commit
575f3d4086
|
@ -43,6 +43,11 @@ type ServiceQuery struct {
|
|||
// this list it must be present. If the tag is preceded with "!" then
|
||||
// it is disallowed.
|
||||
Tags []string
|
||||
|
||||
// NodeMeta is a map of required node metadata fields. If a key/value
|
||||
// pair is in this map it must be present on the node in order for the
|
||||
// service entry to be returned.
|
||||
NodeMeta map[string]string
|
||||
}
|
||||
|
||||
// QueryTemplate carries the arguments for creating a templated query.
|
||||
|
|
|
@ -20,6 +20,7 @@ func TestPreparedQuery(t *testing.T) {
|
|||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.1",
|
||||
},
|
||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||
Service: &AgentService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
|
@ -47,7 +48,8 @@ func TestPreparedQuery(t *testing.T) {
|
|||
def := &PreparedQueryDefinition{
|
||||
Name: "test",
|
||||
Service: ServiceQuery{
|
||||
Service: "redis",
|
||||
Service: "redis",
|
||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -42,26 +42,11 @@ const (
|
|||
"but no reason was provided. This is a default message."
|
||||
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
|
||||
"service, but no reason was provided. This is a default message."
|
||||
|
||||
// The meta key prefix reserved for Consul's internal use
|
||||
metaKeyReservedPrefix = "consul-"
|
||||
|
||||
// The maximum number of metadata key pairs allowed to be registered
|
||||
metaMaxKeyPairs = 64
|
||||
|
||||
// The maximum allowed length of a metadata key
|
||||
metaKeyMaxLength = 128
|
||||
|
||||
// The maximum allowed length of a metadata value
|
||||
metaValueMaxLength = 512
|
||||
)
|
||||
|
||||
var (
|
||||
// dnsNameRe checks if a name or tag is dns-compatible.
|
||||
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
|
||||
|
||||
// metaKeyFormat checks if a metadata key string is valid
|
||||
metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
|
||||
)
|
||||
|
||||
/*
|
||||
|
@ -1789,41 +1774,6 @@ func parseMetaPair(raw string) (string, string) {
|
|||
}
|
||||
}
|
||||
|
||||
// validateMeta validates a set of key/value pairs from the agent config
|
||||
func validateMetadata(meta map[string]string) error {
|
||||
if len(meta) > metaMaxKeyPairs {
|
||||
return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs)
|
||||
}
|
||||
|
||||
for key, value := range meta {
|
||||
if err := validateMetaPair(key, value); err != nil {
|
||||
return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateMetaPair checks that the given key/value pair is in a valid format
|
||||
func validateMetaPair(key, value string) error {
|
||||
if key == "" {
|
||||
return fmt.Errorf("Key cannot be blank")
|
||||
}
|
||||
if !metaKeyFormat(key) {
|
||||
return fmt.Errorf("Key contains invalid characters")
|
||||
}
|
||||
if len(key) > metaKeyMaxLength {
|
||||
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
|
||||
}
|
||||
if strings.HasPrefix(key, metaKeyReservedPrefix) {
|
||||
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
|
||||
}
|
||||
if len(value) > metaValueMaxLength {
|
||||
return fmt.Errorf("Value is too long (limit: %d characters)", metaValueMaxLength)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// unloadMetadata resets the local metadata state
|
||||
func (a *Agent) unloadMetadata() {
|
||||
a.state.Lock()
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -21,7 +22,6 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/raft"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -1919,69 +1919,6 @@ func TestAgent_purgeCheckState(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_metadata(t *testing.T) {
|
||||
// Load a valid set of key/value pairs
|
||||
meta := map[string]string{
|
||||
"key1": "value1",
|
||||
"key2": "value2",
|
||||
}
|
||||
// Should succeed
|
||||
if err := validateMetadata(meta); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Should get error
|
||||
meta = map[string]string{
|
||||
"": "value1",
|
||||
}
|
||||
if err := validateMetadata(meta); !strings.Contains(err.Error(), "Couldn't load metadata pair") {
|
||||
t.Fatalf("should have failed")
|
||||
}
|
||||
|
||||
// Should get error
|
||||
meta = make(map[string]string)
|
||||
for i := 0; i < metaMaxKeyPairs+1; i++ {
|
||||
meta[string(i)] = "value"
|
||||
}
|
||||
if err := validateMetadata(meta); !strings.Contains(err.Error(), "cannot contain more than") {
|
||||
t.Fatalf("should have failed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_validateMetaPair(t *testing.T) {
|
||||
longKey := strings.Repeat("a", metaKeyMaxLength+1)
|
||||
longValue := strings.Repeat("b", metaValueMaxLength+1)
|
||||
pairs := []struct {
|
||||
Key string
|
||||
Value string
|
||||
Error string
|
||||
}{
|
||||
// valid pair
|
||||
{"key", "value", ""},
|
||||
// invalid, blank key
|
||||
{"", "value", "cannot be blank"},
|
||||
// allowed special chars in key name
|
||||
{"k_e-y", "value", ""},
|
||||
// disallowed special chars in key name
|
||||
{"(%key&)", "value", "invalid characters"},
|
||||
// key too long
|
||||
{longKey, "value", "Key is too long"},
|
||||
// reserved prefix
|
||||
{metaKeyReservedPrefix + "key", "value", "reserved for internal use"},
|
||||
// value too long
|
||||
{"key", longValue, "Value is too long"},
|
||||
}
|
||||
|
||||
for _, pair := range pairs {
|
||||
err := validateMetaPair(pair.Key, pair.Value)
|
||||
if pair.Error == "" && err != nil {
|
||||
t.Fatalf("should have succeeded: %v, %v", pair, err)
|
||||
} else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) {
|
||||
t.Fatalf("should have failed: %v, %v", pair, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_GetCoordinate(t *testing.T) {
|
||||
check := func(server bool) {
|
||||
config := nextConfig()
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logger"
|
||||
"github.com/hashicorp/consul/watch"
|
||||
|
@ -397,7 +398,7 @@ func (c *Command) readConfig() *Config {
|
|||
}
|
||||
|
||||
// Verify the node metadata entries are valid
|
||||
if err := validateMetadata(config.Meta); err != nil {
|
||||
if err := structs.ValidateMetadata(config.Meta); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -90,6 +90,7 @@ func TestPreparedQuery_Create(t *testing.T) {
|
|||
},
|
||||
OnlyPassing: true,
|
||||
Tags: []string{"foo", "bar"},
|
||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||
},
|
||||
DNS: structs.QueryDNSOptions{
|
||||
TTL: "10s",
|
||||
|
@ -120,6 +121,7 @@ func TestPreparedQuery_Create(t *testing.T) {
|
|||
},
|
||||
"OnlyPassing": true,
|
||||
"Tags": []string{"foo", "bar"},
|
||||
"NodeMeta": map[string]string{"somekey": "somevalue"},
|
||||
},
|
||||
"DNS": map[string]interface{}{
|
||||
"TTL": "10s",
|
||||
|
@ -645,6 +647,7 @@ func TestPreparedQuery_Update(t *testing.T) {
|
|||
},
|
||||
OnlyPassing: true,
|
||||
Tags: []string{"foo", "bar"},
|
||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||
},
|
||||
DNS: structs.QueryDNSOptions{
|
||||
TTL: "10s",
|
||||
|
@ -676,6 +679,7 @@ func TestPreparedQuery_Update(t *testing.T) {
|
|||
},
|
||||
"OnlyPassing": true,
|
||||
"Tags": []string{"foo", "bar"},
|
||||
"NodeMeta": map[string]string{"somekey": "somevalue"},
|
||||
},
|
||||
"DNS": map[string]interface{}{
|
||||
"TTL": "10s",
|
||||
|
|
|
@ -138,13 +138,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
|||
|
||||
reply.Index, reply.Nodes = index, nodes
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
var filtered structs.CheckServiceNodes
|
||||
for _, node := range nodes {
|
||||
if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) {
|
||||
filtered = append(filtered, node)
|
||||
}
|
||||
}
|
||||
reply.Nodes = filtered
|
||||
reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
|
||||
}
|
||||
if err := h.srv.filterACL(args.Token, reply); err != nil {
|
||||
return err
|
||||
|
|
|
@ -38,6 +38,11 @@ var (
|
|||
"${match(1)}",
|
||||
"${match(2)}",
|
||||
},
|
||||
NodeMeta: map[string]string{
|
||||
"foo": "${name.prefix}",
|
||||
"bar": "${match(0)}",
|
||||
"baz": "${match(1)}",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -222,6 +227,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||
"${match(4)}",
|
||||
"${40 + 2}",
|
||||
},
|
||||
NodeMeta: map[string]string{"foo": "${match(1)}"},
|
||||
},
|
||||
}
|
||||
ct, err := Compile(query)
|
||||
|
@ -252,6 +258,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||
"",
|
||||
"42",
|
||||
},
|
||||
NodeMeta: map[string]string{"foo": "hello"},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
|
@ -282,6 +289,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||
"",
|
||||
"42",
|
||||
},
|
||||
NodeMeta: map[string]string{"foo": ""},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
|
|
|
@ -34,6 +34,20 @@ func visit(path string, v reflect.Value, t reflect.Type, fn visitor) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
case reflect.Map:
|
||||
for _, key := range v.MapKeys() {
|
||||
value := v.MapIndex(key)
|
||||
|
||||
newValue := reflect.New(value.Type()).Elem()
|
||||
newValue.SetString(value.String())
|
||||
|
||||
if err := visit(fmt.Sprintf("%s[%s]", path, key.String()), newValue, newValue.Type(), fn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// overwrite the entry in case it was modified by the callback
|
||||
v.SetMapIndex(key, newValue)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"sort"
|
||||
)
|
||||
|
||||
func TestWalk_ServiceQuery(t *testing.T) {
|
||||
|
@ -20,22 +21,26 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
|||
Failover: structs.QueryDatacenterOptions{
|
||||
Datacenters: []string{"dc1", "dc2"},
|
||||
},
|
||||
Near: "_agent",
|
||||
Tags: []string{"tag1", "tag2", "tag3"},
|
||||
Near: "_agent",
|
||||
Tags: []string{"tag1", "tag2", "tag3"},
|
||||
NodeMeta: map[string]string{"foo": "bar", "role": "server"},
|
||||
}
|
||||
if err := walk(service, fn); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
expected := []string{
|
||||
".Service:the-service",
|
||||
".Failover.Datacenters[0]:dc1",
|
||||
".Failover.Datacenters[1]:dc2",
|
||||
".Near:_agent",
|
||||
".NodeMeta[foo]:bar",
|
||||
".NodeMeta[role]:server",
|
||||
".Service:the-service",
|
||||
".Tags[0]:tag1",
|
||||
".Tags[1]:tag2",
|
||||
".Tags[2]:tag3",
|
||||
}
|
||||
sort.Strings(actual)
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %#v", actual)
|
||||
}
|
||||
|
|
|
@ -178,6 +178,11 @@ func parseService(svc *structs.ServiceQuery) error {
|
|||
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
|
||||
}
|
||||
|
||||
// Make sure the metadata filters are valid
|
||||
if err := structs.ValidateMetadata(svc.NodeMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We skip a few fields:
|
||||
// - There's no validation for Datacenters; we skip any unknown entries
|
||||
// at execution time.
|
||||
|
@ -492,6 +497,11 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
|||
// Filter out any unhealthy nodes.
|
||||
nodes = nodes.Filter(query.Service.OnlyPassing)
|
||||
|
||||
// Apply the node metadata filters, if any.
|
||||
if len(query.Service.NodeMeta) > 0 {
|
||||
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
|
||||
}
|
||||
|
||||
// Apply the tag filters, if any.
|
||||
if len(query.Service.Tags) > 0 {
|
||||
nodes = tagFilter(query.Service.Tags, nodes)
|
||||
|
@ -562,6 +572,18 @@ func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServ
|
|||
return nodes[:n]
|
||||
}
|
||||
|
||||
// nodeMetaFilter returns a list of the nodes who satisfy the given metadata filters. Nodes
|
||||
// must have ALL the given tags.
|
||||
func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
||||
var filtered structs.CheckServiceNodes
|
||||
for _, node := range nodes {
|
||||
if structs.SatisfiesMetaFilters(node.Node.Meta, filters) {
|
||||
filtered = append(filtered, node)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
// queryServer is a wrapper that makes it easier to test the failover logic.
|
||||
type queryServer interface {
|
||||
GetLogger() *log.Logger
|
||||
|
|
|
@ -604,6 +604,17 @@ func TestPreparedQuery_parseQuery(t *testing.T) {
|
|||
if err := parseQuery(query, version8); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
query.Service.NodeMeta = map[string]string{"": "somevalue"}
|
||||
err = parseQuery(query, version8)
|
||||
if err == nil || !strings.Contains(err.Error(), "cannot be blank") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
query.Service.NodeMeta = map[string]string{"somekey": "somevalue"}
|
||||
if err := parseQuery(query, version8); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1482,6 +1493,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
Datacenter: dc,
|
||||
Node: fmt.Sprintf("node%d", i+1),
|
||||
Address: fmt.Sprintf("127.0.0.%d", i+1),
|
||||
NodeMeta: map[string]string{
|
||||
"group": fmt.Sprintf("%d", i/5),
|
||||
"instance_type": "t2.micro",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "foo",
|
||||
Port: 8000,
|
||||
|
@ -1489,6 +1504,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if i == 0 {
|
||||
req.NodeMeta["unique"] = "true"
|
||||
}
|
||||
|
||||
var codec rpc.ClientCodec
|
||||
if dc == "dc1" {
|
||||
|
@ -1587,6 +1605,72 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Run various service queries with node metadata filters.
|
||||
if false {
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
numNodes int
|
||||
}{
|
||||
{
|
||||
filters: map[string]string{},
|
||||
numNodes: 10,
|
||||
},
|
||||
{
|
||||
filters: map[string]string{"instance_type": "t2.micro"},
|
||||
numNodes: 10,
|
||||
},
|
||||
{
|
||||
filters: map[string]string{"group": "1"},
|
||||
numNodes: 5,
|
||||
},
|
||||
{
|
||||
filters: map[string]string{"group": "0", "unique": "true"},
|
||||
numNodes: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
nodeMetaQuery := structs.PreparedQueryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.PreparedQueryCreate,
|
||||
Query: &structs.PreparedQuery{
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "foo",
|
||||
NodeMeta: tc.filters,
|
||||
},
|
||||
DNS: structs.QueryDNSOptions{
|
||||
TTL: "10s",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: nodeMetaQuery.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(reply.Nodes) != tc.numNodes {
|
||||
t.Fatalf("bad: %v, %v", len(reply.Nodes), tc.numNodes)
|
||||
}
|
||||
|
||||
for _, node := range reply.Nodes {
|
||||
if !structs.SatisfiesMetaFilters(node.Node.Meta, tc.filters) {
|
||||
t.Fatalf("bad: %v", node.Node.Meta)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push a coordinate for one of the nodes so we can try an RTT sort. We
|
||||
// have to sleep a little while for the coordinate batch to get flushed.
|
||||
{
|
||||
|
@ -1690,9 +1774,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1725,10 +1808,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1759,9 +1841,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1792,9 +1873,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1821,12 +1901,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
// Expect the set to be shuffled since we have no coordinates
|
||||
// on the "foo" node.
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1861,10 +1940,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -44,6 +44,11 @@ type ServiceQuery struct {
|
|||
// this list it must be present. If the tag is preceded with "!" then
|
||||
// it is disallowed.
|
||||
Tags []string
|
||||
|
||||
// NodeMeta is a map of required node metadata fields. If a key/value
|
||||
// pair is in this map it must be present on the node in order for the
|
||||
// service entry to be returned.
|
||||
NodeMeta map[string]string
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"regexp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -67,6 +69,25 @@ const (
|
|||
ServiceMaintPrefix = "_service_maintenance:"
|
||||
)
|
||||
|
||||
const (
|
||||
// The meta key prefix reserved for Consul's internal use
|
||||
metaKeyReservedPrefix = "consul-"
|
||||
|
||||
// The maximum number of metadata key pairs allowed to be registered
|
||||
metaMaxKeyPairs = 64
|
||||
|
||||
// The maximum allowed length of a metadata key
|
||||
metaKeyMaxLength = 128
|
||||
|
||||
// The maximum allowed length of a metadata value
|
||||
metaValueMaxLength = 512
|
||||
)
|
||||
|
||||
var (
|
||||
// metaKeyFormat checks if a metadata key string is valid
|
||||
metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
|
||||
)
|
||||
|
||||
func ValidStatus(s string) bool {
|
||||
return s == HealthPassing ||
|
||||
s == HealthWarning ||
|
||||
|
@ -292,6 +313,41 @@ type Node struct {
|
|||
}
|
||||
type Nodes []*Node
|
||||
|
||||
// ValidateMeta validates a set of key/value pairs from the agent config
|
||||
func ValidateMetadata(meta map[string]string) error {
|
||||
if len(meta) > metaMaxKeyPairs {
|
||||
return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs)
|
||||
}
|
||||
|
||||
for key, value := range meta {
|
||||
if err := validateMetaPair(key, value); err != nil {
|
||||
return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateMetaPair checks that the given key/value pair is in a valid format
|
||||
func validateMetaPair(key, value string) error {
|
||||
if key == "" {
|
||||
return fmt.Errorf("Key cannot be blank")
|
||||
}
|
||||
if !metaKeyFormat(key) {
|
||||
return fmt.Errorf("Key contains invalid characters")
|
||||
}
|
||||
if len(key) > metaKeyMaxLength {
|
||||
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
|
||||
}
|
||||
if strings.HasPrefix(key, metaKeyReservedPrefix) {
|
||||
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
|
||||
}
|
||||
if len(value) > metaValueMaxLength {
|
||||
return fmt.Errorf("Value is too long (limit: %d characters)", metaValueMaxLength)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SatisfiesMetaFilters returns true if the metadata map contains the given filters
|
||||
func SatisfiesMetaFilters(meta map[string]string, filters map[string]string) bool {
|
||||
for key, value := range filters {
|
||||
|
|
|
@ -501,3 +501,66 @@ func TestStructs_DirEntry_Clone(t *testing.T) {
|
|||
t.Fatalf("clone wasn't independent of the original")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructs_ValidateMetadata(t *testing.T) {
|
||||
// Load a valid set of key/value pairs
|
||||
meta := map[string]string{
|
||||
"key1": "value1",
|
||||
"key2": "value2",
|
||||
}
|
||||
// Should succeed
|
||||
if err := ValidateMetadata(meta); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Should get error
|
||||
meta = map[string]string{
|
||||
"": "value1",
|
||||
}
|
||||
if err := ValidateMetadata(meta); !strings.Contains(err.Error(), "Couldn't load metadata pair") {
|
||||
t.Fatalf("should have failed")
|
||||
}
|
||||
|
||||
// Should get error
|
||||
meta = make(map[string]string)
|
||||
for i := 0; i < metaMaxKeyPairs+1; i++ {
|
||||
meta[string(i)] = "value"
|
||||
}
|
||||
if err := ValidateMetadata(meta); !strings.Contains(err.Error(), "cannot contain more than") {
|
||||
t.Fatalf("should have failed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructs_validateMetaPair(t *testing.T) {
|
||||
longKey := strings.Repeat("a", metaKeyMaxLength+1)
|
||||
longValue := strings.Repeat("b", metaValueMaxLength+1)
|
||||
pairs := []struct {
|
||||
Key string
|
||||
Value string
|
||||
Error string
|
||||
}{
|
||||
// valid pair
|
||||
{"key", "value", ""},
|
||||
// invalid, blank key
|
||||
{"", "value", "cannot be blank"},
|
||||
// allowed special chars in key name
|
||||
{"k_e-y", "value", ""},
|
||||
// disallowed special chars in key name
|
||||
{"(%key&)", "value", "invalid characters"},
|
||||
// key too long
|
||||
{longKey, "value", "Key is too long"},
|
||||
// reserved prefix
|
||||
{metaKeyReservedPrefix + "key", "value", "reserved for internal use"},
|
||||
// value too long
|
||||
{"key", longValue, "Value is too long"},
|
||||
}
|
||||
|
||||
for _, pair := range pairs {
|
||||
err := validateMetaPair(pair.Key, pair.Value)
|
||||
if pair.Error == "" && err != nil {
|
||||
t.Fatalf("should have succeeded: %v, %v", pair, err)
|
||||
} else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) {
|
||||
t.Fatalf("should have failed: %v, %v", pair, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,8 @@ query, like this example:
|
|||
},
|
||||
"Near": "node1",
|
||||
"OnlyPassing": false,
|
||||
"Tags": ["primary", "!experimental"]
|
||||
"Tags": ["primary", "!experimental"],
|
||||
"NodeMeta": {"instance_type": "m3.large"}
|
||||
},
|
||||
"DNS": {
|
||||
"TTL": "10s"
|
||||
|
@ -162,6 +163,10 @@ to pass the tag filter it must have *all* of the required tags, and *none* of th
|
|||
excluded tags (prefixed with `!`). The default value is an empty list, which does
|
||||
no tag filtering.
|
||||
|
||||
`NodeMeta` provides a list of user-defined key/value pairs that will be used for
|
||||
filtering the query results to nodes with the given metadata values present. This
|
||||
was added in Consul 0.7.3.
|
||||
|
||||
`TTL` in the `DNS` structure is a duration string that can use `s` as a
|
||||
suffix for seconds. It controls how the TTL is set when query results are served
|
||||
over DNS. If this isn't specified, then the Consul agent configuration for the given
|
||||
|
@ -199,7 +204,8 @@ and features. Here's an example:
|
|||
"Datacenters": ["dc1", "dc2"]
|
||||
},
|
||||
"OnlyPassing": true,
|
||||
"Tags": ["${match(2)}"]
|
||||
"Tags": ["${match(2)}"],
|
||||
"NodeMeta": {"instance_type": "m3.large"}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
@ -303,7 +309,8 @@ This returns a JSON list of prepared queries, which looks like:
|
|||
"Datacenters": ["dc1", "dc2"]
|
||||
},
|
||||
"OnlyPassing": false,
|
||||
"Tags": ["primary", "!experimental"]
|
||||
"Tags": ["primary", "!experimental"],
|
||||
"NodeMeta": {"instance_type": "m3.large"}
|
||||
},
|
||||
"DNS": {
|
||||
"TTL": "10s"
|
||||
|
@ -408,7 +415,8 @@ a JSON body will be returned like this:
|
|||
"TaggedAddresses": {
|
||||
"lan": "10.1.10.12",
|
||||
"wan": "10.1.10.12"
|
||||
}
|
||||
},
|
||||
"NodeMeta": {"instance_type": "m3.large"}
|
||||
},
|
||||
"Service": {
|
||||
"ID": "redis",
|
||||
|
@ -500,7 +508,8 @@ a JSON body will be returned like this:
|
|||
"Datacenters": ["dc1", "dc2"]
|
||||
},
|
||||
"OnlyPassing": true,
|
||||
"Tags": ["primary"]
|
||||
"Tags": ["primary"],
|
||||
"NodeMeta": {"instance_type": "m3.large"}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
|
Loading…
Reference in New Issue