Merge pull request #2643 from hashicorp/f-node-metadata

Node metadata
This commit is contained in:
Kyle Havlovitz 2017-01-11 20:29:24 -05:00 committed by GitHub
commit 821735d175
33 changed files with 1253 additions and 47 deletions

View file

@ -74,6 +74,11 @@ type QueryOptions struct {
// that node. Setting this to "_agent" will use the agent's node
// for the sort.
Near string
// NodeMeta is used to filter results by nodes with the given
// metadata key/value pairs. Currently, only one key/value pair can
// be provided for filtering.
NodeMeta map[string]string
}
// WriteOptions are used to parameterize a write
@ -386,6 +391,11 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Near != "" {
r.params.Set("near", q.Near)
}
if len(q.NodeMeta) > 0 {
for key, value := range q.NodeMeta {
r.params.Add("node-meta", key+":"+value)
}
}
}
// durToMsec converts a duration to a millisecond specified string. If the

View file

@ -4,12 +4,14 @@ type Node struct {
Node string
Address string
TaggedAddresses map[string]string
Meta map[string]string
}
type CatalogService struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
ServiceID string
ServiceName string
ServiceAddress string
@ -29,6 +31,7 @@ type CatalogRegistration struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
Datacenter string
Service *AgentService
Check *AgentCheck

View file

