From 2a423c6e2c0461f0ea6c670965b17210e547f313 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 5 Jan 2017 14:10:26 -0800 Subject: [PATCH 1/9] Add support for setting node metadata fields --- command/agent/agent.go | 36 ++++++++++++++++++++++++++++++- command/agent/agent_endpoint.go | 2 ++ command/agent/catalog_endpoint.go | 8 +++++++ command/agent/command.go | 12 +++++++++-- command/agent/config.go | 11 ++++++++++ command/agent/config_test.go | 19 ++++++++++++++++ command/agent/local.go | 26 +++++++++++++++++++--- consul/catalog_endpoint.go | 10 ++++++++- consul/state/schema.go | 9 ++++++++ consul/state/state_store.go | 13 +++++++++-- consul/structs/structs.go | 13 ++++++++--- consul/structs/structs_test.go | 7 +++++- 12 files changed, 153 insertions(+), 13 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 789768a5b..1a0cf207a 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -246,13 +246,16 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, return nil, err } - // Load checks/services. + // Load checks/services/metadata. if err := agent.loadServices(config); err != nil { return nil, err } if err := agent.loadChecks(config); err != nil { return nil, err } + if err := agent.loadMetadata(config); err != nil { + return nil, err + } // Start watching for critical services to deregister, based on their // checks. @@ -1677,6 +1680,37 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { } } +// loadMetadata loads and validates node metadata fields from the config and +// updates them on the local agent. +func (a *Agent) loadMetadata(conf *Config) error { + a.state.Lock() + defer a.state.Unlock() + + for key, value := range conf.Meta { + if strings.Contains(key, ":") { + return fmt.Errorf("Key name cannot contain ':' character: %s", key) + } + if strings.HasPrefix(key, "consul-") { + return fmt.Errorf("Key prefix 'consul-' is reserved for internal use") + } + a.state.metadata[key] = value + } + + a.state.changeMade() + + return nil +} + +// unloadMetadata resets the local metadata state +func (a *Agent) unloadMetadata() error { + a.state.Lock() + defer a.state.Unlock() + + a.state.metadata = make(map[string]string) + + return nil +} + // serviceMaintCheckID returns the ID of a given service's maintenance check func serviceMaintCheckID(serviceID string) types.CheckID { return types.CheckID(structs.ServiceMaintPrefix + serviceID) diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index e96218d86..48c0b27f4 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -20,6 +20,7 @@ type AgentSelf struct { Coord *coordinate.Coordinate Member serf.Member Stats map[string]map[string]string + Meta map[string]string } func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -47,6 +48,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int Coord: c, Member: s.agent.LocalMember(), Stats: s.agent.Stats(), + Meta: s.agent.state.Metadata(), }, nil } diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index efccfba73..13d425044 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -67,6 +67,14 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } + // Try to parse node metadata filter params + if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 { + pair := strings.SplitN(filter[0], ":", 2) + args.NodeMetaKey = pair[0] + if len(pair) == 2 { + args.NodeMetaValue = pair[1] + } + } var out structs.IndexedNodes defer setMeta(resp, &out.QueryMeta) diff --git a/command/agent/command.go b/command/agent/command.go index f81291453..165839fdc 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -1071,7 +1071,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) { snap := c.agent.snapshotCheckState() defer c.agent.restoreCheckState(snap) - // First unload all checks and services. This lets us begin the reload + // First unload all checks, services, and metadata. This lets us begin the reload // with a clean slate. if err := c.agent.unloadServices(); err != nil { errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err)) @@ -1081,8 +1081,12 @@ func (c *Command) handleReload(config *Config) (*Config, error) { errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err)) return nil, errs } + if err := c.agent.unloadMetadata(); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed unloading metadata: %s", err)) + return nil, errs + } - // Reload services and check definitions. + // Reload service/check definitions and metadata. if err := c.agent.loadServices(newConf); err != nil { errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err)) return nil, errs @@ -1091,6 +1095,10 @@ func (c *Command) handleReload(config *Config) (*Config, error) { errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err)) return nil, errs } + if err := c.agent.loadMetadata(newConf); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err)) + return nil, errs + } // Get the new client listener addr httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) diff --git a/command/agent/config.go b/command/agent/config.go index 4d95d6244..c4f35f3d0 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -342,6 +342,9 @@ type Config struct { // they are configured with TranslateWanAddrs set to true. TaggedAddresses map[string]string + // Node metadata + Meta map[string]string `mapstructure:"node_meta" json:"-"` + // LeaveOnTerm controls if Serf does a graceful leave when receiving // the TERM signal. Defaults true on clients, false on servers. This can // be changed on reload. @@ -1577,6 +1580,14 @@ func MergeConfig(a, b *Config) *Config { result.HTTPAPIResponseHeaders[field] = value } } + if len(b.Meta) != 0 { + if result.Meta == nil { + result.Meta = make(map[string]string) + } + for field, value := range b.Meta { + result.Meta[field] = value + } + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index caef0ce6f..b5cc3639d 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -281,6 +281,19 @@ func TestDecodeConfig(t *testing.T) { t.Fatalf("bad: %#v", config) } + // Node metadata fields + input = `{"node_meta": {"thing1": "1", "thing2": "2"}}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + if v, ok := config.Meta["thing1"]; !ok || v != "1" { + t.Fatalf("bad: %#v", config) + } + if v, ok := config.Meta["thing2"]; !ok || v != "2" { + t.Fatalf("bad: %#v", config) + } + // leave_on_terminate input = `{"leave_on_terminate": true}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) @@ -1519,6 +1532,9 @@ func TestMergeConfig(t *testing.T) { DogStatsdAddr: "nope", DogStatsdTags: []string{"nope"}, }, + Meta: map[string]string{ + "key": "value1", + }, } b := &Config{ @@ -1620,6 +1636,9 @@ func TestMergeConfig(t *testing.T) { DogStatsdAddr: "127.0.0.1:7254", DogStatsdTags: []string{"tag_1:val_1", "tag_2:val_2"}, }, + Meta: map[string]string{ + "key": "value2", + }, DisableUpdateCheck: true, DisableAnonymousSignature: true, HTTPAPIResponseHeaders: map[string]string{ diff --git a/command/agent/local.go b/command/agent/local.go index fdd3d3788..853cdd5ad 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -61,6 +61,9 @@ type localState struct { // Used to track checks that are being deferred deferCheck map[types.CheckID]*time.Timer + // metadata tracks the local metadata fields + metadata map[string]string + // consulCh is used to inform of a change to the known // consul nodes. This may be used to retry a sync run consulCh chan struct{} @@ -82,6 +85,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) { l.checkTokens = make(map[types.CheckID]string) l.checkCriticalTime = make(map[types.CheckID]time.Time) l.deferCheck = make(map[types.CheckID]*time.Timer) + l.metadata = make(map[string]string) l.consulCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1) } @@ -339,6 +343,19 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck { return checks } +// Metadata returns the local node metadata fields that the +// agent is aware of and are being kept in sync with the server +func (l *localState) Metadata() map[string]string { + metadata := make(map[string]string) + l.RLock() + defer l.RUnlock() + + for key, value := range l.metadata { + metadata[key] = value + } + return metadata +} + // antiEntropy is a long running method used to perform anti-entropy // between local and remote state. func (l *localState) antiEntropy(shutdownCh chan struct{}) { @@ -412,10 +429,10 @@ func (l *localState) setSyncState() error { l.Lock() defer l.Unlock() - // Check the node info (currently limited to tagged addresses since - // everything else is managed by the Serf layer) + // Check the node info if out1.NodeServices == nil || out1.NodeServices.Node == nil || - !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) { + !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) || + !reflect.DeepEqual(out1.NodeServices.Node.Meta, l.metadata) { l.nodeInfoInSync = false } @@ -619,6 +636,7 @@ func (l *localState) syncService(id string) error { Node: l.config.NodeName, Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, + NodeMeta: l.metadata, Service: l.services[id], WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } @@ -680,6 +698,7 @@ func (l *localState) syncCheck(id types.CheckID) error { Node: l.config.NodeName, Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, + NodeMeta: l.metadata, Service: service, Check: l.checks[id], WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, @@ -706,6 +725,7 @@ func (l *localState) syncNodeInfo() error { Node: l.config.NodeName, Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, + NodeMeta: l.metadata, WriteRequest: structs.WriteRequest{Token: l.config.GetTokenForAgent()}, } var out struct{} diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 1383b8916..565155899 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -156,6 +156,14 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return err } + var metaFilter []interface{} + if args.NodeMetaKey != "" { + metaFilter = append(metaFilter, args.NodeMetaKey) + if args.NodeMetaValue != "" { + metaFilter = append(metaFilter, args.NodeMetaValue) + } + } + // Get the list of nodes. state := c.srv.fsm.State() return c.srv.blockingRPC( @@ -163,7 +171,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde &reply.QueryMeta, state.GetQueryWatch("Nodes"), func() error { - index, nodes, err := state.Nodes() + index, nodes, err := state.Nodes(metaFilter...) if err != nil { return err } diff --git a/consul/state/schema.go b/consul/state/schema.go index fca8a3cf2..3d3ed1450 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -78,6 +78,15 @@ func nodesTableSchema() *memdb.TableSchema { Lowercase: true, }, }, + "meta": &memdb.IndexSchema{ + Name: "meta", + AllowMissing: true, + Unique: false, + Indexer: &memdb.StringMapFieldIndex{ + Field: "Meta", + Lowercase: false, + }, + }, }, } } diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 3316e4f77..c1bb2d0f6 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -426,6 +426,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D Node: req.Node, Address: req.Address, TaggedAddresses: req.TaggedAddresses, + Meta: req.NodeMeta, } if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil { return fmt.Errorf("failed inserting node: %s", err) @@ -527,7 +528,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { } // Nodes is used to return all of the known nodes. -func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { +func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -535,7 +536,13 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) // Retrieve all of the nodes - nodes, err := tx.Get("nodes", "id") + var nodes memdb.ResultIterator + var err error + if len(metaFilter) > 0 { + nodes, err = tx.Get("nodes", "meta", metaFilter...) + } else { + nodes, err = tx.Get("nodes", "id") + } if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -854,6 +861,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo node := n.(*structs.Node) s.Address = node.Address s.TaggedAddresses = node.TaggedAddresses + s.NodeMeta = node.Meta results = append(results, s) } @@ -1392,6 +1400,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64, Node: node.Node, Address: node.Address, TaggedAddresses: node.TaggedAddresses, + Meta: node.Meta, } // Query the node services diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 30b0be378..97030f285 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -173,6 +173,7 @@ type RegisterRequest struct { Node string Address string TaggedAddresses map[string]string + NodeMeta map[string]string Service *NodeService Check *HealthCheck Checks HealthChecks @@ -195,7 +196,8 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool { // Check if any of the node-level fields are being changed. if r.Node != node.Node || r.Address != node.Address || - !reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) { + !reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) || + !reflect.DeepEqual(r.NodeMeta, node.Meta) { return true } @@ -227,8 +229,10 @@ type QuerySource struct { // DCSpecificRequest is used to query about a specific DC type DCSpecificRequest struct { - Datacenter string - Source QuerySource + Datacenter string + NodeMetaKey string + NodeMetaValue string + Source QuerySource QueryOptions } @@ -278,6 +282,7 @@ type Node struct { Node string Address string TaggedAddresses map[string]string + Meta map[string]string RaftIndex } @@ -296,6 +301,7 @@ type ServiceNode struct { Node string Address string TaggedAddresses map[string]string + NodeMeta map[string]string ServiceID string ServiceName string ServiceTags []string @@ -488,6 +494,7 @@ type NodeInfo struct { Node string Address string TaggedAddresses map[string]string + Meta map[string]string Services []*NodeService Checks HealthChecks } diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index efceeace3..a66d4b176 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -151,6 +151,9 @@ func testServiceNode() *ServiceNode { TaggedAddresses: map[string]string{ "hello": "world", }, + NodeMeta: map[string]string{ + "tag": "value", + }, ServiceID: "service1", ServiceName: "dogs", ServiceTags: []string{"prod", "v1"}, @@ -172,12 +175,13 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) { // Make sure the parts that weren't supposed to be cloned didn't get // copied over, then zero-value them out so we can do a DeepEqual() on // the rest of the contents. - if clone.Address != "" || len(clone.TaggedAddresses) != 0 { + if clone.Address != "" || len(clone.TaggedAddresses) != 0 || len(clone.NodeMeta) != 0 { t.Fatalf("bad: %v", clone) } sn.Address = "" sn.TaggedAddresses = nil + sn.NodeMeta = nil if !reflect.DeepEqual(sn, clone) { t.Fatalf("bad: %v", clone) } @@ -197,6 +201,7 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) { // them out before we do the compare. sn.Address = "" sn.TaggedAddresses = nil + sn.NodeMeta = nil if !reflect.DeepEqual(sn, sn2) { t.Fatalf("bad: %v", sn2) } From b25f4c7d359624104ac348a2877d106ebac5b4fe Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 5 Jan 2017 17:21:56 -0800 Subject: [PATCH 2/9] Add tests for node metadata functionality --- command/agent/agent.go | 3 + command/agent/agent_endpoint_test.go | 10 ++- command/agent/agent_test.go | 44 ++++++++++ command/agent/catalog_endpoint_test.go | 47 +++++++++++ command/agent/config.go | 1 + command/agent/local_test.go | 7 +- consul/catalog_endpoint.go | 17 ++-- consul/catalog_endpoint_test.go | 66 +++++++++++++++ consul/state/state_store.go | 30 +++++-- consul/state/state_store_test.go | 111 +++++++++++++++++++++++++ 10 files changed, 317 insertions(+), 19 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 1a0cf207a..9b8c5be09 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1687,6 +1687,9 @@ func (a *Agent) loadMetadata(conf *Config) error { defer a.state.Unlock() for key, value := range conf.Meta { + if key == "" { + return fmt.Errorf("Key name cannot be blank") + } if strings.Contains(key, ":") { return fmt.Errorf("Key name cannot contain ':' character: %s", key) } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index b755f94d6..4c14608c6 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -201,7 +201,12 @@ func TestAgent_Checks_ACLFilter(t *testing.T) { } func TestAgent_Self(t *testing.T) { - dir, srv := makeHTTPServer(t) + meta := map[string]string{ + "somekey": "somevalue", + } + dir, srv := makeHTTPServerWithConfig(t, func(conf *Config) { + conf.Meta = meta + }) defer os.RemoveAll(dir) defer srv.Shutdown() defer srv.agent.Shutdown() @@ -232,6 +237,9 @@ func TestAgent_Self(t *testing.T) { if !reflect.DeepEqual(c, val.Coord) { t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord) } + if !reflect.DeepEqual(meta, val.Meta) { + t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta) + } srv.agent.config.DisableCoordinates = true obj, err = srv.AgentSelf(nil, req) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index c84a5b008..23a9ea006 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1851,6 +1851,50 @@ func TestAgent_purgeCheckState(t *testing.T) { } } +func TestAgent_metadata(t *testing.T) { + config := nextConfig() + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + // Load a valid set of key/value pairs + config.Meta = map[string]string{ + "key1": "value1", + "key2": "value2", + } + if err := agent.loadMetadata(config); err != nil { + t.Fatalf("err: %s", err) + } + agent.unloadMetadata() + + // Should fail, keys can't be blank + config.Meta = map[string]string{ + "": "value1", + } + if err := agent.loadMetadata(config); err == nil { + t.Fatalf("should have failed") + } + agent.unloadMetadata() + + // Should fail, keys can't contain ':' + config.Meta = map[string]string{ + "key:123": "value1", + } + if err := agent.loadMetadata(config); err == nil { + t.Fatalf("should have failed") + } + agent.unloadMetadata() + + // Should fail, keys can't begin with 'consul-' + config.Meta = map[string]string{ + "consul-key": "value1", + } + if err := agent.loadMetadata(config); err == nil { + t.Fatalf("should have failed") + } + agent.unloadMetadata() +} + func TestAgent_GetCoordinate(t *testing.T) { check := func(server bool) { config := nextConfig() diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 1d1136e1f..97f706d8a 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -145,6 +145,53 @@ func TestCatalogNodes(t *testing.T) { } } +func TestCatalogNodes_metaFilter(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + // Register a node with a meta field + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + }, + } + + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err := http.NewRequest("GET", "/v1/catalog/nodes?node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Verify an index is set + assertIndex(t, resp) + + // Verify we only get the node with the correct meta field back + nodes := obj.(structs.Nodes) + if len(nodes) != 1 { + t.Fatalf("bad: %v", obj) + } + if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" { + t.Fatalf("bad: %v", nodes[0].Meta) + } +} + func TestCatalogNodes_WanTranslation(t *testing.T) { dir1, srv1 := makeHTTPServerWithConfig(t, func(c *Config) { diff --git a/command/agent/config.go b/command/agent/config.go index c4f35f3d0..9dfa083c6 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -713,6 +713,7 @@ func DefaultConfig() *Config { Telemetry: Telemetry{ StatsitePrefix: "consul", }, + Meta: make(map[string]string), SyslogFacility: "LOCAL0", Protocol: consul.ProtocolVersion2Compatible, CheckUpdateInterval: 5 * time.Minute, diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 089244287..79e8b41b2 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -985,6 +985,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { func TestAgentAntiEntropy_NodeInfo(t *testing.T) { conf := nextConfig() + conf.Meta["somekey"] = "somevalue" dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) defer agent.Shutdown() @@ -1020,7 +1021,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { // Make sure we synced our node info - this should have ridden on the // "consul" service sync addrs := services.NodeServices.Node.TaggedAddresses - if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + meta := services.NodeServices.Node.Meta + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) || !reflect.DeepEqual(meta, conf.Meta) { return false, fmt.Errorf("bad: %v", addrs) } @@ -1044,7 +1046,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { return false, fmt.Errorf("err: %v", err) } addrs := services.NodeServices.Node.TaggedAddresses - if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + meta := services.NodeServices.Node.Meta + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) || !reflect.DeepEqual(meta, conf.Meta) { return false, fmt.Errorf("bad: %v", addrs) } diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 565155899..237d1b826 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -156,14 +156,6 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return err } - var metaFilter []interface{} - if args.NodeMetaKey != "" { - metaFilter = append(metaFilter, args.NodeMetaKey) - if args.NodeMetaValue != "" { - metaFilter = append(metaFilter, args.NodeMetaValue) - } - } - // Get the list of nodes. state := c.srv.fsm.State() return c.srv.blockingRPC( @@ -171,7 +163,14 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde &reply.QueryMeta, state.GetQueryWatch("Nodes"), func() error { - index, nodes, err := state.Nodes(metaFilter...) + var index uint64 + var nodes structs.Nodes + var err error + if args.NodeMetaKey != "" { + index, nodes, err = state.NodesByMeta(args.NodeMetaKey, args.NodeMetaValue) + } else { + index, nodes, err = state.Nodes() + } if err != nil { return err } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index c0a329af3..f0152eece 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -592,6 +592,72 @@ func TestCatalog_ListNodes(t *testing.T) { } } +func TestCatalog_ListNodes_MetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Filter by a specific meta k/v pair + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "somevalue", + } + var out structs.IndexedNodes + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Add a new node with the right meta k/v pair + node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue"}} + if err := s1.fsm.State().EnsureNode(1, node); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + return len(out.Nodes) == 1, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Verify that only the correct node was returned + if out.Nodes[0].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[0].Address != "127.0.0.1" { + t.Fatalf("bad: %v", out) + } + if v, ok := out.Nodes[0].Meta["somekey"]; !ok || v != "somevalue" { + t.Fatalf("bad: %v", out) + } + + // Now filter on a nonexistent meta k/v pair + args = structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "invalid", + } + out = structs.IndexedNodes{} + err = msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should get an empty list of nodes back + testutil.WaitForResult(func() (bool, error) { + msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + return len(out.Nodes) == 0, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestCatalog_ListNodes_StaleRaad(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index c1bb2d0f6..88f811e81 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -528,7 +528,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { } // Nodes is used to return all of the known nodes. -func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, error) { +func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -536,13 +536,29 @@ func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, er idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) // Retrieve all of the nodes - var nodes memdb.ResultIterator - var err error - if len(metaFilter) > 0 { - nodes, err = tx.Get("nodes", "meta", metaFilter...) - } else { - nodes, err = tx.Get("nodes", "id") + nodes, err := tx.Get("nodes", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } + + // Create and return the nodes list. + var results structs.Nodes + for node := nodes.Next(); node != nil; node = nodes.Next() { + results = append(results, node.(*structs.Node)) + } + return idx, results, nil +} + +// NodesByMeta is used to return all nodes with the given meta key/value pair. +func (s *StateStore) NodesByMeta(key, value string) (uint64, structs.Nodes, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) + + // Retrieve all of the nodes + nodes, err := tx.Get("nodes", "meta", key, value) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 1354d8920..d932b1b24 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -241,6 +241,9 @@ func TestStateStore_EnsureRegistration(t *testing.T) { TaggedAddresses: map[string]string{ "hello": "world", }, + NodeMeta: map[string]string{ + "somekey": "somevalue", + }, } if err := s.EnsureRegistration(1, req); err != nil { t.Fatalf("err: %s", err) @@ -255,6 +258,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { if out.Node != "node1" || out.Address != "1.2.3.4" || len(out.TaggedAddresses) != 1 || out.TaggedAddresses["hello"] != "world" || + out.Meta["somekey"] != "somevalue" || out.CreateIndex != created || out.ModifyIndex != modified { t.Fatalf("bad node returned: %#v", out) } @@ -751,6 +755,113 @@ func BenchmarkGetNodes(b *testing.B) { } } +func TestStateStore_GetNodesByMeta(t *testing.T) { + s := testStateStore(t) + + // Listing with no results returns nil + idx, res, err := s.NodesByMeta("somekey", "somevalue") + if idx != 0 || res != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) + } + + // Create some nodes in the state store + node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}} + if err := s.EnsureNode(0, node0); err != nil { + t.Fatalf("err: %v", err) + } + node1 := &structs.Node{Node: "node1", Address: "127.0.0.1", Meta: map[string]string{"role": "server", "common": "1"}} + if err := s.EnsureNode(1, node1); err != nil { + t.Fatalf("err: %v", err) + } + + // Retrieve the node with role=client + _, nodes, err := s.NodesByMeta("role", "client") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Only one node was returned + if n := len(nodes); n != 1 { + t.Fatalf("bad node count: %d", n) + } + + // Make sure the node is correct + if nodes[0].CreateIndex != 0 || nodes[0].ModifyIndex != 0 { + t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex) + } + if nodes[0].Node != "node0" { + t.Fatalf("bad: %#v", nodes[0]) + } + if !reflect.DeepEqual(nodes[0].Meta, node0.Meta) { + t.Fatalf("bad: %v != %v", nodes[0].Meta, node0.Meta) + } + + // Retrieve the node with role=server + _, nodes, err = s.NodesByMeta("role", "server") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Only one node was returned + if n := len(nodes); n != 1 { + t.Fatalf("bad node count: %d", n) + } + + // Make sure the node is correct + if nodes[0].CreateIndex != 1 || nodes[0].ModifyIndex != 1 { + t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex) + } + if nodes[0].Node != "node1" { + t.Fatalf("bad: %#v", nodes[0]) + } + if !reflect.DeepEqual(nodes[0].Meta, node1.Meta) { + t.Fatalf("bad: %v != %v", nodes[0].Meta, node1.Meta) + } + + // Retrieve both nodes via their common meta field + _, nodes, err = s.NodesByMeta("common", "1") + if err != nil { + t.Fatalf("err: %s", err) + } + + // All nodes were returned + if n := len(nodes); n != 2 { + t.Fatalf("bad node count: %d", n) + } + + // Make sure the nodes match + for i, node := range nodes { + if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) { + t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex) + } + name := fmt.Sprintf("node%d", i) + if node.Node != name { + t.Fatalf("bad: %#v", node) + } + if v, ok := node.Meta["common"]; !ok || v != "1" { + t.Fatalf("bad: %v", node.Meta) + } + } +} + +func BenchmarkGetNodesByMeta(b *testing.B) { + s, err := NewStateStore(nil) + if err != nil { + b.Fatalf("err: %s", err) + } + + if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + b.Fatalf("err: %v", err) + } + if err := s.EnsureNode(101, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { + b.Fatalf("err: %v", err) + } + + for i := 0; i < b.N; i++ { + s.Nodes() + } +} + func TestStateStore_DeleteNode(t *testing.T) { s := testStateStore(t) From aee766baba31a1945e908da9e3aaf934f1c3ff52 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 9 Jan 2017 11:21:49 -0800 Subject: [PATCH 3/9] Add meta key validations and more tests --- command/agent/agent.go | 52 +++++++++++-- command/agent/agent_test.go | 48 +++++++++--- command/agent/catalog_endpoint.go | 10 +-- command/agent/catalog_endpoint_test.go | 50 +++++++++++- command/agent/http.go | 12 +++ consul/catalog_endpoint.go | 9 ++- consul/catalog_endpoint_test.go | 66 ++++++++++++++++ consul/state/state_store.go | 50 ++++++++++++ consul/state/state_store_test.go | 104 +++++++++++++++++++------ consul/structs/structs.go | 4 +- consul/structs/structs_test.go | 7 ++ 11 files changed, 357 insertions(+), 55 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 9b8c5be09..4b71fcce9 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -42,11 +42,26 @@ const ( "but no reason was provided. This is a default message." defaultServiceMaintReason = "Maintenance mode is enabled for this " + "service, but no reason was provided. This is a default message." + + // The meta key prefix reserved for Consul's internal use + metaKeyReservedPrefix = "consul-" + + // The maximum number of metadata key pairs allowed to be registered + metaMaxKeyPairs = 64 + + // The maximum allowed length of a metadata key + metaKeyMaxLength = 128 + + // The maximum allowed length of a metadata value + metaValueMaxLength = 512 ) var ( // dnsNameRe checks if a name or tag is dns-compatible. dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`) + + // metaKeyFormat checks if a metadata key string is valid + metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString ) /* @@ -1686,15 +1701,13 @@ func (a *Agent) loadMetadata(conf *Config) error { a.state.Lock() defer a.state.Unlock() + if len(conf.Meta) > metaMaxKeyPairs { + return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs) + } + for key, value := range conf.Meta { - if key == "" { - return fmt.Errorf("Key name cannot be blank") - } - if strings.Contains(key, ":") { - return fmt.Errorf("Key name cannot contain ':' character: %s", key) - } - if strings.HasPrefix(key, "consul-") { - return fmt.Errorf("Key prefix 'consul-' is reserved for internal use") + if err := validateMetaPair(key, value); err != nil { + return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err) } a.state.metadata[key] = value } @@ -1704,6 +1717,29 @@ func (a *Agent) loadMetadata(conf *Config) error { return nil } +// validateMetaPair checks that the given key/value pair is in a valid format +func validateMetaPair(key, value string) error { + if key == "" { + return fmt.Errorf("Key cannot be blank") + } + if !metaKeyFormat(key) { + return fmt.Errorf("Key contains invalid characters") + } + if len(key) > metaKeyMaxLength { + return fmt.Errorf("Key is longer than %d chars", metaKeyMaxLength) + } + if strings.Contains(key, ":") { + return fmt.Errorf("Key contains ':' character") + } + if strings.HasPrefix(key, metaKeyReservedPrefix) { + return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix) + } + if len(value) > metaValueMaxLength { + return fmt.Errorf("Value is longer than %d characters", metaValueMaxLength) + } + return nil +} + // unloadMetadata resets the local metadata state func (a *Agent) unloadMetadata() error { a.state.Lock() diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 23a9ea006..42e3269f7 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1862,12 +1862,13 @@ func TestAgent_metadata(t *testing.T) { "key1": "value1", "key2": "value2", } + // Should succeed if err := agent.loadMetadata(config); err != nil { t.Fatalf("err: %s", err) } agent.unloadMetadata() - // Should fail, keys can't be blank + // Should get error config.Meta = map[string]string{ "": "value1", } @@ -1876,23 +1877,48 @@ func TestAgent_metadata(t *testing.T) { } agent.unloadMetadata() - // Should fail, keys can't contain ':' - config.Meta = map[string]string{ - "key:123": "value1", + // Should get error + tooManyKeys := make(map[string]string) + for i := 0; i < metaMaxKeyPairs+1; i++ { + tooManyKeys[string(i)] = "value" } if err := agent.loadMetadata(config); err == nil { t.Fatalf("should have failed") } - agent.unloadMetadata() +} - // Should fail, keys can't begin with 'consul-' - config.Meta = map[string]string{ - "consul-key": "value1", +func TestAgent_validateMetaPair(t *testing.T) { + longKey := fmt.Sprintf(fmt.Sprintf("%%%ds", metaKeyMaxLength+1), "") + longValue := fmt.Sprintf(fmt.Sprintf("%%%ds", metaValueMaxLength+1), "") + pairs := []struct { + Key string + Value string + Success bool + }{ + // valid pair + {"key", "value", true}, + // invalid, blank key + {"", "value", false}, + // allowed special chars in key name + {"k_e-y", "value", true}, + // ':' in key name + {"k:ey", "value", false}, + // disallowed special chars in key name + {"(%key&)", "value", false}, + // key too long + {longKey, "value", false}, + // reserved prefix + {metaKeyReservedPrefix + "key", "value", false}, + // value too long + {"key", longValue, false}, } - if err := agent.loadMetadata(config); err == nil { - t.Fatalf("should have failed") + + for _, pair := range pairs { + err := validateMetaPair(pair.Key, pair.Value) + if pair.Success != (err == nil) { + t.Fatalf("bad: %v, %v", pair, err) + } } - agent.unloadMetadata() } func TestAgent_GetCoordinate(t *testing.T) { diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 13d425044..05b431a92 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -64,17 +64,10 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( // Setup the request args := structs.DCSpecificRequest{} s.parseSource(req, &args.Source) + s.parseMetaFilter(req, &args.NodeMetaKey, &args.NodeMetaValue) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } - // Try to parse node metadata filter params - if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 { - pair := strings.SplitN(filter[0], ":", 2) - args.NodeMetaKey = pair[0] - if len(pair) == 2 { - args.NodeMetaValue = pair[1] - } - } var out structs.IndexedNodes defer setMeta(resp, &out.QueryMeta) @@ -93,6 +86,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.DCSpecificRequest{} + s.parseMetaFilter(req, &args.NodeMetaKey, &args.NodeMetaValue) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 97f706d8a..62cd3bd90 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -145,7 +145,7 @@ func TestCatalogNodes(t *testing.T) { } } -func TestCatalogNodes_metaFilter(t *testing.T) { +func TestCatalogNodes_MetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() @@ -496,6 +496,54 @@ func TestCatalogServices(t *testing.T) { } } +func TestCatalogServices_NodeMetaFilter(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + // Register node + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + }, + Service: &structs.NodeService{ + Service: "api", + }, + } + + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err := http.NewRequest("GET", "/v1/catalog/services?node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.CatalogServices(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + assertIndex(t, resp) + + services := obj.(structs.Services) + if len(services) != 1 { + t.Fatalf("bad: %v", obj) + } + if _, ok := services[args.Service.Service]; !ok { + t.Fatalf("bad: %v", services) + } +} + func TestCatalogServiceNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) diff --git a/command/agent/http.go b/command/agent/http.go index 7070706f1..4c4a3ab2a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -584,6 +584,18 @@ func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) } } +// parseMetaFilter is used to parse the ?node-meta=key:value query parameter, used for +// filtering results to nodes with the given metadata key/value +func (s *HTTPServer) parseMetaFilter(req *http.Request, key *string, value *string) { + if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 { + pair := strings.SplitN(filter[0], ":", 2) + *key = pair[0] + if len(pair) == 2 { + *value = pair[1] + } + } +} + // parse is a convenience method for endpoints that need // to use both parseWait and parseDC. func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool { diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 237d1b826..2887559ac 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -196,7 +196,14 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I &reply.QueryMeta, state.GetQueryWatch("Services"), func() error { - index, services, err := state.Services() + var index uint64 + var services structs.Services + var err error + if args.NodeMetaKey != "" { + index, services, err = state.ServicesByNodeMeta(args.NodeMetaKey, args.NodeMetaValue) + } else { + index, services, err = state.Services() + } if err != nil { return err } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index f0152eece..604b4ee25 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -1062,6 +1062,72 @@ func TestCatalog_ListServices(t *testing.T) { } } +func TestCatalog_ListServices_MetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Filter by a specific meta k/v pair + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "somevalue", + } + var out structs.IndexedServices + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Add a new node with the right meta k/v pair + node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue"}} + if err := s1.fsm.State().EnsureNode(1, node); err != nil { + t.Fatalf("err: %v", err) + } + // Add a service to the new node + if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out.Services) != 1 { + t.Fatalf("bad: %v", out) + } + if out.Services["db"] == nil { + t.Fatalf("bad: %v", out.Services["db"]) + } + if len(out.Services["db"]) != 1 { + t.Fatalf("bad: %v", out) + } + if out.Services["db"][0] != "primary" { + t.Fatalf("bad: %v", out) + } + + // Now filter on a nonexistent meta k/v pair + args = structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "invalid", + } + out = structs.IndexedServices{} + err = msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should get an empty list of nodes back + if len(out.Services) != 0 { + t.Fatalf("bad: %v", out.Services) + } +} + func TestCatalog_ListServices_Blocking(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 88f811e81..eb2fcb8ce 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -781,6 +781,56 @@ func (s *StateStore) Services() (uint64, structs.Services, error) { return idx, results, nil } +// Services returns all services, filtered by given node metadata. +func (s *StateStore) ServicesByNodeMeta(key, value string) (uint64, structs.Services, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...) + + // Retrieve all of the nodes with the meta k/v pair + nodes, err := tx.Get("nodes", "meta", key, value) + if err != nil { + return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) + } + + // Populate the services map + unique := make(map[string]map[string]struct{}) + for node := nodes.Next(); node != nil; node = nodes.Next() { + n := node.(*structs.Node) + // List all the services on the node + services, err := tx.Get("services", "node", n.Node) + if err != nil { + return 0, nil, fmt.Errorf("failed querying services: %s", err) + } + + // Rip through the services and enumerate them and their unique set of + // tags. + for service := services.Next(); service != nil; service = services.Next() { + svc := service.(*structs.ServiceNode) + tags, ok := unique[svc.ServiceName] + if !ok { + unique[svc.ServiceName] = make(map[string]struct{}) + tags = unique[svc.ServiceName] + } + for _, tag := range svc.ServiceTags { + tags[tag] = struct{}{} + } + } + } + + // Generate the output structure. + var results = make(structs.Services) + for service, tags := range unique { + results[service] = make([]string, 0) + for tag, _ := range tags { + results[service] = append(results[service], tag) + } + } + return idx, results, nil +} + // ServiceNodes returns the nodes associated with a given service name. func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index d932b1b24..3277cb391 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -775,10 +775,13 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { } // Retrieve the node with role=client - _, nodes, err := s.NodesByMeta("role", "client") + idx, nodes, err := s.NodesByMeta("role", "client") if err != nil { t.Fatalf("err: %s", err) } + if idx != 1 { + t.Fatalf("bad index: %d", idx) + } // Only one node was returned if n := len(nodes); n != 1 { @@ -796,33 +799,14 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { t.Fatalf("bad: %v != %v", nodes[0].Meta, node0.Meta) } - // Retrieve the node with role=server - _, nodes, err = s.NodesByMeta("role", "server") - if err != nil { - t.Fatalf("err: %s", err) - } - - // Only one node was returned - if n := len(nodes); n != 1 { - t.Fatalf("bad node count: %d", n) - } - - // Make sure the node is correct - if nodes[0].CreateIndex != 1 || nodes[0].ModifyIndex != 1 { - t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex) - } - if nodes[0].Node != "node1" { - t.Fatalf("bad: %#v", nodes[0]) - } - if !reflect.DeepEqual(nodes[0].Meta, node1.Meta) { - t.Fatalf("bad: %v != %v", nodes[0].Meta, node1.Meta) - } - // Retrieve both nodes via their common meta field - _, nodes, err = s.NodesByMeta("common", "1") + idx, nodes, err = s.NodesByMeta("common", "1") if err != nil { t.Fatalf("err: %s", err) } + if idx != 1 { + t.Fatalf("bad index: %d", idx) + } // All nodes were returned if n := len(nodes); n != 2 { @@ -1172,6 +1156,78 @@ func TestStateStore_Services(t *testing.T) { } } +func TestStateStore_ServicesByNodeMeta(t *testing.T) { + s := testStateStore(t) + + // Listing with no results returns nil + idx, res, err := s.ServicesByNodeMeta("somekey", "somevalue") + if idx != 0 || len(res) != 0 || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) + } + + // Create some nodes and services in the state store + node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}} + if err := s.EnsureNode(0, node0); err != nil { + t.Fatalf("err: %v", err) + } + node1 := &structs.Node{Node: "node1", Address: "127.0.0.1", Meta: map[string]string{"role": "server", "common": "1"}} + if err := s.EnsureNode(1, node1); err != nil { + t.Fatalf("err: %v", err) + } + ns1 := &structs.NodeService{ + ID: "service1", + Service: "redis", + Tags: []string{"prod", "master"}, + Address: "1.1.1.1", + Port: 1111, + } + if err := s.EnsureService(2, "node0", ns1); err != nil { + t.Fatalf("err: %s", err) + } + ns2 := &structs.NodeService{ + ID: "service1", + Service: "redis", + Tags: []string{"prod", "slave"}, + Address: "1.1.1.1", + Port: 1111, + } + if err := s.EnsureService(3, "node1", ns2); err != nil { + t.Fatalf("err: %s", err) + } + + // Filter the services by the first node's meta value + idx, res, err = s.ServicesByNodeMeta("role", "client") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + expected := structs.Services{ + "redis": []string{"prod", "master"}, + } + if !reflect.DeepEqual(res, expected) { + t.Fatalf("bad: %v %v", res, expected) + } + + // Get all services using the common meta value + idx, res, err = s.ServicesByNodeMeta("common", "1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + expected = structs.Services{ + "redis": []string{"prod", "master", "slave"}, + } + sort.Strings(res["redis"]) + sort.Strings(expected["redis"]) + if !reflect.DeepEqual(res, expected) { + t.Fatalf("bad: %v %v", res, expected) + } +} + func TestStateStore_ServiceNodes(t *testing.T) { s := testStateStore(t) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 97030f285..bb4f6472d 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -292,8 +292,8 @@ type Nodes []*Node // Maps service name to available tags type Services map[string][]string -// ServiceNode represents a node that is part of a service. Address and -// TaggedAddresses are node-related fields that are always empty in the state +// ServiceNode represents a node that is part of a service. Address, TaggedAddresses, +// and NodeMeta are node-related fields that are always empty in the state // store and are filled in on the way out by parseServiceNodes(). This is also // why PartialClone() skips them, because we know they are blank already so it // would be a waste of time to copy them. diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index a66d4b176..13714e993 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -110,12 +110,18 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) { Node: "test", Address: "127.0.0.1", TaggedAddresses: make(map[string]string), + NodeMeta: map[string]string{ + "role": "server", + }, } node := &Node{ Node: "test", Address: "127.0.0.1", TaggedAddresses: make(map[string]string), + Meta: map[string]string{ + "role": "server", + }, } check := func(twiddle, restore func()) { @@ -137,6 +143,7 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) { check(func() { req.Node = "nope" }, func() { req.Node = "test" }) check(func() { req.Address = "127.0.0.2" }, func() { req.Address = "127.0.0.1" }) check(func() { req.TaggedAddresses["wan"] = "nope" }, func() { delete(req.TaggedAddresses, "wan") }) + check(func() { req.NodeMeta["invalid"] = "nope" }, func() { delete(req.NodeMeta, "invalid")}) if !req.ChangesNode(nil) { t.Fatalf("should change") From 12da4521366bb4c22c6f0b430c7a4642a4561e98 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 9 Jan 2017 11:23:09 -0800 Subject: [PATCH 4/9] vendor: Update go-memdb dependency --- .../github.com/hashicorp/go-memdb/README.md | 2 +- vendor/github.com/hashicorp/go-memdb/index.go | 175 +++++++++++++++++- .../github.com/hashicorp/go-memdb/schema.go | 9 + vendor/github.com/hashicorp/go-memdb/txn.go | 88 ++++++--- vendor/vendor.json | 6 +- 5 files changed, 250 insertions(+), 30 deletions(-) diff --git a/vendor/github.com/hashicorp/go-memdb/README.md b/vendor/github.com/hashicorp/go-memdb/README.md index 203a0af14..675044beb 100644 --- a/vendor/github.com/hashicorp/go-memdb/README.md +++ b/vendor/github.com/hashicorp/go-memdb/README.md @@ -19,7 +19,7 @@ The database provides the following: * Rich Indexing - Tables can support any number of indexes, which can be simple like a single field index, or more advanced compound field indexes. Certain types like - UUID can be efficiently compressed from strings into byte indexes for reduces + UUID can be efficiently compressed from strings into byte indexes for reduced storage requirements. For the underlying immutable radix trees, see [go-immutable-radix](https://github.com/hashicorp/go-immutable-radix). diff --git a/vendor/github.com/hashicorp/go-memdb/index.go b/vendor/github.com/hashicorp/go-memdb/index.go index 7237f33e2..17aa02699 100644 --- a/vendor/github.com/hashicorp/go-memdb/index.go +++ b/vendor/github.com/hashicorp/go-memdb/index.go @@ -9,15 +9,27 @@ import ( // Indexer is an interface used for defining indexes type Indexer interface { - // FromObject is used to extract an index value from an - // object or to indicate that the index value is missing. - FromObject(raw interface{}) (bool, []byte, error) - // ExactFromArgs is used to build an exact index lookup // based on arguments FromArgs(args ...interface{}) ([]byte, error) } +// SingleIndexer is an interface used for defining indexes +// generating a single entry per object +type SingleIndexer interface { + // FromObject is used to extract an index value from an + // object or to indicate that the index value is missing. + FromObject(raw interface{}) (bool, []byte, error) +} + +// MultiIndexer is an interface used for defining indexes +// generating multiple entries per object +type MultiIndexer interface { + // FromObject is used to extract index values from an + // object or to indicate that the index value is missing. + FromObject(raw interface{}) (bool, [][]byte, error) +} + // PrefixIndexer can optionally be implemented for any // indexes that support prefix based iteration. This may // not apply to all indexes. @@ -88,6 +100,155 @@ func (s *StringFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { return val, nil } +// StringSliceFieldIndex is used to extract a field from an object +// using reflection and builds an index on that field. +type StringSliceFieldIndex struct { + Field string + Lowercase bool +} + +func (s *StringSliceFieldIndex) FromObject(obj interface{}) (bool, [][]byte, error) { + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) // Dereference the pointer if any + + fv := v.FieldByName(s.Field) + if !fv.IsValid() { + return false, nil, + fmt.Errorf("field '%s' for %#v is invalid", s.Field, obj) + } + + if fv.Kind() != reflect.Slice || fv.Type().Elem().Kind() != reflect.String { + return false, nil, fmt.Errorf("field '%s' is not a string slice", s.Field) + } + + length := fv.Len() + vals := make([][]byte, 0, length) + for i := 0; i < fv.Len(); i++ { + val := fv.Index(i).String() + if val == "" { + continue + } + + if s.Lowercase { + val = strings.ToLower(val) + } + + // Add the null character as a terminator + val += "\x00" + vals = append(vals, []byte(val)) + } + if len(vals) == 0 { + return false, nil, nil + } + return true, vals, nil +} + +func (s *StringSliceFieldIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + arg, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("argument must be a string: %#v", args[0]) + } + if s.Lowercase { + arg = strings.ToLower(arg) + } + // Add the null character as a terminator + arg += "\x00" + return []byte(arg), nil +} + +func (s *StringSliceFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { + val, err := s.FromArgs(args...) + if err != nil { + return nil, err + } + + // Strip the null terminator, the rest is a prefix + n := len(val) + if n > 0 { + return val[:n-1], nil + } + return val, nil +} + +// StringMapFieldIndex is used to extract a field of type map[string]string +// from an object using reflection and builds an index on that field. +type StringMapFieldIndex struct { + Field string + Lowercase bool +} + +var MapType = reflect.MapOf(reflect.TypeOf(""), reflect.TypeOf("")).Kind() + +func (s *StringMapFieldIndex) FromObject(obj interface{}) (bool, [][]byte, error) { + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) // Dereference the pointer if any + + fv := v.FieldByName(s.Field) + if !fv.IsValid() { + return false, nil, fmt.Errorf("field '%s' for %#v is invalid", s.Field, obj) + } + + if fv.Kind() != MapType { + return false, nil, fmt.Errorf("field '%s' is not a map[string]string", s.Field) + } + + length := fv.Len() + vals := make([][]byte, 0, length) + for _, key := range fv.MapKeys() { + k := key.String() + if k == "" { + continue + } + val := fv.MapIndex(key).String() + + if s.Lowercase { + k = strings.ToLower(k) + val = strings.ToLower(val) + } + + // Add the null character as a terminator + k += "\x00" + val + "\x00" + + vals = append(vals, []byte(k)) + } + if len(vals) == 0 { + return false, nil, nil + } + return true, vals, nil +} + +func (s *StringMapFieldIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) > 2 || len(args) == 0 { + return nil, fmt.Errorf("must provide one or two arguments") + } + key, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("argument must be a string: %#v", args[0]) + } + if s.Lowercase { + key = strings.ToLower(key) + } + // Add the null character as a terminator + key += "\x00" + + if len(args) == 2 { + val, ok := args[1].(string) + if !ok { + return nil, fmt.Errorf("argument must be a string: %#v", args[1]) + } + if s.Lowercase { + val = strings.ToLower(val) + } + // Add the null character as a terminator + key += val + "\x00" + } + + return []byte(key), nil +} + // UUIDFieldIndex is used to extract a field from an object // using reflection and builds an index on that field by treating // it as a UUID. This is an optimization to using a StringFieldIndex @@ -270,7 +431,11 @@ type CompoundIndex struct { func (c *CompoundIndex) FromObject(raw interface{}) (bool, []byte, error) { var out []byte - for i, idx := range c.Indexes { + for i, idxRaw := range c.Indexes { + idx, ok := idxRaw.(SingleIndexer) + if !ok { + return false, nil, fmt.Errorf("sub-index %d error: %s", i, "sub-index must be a SingleIndexer") + } ok, val, err := idx.FromObject(raw) if err != nil { return false, nil, fmt.Errorf("sub-index %d error: %v", i, err) diff --git a/vendor/github.com/hashicorp/go-memdb/schema.go b/vendor/github.com/hashicorp/go-memdb/schema.go index 2b8ffb476..26d0fcb99 100644 --- a/vendor/github.com/hashicorp/go-memdb/schema.go +++ b/vendor/github.com/hashicorp/go-memdb/schema.go @@ -46,6 +46,9 @@ func (s *TableSchema) Validate() error { if !s.Indexes["id"].Unique { return fmt.Errorf("id index must be unique") } + if _, ok := s.Indexes["id"].Indexer.(SingleIndexer); !ok { + return fmt.Errorf("id index must be a SingleIndexer") + } for name, index := range s.Indexes { if name != index.Name { return fmt.Errorf("index name mis-match for '%s'", name) @@ -72,5 +75,11 @@ func (s *IndexSchema) Validate() error { if s.Indexer == nil { return fmt.Errorf("missing index function for '%s'", s.Name) } + switch s.Indexer.(type) { + case SingleIndexer: + case MultiIndexer: + default: + return fmt.Errorf("indexer for '%s' must be a SingleIndexer or MultiIndexer", s.Name) + } return nil } diff --git a/vendor/github.com/hashicorp/go-memdb/txn.go b/vendor/github.com/hashicorp/go-memdb/txn.go index 6228677da..fa73c9a3f 100644 --- a/vendor/github.com/hashicorp/go-memdb/txn.go +++ b/vendor/github.com/hashicorp/go-memdb/txn.go @@ -148,7 +148,8 @@ func (txn *Txn) Insert(table string, obj interface{}) error { // Get the primary ID of the object idSchema := tableSchema.Indexes[id] - ok, idVal, err := idSchema.Indexer.FromObject(obj) + idIndexer := idSchema.Indexer.(SingleIndexer) + ok, idVal, err := idIndexer.FromObject(obj) if err != nil { return fmt.Errorf("failed to build primary index: %v", err) } @@ -167,7 +168,19 @@ func (txn *Txn) Insert(table string, obj interface{}) error { indexTxn := txn.writableIndex(table, name) // Determine the new index value - ok, val, err := indexSchema.Indexer.FromObject(obj) + var ( + ok bool + vals [][]byte + err error + ) + switch indexer := indexSchema.Indexer.(type) { + case SingleIndexer: + var val []byte + ok, val, err = indexer.FromObject(obj) + vals = [][]byte{val} + case MultiIndexer: + ok, vals, err = indexer.FromObject(obj) + } if err != nil { return fmt.Errorf("failed to build index '%s': %v", name, err) } @@ -176,28 +189,44 @@ func (txn *Txn) Insert(table string, obj interface{}) error { // This is done by appending the primary key which must // be unique anyways. if ok && !indexSchema.Unique { - val = append(val, idVal...) + for i := range vals { + vals[i] = append(vals[i], idVal...) + } } // Handle the update by deleting from the index first if update { - okExist, valExist, err := indexSchema.Indexer.FromObject(existing) + var ( + okExist bool + valsExist [][]byte + err error + ) + switch indexer := indexSchema.Indexer.(type) { + case SingleIndexer: + var valExist []byte + okExist, valExist, err = indexer.FromObject(existing) + valsExist = [][]byte{valExist} + case MultiIndexer: + okExist, valsExist, err = indexer.FromObject(existing) + } if err != nil { return fmt.Errorf("failed to build index '%s': %v", name, err) } if okExist { - // Handle non-unique index by computing a unique index. - // This is done by appending the primary key which must - // be unique anyways. - if !indexSchema.Unique { - valExist = append(valExist, idVal...) - } + for i, valExist := range valsExist { + // Handle non-unique index by computing a unique index. + // This is done by appending the primary key which must + // be unique anyways. + if !indexSchema.Unique { + valExist = append(valExist, idVal...) + } - // If we are writing to the same index with the same value, - // we can avoid the delete as the insert will overwrite the - // value anyways. - if !bytes.Equal(valExist, val) { - indexTxn.Delete(valExist) + // If we are writing to the same index with the same value, + // we can avoid the delete as the insert will overwrite the + // value anyways. + if i >= len(vals) || !bytes.Equal(valExist, vals[i]) { + indexTxn.Delete(valExist) + } } } } @@ -213,7 +242,9 @@ func (txn *Txn) Insert(table string, obj interface{}) error { } // Update the value of the index - indexTxn.Insert(val, obj) + for _, val := range vals { + indexTxn.Insert(val, obj) + } } return nil } @@ -233,7 +264,8 @@ func (txn *Txn) Delete(table string, obj interface{}) error { // Get the primary ID of the object idSchema := tableSchema.Indexes[id] - ok, idVal, err := idSchema.Indexer.FromObject(obj) + idIndexer := idSchema.Indexer.(SingleIndexer) + ok, idVal, err := idIndexer.FromObject(obj) if err != nil { return fmt.Errorf("failed to build primary index: %v", err) } @@ -253,7 +285,19 @@ func (txn *Txn) Delete(table string, obj interface{}) error { indexTxn := txn.writableIndex(table, name) // Handle the update by deleting from the index first - ok, val, err := indexSchema.Indexer.FromObject(existing) + var ( + ok bool + vals [][]byte + err error + ) + switch indexer := indexSchema.Indexer.(type) { + case SingleIndexer: + var val []byte + ok, val, err = indexer.FromObject(existing) + vals = [][]byte{val} + case MultiIndexer: + ok, vals, err = indexer.FromObject(existing) + } if err != nil { return fmt.Errorf("failed to build index '%s': %v", name, err) } @@ -261,10 +305,12 @@ func (txn *Txn) Delete(table string, obj interface{}) error { // Handle non-unique index by computing a unique index. // This is done by appending the primary key which must // be unique anyways. - if !indexSchema.Unique { - val = append(val, idVal...) + for _, val := range vals { + if !indexSchema.Unique { + val = append(val, idVal...) + } + indexTxn.Delete(val) } - indexTxn.Delete(val) } } return nil diff --git a/vendor/vendor.json b/vendor/vendor.json index b130ac271..668875419 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -396,10 +396,10 @@ "revisionTime": "2016-06-09T02:05:29Z" }, { - "checksumSHA1": "/V57CyN7x2NUlHoOzVL5GgGXX84=", + "checksumSHA1": "ZpTDFeRvXFwIvSHRD8eDYHxaj4Y=", "path": "github.com/hashicorp/go-memdb", - "revision": "98f52f52d7a476958fa9da671354d270c50661a7", - "revisionTime": "2016-03-01T23:01:42Z" + "revision": "d2d2b77acab85aa635614ac17ea865969f56009e", + "revisionTime": "2017-01-07T16:22:14Z" }, { "checksumSHA1": "TNlVzNR1OaajcNi3CbQ3bGbaLGU=", From 6b5cf20b1c1ab7a47345dd2a58fe7c5f7ff3eadf Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 9 Jan 2017 13:40:50 -0800 Subject: [PATCH 5/9] Fix formatting --- command/agent/agent.go | 9 +++------ command/agent/agent_test.go | 2 -- consul/structs/structs_test.go | 2 +- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 4b71fcce9..2753d3159 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -47,13 +47,13 @@ const ( metaKeyReservedPrefix = "consul-" // The maximum number of metadata key pairs allowed to be registered - metaMaxKeyPairs = 64 + metaMaxKeyPairs = 64 // The maximum allowed length of a metadata key - metaKeyMaxLength = 128 + metaKeyMaxLength = 128 // The maximum allowed length of a metadata value - metaValueMaxLength = 512 + metaValueMaxLength = 512 ) var ( @@ -1728,9 +1728,6 @@ func validateMetaPair(key, value string) error { if len(key) > metaKeyMaxLength { return fmt.Errorf("Key is longer than %d chars", metaKeyMaxLength) } - if strings.Contains(key, ":") { - return fmt.Errorf("Key contains ':' character") - } if strings.HasPrefix(key, metaKeyReservedPrefix) { return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix) } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 42e3269f7..3306da285 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1901,8 +1901,6 @@ func TestAgent_validateMetaPair(t *testing.T) { {"", "value", false}, // allowed special chars in key name {"k_e-y", "value", true}, - // ':' in key name - {"k:ey", "value", false}, // disallowed special chars in key name {"(%key&)", "value", false}, // key too long diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index 13714e993..c1a39c4f9 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -143,7 +143,7 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) { check(func() { req.Node = "nope" }, func() { req.Node = "test" }) check(func() { req.Address = "127.0.0.2" }, func() { req.Address = "127.0.0.1" }) check(func() { req.TaggedAddresses["wan"] = "nope" }, func() { delete(req.TaggedAddresses, "wan") }) - check(func() { req.NodeMeta["invalid"] = "nope" }, func() { delete(req.NodeMeta, "invalid")}) + check(func() { req.NodeMeta["invalid"] = "nope" }, func() { delete(req.NodeMeta, "invalid") }) if !req.ChangesNode(nil) { t.Fatalf("should change") From d77890a011e98e292de4625961750db3116d94e8 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 11 Jan 2017 14:41:12 -0500 Subject: [PATCH 6/9] Validate metadata config earlier and handle multiple filters --- command/agent/agent.go | 32 ++++++++++------- command/agent/agent_test.go | 52 +++++++++++++-------------- command/agent/catalog_endpoint.go | 4 +-- command/agent/command.go | 11 +++--- command/agent/config.go | 4 ++- command/agent/http.go | 18 ++++++---- consul/catalog_endpoint.go | 8 ++--- consul/catalog_endpoint_test.go | 59 ++++++++++++++----------------- consul/state/state_store.go | 24 ++++++++++--- consul/state/state_store_test.go | 12 +++---- consul/structs/structs.go | 13 ++++--- 11 files changed, 132 insertions(+), 105 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 2753d3159..582c62570 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1695,20 +1695,13 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { } } -// loadMetadata loads and validates node metadata fields from the config and +// loadMetadata loads node metadata fields from the agent config and // updates them on the local agent. func (a *Agent) loadMetadata(conf *Config) error { a.state.Lock() defer a.state.Unlock() - if len(conf.Meta) > metaMaxKeyPairs { - return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs) - } - for key, value := range conf.Meta { - if err := validateMetaPair(key, value); err != nil { - return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err) - } a.state.metadata[key] = value } @@ -1717,6 +1710,21 @@ func (a *Agent) loadMetadata(conf *Config) error { return nil } +// validateMeta validates a set of key/value pairs from the agent config +func validateMetadata(meta map[string]string) error { + if len(meta) > metaMaxKeyPairs { + return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs) + } + + for key, value := range meta { + if err := validateMetaPair(key, value); err != nil { + return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err) + } + } + + return nil +} + // validateMetaPair checks that the given key/value pair is in a valid format func validateMetaPair(key, value string) error { if key == "" { @@ -1726,25 +1734,23 @@ func validateMetaPair(key, value string) error { return fmt.Errorf("Key contains invalid characters") } if len(key) > metaKeyMaxLength { - return fmt.Errorf("Key is longer than %d chars", metaKeyMaxLength) + return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength) } if strings.HasPrefix(key, metaKeyReservedPrefix) { return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix) } if len(value) > metaValueMaxLength { - return fmt.Errorf("Value is longer than %d characters", metaValueMaxLength) + return fmt.Errorf("Value is too long (limit: %d characters)", metaValueMaxLength) } return nil } // unloadMetadata resets the local metadata state -func (a *Agent) unloadMetadata() error { +func (a *Agent) unloadMetadata() { a.state.Lock() defer a.state.Unlock() a.state.metadata = make(map[string]string) - - return nil } // serviceMaintCheckID returns the ID of a given service's maintenance check diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 3306da285..4eedaba0b 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/raft" + "strings" ) const ( @@ -1852,69 +1853,64 @@ func TestAgent_purgeCheckState(t *testing.T) { } func TestAgent_metadata(t *testing.T) { - config := nextConfig() - dir, agent := makeAgent(t, config) - defer os.RemoveAll(dir) - defer agent.Shutdown() - // Load a valid set of key/value pairs - config.Meta = map[string]string{ + meta := map[string]string{ "key1": "value1", "key2": "value2", } // Should succeed - if err := agent.loadMetadata(config); err != nil { + if err := validateMetadata(meta); err != nil { t.Fatalf("err: %s", err) } - agent.unloadMetadata() // Should get error - config.Meta = map[string]string{ + meta = map[string]string{ "": "value1", } - if err := agent.loadMetadata(config); err == nil { + if err := validateMetadata(meta); !strings.Contains(err.Error(), "Couldn't load metadata pair") { t.Fatalf("should have failed") } - agent.unloadMetadata() // Should get error - tooManyKeys := make(map[string]string) + meta = make(map[string]string) for i := 0; i < metaMaxKeyPairs+1; i++ { - tooManyKeys[string(i)] = "value" + meta[string(i)] = "value" } - if err := agent.loadMetadata(config); err == nil { + if err := validateMetadata(meta); !strings.Contains(err.Error(), "cannot contain more than") { t.Fatalf("should have failed") } } func TestAgent_validateMetaPair(t *testing.T) { - longKey := fmt.Sprintf(fmt.Sprintf("%%%ds", metaKeyMaxLength+1), "") - longValue := fmt.Sprintf(fmt.Sprintf("%%%ds", metaValueMaxLength+1), "") + longKey := strings.Repeat("a", metaKeyMaxLength+1) + longValue := strings.Repeat("b", metaValueMaxLength+1) pairs := []struct { - Key string - Value string - Success bool + Key string + Value string + Error string }{ // valid pair - {"key", "value", true}, + {"key", "value", ""}, // invalid, blank key - {"", "value", false}, + {"", "value", "cannot be blank"}, // allowed special chars in key name - {"k_e-y", "value", true}, + {"k_e-y", "value", ""}, // disallowed special chars in key name - {"(%key&)", "value", false}, + {"(%key&)", "value", "invalid characters"}, // key too long - {longKey, "value", false}, + {longKey, "value", "Key is too long"}, // reserved prefix - {metaKeyReservedPrefix + "key", "value", false}, + {metaKeyReservedPrefix + "key", "value", "reserved for internal use"}, // value too long - {"key", longValue, false}, + {"key", longValue, "Value is too long"}, } for _, pair := range pairs { err := validateMetaPair(pair.Key, pair.Value) - if pair.Success != (err == nil) { - t.Fatalf("bad: %v, %v", pair, err) + if pair.Error == "" && err != nil { + t.Fatalf("should have succeeded: %v, %v", pair, err) + } else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) { + t.Fatalf("should have failed: %v, %v", pair, err) } } } diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 05b431a92..913b3af1e 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -64,7 +64,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( // Setup the request args := structs.DCSpecificRequest{} s.parseSource(req, &args.Source) - s.parseMetaFilter(req, &args.NodeMetaKey, &args.NodeMetaValue) + args.NodeMetaFilters = s.parseMetaFilter(req) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } @@ -86,7 +86,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.DCSpecificRequest{} - s.parseMetaFilter(req, &args.NodeMetaKey, &args.NodeMetaValue) + args.NodeMetaFilters = s.parseMetaFilter(req) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } diff --git a/command/agent/command.go b/command/agent/command.go index 165839fdc..03f3b25e1 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -364,6 +364,12 @@ func (c *Command) readConfig() *Config { } } + // Verify the node metadata entries are valid + if err := validateMetadata(config.Meta); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse node metadata: %v", err)) + return nil + } + // Set the version info config.Revision = c.Revision config.Version = c.Version @@ -1081,10 +1087,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) { errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err)) return nil, errs } - if err := c.agent.unloadMetadata(); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed unloading metadata: %s", err)) - return nil, errs - } + c.agent.unloadMetadata() // Reload service/check definitions and metadata. if err := c.agent.loadServices(newConf); err != nil { diff --git a/command/agent/config.go b/command/agent/config.go index 9dfa083c6..266ad4eca 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -342,7 +342,9 @@ type Config struct { // they are configured with TranslateWanAddrs set to true. TaggedAddresses map[string]string - // Node metadata + // Node metadata key/value pairs. These are excluded from JSON output + // because they can be reloaded and might be stale when shown from the + // config instead of the local state. Meta map[string]string `mapstructure:"node_meta" json:"-"` // LeaveOnTerm controls if Serf does a graceful leave when receiving diff --git a/command/agent/http.go b/command/agent/http.go index 4c4a3ab2a..74f1f2531 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -586,14 +586,20 @@ func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) // parseMetaFilter is used to parse the ?node-meta=key:value query parameter, used for // filtering results to nodes with the given metadata key/value -func (s *HTTPServer) parseMetaFilter(req *http.Request, key *string, value *string) { - if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 { - pair := strings.SplitN(filter[0], ":", 2) - *key = pair[0] - if len(pair) == 2 { - *value = pair[1] +func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string { + if filterList, ok := req.URL.Query()["node-meta"]; ok { + filters := make(map[string]string) + for _, filter := range filterList { + pair := strings.SplitN(filter, ":", 2) + if len(pair) == 2 { + filters[pair[0]] = pair[1] + } else { + filters[pair[0]] = "" + } } + return filters } + return nil } // parse is a convenience method for endpoints that need diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 2887559ac..c19082a8f 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -166,8 +166,8 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde var index uint64 var nodes structs.Nodes var err error - if args.NodeMetaKey != "" { - index, nodes, err = state.NodesByMeta(args.NodeMetaKey, args.NodeMetaValue) + if len(args.NodeMetaFilters) > 0 { + index, nodes, err = state.NodesByMeta(args.NodeMetaFilters) } else { index, nodes, err = state.Nodes() } @@ -199,8 +199,8 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I var index uint64 var services structs.Services var err error - if args.NodeMetaKey != "" { - index, services, err = state.ServicesByNodeMeta(args.NodeMetaKey, args.NodeMetaValue) + if len(args.NodeMetaFilters) > 0 { + index, services, err = state.ServicesByNodeMeta(args.NodeMetaFilters) } else { index, services, err = state.Services() } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 604b4ee25..b983eb59a 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -599,18 +599,6 @@ func TestCatalog_ListNodes_MetaFilter(t *testing.T) { codec := rpcClient(t, s1) defer codec.Close() - // Filter by a specific meta k/v pair - args := structs.DCSpecificRequest{ - Datacenter: "dc1", - NodeMetaKey: "somekey", - NodeMetaValue: "somevalue", - } - var out structs.IndexedNodes - err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) - if err != nil { - t.Fatalf("err: %v", err) - } - testutil.WaitForLeader(t, s1.RPC, "dc1") // Add a new node with the right meta k/v pair @@ -619,6 +607,15 @@ func TestCatalog_ListNodes_MetaFilter(t *testing.T) { t.Fatalf("err: %v", err) } + // Filter by a specific meta k/v pair + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaFilters: map[string]string{ + "somekey": "somevalue", + }, + } + var out structs.IndexedNodes + testutil.WaitForResult(func() (bool, error) { msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) return len(out.Nodes) == 1, nil @@ -639,12 +636,13 @@ func TestCatalog_ListNodes_MetaFilter(t *testing.T) { // Now filter on a nonexistent meta k/v pair args = structs.DCSpecificRequest{ - Datacenter: "dc1", - NodeMetaKey: "somekey", - NodeMetaValue: "invalid", + Datacenter: "dc1", + NodeMetaFilters: map[string]string{ + "somekey": "invalid", + }, } out = structs.IndexedNodes{} - err = msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) if err != nil { t.Fatalf("err: %v", err) } @@ -1069,18 +1067,6 @@ func TestCatalog_ListServices_MetaFilter(t *testing.T) { codec := rpcClient(t, s1) defer codec.Close() - // Filter by a specific meta k/v pair - args := structs.DCSpecificRequest{ - Datacenter: "dc1", - NodeMetaKey: "somekey", - NodeMetaValue: "somevalue", - } - var out structs.IndexedServices - err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) - if err != nil { - t.Fatalf("err: %v", err) - } - testutil.WaitForLeader(t, s1.RPC, "dc1") // Add a new node with the right meta k/v pair @@ -1093,6 +1079,14 @@ func TestCatalog_ListServices_MetaFilter(t *testing.T) { t.Fatalf("err: %v", err) } + // Filter by a specific meta k/v pair + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaFilters: map[string]string{ + "somekey": "somevalue", + }, + } + var out structs.IndexedServices if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -1112,12 +1106,13 @@ func TestCatalog_ListServices_MetaFilter(t *testing.T) { // Now filter on a nonexistent meta k/v pair args = structs.DCSpecificRequest{ - Datacenter: "dc1", - NodeMetaKey: "somekey", - NodeMetaValue: "invalid", + Datacenter: "dc1", + NodeMetaFilters: map[string]string{ + "somekey": "invalid", + }, } out = structs.IndexedServices{} - err = msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/state/state_store.go b/consul/state/state_store.go index eb2fcb8ce..b11ac91ee 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -550,7 +550,10 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { } // NodesByMeta is used to return all nodes with the given meta key/value pair. -func (s *StateStore) NodesByMeta(key, value string) (uint64, structs.Nodes, error) { +func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nodes, error) { + if len(filters) > 1 { + return 0, nil, fmt.Errorf("multiple meta filters not supported") + } tx := s.db.Txn(false) defer tx.Abort() @@ -558,7 +561,11 @@ func (s *StateStore) NodesByMeta(key, value string) (uint64, structs.Nodes, erro idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) // Retrieve all of the nodes - nodes, err := tx.Get("nodes", "meta", key, value) + var args []interface{} + for key, value := range filters { + args = append(args, key, value) + } + nodes, err := tx.Get("nodes", "meta", args...) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -781,8 +788,11 @@ func (s *StateStore) Services() (uint64, structs.Services, error) { return idx, results, nil } -// Services returns all services, filtered by given node metadata. -func (s *StateStore) ServicesByNodeMeta(key, value string) (uint64, structs.Services, error) { +// Services returns all services, filtered by the given node metadata. +func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, structs.Services, error) { + if len(filters) > 1 { + return 0, nil, fmt.Errorf("multiple meta filters not supported") + } tx := s.db.Txn(false) defer tx.Abort() @@ -790,7 +800,11 @@ func (s *StateStore) ServicesByNodeMeta(key, value string) (uint64, structs.Serv idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...) // Retrieve all of the nodes with the meta k/v pair - nodes, err := tx.Get("nodes", "meta", key, value) + var args []interface{} + for key, value := range filters { + args = append(args, key, value) + } + nodes, err := tx.Get("nodes", "meta", args...) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 3277cb391..855423067 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -759,7 +759,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { s := testStateStore(t) // Listing with no results returns nil - idx, res, err := s.NodesByMeta("somekey", "somevalue") + idx, res, err := s.NodesByMeta(map[string]string{"somekey": "somevalue"}) if idx != 0 || res != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } @@ -775,7 +775,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { } // Retrieve the node with role=client - idx, nodes, err := s.NodesByMeta("role", "client") + idx, nodes, err := s.NodesByMeta(map[string]string{"role": "client"}) if err != nil { t.Fatalf("err: %s", err) } @@ -800,7 +800,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { } // Retrieve both nodes via their common meta field - idx, nodes, err = s.NodesByMeta("common", "1") + idx, nodes, err = s.NodesByMeta(map[string]string{"common": "1"}) if err != nil { t.Fatalf("err: %s", err) } @@ -1160,7 +1160,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { s := testStateStore(t) // Listing with no results returns nil - idx, res, err := s.ServicesByNodeMeta("somekey", "somevalue") + idx, res, err := s.ServicesByNodeMeta(map[string]string{"somekey": "somevalue"}) if idx != 0 || len(res) != 0 || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } @@ -1196,7 +1196,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { } // Filter the services by the first node's meta value - idx, res, err = s.ServicesByNodeMeta("role", "client") + idx, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client"}) if err != nil { t.Fatalf("err: %s", err) } @@ -1211,7 +1211,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { } // Get all services using the common meta value - idx, res, err = s.ServicesByNodeMeta("common", "1") + idx, res, err = s.ServicesByNodeMeta(map[string]string{"common": "1"}) if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index bb4f6472d..e7dbbd068 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -229,10 +229,9 @@ type QuerySource struct { // DCSpecificRequest is used to query about a specific DC type DCSpecificRequest struct { - Datacenter string - NodeMetaKey string - NodeMetaValue string - Source QuerySource + Datacenter string + NodeMetaFilters map[string]string + Source QuerySource QueryOptions } @@ -288,6 +287,12 @@ type Node struct { } type Nodes []*Node +// Used for filtering nodes by metadata key/value pairs +type NodeMetaFilter struct { + Key string + Value string +} + // Used to return information about a provided services. // Maps service name to available tags type Services map[string][]string From 51a18346674f4a7b500222125215e1cea78f979f Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 11 Jan 2017 16:07:11 -0500 Subject: [PATCH 7/9] Add -node-meta to agent command line options --- command/agent/agent.go | 10 ++++++++++ command/agent/command.go | 12 ++++++++++++ command/agent/command_test.go | 26 ++++++++++++++++++++++++++ command/agent/http.go | 8 ++------ consul/structs/structs.go | 6 ------ 5 files changed, 50 insertions(+), 12 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 582c62570..850733a77 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1710,6 +1710,16 @@ func (a *Agent) loadMetadata(conf *Config) error { return nil } +// parseMetaPair parses a key/value pair of the form key:value +func parseMetaPair(raw string) (string, string) { + pair := strings.SplitN(raw, ":", 2) + if len(pair) == 2 { + return pair[0], pair[1] + } else { + return pair[0], "" + } +} + // validateMeta validates a set of key/value pairs from the agent config func validateMetadata(meta map[string]string) error { if len(meta) > metaMaxKeyPairs { diff --git a/command/agent/command.go b/command/agent/command.go index 03f3b25e1..10e62a77c 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -73,12 +73,14 @@ func (c *Command) readConfig() *Config { var dnsRecursors []string var dev bool var dcDeprecated string + var nodeMeta []string cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError) cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-file", "json file to read config from") cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-dir", "directory of json files to read") cmdFlags.Var((*AppendSliceValue)(&dnsRecursors), "recursor", "address of an upstream DNS server") + cmdFlags.Var((*AppendSliceValue)(&nodeMeta), "node-meta", "arbitrary metadata key/value pair") cmdFlags.BoolVar(&dev, "dev", false, "development server mode") cmdFlags.StringVar(&cmdConfig.LogLevel, "log-level", "", "log level") @@ -161,6 +163,14 @@ func (c *Command) readConfig() *Config { cmdConfig.RetryIntervalWan = dur } + if len(nodeMeta) > 0 { + cmdConfig.Meta = make(map[string]string) + for _, entry := range nodeMeta { + key, value := parseMetaPair(entry) + cmdConfig.Meta[key] = value + } + } + var config *Config if dev { config = DevConfig() @@ -1242,6 +1252,8 @@ Options: will retry indefinitely. -log-level=info Log level of the agent. -node=hostname Name of this node. Must be unique in the cluster + -node-meta=key:value An arbitrary metadata key/value pair for this node. + This can be specified multiple times. -protocol=N Sets the protocol version. Defaults to latest. -rejoin Ignores a previous leave and attempts to rejoin the cluster. -server Switches agent to server mode. diff --git a/command/agent/command_test.go b/command/agent/command_test.go index fc9a2c1e0..5e97406cc 100644 --- a/command/agent/command_test.go +++ b/command/agent/command_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/mitchellh/cli" + "reflect" ) func TestCommand_implements(t *testing.T) { @@ -129,6 +130,7 @@ func TestReadCliConfig(t *testing.T) { "-advertise-wan", "1.2.3.4", "-serf-wan-bind", "4.3.2.1", "-serf-lan-bind", "4.3.2.2", + "-node-meta", "somekey:somevalue", }, ShutdownCh: shutdownCh, Ui: new(cli.MockUi), @@ -144,6 +146,30 @@ func TestReadCliConfig(t *testing.T) { if config.SerfLanBindAddr != "4.3.2.2" { t.Fatalf("expected -serf-lan-bind 4.3.2.2 got %s", config.SerfLanBindAddr) } + if len(config.Meta) != 1 || config.Meta["somekey"] != "somevalue" { + t.Fatalf("expected somekey=somevalue, got %v", config.Meta) + } + } + + // Test multiple node meta flags + { + cmd := &Command{ + args: []string{ + "-data-dir", tmpDir, + "-node-meta", "somekey:somevalue", + "-node-meta", "otherkey:othervalue", + }, + ShutdownCh: shutdownCh, + Ui: new(cli.MockUi), + } + expected := map[string]string{ + "somekey": "somevalue", + "otherkey": "othervalue", + } + config := cmd.readConfig() + if !reflect.DeepEqual(config.Meta, expected) { + t.Fatalf("bad: %v %v", config.Meta, expected) + } } // Test LeaveOnTerm and SkipLeaveOnInt defaults for server mode diff --git a/command/agent/http.go b/command/agent/http.go index 74f1f2531..b228b9c0e 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -590,12 +590,8 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string { if filterList, ok := req.URL.Query()["node-meta"]; ok { filters := make(map[string]string) for _, filter := range filterList { - pair := strings.SplitN(filter, ":", 2) - if len(pair) == 2 { - filters[pair[0]] = pair[1] - } else { - filters[pair[0]] = "" - } + key, value := parseMetaPair(filter) + filters[key] = value } return filters } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index e7dbbd068..e34a8b635 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -287,12 +287,6 @@ type Node struct { } type Nodes []*Node -// Used for filtering nodes by metadata key/value pairs -type NodeMetaFilter struct { - Key string - Value string -} - // Used to return information about a provided services. // Maps service name to available tags type Services map[string][]string From 561d6c71e0413b20d6256b30cca551e3876040eb Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 11 Jan 2017 17:52:31 -0500 Subject: [PATCH 8/9] Update website docs for node metadata --- .../source/docs/agent/http/catalog.html.markdown | 8 ++++++++ website/source/docs/agent/options.html.markdown | 13 +++++++++++++ 2 files changed, 21 insertions(+) diff --git a/website/source/docs/agent/http/catalog.html.markdown b/website/source/docs/agent/http/catalog.html.markdown index efb80b3b9..e5dcaab16 100644 --- a/website/source/docs/agent/http/catalog.html.markdown +++ b/website/source/docs/agent/http/catalog.html.markdown @@ -191,6 +191,10 @@ the node list in ascending order based on the estimated round trip time from that node. Passing `?near=_agent` will use the agent's node for the sort. +Adding the optional `?node-meta=` parameter with a desired node +metadata key/value pair of the form `key:value` will filter the +results to nodes with that pair present. + It returns a JSON body like this: ```javascript @@ -222,6 +226,10 @@ This endpoint is hit with a `GET` and returns the services registered in a given DC. By default, the datacenter of the agent is queried; however, the `dc` can be provided using the `?dc=` query parameter. +Adding the optional `?node-meta=` parameter with a desired node +metadata key/value pair of the form `key:value` will filter the +results to services with that pair present. + It returns a JSON body like this: ```javascript diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index f4145ceee..3a7d6f8c0 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -251,6 +251,15 @@ will exit with an error at startup. * `-node` - The name of this node in the cluster. This must be unique within the cluster. By default this is the hostname of the machine. +* `-node-meta` - An arbitrary metadata key/value pair + to associate with the node, of the form `key:value`. This can be specified multiple times. Node metadata + pairs have the following restrictions: + - A maximum of 64 key/value pairs can be registered per node. + - Metadata keys must be between 1 and 128 characters (inclusive) in length + - Metadata keys must contain only alphanumeric, `-`, and `_` characters. + - Metadata keys must not begin with the `consul-` prefix; that is reserved for internal use by Consul. + - Metadata values must be between 0 and 512 (inclusive) characters in length. + * `-pid-file` - This flag provides the file path for the agent to store its PID. This is useful for sending signals (for example, `SIGINT` to close the agent or `SIGHUP` to update check definite @@ -658,6 +667,10 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `node_name` Equivalent to the [`-node` command-line flag](#_node). +* `node_meta` This object allows associating arbitrary + metadata key/value pairs with the local node, which can then be used for filtering results from certain + catalog endpoints. See the [`-node-meta` command-line flag](#_node_meta) for more information. + * `performance` Available in Consul 0.7 and later, this is a nested object that allows tuning the performance of different subsystems in Consul. See the [Server Performance](/docs/guides/performance.html) guide for more details. The From 15f008b3e3b2288967de1e7e35aedcc1e1407282 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 11 Jan 2017 18:44:13 -0500 Subject: [PATCH 9/9] Update client api and docs for node metadata --- api/api.go | 10 ++ api/catalog.go | 3 + api/catalog_test.go | 113 ++++++++++++++++++ testutil/server.go | 1 + .../docs/agent/http/agent.html.markdown | 6 + .../docs/agent/http/catalog.html.markdown | 32 ++++- .../docs/agent/http/health.html.markdown | 3 + .../source/docs/agent/options.html.markdown | 21 +++- 8 files changed, 177 insertions(+), 12 deletions(-) diff --git a/api/api.go b/api/api.go index 9587043a4..9a59b724c 100644 --- a/api/api.go +++ b/api/api.go @@ -74,6 +74,11 @@ type QueryOptions struct { // that node. Setting this to "_agent" will use the agent's node // for the sort. Near string + + // NodeMeta is used to filter results by nodes with the given + // metadata key/value pairs. Currently, only one key/value pair can + // be provided for filtering. + NodeMeta map[string]string } // WriteOptions are used to parameterize a write @@ -386,6 +391,11 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.Near != "" { r.params.Set("near", q.Near) } + if len(q.NodeMeta) > 0 { + for key, value := range q.NodeMeta { + r.params.Add("node-meta", key+":"+value) + } + } } // durToMsec converts a duration to a millisecond specified string. If the diff --git a/api/catalog.go b/api/catalog.go index 56f0dbf69..10e93b42d 100644 --- a/api/catalog.go +++ b/api/catalog.go @@ -4,12 +4,14 @@ type Node struct { Node string Address string TaggedAddresses map[string]string + Meta map[string]string } type CatalogService struct { Node string Address string TaggedAddresses map[string]string + NodeMeta map[string]string ServiceID string ServiceName string ServiceAddress string @@ -29,6 +31,7 @@ type CatalogRegistration struct { Node string Address string TaggedAddresses map[string]string + NodeMeta map[string]string Datacenter string Service *AgentService Check *AgentCheck diff --git a/api/catalog_test.go b/api/catalog_test.go index e37d3dd50..527153b32 100644 --- a/api/catalog_test.go +++ b/api/catalog_test.go @@ -60,6 +60,64 @@ func TestCatalog_Nodes(t *testing.T) { }) } +func TestCatalog_Nodes_MetaFilter(t *testing.T) { + meta := map[string]string{"somekey": "somevalue"} + c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.NodeMeta = meta + }) + defer s.Stop() + + catalog := c.Catalog() + + // Make sure we get the node back when filtering by its metadata + testutil.WaitForResult(func() (bool, error) { + nodes, meta, err := catalog.Nodes(&QueryOptions{NodeMeta: meta}) + if err != nil { + return false, err + } + + if meta.LastIndex == 0 { + return false, fmt.Errorf("Bad: %v", meta) + } + + if len(nodes) == 0 { + return false, fmt.Errorf("Bad: %v", nodes) + } + + if _, ok := nodes[0].TaggedAddresses["wan"]; !ok { + return false, fmt.Errorf("Bad: %v", nodes[0]) + } + + if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" { + return false, fmt.Errorf("Bad: %v", nodes[0].Meta) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // Get nothing back when we use an invalid filter + testutil.WaitForResult(func() (bool, error) { + nodes, meta, err := catalog.Nodes(&QueryOptions{NodeMeta: map[string]string{"nope":"nope"}}) + if err != nil { + return false, err + } + + if meta.LastIndex == 0 { + return false, fmt.Errorf("Bad: %v", meta) + } + + if len(nodes) != 0 { + return false, fmt.Errorf("Bad: %v", nodes) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + func TestCatalog_Services(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -87,6 +145,56 @@ func TestCatalog_Services(t *testing.T) { }) } +func TestCatalog_Services_NodeMetaFilter(t *testing.T) { + meta := map[string]string{"somekey": "somevalue"} + c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.NodeMeta = meta + }) + defer s.Stop() + + catalog := c.Catalog() + + // Make sure we get the service back when filtering by the node's metadata + testutil.WaitForResult(func() (bool, error) { + services, meta, err := catalog.Services(&QueryOptions{NodeMeta: meta}) + if err != nil { + return false, err + } + + if meta.LastIndex == 0 { + return false, fmt.Errorf("Bad: %v", meta) + } + + if len(services) == 0 { + return false, fmt.Errorf("Bad: %v", services) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // Get nothing back when using an invalid filter + testutil.WaitForResult(func() (bool, error) { + services, meta, err := catalog.Services(&QueryOptions{NodeMeta: map[string]string{"nope":"nope"}}) + if err != nil { + return false, err + } + + if meta.LastIndex == 0 { + return false, fmt.Errorf("Bad: %v", meta) + } + + if len(services) != 0 { + return false, fmt.Errorf("Bad: %v", services) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + func TestCatalog_Service(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -173,6 +281,7 @@ func TestCatalog_Registration(t *testing.T) { Datacenter: "dc1", Node: "foobar", Address: "192.168.10.10", + NodeMeta: map[string]string{"somekey": "somevalue"}, Service: service, Check: check, } @@ -200,6 +309,10 @@ func TestCatalog_Registration(t *testing.T) { return false, fmt.Errorf("missing checkid service:redis1") } + if v, ok := node.Node.Meta["somekey"]; !ok || v != "somevalue" { + return false, fmt.Errorf("missing node meta pair somekey:somevalue") + } + return true, nil }, func(err error) { t.Fatalf("err: %s", err) diff --git a/testutil/server.go b/testutil/server.go index 8ab196eca..e90121781 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -53,6 +53,7 @@ type TestAddressConfig struct { // TestServerConfig is the main server configuration struct. type TestServerConfig struct { NodeName string `json:"node_name"` + NodeMeta map[string]string `json:"node_meta"` Performance *TestPerformanceConfig `json:"performance,omitempty"` Bootstrap bool `json:"bootstrap,omitempty"` Server bool `json:"server,omitempty"` diff --git a/website/source/docs/agent/http/agent.html.markdown b/website/source/docs/agent/http/agent.html.markdown index c1f6e4775..b01a30033 100644 --- a/website/source/docs/agent/http/agent.html.markdown +++ b/website/source/docs/agent/http/agent.html.markdown @@ -128,6 +128,8 @@ This endpoint is used to return the configuration and member information of the Consul 0.7.0 and later also includes a snapshot of various operating statistics under the `Stats` key. These statistics are intended to help human operators for debugging and may change over time, so this part of the interface should not be consumed programmatically. +Consul 0.7.3 and later also includes a block of user-defined node metadata values under the `Meta` key. These are arbitrary key/value pairs defined in the [node meta](/docs/agent/options.html#_node_meta) section of the agent configuration. + It returns a JSON body like this: ```javascript @@ -194,6 +196,10 @@ It returns a JSON body like this: "DelegateMin": 2, "DelegateMax": 4, "DelegateCur": 4 + }, + "Meta": { + "instance_type": "i2.xlarge", + "os_version": "ubuntu_16.04", } } ``` diff --git a/website/source/docs/agent/http/catalog.html.markdown b/website/source/docs/agent/http/catalog.html.markdown index e5dcaab16..0a7a06629 100644 --- a/website/source/docs/agent/http/catalog.html.markdown +++ b/website/source/docs/agent/http/catalog.html.markdown @@ -44,6 +44,9 @@ body must look something like: "lan": "192.168.10.10", "wan": "10.0.10.10" }, + "NodeMeta": { + "somekey": "somevalue" + }, "Service": { "ID": "redis1", "Service": "redis", @@ -73,6 +76,10 @@ the node with the catalog. `TaggedAddresses` can be used in conjunction with the option and the `wan` address. The `lan` address was added in Consul 0.7 to help find the LAN address if address translation is enabled. +The `Meta` block was added in Consul 0.7.3 to enable associating arbitrary metadata +key/value pairs with a node for filtering purposes. For more information on node metadata, +see the [node meta](/docs/agent/options.html#_node_meta) section of the configuration page. + If the `Service` key is provided, the service will also be registered. If `ID` is not provided, it will be defaulted to the value of the `Service.Service` property. Only one service with a given `ID` may be present per node. The service `Tags`, `Address`, @@ -191,9 +198,9 @@ the node list in ascending order based on the estimated round trip time from that node. Passing `?near=_agent` will use the agent's node for the sort. -Adding the optional `?node-meta=` parameter with a desired node -metadata key/value pair of the form `key:value` will filter the -results to nodes with that pair present. +In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be +provided with a desired node metadata key/value pair of the form `key:value`. +This will filter the results to nodes with that pair present. It returns a JSON body like this: @@ -205,6 +212,9 @@ It returns a JSON body like this: "TaggedAddresses": { "lan": "10.1.10.11", "wan": "10.1.10.11" + }, + "Meta": { + "instance_type": "t2.medium" } }, { @@ -213,6 +223,9 @@ It returns a JSON body like this: "TaggedAddresses": { "lan": "10.1.10.11", "wan": "10.1.10.12" + }, + "Meta": { + "instance_type": "t2.large" } } ] @@ -226,9 +239,9 @@ This endpoint is hit with a `GET` and returns the services registered in a given DC. By default, the datacenter of the agent is queried; however, the `dc` can be provided using the `?dc=` query parameter. -Adding the optional `?node-meta=` parameter with a desired node -metadata key/value pair of the form `key:value` will filter the -results to services with that pair present. +In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be +provided with a desired node metadata key/value pair of the form `key:value`. +This will filter the results to services with that pair present. It returns a JSON body like this: @@ -273,6 +286,9 @@ It returns a JSON body like this: "lan": "192.168.10.10", "wan": "10.0.10.10" }, + "Meta": { + "instance_type": "t2.medium" + } "CreateIndex": 51, "ModifyIndex": 51, "Node": "foobar", @@ -294,6 +310,7 @@ The returned fields are as follows: - `Address`: IP address of the Consul node on which the service is registered - `TaggedAddresses`: List of explicit LAN and WAN IP addresses for the agent +- `Meta`: Added in Consul 0.7.3, a list of user-defined metadata key/value pairs for the node - `CreateIndex`: Internal index value representing when the service was created - `ModifyIndex`: Last index that modified the service - `Node`: Node name of the Consul node on which the service is registered @@ -321,6 +338,9 @@ It returns a JSON body like this: "TaggedAddresses": { "lan": "10.1.10.12", "wan": "10.1.10.12" + }, + "Meta": { + "instance_type": "t2.medium" } }, "Services": { diff --git a/website/source/docs/agent/http/health.html.markdown b/website/source/docs/agent/http/health.html.markdown index 9c8767ef7..e9f18c641 100644 --- a/website/source/docs/agent/http/health.html.markdown +++ b/website/source/docs/agent/http/health.html.markdown @@ -131,6 +131,9 @@ It returns a JSON body like this: "TaggedAddresses": { "lan": "10.1.10.12", "wan": "10.1.10.12" + }, + "Meta": { + "instance_type": "t2.medium" } }, "Service": { diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 3a7d6f8c0..0f9196359 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -251,9 +251,9 @@ will exit with an error at startup. * `-node` - The name of this node in the cluster. This must be unique within the cluster. By default this is the hostname of the machine. -* `-node-meta` - An arbitrary metadata key/value pair - to associate with the node, of the form `key:value`. This can be specified multiple times. Node metadata - pairs have the following restrictions: +* `-node-meta` - Available in Consul 0.7.3 and later, + this specifies an arbitrary metadata key/value pair to associate with the node, of the form `key:value`. + This can be specified multiple times. Node metadata pairs have the following restrictions: - A maximum of 64 key/value pairs can be registered per node. - Metadata keys must be between 1 and 128 characters (inclusive) in length - Metadata keys must contain only alphanumeric, `-`, and `_` characters. @@ -667,9 +667,18 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `node_name` Equivalent to the [`-node` command-line flag](#_node). -* `node_meta` This object allows associating arbitrary - metadata key/value pairs with the local node, which can then be used for filtering results from certain - catalog endpoints. See the [`-node-meta` command-line flag](#_node_meta) for more information. +* `node_meta` Available in Consul 0.7.3 and later, + This object allows associating arbitrary metadata key/value pairs with the local node, which can + then be used for filtering results from certain catalog endpoints. See the + [`-node-meta` command-line flag](#_node_meta) for more information. + + ```javascript + { + "node_meta": { + "instance_type": "t2.medium" + } + } + ``` * `performance` Available in Consul 0.7 and later, this is a nested object that allows tuning the performance of different subsystems in