diff --git a/command/agent/agent.go b/command/agent/agent.go index 1a0cf207a..9b8c5be09 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1687,6 +1687,9 @@ func (a *Agent) loadMetadata(conf *Config) error { defer a.state.Unlock() for key, value := range conf.Meta { + if key == "" { + return fmt.Errorf("Key name cannot be blank") + } if strings.Contains(key, ":") { return fmt.Errorf("Key name cannot contain ':' character: %s", key) } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index b755f94d6..4c14608c6 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -201,7 +201,12 @@ func TestAgent_Checks_ACLFilter(t *testing.T) { } func TestAgent_Self(t *testing.T) { - dir, srv := makeHTTPServer(t) + meta := map[string]string{ + "somekey": "somevalue", + } + dir, srv := makeHTTPServerWithConfig(t, func(conf *Config) { + conf.Meta = meta + }) defer os.RemoveAll(dir) defer srv.Shutdown() defer srv.agent.Shutdown() @@ -232,6 +237,9 @@ func TestAgent_Self(t *testing.T) { if !reflect.DeepEqual(c, val.Coord) { t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord) } + if !reflect.DeepEqual(meta, val.Meta) { + t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta) + } srv.agent.config.DisableCoordinates = true obj, err = srv.AgentSelf(nil, req) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index c84a5b008..23a9ea006 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1851,6 +1851,50 @@ func TestAgent_purgeCheckState(t *testing.T) { } } +func TestAgent_metadata(t *testing.T) { + config := nextConfig() + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + // Load a valid set of key/value pairs + config.Meta = map[string]string{ + "key1": "value1", + "key2": "value2", + } + if err := agent.loadMetadata(config); err != nil { + t.Fatalf("err: %s", err) + } + agent.unloadMetadata() + + // Should fail, keys can't be blank + config.Meta = map[string]string{ + "": "value1", + } + if err := agent.loadMetadata(config); err == nil { + t.Fatalf("should have failed") + } + agent.unloadMetadata() + + // Should fail, keys can't contain ':' + config.Meta = map[string]string{ + "key:123": "value1", + } + if err := agent.loadMetadata(config); err == nil { + t.Fatalf("should have failed") + } + agent.unloadMetadata() + + // Should fail, keys can't begin with 'consul-' + config.Meta = map[string]string{ + "consul-key": "value1", + } + if err := agent.loadMetadata(config); err == nil { + t.Fatalf("should have failed") + } + agent.unloadMetadata() +} + func TestAgent_GetCoordinate(t *testing.T) { check := func(server bool) { config := nextConfig() diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 1d1136e1f..97f706d8a 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -145,6 +145,53 @@ func TestCatalogNodes(t *testing.T) { } } +func TestCatalogNodes_metaFilter(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + // Register a node with a meta field + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + }, + } + + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err := http.NewRequest("GET", "/v1/catalog/nodes?node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Verify an index is set + assertIndex(t, resp) + + // Verify we only get the node with the correct meta field back + nodes := obj.(structs.Nodes) + if len(nodes) != 1 { + t.Fatalf("bad: %v", obj) + } + if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" { + t.Fatalf("bad: %v", nodes[0].Meta) + } +} + func TestCatalogNodes_WanTranslation(t *testing.T) { dir1, srv1 := makeHTTPServerWithConfig(t, func(c *Config) { diff --git a/command/agent/config.go b/command/agent/config.go index c4f35f3d0..9dfa083c6 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -713,6 +713,7 @@ func DefaultConfig() *Config { Telemetry: Telemetry{ StatsitePrefix: "consul", }, + Meta: make(map[string]string), SyslogFacility: "LOCAL0", Protocol: consul.ProtocolVersion2Compatible, CheckUpdateInterval: 5 * time.Minute, diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 089244287..79e8b41b2 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -985,6 +985,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { func TestAgentAntiEntropy_NodeInfo(t *testing.T) { conf := nextConfig() + conf.Meta["somekey"] = "somevalue" dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) defer agent.Shutdown() @@ -1020,7 +1021,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { // Make sure we synced our node info - this should have ridden on the // "consul" service sync addrs := services.NodeServices.Node.TaggedAddresses - if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + meta := services.NodeServices.Node.Meta + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) || !reflect.DeepEqual(meta, conf.Meta) { return false, fmt.Errorf("bad: %v", addrs) } @@ -1044,7 +1046,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { return false, fmt.Errorf("err: %v", err) } addrs := services.NodeServices.Node.TaggedAddresses - if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + meta := services.NodeServices.Node.Meta + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) || !reflect.DeepEqual(meta, conf.Meta) { return false, fmt.Errorf("bad: %v", addrs) } diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 565155899..237d1b826 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -156,14 +156,6 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return err } - var metaFilter []interface{} - if args.NodeMetaKey != "" { - metaFilter = append(metaFilter, args.NodeMetaKey) - if args.NodeMetaValue != "" { - metaFilter = append(metaFilter, args.NodeMetaValue) - } - } - // Get the list of nodes. state := c.srv.fsm.State() return c.srv.blockingRPC( @@ -171,7 +163,14 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde &reply.QueryMeta, state.GetQueryWatch("Nodes"), func() error { - index, nodes, err := state.Nodes(metaFilter...) + var index uint64 + var nodes structs.Nodes + var err error + if args.NodeMetaKey != "" { + index, nodes, err = state.NodesByMeta(args.NodeMetaKey, args.NodeMetaValue) + } else { + index, nodes, err = state.Nodes() + } if err != nil { return err } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index c0a329af3..f0152eece 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -592,6 +592,72 @@ func TestCatalog_ListNodes(t *testing.T) { } } +func TestCatalog_ListNodes_MetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Filter by a specific meta k/v pair + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "somevalue", + } + var out structs.IndexedNodes + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Add a new node with the right meta k/v pair + node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue"}} + if err := s1.fsm.State().EnsureNode(1, node); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + return len(out.Nodes) == 1, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Verify that only the correct node was returned + if out.Nodes[0].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[0].Address != "127.0.0.1" { + t.Fatalf("bad: %v", out) + } + if v, ok := out.Nodes[0].Meta["somekey"]; !ok || v != "somevalue" { + t.Fatalf("bad: %v", out) + } + + // Now filter on a nonexistent meta k/v pair + args = structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "invalid", + } + out = structs.IndexedNodes{} + err = msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should get an empty list of nodes back + testutil.WaitForResult(func() (bool, error) { + msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + return len(out.Nodes) == 0, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestCatalog_ListNodes_StaleRaad(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index c1bb2d0f6..88f811e81 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -528,7 +528,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { } // Nodes is used to return all of the known nodes. -func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, error) { +func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -536,13 +536,29 @@ func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, er idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) // Retrieve all of the nodes - var nodes memdb.ResultIterator - var err error - if len(metaFilter) > 0 { - nodes, err = tx.Get("nodes", "meta", metaFilter...) - } else { - nodes, err = tx.Get("nodes", "id") + nodes, err := tx.Get("nodes", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } + + // Create and return the nodes list. + var results structs.Nodes + for node := nodes.Next(); node != nil; node = nodes.Next() { + results = append(results, node.(*structs.Node)) + } + return idx, results, nil +} + +// NodesByMeta is used to return all nodes with the given meta key/value pair. +func (s *StateStore) NodesByMeta(key, value string) (uint64, structs.Nodes, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) + + // Retrieve all of the nodes + nodes, err := tx.Get("nodes", "meta", key, value) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 1354d8920..d932b1b24 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -241,6 +241,9 @@ func TestStateStore_EnsureRegistration(t *testing.T) { TaggedAddresses: map[string]string{ "hello": "world", }, + NodeMeta: map[string]string{ + "somekey": "somevalue", + }, } if err := s.EnsureRegistration(1, req); err != nil { t.Fatalf("err: %s", err) @@ -255,6 +258,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { if out.Node != "node1" || out.Address != "1.2.3.4" || len(out.TaggedAddresses) != 1 || out.TaggedAddresses["hello"] != "world" || + out.Meta["somekey"] != "somevalue" || out.CreateIndex != created || out.ModifyIndex != modified { t.Fatalf("bad node returned: %#v", out) } @@ -751,6 +755,113 @@ func BenchmarkGetNodes(b *testing.B) { } } +func TestStateStore_GetNodesByMeta(t *testing.T) { + s := testStateStore(t) + + // Listing with no results returns nil + idx, res, err := s.NodesByMeta("somekey", "somevalue") + if idx != 0 || res != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) + } + + // Create some nodes in the state store + node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}} + if err := s.EnsureNode(0, node0); err != nil { + t.Fatalf("err: %v", err) + } + node1 := &structs.Node{Node: "node1", Address: "127.0.0.1", Meta: map[string]string{"role": "server", "common": "1"}} + if err := s.EnsureNode(1, node1); err != nil { + t.Fatalf("err: %v", err) + } + + // Retrieve the node with role=client + _, nodes, err := s.NodesByMeta("role", "client") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Only one node was returned + if n := len(nodes); n != 1 { + t.Fatalf("bad node count: %d", n) + } + + // Make sure the node is correct + if nodes[0].CreateIndex != 0 || nodes[0].ModifyIndex != 0 { + t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex) + } + if nodes[0].Node != "node0" { + t.Fatalf("bad: %#v", nodes[0]) + } + if !reflect.DeepEqual(nodes[0].Meta, node0.Meta) { + t.Fatalf("bad: %v != %v", nodes[0].Meta, node0.Meta) + } + + // Retrieve the node with role=server + _, nodes, err = s.NodesByMeta("role", "server") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Only one node was returned + if n := len(nodes); n != 1 { + t.Fatalf("bad node count: %d", n) + } + + // Make sure the node is correct + if nodes[0].CreateIndex != 1 || nodes[0].ModifyIndex != 1 { + t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex) + } + if nodes[0].Node != "node1" { + t.Fatalf("bad: %#v", nodes[0]) + } + if !reflect.DeepEqual(nodes[0].Meta, node1.Meta) { + t.Fatalf("bad: %v != %v", nodes[0].Meta, node1.Meta) + } + + // Retrieve both nodes via their common meta field + _, nodes, err = s.NodesByMeta("common", "1") + if err != nil { + t.Fatalf("err: %s", err) + } + + // All nodes were returned + if n := len(nodes); n != 2 { + t.Fatalf("bad node count: %d", n) + } + + // Make sure the nodes match + for i, node := range nodes { + if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) { + t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex) + } + name := fmt.Sprintf("node%d", i) + if node.Node != name { + t.Fatalf("bad: %#v", node) + } + if v, ok := node.Meta["common"]; !ok || v != "1" { + t.Fatalf("bad: %v", node.Meta) + } + } +} + +func BenchmarkGetNodesByMeta(b *testing.B) { + s, err := NewStateStore(nil) + if err != nil { + b.Fatalf("err: %s", err) + } + + if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + b.Fatalf("err: %v", err) + } + if err := s.EnsureNode(101, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { + b.Fatalf("err: %v", err) + } + + for i := 0; i < b.N; i++ { + s.Nodes() + } +} + func TestStateStore_DeleteNode(t *testing.T) { s := testStateStore(t)