@ -60,6 +60,64 @@ func TestCatalog_Nodes(t *testing.T) {
})
}
func TestCatalog_Nodes_MetaFilter(t *testing.T) {
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
catalog := c.Catalog()
// Make sure we get the node back when filtering by its metadata
testutil.WaitForResult(func() (bool, error) {
nodes, meta, err := catalog.Nodes(&QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(nodes) == 0 {
return false, fmt.Errorf("Bad: %v", nodes)
}
if _, ok := nodes[0].TaggedAddresses["wan"]; !ok {
return false, fmt.Errorf("Bad: %v", nodes[0])
}
if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" {
return false, fmt.Errorf("Bad: %v", nodes[0].Meta)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Get nothing back when we use an invalid filter
testutil.WaitForResult(func() (bool, error) {
nodes, meta, err := catalog.Nodes(&QueryOptions{NodeMeta: map[string]string{"nope":"nope"}})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(nodes) != 0 {
return false, fmt.Errorf("Bad: %v", nodes)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Services(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -87,6 +145,56 @@ func TestCatalog_Services(t *testing.T) {
})
}
func TestCatalog_Services_NodeMetaFilter(t *testing.T) {
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
catalog := c.Catalog()
// Make sure we get the service back when filtering by the node's metadata
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Services(&QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(services) == 0 {
return false, fmt.Errorf("Bad: %v", services)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Get nothing back when using an invalid filter
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Services(&QueryOptions{NodeMeta: map[string]string{"nope":"nope"}})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(services) != 0 {
return false, fmt.Errorf("Bad: %v", services)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Service(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -173,6 +281,7 @@ func TestCatalog_Registration(t *testing.T) {
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: service,
Check: check,
}
@ -200,6 +309,10 @@ func TestCatalog_Registration(t *testing.T) {
return false, fmt.Errorf("missing checkid service:redis1")
}
if v, ok := node.Node.Meta["somekey"]; !ok || v != "somevalue" {
return false, fmt.Errorf("missing node meta pair somekey:somevalue")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)

View file

@ -42,11 +42,26 @@ const (
"but no reason was provided. This is a default message."
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
"service, but no reason was provided. This is a default message."
// The meta key prefix reserved for Consul's internal use
metaKeyReservedPrefix = "consul-"
// The maximum number of metadata key pairs allowed to be registered
metaMaxKeyPairs = 64
// The maximum allowed length of a metadata key
metaKeyMaxLength = 128
// The maximum allowed length of a metadata value
metaValueMaxLength = 512
)
var (
// dnsNameRe checks if a name or tag is dns-compatible.
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
// metaKeyFormat checks if a metadata key string is valid
metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
)
/*
@ -246,13 +261,16 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter,
return nil, err
}
// Load checks/services.
// Load checks/services/metadata.
if err := agent.loadServices(config); err != nil {
return nil, err
}
if err := agent.loadChecks(config); err != nil {
return nil, err
}
if err := agent.loadMetadata(config); err != nil {
return nil, err
}
// Start watching for critical services to deregister, based on their
// checks.
@ -1677,6 +1695,74 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
}
}
// loadMetadata loads node metadata fields from the agent config and
// updates them on the local agent.
func (a *Agent) loadMetadata(conf *Config) error {
a.state.Lock()
defer a.state.Unlock()
for key, value := range conf.Meta {
a.state.metadata[key] = value
}
a.state.changeMade()
return nil
}
// parseMetaPair parses a key/value pair of the form key:value
func parseMetaPair(raw string) (string, string) {
pair := strings.SplitN(raw, ":", 2)
if len(pair) == 2 {
return pair[0], pair[1]
} else {
return pair[0], ""
}
}
// validateMeta validates a set of key/value pairs from the agent config
func validateMetadata(meta map[string]string) error {
if len(meta) > metaMaxKeyPairs {
return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs)
}
for key, value := range meta {
if err := validateMetaPair(key, value); err != nil {
return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err)
}
}
return nil
}
// validateMetaPair checks that the given key/value pair is in a valid format
func validateMetaPair(key, value string) error {
if key == "" {
return fmt.Errorf("Key cannot be blank")
}
if !metaKeyFormat(key) {
return fmt.Errorf("Key contains invalid characters")
}
if len(key) > metaKeyMaxLength {
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
}
if strings.HasPrefix(key, metaKeyReservedPrefix) {
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
}
if len(value) > metaValueMaxLength {
return fmt.Errorf("Value is too long (limit: %d characters)", metaValueMaxLength)
}
return nil
}
// unloadMetadata resets the local metadata state
func (a *Agent) unloadMetadata() {
a.state.Lock()
defer a.state.Unlock()
a.state.metadata = make(map[string]string)
}
// serviceMaintCheckID returns the ID of a given service's maintenance check
func serviceMaintCheckID(serviceID string) types.CheckID {
return types.CheckID(structs.ServiceMaintPrefix + serviceID)

View file

@ -20,6 +20,7 @@ type AgentSelf struct {
Coord *coordinate.Coordinate
Member serf.Member
Stats map[string]map[string]string
Meta map[string]string
}
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -47,6 +48,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
Coord: c,
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(),
}, nil
}

View file

@ -201,7 +201,12 @@ func TestAgent_Checks_ACLFilter(t *testing.T) {
}
func TestAgent_Self(t *testing.T) {
dir, srv := makeHTTPServer(t)
meta := map[string]string{
"somekey": "somevalue",
}
dir, srv := makeHTTPServerWithConfig(t, func(conf *Config) {
conf.Meta = meta
})
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
@ -232,6 +237,9 @@ func TestAgent_Self(t *testing.T) {
if !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
}
if !reflect.DeepEqual(meta, val.Meta) {
t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta)
}
srv.agent.config.DisableCoordinates = true
obj, err = srv.AgentSelf(nil, req)

View file

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/raft"
"strings"
)
const (
@ -1851,6 +1852,69 @@ func TestAgent_purgeCheckState(t *testing.T) {
}
}
func TestAgent_metadata(t *testing.T) {
// Load a valid set of key/value pairs
meta := map[string]string{
"key1": "value1",
"key2": "value2",
}
// Should succeed
if err := validateMetadata(meta); err != nil {
t.Fatalf("err: %s", err)
}
// Should get error
meta = map[string]string{
"": "value1",
}
if err := validateMetadata(meta); !strings.Contains(err.Error(), "Couldn't load metadata pair") {
t.Fatalf("should have failed")
}
// Should get error
meta = make(map[string]string)
for i := 0; i < metaMaxKeyPairs+1; i++ {
meta[string(i)] = "value"
}
if err := validateMetadata(meta); !strings.Contains(err.Error(), "cannot contain more than") {
t.Fatalf("should have failed")
}
}
func TestAgent_validateMetaPair(t *testing.T) {
longKey := strings.Repeat("a", metaKeyMaxLength+1)
longValue := strings.Repeat("b", metaValueMaxLength+1)
pairs := []struct {
Key string
Value string
Error string
}{
// valid pair
{"key", "value", ""},
// invalid, blank key
{"", "value", "cannot be blank"},
// allowed special chars in key name
{"k_e-y", "value", ""},
// disallowed special chars in key name
{"(%key&)", "value", "invalid characters"},
// key too long
{longKey, "value", "Key is too long"},
// reserved prefix
{metaKeyReservedPrefix + "key", "value", "reserved for internal use"},
// value too long
{"key", longValue, "Value is too long"},
}
for _, pair := range pairs {
err := validateMetaPair(pair.Key, pair.Value)
if pair.Error == "" && err != nil {
t.Fatalf("should have succeeded: %v, %v", pair, err)
} else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) {
t.Fatalf("should have failed: %v, %v", pair, err)
}
}
}
func TestAgent_GetCoordinate(t *testing.T) {
check := func(server bool) {
config := nextConfig()

View file

@ -64,6 +64,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (
// Setup the request
args := structs.DCSpecificRequest{}
s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
@ -85,6 +86,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (
func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.DCSpecificRequest{}
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

View file

@ -145,6 +145,53 @@ func TestCatalogNodes(t *testing.T) {
}
}
func TestCatalogNodes_MetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
// Register a node with a meta field
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("GET", "/v1/catalog/nodes?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.CatalogNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Verify an index is set
assertIndex(t, resp)
// Verify we only get the node with the correct meta field back
nodes := obj.(structs.Nodes)
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
}
if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" {
t.Fatalf("bad: %v", nodes[0].Meta)
}
}
func TestCatalogNodes_WanTranslation(t *testing.T) {
dir1, srv1 := makeHTTPServerWithConfig(t,
func(c *Config) {
@ -449,6 +496,54 @@ func TestCatalogServices(t *testing.T) {
}
}
func TestCatalogServices_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
// Register node
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
},
Service: &structs.NodeService{
Service: "api",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("GET", "/v1/catalog/services?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.CatalogServices(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
services := obj.(structs.Services)
if len(services) != 1 {
t.Fatalf("bad: %v", obj)
}
if _, ok := services[args.Service.Service]; !ok {
t.Fatalf("bad: %v", services)
}
}
func TestCatalogServiceNodes(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)

View file

@ -73,12 +73,14 @@ func (c *Command) readConfig() *Config {
var dnsRecursors []string
var dev bool
var dcDeprecated string
var nodeMeta []string
cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-file", "json file to read config from")
cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-dir", "directory of json files to read")
cmdFlags.Var((*AppendSliceValue)(&dnsRecursors), "recursor", "address of an upstream DNS server")
cmdFlags.Var((*AppendSliceValue)(&nodeMeta), "node-meta", "arbitrary metadata key/value pair")
cmdFlags.BoolVar(&dev, "dev", false, "development server mode")
cmdFlags.StringVar(&cmdConfig.LogLevel, "log-level", "", "log level")
@ -161,6 +163,14 @@ func (c *Command) readConfig() *Config {
cmdConfig.RetryIntervalWan = dur
}
if len(nodeMeta) > 0 {
cmdConfig.Meta = make(map[string]string)
for _, entry := range nodeMeta {
key, value := parseMetaPair(entry)
cmdConfig.Meta[key] = value
}
}
var config *Config
if dev {
config = DevConfig()
@ -364,6 +374,12 @@ func (c *Command) readConfig() *Config {
}
}
// Verify the node metadata entries are valid
if err := validateMetadata(config.Meta); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
return nil
}
// Set the version info
config.Revision = c.Revision
config.Version = c.Version
@ -1071,7 +1087,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
snap := c.agent.snapshotCheckState()
defer c.agent.restoreCheckState(snap)
// First unload all checks and services. This lets us begin the reload
// First unload all checks, services, and metadata. This lets us begin the reload
// with a clean slate.
if err := c.agent.unloadServices(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err))
@ -1081,8 +1097,9 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err))
return nil, errs
}
c.agent.unloadMetadata()
// Reload services and check definitions.
// Reload service/check definitions and metadata.
if err := c.agent.loadServices(newConf); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err))
return nil, errs
@ -1091,6 +1108,10 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err))
return nil, errs
}
if err := c.agent.loadMetadata(newConf); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err))
return nil, errs
}
// Get the new client listener addr
httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
@ -1231,6 +1252,8 @@ Options:
will retry indefinitely.
-log-level=info Log level of the agent.
-node=hostname Name of this node. Must be unique in the cluster
-node-meta=key:value An arbitrary metadata key/value pair for this node.
This can be specified multiple times.
-protocol=N Sets the protocol version. Defaults to latest.
-rejoin Ignores a previous leave and attempts to rejoin the cluster.
-server Switches agent to server mode.

View file

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil"
"github.com/mitchellh/cli"
"reflect"
)
func TestCommand_implements(t *testing.T) {
@ -129,6 +130,7 @@ func TestReadCliConfig(t *testing.T) {
"-advertise-wan", "1.2.3.4",
"-serf-wan-bind", "4.3.2.1",
"-serf-lan-bind", "4.3.2.2",
"-node-meta", "somekey:somevalue",
},
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
@ -144,6 +146,30 @@ func TestReadCliConfig(t *testing.T) {
if config.SerfLanBindAddr != "4.3.2.2" {
t.Fatalf("expected -serf-lan-bind 4.3.2.2 got %s", config.SerfLanBindAddr)
}
if len(config.Meta) != 1 || config.Meta["somekey"] != "somevalue" {
t.Fatalf("expected somekey=somevalue, got %v", config.Meta)
}
}
// Test multiple node meta flags
{
cmd := &Command{
args: []string{
"-data-dir", tmpDir,
"-node-meta", "somekey:somevalue",
"-node-meta", "otherkey:othervalue",
},
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
expected := map[string]string{
"somekey": "somevalue",
"otherkey": "othervalue",
}
config := cmd.readConfig()
if !reflect.DeepEqual(config.Meta, expected) {
t.Fatalf("bad: %v %v", config.Meta, expected)
}
}
// Test LeaveOnTerm and SkipLeaveOnInt defaults for server mode

View file

@ -342,6 +342,11 @@ type Config struct {
// they are configured with TranslateWanAddrs set to true.
TaggedAddresses map[string]string
// Node metadata key/value pairs. These are excluded from JSON output
// because they can be reloaded and might be stale when shown from the
// config instead of the local state.
Meta map[string]string `mapstructure:"node_meta" json:"-"`
// LeaveOnTerm controls if Serf does a graceful leave when receiving
// the TERM signal. Defaults true on clients, false on servers. This can
// be changed on reload.
@ -710,6 +715,7 @@ func DefaultConfig() *Config {
Telemetry: Telemetry{
StatsitePrefix: "consul",
},
Meta: make(map[string]string),
SyslogFacility: "LOCAL0",
Protocol: consul.ProtocolVersion2Compatible,
CheckUpdateInterval: 5 * time.Minute,
@ -1577,6 +1583,14 @@ func MergeConfig(a, b *Config) *Config {
result.HTTPAPIResponseHeaders[field] = value
}
}
if len(b.Meta) != 0 {
if result.Meta == nil {
result.Meta = make(map[string]string)
}
for field, value := range b.Meta {
result.Meta[field] = value
}
}
// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))

View file

@ -281,6 +281,19 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
// Node metadata fields
input = `{"node_meta": {"thing1": "1", "thing2": "2"}}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if v, ok := config.Meta["thing1"]; !ok || v != "1" {
t.Fatalf("bad: %#v", config)
}
if v, ok := config.Meta["thing2"]; !ok || v != "2" {
t.Fatalf("bad: %#v", config)
}
// leave_on_terminate
input = `{"leave_on_terminate": true}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -1519,6 +1532,9 @@ func TestMergeConfig(t *testing.T) {
DogStatsdAddr: "nope",
DogStatsdTags: []string{"nope"},
},
Meta: map[string]string{
"key": "value1",
},
}
b := &Config{
@ -1620,6 +1636,9 @@ func TestMergeConfig(t *testing.T) {
DogStatsdAddr: "127.0.0.1:7254",
DogStatsdTags: []string{"tag_1:val_1", "tag_2:val_2"},
},
Meta: map[string]string{
"key": "value2",
},
DisableUpdateCheck: true,
DisableAnonymousSignature: true,
HTTPAPIResponseHeaders: map[string]string{

View file

@ -584,6 +584,20 @@ func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource)
}
}
// parseMetaFilter is used to parse the ?node-meta=key:value query parameter, used for
// filtering results to nodes with the given metadata key/value
func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string {
if filterList, ok := req.URL.Query()["node-meta"]; ok {
filters := make(map[string]string)
for _, filter := range filterList {
key, value := parseMetaPair(filter)
filters[key] = value
}
return filters
}
return nil
}
// parse is a convenience method for endpoints that need
// to use both parseWait and parseDC.
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {

View file

@ -61,6 +61,9 @@ type localState struct {
// Used to track checks that are being deferred
deferCheck map[types.CheckID]*time.Timer
// metadata tracks the local metadata fields
metadata map[string]string
// consulCh is used to inform of a change to the known
// consul nodes. This may be used to retry a sync run
consulCh chan struct{}
@ -82,6 +85,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
l.checkTokens = make(map[types.CheckID]string)
l.checkCriticalTime = make(map[types.CheckID]time.Time)
l.deferCheck = make(map[types.CheckID]*time.Timer)
l.metadata = make(map[string]string)
l.consulCh = make(chan struct{}, 1)
l.triggerCh = make(chan struct{}, 1)
}
@ -339,6 +343,19 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
return checks
}
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Metadata() map[string]string {
metadata := make(map[string]string)
l.RLock()
defer l.RUnlock()
for key, value := range l.metadata {
metadata[key] = value
}
return metadata
}
// antiEntropy is a long running method used to perform anti-entropy
// between local and remote state.
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
@ -412,10 +429,10 @@ func (l *localState) setSyncState() error {
l.Lock()
defer l.Unlock()
// Check the node info (currently limited to tagged addresses since
// everything else is managed by the Serf layer)
// Check the node info
if out1.NodeServices == nil || out1.NodeServices.Node == nil ||
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) {
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) ||
!reflect.DeepEqual(out1.NodeServices.Node.Meta, l.metadata) {
l.nodeInfoInSync = false
}
@ -619,6 +636,7 @@ func (l *localState) syncService(id string) error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
Service: l.services[id],
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
}
@ -680,6 +698,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
Service: service,
Check: l.checks[id],
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
@ -706,6 +725,7 @@ func (l *localState) syncNodeInfo() error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
WriteRequest: structs.WriteRequest{Token: l.config.GetTokenForAgent()},
}
var out struct{}

View file

@ -985,6 +985,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
conf := nextConfig()
conf.Meta["somekey"] = "somevalue"
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
@ -1020,7 +1021,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
// Make sure we synced our node info - this should have ridden on the
// "consul" service sync
addrs := services.NodeServices.Node.TaggedAddresses
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
meta := services.NodeServices.Node.Meta
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) || !reflect.DeepEqual(meta, conf.Meta) {
return false, fmt.Errorf("bad: %v", addrs)
}
@ -1044,7 +1046,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
return false, fmt.Errorf("err: %v", err)
}
addrs := services.NodeServices.Node.TaggedAddresses
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
meta := services.NodeServices.Node.Meta
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) || !reflect.DeepEqual(meta, conf.Meta) {
return false, fmt.Errorf("bad: %v", addrs)
}

View file

@ -163,7 +163,14 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
&reply.QueryMeta,
state.GetQueryWatch("Nodes"),
func() error {
index, nodes, err := state.Nodes()
var index uint64
var nodes structs.Nodes
var err error
if len(args.NodeMetaFilters) > 0 {
index, nodes, err = state.NodesByMeta(args.NodeMetaFilters)
} else {
index, nodes, err = state.Nodes()
}
if err != nil {
return err
}
@ -189,7 +196,14 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
&reply.QueryMeta,
state.GetQueryWatch("Services"),
func() error {
index, services, err := state.Services()
var index uint64
var services structs.Services
var err error
if len(args.NodeMetaFilters) > 0 {
index, services, err = state.ServicesByNodeMeta(args.NodeMetaFilters)
} else {
index, services, err = state.Services()
}
if err != nil {
return err
}

View file

@ -592,6 +592,70 @@ func TestCatalog_ListNodes(t *testing.T) {
}
}
func TestCatalog_ListNodes_MetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Add a new node with the right meta k/v pair
node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue"}}
if err := s1.fsm.State().EnsureNode(1, node); err != nil {
t.Fatalf("err: %v", err)
}
// Filter by a specific meta k/v pair
args := structs.DCSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: map[string]string{
"somekey": "somevalue",
},
}
var out structs.IndexedNodes
testutil.WaitForResult(func() (bool, error) {
msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
return len(out.Nodes) == 1, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Verify that only the correct node was returned
if out.Nodes[0].Node != "foo" {
t.Fatalf("bad: %v", out)
}
if out.Nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", out)
}
if v, ok := out.Nodes[0].Meta["somekey"]; !ok || v != "somevalue" {
t.Fatalf("bad: %v", out)
}
// Now filter on a nonexistent meta k/v pair
args = structs.DCSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: map[string]string{
"somekey": "invalid",
},
}
out = structs.IndexedNodes{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should get an empty list of nodes back
testutil.WaitForResult(func() (bool, error) {
msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
return len(out.Nodes) == 0, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestCatalog_ListNodes_StaleRaad(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -996,6 +1060,69 @@ func TestCatalog_ListServices(t *testing.T) {
}
}
func TestCatalog_ListServices_MetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Add a new node with the right meta k/v pair
node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue"}}
if err := s1.fsm.State().EnsureNode(1, node); err != nil {
t.Fatalf("err: %v", err)
}
// Add a service to the new node
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
// Filter by a specific meta k/v pair
args := structs.DCSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: map[string]string{
"somekey": "somevalue",
},
}
var out structs.IndexedServices
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Services) != 1 {
t.Fatalf("bad: %v", out)
}
if out.Services["db"] == nil {
t.Fatalf("bad: %v", out.Services["db"])
}
if len(out.Services["db"]) != 1 {
t.Fatalf("bad: %v", out)
}
if out.Services["db"][0] != "primary" {
t.Fatalf("bad: %v", out)
}
// Now filter on a nonexistent meta k/v pair
args = structs.DCSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: map[string]string{
"somekey": "invalid",
},
}
out = structs.IndexedServices{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should get an empty list of nodes back
if len(out.Services) != 0 {
t.Fatalf("bad: %v", out.Services)
}
}
func TestCatalog_ListServices_Blocking(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)

View file

@ -78,6 +78,15 @@ func nodesTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
"meta": &memdb.IndexSchema{
Name: "meta",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringMapFieldIndex{
Field: "Meta",
Lowercase: false,
},
},
},
}
}

View file

@ -426,6 +426,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
Node: req.Node,
Address: req.Address,
TaggedAddresses: req.TaggedAddresses,
Meta: req.NodeMeta,
}
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
@ -548,6 +549,35 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
return idx, results, nil
}
// NodesByMeta is used to return all nodes with the given meta key/value pair.
func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nodes, error) {
if len(filters) > 1 {
return 0, nil, fmt.Errorf("multiple meta filters not supported")
}
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...)
// Retrieve all of the nodes
var args []interface{}
for key, value := range filters {
args = append(args, key, value)
}
nodes, err := tx.Get("nodes", "meta", args...)
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
// Create and return the nodes list.
var results structs.Nodes
for node := nodes.Next(); node != nil; node = nodes.Next() {
results = append(results, node.(*structs.Node))
}
return idx, results, nil
}
// DeleteNode is used to delete a given node by its ID.
func (s *StateStore) DeleteNode(idx uint64, nodeID string) error {
tx := s.db.Txn(true)
@ -758,6 +788,63 @@ func (s *StateStore) Services() (uint64, structs.Services, error) {
return idx, results, nil
}
// Services returns all services, filtered by the given node metadata.
func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, structs.Services, error) {
if len(filters) > 1 {
return 0, nil, fmt.Errorf("multiple meta filters not supported")
}
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
// Retrieve all of the nodes with the meta k/v pair
var args []interface{}
for key, value := range filters {
args = append(args, key, value)
}
nodes, err := tx.Get("nodes", "meta", args...)
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
// Populate the services map
unique := make(map[string]map[string]struct{})
for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
// List all the services on the node
services, err := tx.Get("services", "node", n.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed querying services: %s", err)
}
// Rip through the services and enumerate them and their unique set of
// tags.
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0)
for tag, _ := range tags {
results[service] = append(results[service], tag)
}
}
return idx, results, nil
}
// ServiceNodes returns the nodes associated with a given service name.
func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
@ -854,6 +941,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo
node := n.(*structs.Node)
s.Address = node.Address
s.TaggedAddresses = node.TaggedAddresses
s.NodeMeta = node.Meta
results = append(results, s)
}
@ -1392,6 +1480,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
Node: node.Node,
Address: node.Address,
TaggedAddresses: node.TaggedAddresses,
Meta: node.Meta,
}
// Query the node services

View file

@ -241,6 +241,9 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
TaggedAddresses: map[string]string{
"hello": "world",
},
NodeMeta: map[string]string{
"somekey": "somevalue",
},
}
if err := s.EnsureRegistration(1, req); err != nil {
t.Fatalf("err: %s", err)
@ -255,6 +258,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
if out.Node != "node1" || out.Address != "1.2.3.4" ||
len(out.TaggedAddresses) != 1 ||
out.TaggedAddresses["hello"] != "world" ||
out.Meta["somekey"] != "somevalue" ||
out.CreateIndex != created || out.ModifyIndex != modified {
t.Fatalf("bad node returned: %#v", out)
}
@ -751,6 +755,97 @@ func BenchmarkGetNodes(b *testing.B) {
}
}
func TestStateStore_GetNodesByMeta(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns nil
idx, res, err := s.NodesByMeta(map[string]string{"somekey": "somevalue"})
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Create some nodes in the state store
node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}}
if err := s.EnsureNode(0, node0); err != nil {
t.Fatalf("err: %v", err)
}
node1 := &structs.Node{Node: "node1", Address: "127.0.0.1", Meta: map[string]string{"role": "server", "common": "1"}}
if err := s.EnsureNode(1, node1); err != nil {
t.Fatalf("err: %v", err)
}
// Retrieve the node with role=client
idx, nodes, err := s.NodesByMeta(map[string]string{"role": "client"})
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
// Only one node was returned
if n := len(nodes); n != 1 {
t.Fatalf("bad node count: %d", n)
}
// Make sure the node is correct
if nodes[0].CreateIndex != 0 || nodes[0].ModifyIndex != 0 {
t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex)
}
if nodes[0].Node != "node0" {
t.Fatalf("bad: %#v", nodes[0])
}
if !reflect.DeepEqual(nodes[0].Meta, node0.Meta) {
t.Fatalf("bad: %v != %v", nodes[0].Meta, node0.Meta)
}
// Retrieve both nodes via their common meta field
idx, nodes, err = s.NodesByMeta(map[string]string{"common": "1"})
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
// All nodes were returned
if n := len(nodes); n != 2 {
t.Fatalf("bad node count: %d", n)
}
// Make sure the nodes match
for i, node := range nodes {
if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) {
t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex)
}
name := fmt.Sprintf("node%d", i)
if node.Node != name {
t.Fatalf("bad: %#v", node)
}
if v, ok := node.Meta["common"]; !ok || v != "1" {
t.Fatalf("bad: %v", node.Meta)
}
}
}
func BenchmarkGetNodesByMeta(b *testing.B) {
s, err := NewStateStore(nil)
if err != nil {
b.Fatalf("err: %s", err)
}
if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
b.Fatalf("err: %v", err)
}
if err := s.EnsureNode(101, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil {
b.Fatalf("err: %v", err)
}
for i := 0; i < b.N; i++ {
s.Nodes()
}
}
func TestStateStore_DeleteNode(t *testing.T) {
s := testStateStore(t)
@ -1061,6 +1156,78 @@ func TestStateStore_Services(t *testing.T) {
}
}
func TestStateStore_ServicesByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns nil
idx, res, err := s.ServicesByNodeMeta(map[string]string{"somekey": "somevalue"})
if idx != 0 || len(res) != 0 || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Create some nodes and services in the state store
node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}}
if err := s.EnsureNode(0, node0); err != nil {
t.Fatalf("err: %v", err)
}
node1 := &structs.Node{Node: "node1", Address: "127.0.0.1", Meta: map[string]string{"role": "server", "common": "1"}}
if err := s.EnsureNode(1, node1); err != nil {
t.Fatalf("err: %v", err)
}
ns1 := &structs.NodeService{
ID: "service1",
Service: "redis",
Tags: []string{"prod", "master"},
Address: "1.1.1.1",
Port: 1111,
}
if err := s.EnsureService(2, "node0", ns1); err != nil {
t.Fatalf("err: %s", err)
}
ns2 := &structs.NodeService{
ID: "service1",
Service: "redis",
Tags: []string{"prod", "slave"},
Address: "1.1.1.1",
Port: 1111,
}
if err := s.EnsureService(3, "node1", ns2); err != nil {
t.Fatalf("err: %s", err)
}
// Filter the services by the first node's meta value
idx, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client"})
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
expected := structs.Services{
"redis": []string{"prod", "master"},
}
if !reflect.DeepEqual(res, expected) {
t.Fatalf("bad: %v %v", res, expected)
}
// Get all services using the common meta value
idx, res, err = s.ServicesByNodeMeta(map[string]string{"common": "1"})
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
expected = structs.Services{
"redis": []string{"prod", "master", "slave"},
}
sort.Strings(res["redis"])
sort.Strings(expected["redis"])
if !reflect.DeepEqual(res, expected) {
t.Fatalf("bad: %v %v", res, expected)
}
}
func TestStateStore_ServiceNodes(t *testing.T) {
s := testStateStore(t)

View file

@ -173,6 +173,7 @@ type RegisterRequest struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
Service *NodeService
Check *HealthCheck
Checks HealthChecks
@ -195,7 +196,8 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool {
// Check if any of the node-level fields are being changed.
if r.Node != node.Node ||
r.Address != node.Address ||
!reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) {
!reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) ||
!reflect.DeepEqual(r.NodeMeta, node.Meta) {
return true
}
@ -227,8 +229,9 @@ type QuerySource struct {
// DCSpecificRequest is used to query about a specific DC
type DCSpecificRequest struct {
Datacenter string
Source QuerySource
Datacenter string
NodeMetaFilters map[string]string
Source QuerySource
QueryOptions
}
@ -278,6 +281,7 @@ type Node struct {
Node string
Address string
TaggedAddresses map[string]string
Meta map[string]string
RaftIndex
}
@ -287,8 +291,8 @@ type Nodes []*Node
// Maps service name to available tags
type Services map[string][]string
// ServiceNode represents a node that is part of a service. Address and
// TaggedAddresses are node-related fields that are always empty in the state
// ServiceNode represents a node that is part of a service. Address, TaggedAddresses,
// and NodeMeta are node-related fields that are always empty in the state
// store and are filled in on the way out by parseServiceNodes(). This is also
// why PartialClone() skips them, because we know they are blank already so it
// would be a waste of time to copy them.
@ -296,6 +300,7 @@ type ServiceNode struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
ServiceID string
ServiceName string
ServiceTags []string
@ -488,6 +493,7 @@ type NodeInfo struct {
Node string
Address string
TaggedAddresses map[string]string
Meta map[string]string
Services []*NodeService
Checks HealthChecks
}

View file

@ -110,12 +110,18 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) {
Node: "test",
Address: "127.0.0.1",
TaggedAddresses: make(map[string]string),
NodeMeta: map[string]string{
"role": "server",
},
}
node := &Node{
Node: "test",
Address: "127.0.0.1",
TaggedAddresses: make(map[string]string),
Meta: map[string]string{
"role": "server",
},
}
check := func(twiddle, restore func()) {
@ -137,6 +143,7 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) {
check(func() { req.Node = "nope" }, func() { req.Node = "test" })
check(func() { req.Address = "127.0.0.2" }, func() { req.Address = "127.0.0.1" })
check(func() { req.TaggedAddresses["wan"] = "nope" }, func() { delete(req.TaggedAddresses, "wan") })
check(func() { req.NodeMeta["invalid"] = "nope" }, func() { delete(req.NodeMeta, "invalid") })
if !req.ChangesNode(nil) {
t.Fatalf("should change")
@ -151,6 +158,9 @@ func testServiceNode() *ServiceNode {
TaggedAddresses: map[string]string{
"hello": "world",
},
NodeMeta: map[string]string{
"tag": "value",
},
ServiceID: "service1",
ServiceName: "dogs",
ServiceTags: []string{"prod", "v1"},
@ -172,12 +182,13 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) {
// Make sure the parts that weren't supposed to be cloned didn't get
// copied over, then zero-value them out so we can do a DeepEqual() on
// the rest of the contents.
if clone.Address != "" || len(clone.TaggedAddresses) != 0 {
if clone.Address != "" || len(clone.TaggedAddresses) != 0 || len(clone.NodeMeta) != 0 {
t.Fatalf("bad: %v", clone)
}
sn.Address = ""
sn.TaggedAddresses = nil
sn.NodeMeta = nil
if !reflect.DeepEqual(sn, clone) {
t.Fatalf("bad: %v", clone)
}
@ -197,6 +208,7 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) {
// them out before we do the compare.
sn.Address = ""
sn.TaggedAddresses = nil
sn.NodeMeta = nil
if !reflect.DeepEqual(sn, sn2) {
t.Fatalf("bad: %v", sn2)
}

View file

@ -53,6 +53,7 @@ type TestAddressConfig struct {
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
NodeName string `json:"node_name"`
NodeMeta map[string]string `json:"node_meta"`
Performance *TestPerformanceConfig `json:"performance,omitempty"`
Bootstrap bool `json:"bootstrap,omitempty"`
Server bool `json:"server,omitempty"`

View file

@ -19,7 +19,7 @@ The database provides the following:
* Rich Indexing - Tables can support any number of indexes, which can be simple like
a single field index, or more advanced compound field indexes. Certain types like
UUID can be efficiently compressed from strings into byte indexes for reduces
UUID can be efficiently compressed from strings into byte indexes for reduced
storage requirements.
For the underlying immutable radix trees, see [go-immutable-radix](https://github.com/hashicorp/go-immutable-radix).

View file

@ -9,15 +9,27 @@ import (
// Indexer is an interface used for defining indexes
type Indexer interface {
// FromObject is used to extract an index value from an
// object or to indicate that the index value is missing.
FromObject(raw interface{}) (bool, []byte, error)
// ExactFromArgs is used to build an exact index lookup
// based on arguments
FromArgs(args ...interface{}) ([]byte, error)
}
// SingleIndexer is an interface used for defining indexes
// generating a single entry per object
type SingleIndexer interface {
// FromObject is used to extract an index value from an
// object or to indicate that the index value is missing.
FromObject(raw interface{}) (bool, []byte, error)
}
// MultiIndexer is an interface used for defining indexes
// generating multiple entries per object
type MultiIndexer interface {
// FromObject is used to extract index values from an
// object or to indicate that the index value is missing.
FromObject(raw interface{}) (bool, [][]byte, error)
}
// PrefixIndexer can optionally be implemented for any
// indexes that support prefix based iteration. This may
// not apply to all indexes.
@ -88,6 +100,155 @@ func (s *StringFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
return val, nil
}
// StringSliceFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field.
type StringSliceFieldIndex struct {
Field string
Lowercase bool
}
func (s *StringSliceFieldIndex) FromObject(obj interface{}) (bool, [][]byte, error) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v) // Dereference the pointer if any
fv := v.FieldByName(s.Field)
if !fv.IsValid() {
return false, nil,
fmt.Errorf("field '%s' for %#v is invalid", s.Field, obj)
}
if fv.Kind() != reflect.Slice || fv.Type().Elem().Kind() != reflect.String {
return false, nil, fmt.Errorf("field '%s' is not a string slice", s.Field)
}
length := fv.Len()
vals := make([][]byte, 0, length)
for i := 0; i < fv.Len(); i++ {
val := fv.Index(i).String()
if val == "" {
continue
}
if s.Lowercase {
val = strings.ToLower(val)
}
// Add the null character as a terminator
val += "\x00"
vals = append(vals, []byte(val))
}
if len(vals) == 0 {
return false, nil, nil
}
return true, vals, nil
}
func (s *StringSliceFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
if s.Lowercase {
arg = strings.ToLower(arg)
}
// Add the null character as a terminator
arg += "\x00"
return []byte(arg), nil
}
func (s *StringSliceFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
val, err := s.FromArgs(args...)
if err != nil {
return nil, err
}
// Strip the null terminator, the rest is a prefix
n := len(val)
if n > 0 {
return val[:n-1], nil
}
return val, nil
}
// StringMapFieldIndex is used to extract a field of type map[string]string
// from an object using reflection and builds an index on that field.
type StringMapFieldIndex struct {
Field string
Lowercase bool
}
var MapType = reflect.MapOf(reflect.TypeOf(""), reflect.TypeOf("")).Kind()
func (s *StringMapFieldIndex) FromObject(obj interface{}) (bool, [][]byte, error) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v) // Dereference the pointer if any
fv := v.FieldByName(s.Field)
if !fv.IsValid() {
return false, nil, fmt.Errorf("field '%s' for %#v is invalid", s.Field, obj)
}
if fv.Kind() != MapType {
return false, nil, fmt.Errorf("field '%s' is not a map[string]string", s.Field)
}
length := fv.Len()
vals := make([][]byte, 0, length)
for _, key := range fv.MapKeys() {
k := key.String()
if k == "" {
continue
}
val := fv.MapIndex(key).String()
if s.Lowercase {
k = strings.ToLower(k)
val = strings.ToLower(val)
}
// Add the null character as a terminator
k += "\x00" + val + "\x00"
vals = append(vals, []byte(k))
}
if len(vals) == 0 {
return false, nil, nil
}
return true, vals, nil
}
func (s *StringMapFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) > 2 || len(args) == 0 {
return nil, fmt.Errorf("must provide one or two arguments")
}
key, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
if s.Lowercase {
key = strings.ToLower(key)
}
// Add the null character as a terminator
key += "\x00"
if len(args) == 2 {
val, ok := args[1].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[1])
}
if s.Lowercase {
val = strings.ToLower(val)
}
// Add the null character as a terminator
key += val + "\x00"
}
return []byte(key), nil
}
// UUIDFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field by treating
// it as a UUID. This is an optimization to using a StringFieldIndex
@ -270,7 +431,11 @@ type CompoundIndex struct {
func (c *CompoundIndex) FromObject(raw interface{}) (bool, []byte, error) {
var out []byte
for i, idx := range c.Indexes {
for i, idxRaw := range c.Indexes {
idx, ok := idxRaw.(SingleIndexer)
if !ok {
return false, nil, fmt.Errorf("sub-index %d error: %s", i, "sub-index must be a SingleIndexer")
}
ok, val, err := idx.FromObject(raw)
if err != nil {
return false, nil, fmt.Errorf("sub-index %d error: %v", i, err)

View file

@ -46,6 +46,9 @@ func (s *TableSchema) Validate() error {
if !s.Indexes["id"].Unique {
return fmt.Errorf("id index must be unique")
}
if _, ok := s.Indexes["id"].Indexer.(SingleIndexer); !ok {
return fmt.Errorf("id index must be a SingleIndexer")
}
for name, index := range s.Indexes {
if name != index.Name {
return fmt.Errorf("index name mis-match for '%s'", name)
@ -72,5 +75,11 @@ func (s *IndexSchema) Validate() error {
if s.Indexer == nil {
return fmt.Errorf("missing index function for '%s'", s.Name)
}
switch s.Indexer.(type) {
case SingleIndexer:
case MultiIndexer:
default:
return fmt.Errorf("indexer for '%s' must be a SingleIndexer or MultiIndexer", s.Name)
}
return nil
}

View file

@ -148,7 +148,8 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
// Get the primary ID of the object
idSchema := tableSchema.Indexes[id]
ok, idVal, err := idSchema.Indexer.FromObject(obj)
idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(obj)
if err != nil {
return fmt.Errorf("failed to build primary index: %v", err)
}
@ -167,7 +168,19 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
indexTxn := txn.writableIndex(table, name)
// Determine the new index value
ok, val, err := indexSchema.Indexer.FromObject(obj)
var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(obj)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(obj)
}
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
@ -176,28 +189,44 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
// This is done by appending the primary key which must
// be unique anyways.
if ok && !indexSchema.Unique {
val = append(val, idVal...)
for i := range vals {
vals[i] = append(vals[i], idVal...)
}
}
// Handle the update by deleting from the index first
if update {
okExist, valExist, err := indexSchema.Indexer.FromObject(existing)
var (
okExist bool
valsExist [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var valExist []byte
okExist, valExist, err = indexer.FromObject(existing)
valsExist = [][]byte{valExist}
case MultiIndexer:
okExist, valsExist, err = indexer.FromObject(existing)
}
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
if okExist {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if !indexSchema.Unique {
valExist = append(valExist, idVal...)
}
for i, valExist := range valsExist {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if !indexSchema.Unique {
valExist = append(valExist, idVal...)
}
// If we are writing to the same index with the same value,
// we can avoid the delete as the insert will overwrite the
// value anyways.
if !bytes.Equal(valExist, val) {
indexTxn.Delete(valExist)
// If we are writing to the same index with the same value,
// we can avoid the delete as the insert will overwrite the
// value anyways.
if i >= len(vals) || !bytes.Equal(valExist, vals[i]) {
indexTxn.Delete(valExist)
}
}
}
}
@ -213,7 +242,9 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
}
// Update the value of the index
indexTxn.Insert(val, obj)
for _, val := range vals {
indexTxn.Insert(val, obj)
}
}
return nil
}
@ -233,7 +264,8 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
// Get the primary ID of the object
idSchema := tableSchema.Indexes[id]
ok, idVal, err := idSchema.Indexer.FromObject(obj)
idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(obj)
if err != nil {
return fmt.Errorf("failed to build primary index: %v", err)
}
@ -253,7 +285,19 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
indexTxn := txn.writableIndex(table, name)
// Handle the update by deleting from the index first
ok, val, err := indexSchema.Indexer.FromObject(existing)
var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(existing)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(existing)
}
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
@ -261,10 +305,12 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if !indexSchema.Unique {
val = append(val, idVal...)
for _, val := range vals {
if !indexSchema.Unique {
val = append(val, idVal...)
}
indexTxn.Delete(val)
}
indexTxn.Delete(val)
}
}
return nil

6
vendor/vendor.json vendored
View file

@ -396,10 +396,10 @@
"revisionTime": "2016-06-09T02:05:29Z"
},
{
"checksumSHA1": "/V57CyN7x2NUlHoOzVL5GgGXX84=",
"checksumSHA1": "ZpTDFeRvXFwIvSHRD8eDYHxaj4Y=",
"path": "github.com/hashicorp/go-memdb",
"revision": "98f52f52d7a476958fa9da671354d270c50661a7",
"revisionTime": "2016-03-01T23:01:42Z"
"revision": "d2d2b77acab85aa635614ac17ea865969f56009e",
"revisionTime": "2017-01-07T16:22:14Z"
},
{
"checksumSHA1": "TNlVzNR1OaajcNi3CbQ3bGbaLGU=",

View file

@ -128,6 +128,8 @@ This endpoint is used to return the configuration and member information of the
Consul 0.7.0 and later also includes a snapshot of various operating statistics under the `Stats` key. These statistics are intended to help human operators for debugging and may change over time, so this part of the interface should not be consumed programmatically.
Consul 0.7.3 and later also includes a block of user-defined node metadata values under the `Meta` key. These are arbitrary key/value pairs defined in the [node meta](/docs/agent/options.html#_node_meta) section of the agent configuration.
It returns a JSON body like this:
```javascript
@ -194,6 +196,10 @@ It returns a JSON body like this:
"DelegateMin": 2,
"DelegateMax": 4,
"DelegateCur": 4
},
"Meta": {
"instance_type": "i2.xlarge",
"os_version": "ubuntu_16.04",
}
}
```

View file

@ -44,6 +44,9 @@ body must look something like:
"lan": "192.168.10.10",
"wan": "10.0.10.10"
},
"NodeMeta": {
"somekey": "somevalue"
},
"Service": {
"ID": "redis1",
"Service": "redis",
@ -73,6 +76,10 @@ the node with the catalog. `TaggedAddresses` can be used in conjunction with the
option and the `wan` address. The `lan` address was added in Consul 0.7 to help find
the LAN address if address translation is enabled.
The `Meta` block was added in Consul 0.7.3 to enable associating arbitrary metadata
key/value pairs with a node for filtering purposes. For more information on node metadata,
see the [node meta](/docs/agent/options.html#_node_meta) section of the configuration page.
If the `Service` key is provided, the service will also be registered. If
`ID` is not provided, it will be defaulted to the value of the `Service.Service` property.
Only one service with a given `ID` may be present per node. The service `Tags`, `Address`,
@ -191,6 +198,10 @@ the node list in ascending order based on the estimated round trip
time from that node. Passing `?near=_agent` will use the agent's
node for the sort.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This will filter the results to nodes with that pair present.
It returns a JSON body like this:
```javascript
@ -201,6 +212,9 @@ It returns a JSON body like this:
"TaggedAddresses": {
"lan": "10.1.10.11",
"wan": "10.1.10.11"
},
"Meta": {
"instance_type": "t2.medium"
}
},
{
@ -209,6 +223,9 @@ It returns a JSON body like this:
"TaggedAddresses": {
"lan": "10.1.10.11",
"wan": "10.1.10.12"
},
"Meta": {
"instance_type": "t2.large"
}
}
]
@ -222,6 +239,10 @@ This endpoint is hit with a `GET` and returns the services registered
in a given DC. By default, the datacenter of the agent is queried;
however, the `dc` can be provided using the `?dc=` query parameter.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This will filter the results to services with that pair present.
It returns a JSON body like this:
```javascript
@ -265,6 +286,9 @@ It returns a JSON body like this:
"lan": "192.168.10.10",
"wan": "10.0.10.10"
},
"Meta": {
"instance_type": "t2.medium"
}
"CreateIndex": 51,
"ModifyIndex": 51,
"Node": "foobar",
@ -286,6 +310,7 @@ The returned fields are as follows:
- `Address`: IP address of the Consul node on which the service is registered
- `TaggedAddresses`: List of explicit LAN and WAN IP addresses for the agent
- `Meta`: Added in Consul 0.7.3, a list of user-defined metadata key/value pairs for the node
- `CreateIndex`: Internal index value representing when the service was created
- `ModifyIndex`: Last index that modified the service
- `Node`: Node name of the Consul node on which the service is registered
@ -313,6 +338,9 @@ It returns a JSON body like this:
"TaggedAddresses": {
"lan": "10.1.10.12",
"wan": "10.1.10.12"
},
"Meta": {
"instance_type": "t2.medium"
}
},
"Services": {

View file

@ -131,6 +131,9 @@ It returns a JSON body like this:
"TaggedAddresses": {
"lan": "10.1.10.12",
"wan": "10.1.10.12"
},
"Meta": {
"instance_type": "t2.medium"
}
},
"Service": {

View file

@ -251,6 +251,15 @@ will exit with an error at startup.
* <a name="_node"></a><a href="#_node">`-node`</a> - The name of this node in the cluster.
This must be unique within the cluster. By default this is the hostname of the machine.
* <a name="_node_meta"></a><a href="#_node_meta">`-node-meta`</a> - Available in Consul 0.7.3 and later,
this specifies an arbitrary metadata key/value pair to associate with the node, of the form `key:value`.
This can be specified multiple times. Node metadata pairs have the following restrictions:
- A maximum of 64 key/value pairs can be registered per node.
- Metadata keys must be between 1 and 128 characters (inclusive) in length
- Metadata keys must contain only alphanumeric, `-`, and `_` characters.
- Metadata keys must not begin with the `consul-` prefix; that is reserved for internal use by Consul.
- Metadata values must be between 0 and 512 (inclusive) characters in length.
* <a name="_pid_file"></a><a href="#_pid_file">`-pid-file`</a> - This flag provides the file
path for the agent to store its PID. This is useful for sending signals (for example, `SIGINT`
to close the agent or `SIGHUP` to update check definite
@ -658,6 +667,19 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="node_name"></a><a href="#node_name">`node_name`</a> Equivalent to the
[`-node` command-line flag](#_node).
* <a name="node_meta"></a><a href="#node_meta">`node_meta`</a> Available in Consul 0.7.3 and later,
This object allows associating arbitrary metadata key/value pairs with the local node, which can
then be used for filtering results from certain catalog endpoints. See the
[`-node-meta` command-line flag](#_node_meta) for more information.
```javascript
{
"node_meta": {
"instance_type": "t2.medium"
}
}
```
* <a name="performance"></a><a href="#performance">`performance`</a> Available in Consul 0.7 and
later, this is a nested object that allows tuning the performance of different subsystems in
Consul. See the [Server Performance](/docs/guides/performance.html) guide for more details. The