diff --git a/command/agent/agent.go b/command/agent/agent.go index 789768a5b..1a0cf207a 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -246,13 +246,16 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, return nil, err } - // Load checks/services. + // Load checks/services/metadata. if err := agent.loadServices(config); err != nil { return nil, err } if err := agent.loadChecks(config); err != nil { return nil, err } + if err := agent.loadMetadata(config); err != nil { + return nil, err + } // Start watching for critical services to deregister, based on their // checks. @@ -1677,6 +1680,37 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { } } +// loadMetadata loads and validates node metadata fields from the config and +// updates them on the local agent. +func (a *Agent) loadMetadata(conf *Config) error { + a.state.Lock() + defer a.state.Unlock() + + for key, value := range conf.Meta { + if strings.Contains(key, ":") { + return fmt.Errorf("Key name cannot contain ':' character: %s", key) + } + if strings.HasPrefix(key, "consul-") { + return fmt.Errorf("Key prefix 'consul-' is reserved for internal use") + } + a.state.metadata[key] = value + } + + a.state.changeMade() + + return nil +} + +// unloadMetadata resets the local metadata state +func (a *Agent) unloadMetadata() error { + a.state.Lock() + defer a.state.Unlock() + + a.state.metadata = make(map[string]string) + + return nil +} + // serviceMaintCheckID returns the ID of a given service's maintenance check func serviceMaintCheckID(serviceID string) types.CheckID { return types.CheckID(structs.ServiceMaintPrefix + serviceID) diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index e96218d86..48c0b27f4 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -20,6 +20,7 @@ type AgentSelf struct { Coord *coordinate.Coordinate Member serf.Member Stats map[string]map[string]string + Meta map[string]string } func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -47,6 +48,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int Coord: c, Member: s.agent.LocalMember(), Stats: s.agent.Stats(), + Meta: s.agent.state.Metadata(), }, nil } diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index efccfba73..13d425044 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -67,6 +67,14 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } + // Try to parse node metadata filter params + if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 { + pair := strings.SplitN(filter[0], ":", 2) + args.NodeMetaKey = pair[0] + if len(pair) == 2 { + args.NodeMetaValue = pair[1] + } + } var out structs.IndexedNodes defer setMeta(resp, &out.QueryMeta) diff --git a/command/agent/command.go b/command/agent/command.go index f81291453..165839fdc 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -1071,7 +1071,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) { snap := c.agent.snapshotCheckState() defer c.agent.restoreCheckState(snap) - // First unload all checks and services. This lets us begin the reload + // First unload all checks, services, and metadata. This lets us begin the reload // with a clean slate. if err := c.agent.unloadServices(); err != nil { errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err)) @@ -1081,8 +1081,12 @@ func (c *Command) handleReload(config *Config) (*Config, error) { errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err)) return nil, errs } + if err := c.agent.unloadMetadata(); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed unloading metadata: %s", err)) + return nil, errs + } - // Reload services and check definitions. + // Reload service/check definitions and metadata. if err := c.agent.loadServices(newConf); err != nil { errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err)) return nil, errs @@ -1091,6 +1095,10 @@ func (c *Command) handleReload(config *Config) (*Config, error) { errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err)) return nil, errs } + if err := c.agent.loadMetadata(newConf); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err)) + return nil, errs + } // Get the new client listener addr httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) diff --git a/command/agent/config.go b/command/agent/config.go index 4d95d6244..c4f35f3d0 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -342,6 +342,9 @@ type Config struct { // they are configured with TranslateWanAddrs set to true. TaggedAddresses map[string]string + // Node metadata + Meta map[string]string `mapstructure:"node_meta" json:"-"` + // LeaveOnTerm controls if Serf does a graceful leave when receiving // the TERM signal. Defaults true on clients, false on servers. This can // be changed on reload. @@ -1577,6 +1580,14 @@ func MergeConfig(a, b *Config) *Config { result.HTTPAPIResponseHeaders[field] = value } } + if len(b.Meta) != 0 { + if result.Meta == nil { + result.Meta = make(map[string]string) + } + for field, value := range b.Meta { + result.Meta[field] = value + } + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index caef0ce6f..b5cc3639d 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -281,6 +281,19 @@ func TestDecodeConfig(t *testing.T) { t.Fatalf("bad: %#v", config) } + // Node metadata fields + input = `{"node_meta": {"thing1": "1", "thing2": "2"}}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + if v, ok := config.Meta["thing1"]; !ok || v != "1" { + t.Fatalf("bad: %#v", config) + } + if v, ok := config.Meta["thing2"]; !ok || v != "2" { + t.Fatalf("bad: %#v", config) + } + // leave_on_terminate input = `{"leave_on_terminate": true}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) @@ -1519,6 +1532,9 @@ func TestMergeConfig(t *testing.T) { DogStatsdAddr: "nope", DogStatsdTags: []string{"nope"}, }, + Meta: map[string]string{ + "key": "value1", + }, } b := &Config{ @@ -1620,6 +1636,9 @@ func TestMergeConfig(t *testing.T) { DogStatsdAddr: "127.0.0.1:7254", DogStatsdTags: []string{"tag_1:val_1", "tag_2:val_2"}, }, + Meta: map[string]string{ + "key": "value2", + }, DisableUpdateCheck: true, DisableAnonymousSignature: true, HTTPAPIResponseHeaders: map[string]string{ diff --git a/command/agent/local.go b/command/agent/local.go index fdd3d3788..853cdd5ad 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -61,6 +61,9 @@ type localState struct { // Used to track checks that are being deferred deferCheck map[types.CheckID]*time.Timer + // metadata tracks the local metadata fields + metadata map[string]string + // consulCh is used to inform of a change to the known // consul nodes. This may be used to retry a sync run consulCh chan struct{} @@ -82,6 +85,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) { l.checkTokens = make(map[types.CheckID]string) l.checkCriticalTime = make(map[types.CheckID]time.Time) l.deferCheck = make(map[types.CheckID]*time.Timer) + l.metadata = make(map[string]string) l.consulCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1) } @@ -339,6 +343,19 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck { return checks } +// Metadata returns the local node metadata fields that the +// agent is aware of and are being kept in sync with the server +func (l *localState) Metadata() map[string]string { + metadata := make(map[string]string) + l.RLock() + defer l.RUnlock() + + for key, value := range l.metadata { + metadata[key] = value + } + return metadata +} + // antiEntropy is a long running method used to perform anti-entropy // between local and remote state. func (l *localState) antiEntropy(shutdownCh chan struct{}) { @@ -412,10 +429,10 @@ func (l *localState) setSyncState() error { l.Lock() defer l.Unlock() - // Check the node info (currently limited to tagged addresses since - // everything else is managed by the Serf layer) + // Check the node info if out1.NodeServices == nil || out1.NodeServices.Node == nil || - !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) { + !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) || + !reflect.DeepEqual(out1.NodeServices.Node.Meta, l.metadata) { l.nodeInfoInSync = false } @@ -619,6 +636,7 @@ func (l *localState) syncService(id string) error { Node: l.config.NodeName, Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, + NodeMeta: l.metadata, Service: l.services[id], WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } @@ -680,6 +698,7 @@ func (l *localState) syncCheck(id types.CheckID) error { Node: l.config.NodeName, Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, + NodeMeta: l.metadata, Service: service, Check: l.checks[id], WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, @@ -706,6 +725,7 @@ func (l *localState) syncNodeInfo() error { Node: l.config.NodeName, Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, + NodeMeta: l.metadata, WriteRequest: structs.WriteRequest{Token: l.config.GetTokenForAgent()}, } var out struct{} diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 1383b8916..565155899 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -156,6 +156,14 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return err } + var metaFilter []interface{} + if args.NodeMetaKey != "" { + metaFilter = append(metaFilter, args.NodeMetaKey) + if args.NodeMetaValue != "" { + metaFilter = append(metaFilter, args.NodeMetaValue) + } + } + // Get the list of nodes. state := c.srv.fsm.State() return c.srv.blockingRPC( @@ -163,7 +171,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde &reply.QueryMeta, state.GetQueryWatch("Nodes"), func() error { - index, nodes, err := state.Nodes() + index, nodes, err := state.Nodes(metaFilter...) if err != nil { return err } diff --git a/consul/state/schema.go b/consul/state/schema.go index fca8a3cf2..3d3ed1450 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -78,6 +78,15 @@ func nodesTableSchema() *memdb.TableSchema { Lowercase: true, }, }, + "meta": &memdb.IndexSchema{ + Name: "meta", + AllowMissing: true, + Unique: false, + Indexer: &memdb.StringMapFieldIndex{ + Field: "Meta", + Lowercase: false, + }, + }, }, } } diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 3316e4f77..c1bb2d0f6 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -426,6 +426,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D Node: req.Node, Address: req.Address, TaggedAddresses: req.TaggedAddresses, + Meta: req.NodeMeta, } if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil { return fmt.Errorf("failed inserting node: %s", err) @@ -527,7 +528,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { } // Nodes is used to return all of the known nodes. -func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { +func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -535,7 +536,13 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) // Retrieve all of the nodes - nodes, err := tx.Get("nodes", "id") + var nodes memdb.ResultIterator + var err error + if len(metaFilter) > 0 { + nodes, err = tx.Get("nodes", "meta", metaFilter...) + } else { + nodes, err = tx.Get("nodes", "id") + } if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -854,6 +861,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo node := n.(*structs.Node) s.Address = node.Address s.TaggedAddresses = node.TaggedAddresses + s.NodeMeta = node.Meta results = append(results, s) } @@ -1392,6 +1400,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64, Node: node.Node, Address: node.Address, TaggedAddresses: node.TaggedAddresses, + Meta: node.Meta, } // Query the node services diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 30b0be378..97030f285 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -173,6 +173,7 @@ type RegisterRequest struct { Node string Address string TaggedAddresses map[string]string + NodeMeta map[string]string Service *NodeService Check *HealthCheck Checks HealthChecks @@ -195,7 +196,8 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool { // Check if any of the node-level fields are being changed. if r.Node != node.Node || r.Address != node.Address || - !reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) { + !reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) || + !reflect.DeepEqual(r.NodeMeta, node.Meta) { return true } @@ -227,8 +229,10 @@ type QuerySource struct { // DCSpecificRequest is used to query about a specific DC type DCSpecificRequest struct { - Datacenter string - Source QuerySource + Datacenter string + NodeMetaKey string + NodeMetaValue string + Source QuerySource QueryOptions } @@ -278,6 +282,7 @@ type Node struct { Node string Address string TaggedAddresses map[string]string + Meta map[string]string RaftIndex } @@ -296,6 +301,7 @@ type ServiceNode struct { Node string Address string TaggedAddresses map[string]string + NodeMeta map[string]string ServiceID string ServiceName string ServiceTags []string @@ -488,6 +494,7 @@ type NodeInfo struct { Node string Address string TaggedAddresses map[string]string + Meta map[string]string Services []*NodeService Checks HealthChecks } diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index efceeace3..a66d4b176 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -151,6 +151,9 @@ func testServiceNode() *ServiceNode { TaggedAddresses: map[string]string{ "hello": "world", }, + NodeMeta: map[string]string{ + "tag": "value", + }, ServiceID: "service1", ServiceName: "dogs", ServiceTags: []string{"prod", "v1"}, @@ -172,12 +175,13 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) { // Make sure the parts that weren't supposed to be cloned didn't get // copied over, then zero-value them out so we can do a DeepEqual() on // the rest of the contents. - if clone.Address != "" || len(clone.TaggedAddresses) != 0 { + if clone.Address != "" || len(clone.TaggedAddresses) != 0 || len(clone.NodeMeta) != 0 { t.Fatalf("bad: %v", clone) } sn.Address = "" sn.TaggedAddresses = nil + sn.NodeMeta = nil if !reflect.DeepEqual(sn, clone) { t.Fatalf("bad: %v", clone) } @@ -197,6 +201,7 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) { // them out before we do the compare. sn.Address = "" sn.TaggedAddresses = nil + sn.NodeMeta = nil if !reflect.DeepEqual(sn, sn2) { t.Fatalf("bad: %v", sn2) }