Add support for setting node metadata fields
This commit is contained in:
parent
ef1039bd21
commit
2a423c6e2c
|
@ -246,13 +246,16 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Load checks/services.
|
||||
// Load checks/services/metadata.
|
||||
if err := agent.loadServices(config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := agent.loadChecks(config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := agent.loadMetadata(config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start watching for critical services to deregister, based on their
|
||||
// checks.
|
||||
|
@ -1677,6 +1680,37 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
|
|||
}
|
||||
}
|
||||
|
||||
// loadMetadata loads and validates node metadata fields from the config and
|
||||
// updates them on the local agent.
|
||||
func (a *Agent) loadMetadata(conf *Config) error {
|
||||
a.state.Lock()
|
||||
defer a.state.Unlock()
|
||||
|
||||
for key, value := range conf.Meta {
|
||||
if strings.Contains(key, ":") {
|
||||
return fmt.Errorf("Key name cannot contain ':' character: %s", key)
|
||||
}
|
||||
if strings.HasPrefix(key, "consul-") {
|
||||
return fmt.Errorf("Key prefix 'consul-' is reserved for internal use")
|
||||
}
|
||||
a.state.metadata[key] = value
|
||||
}
|
||||
|
||||
a.state.changeMade()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// unloadMetadata resets the local metadata state
|
||||
func (a *Agent) unloadMetadata() error {
|
||||
a.state.Lock()
|
||||
defer a.state.Unlock()
|
||||
|
||||
a.state.metadata = make(map[string]string)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// serviceMaintCheckID returns the ID of a given service's maintenance check
|
||||
func serviceMaintCheckID(serviceID string) types.CheckID {
|
||||
return types.CheckID(structs.ServiceMaintPrefix + serviceID)
|
||||
|
|
|
@ -20,6 +20,7 @@ type AgentSelf struct {
|
|||
Coord *coordinate.Coordinate
|
||||
Member serf.Member
|
||||
Stats map[string]map[string]string
|
||||
Meta map[string]string
|
||||
}
|
||||
|
||||
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
@ -47,6 +48,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
|
|||
Coord: c,
|
||||
Member: s.agent.LocalMember(),
|
||||
Stats: s.agent.Stats(),
|
||||
Meta: s.agent.state.Metadata(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,14 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (
|
|||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
// Try to parse node metadata filter params
|
||||
if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 {
|
||||
pair := strings.SplitN(filter[0], ":", 2)
|
||||
args.NodeMetaKey = pair[0]
|
||||
if len(pair) == 2 {
|
||||
args.NodeMetaValue = pair[1]
|
||||
}
|
||||
}
|
||||
|
||||
var out structs.IndexedNodes
|
||||
defer setMeta(resp, &out.QueryMeta)
|
||||
|
|
|
@ -1071,7 +1071,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
|
|||
snap := c.agent.snapshotCheckState()
|
||||
defer c.agent.restoreCheckState(snap)
|
||||
|
||||
// First unload all checks and services. This lets us begin the reload
|
||||
// First unload all checks, services, and metadata. This lets us begin the reload
|
||||
// with a clean slate.
|
||||
if err := c.agent.unloadServices(); err != nil {
|
||||
errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err))
|
||||
|
@ -1081,8 +1081,12 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
|
|||
errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err))
|
||||
return nil, errs
|
||||
}
|
||||
if err := c.agent.unloadMetadata(); err != nil {
|
||||
errs = multierror.Append(errs, fmt.Errorf("Failed unloading metadata: %s", err))
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
// Reload services and check definitions.
|
||||
// Reload service/check definitions and metadata.
|
||||
if err := c.agent.loadServices(newConf); err != nil {
|
||||
errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err))
|
||||
return nil, errs
|
||||
|
@ -1091,6 +1095,10 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
|
|||
errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err))
|
||||
return nil, errs
|
||||
}
|
||||
if err := c.agent.loadMetadata(newConf); err != nil {
|
||||
errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err))
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
// Get the new client listener addr
|
||||
httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
|
||||
|
|
|
@ -342,6 +342,9 @@ type Config struct {
|
|||
// they are configured with TranslateWanAddrs set to true.
|
||||
TaggedAddresses map[string]string
|
||||
|
||||
// Node metadata
|
||||
Meta map[string]string `mapstructure:"node_meta" json:"-"`
|
||||
|
||||
// LeaveOnTerm controls if Serf does a graceful leave when receiving
|
||||
// the TERM signal. Defaults true on clients, false on servers. This can
|
||||
// be changed on reload.
|
||||
|
@ -1577,6 +1580,14 @@ func MergeConfig(a, b *Config) *Config {
|
|||
result.HTTPAPIResponseHeaders[field] = value
|
||||
}
|
||||
}
|
||||
if len(b.Meta) != 0 {
|
||||
if result.Meta == nil {
|
||||
result.Meta = make(map[string]string)
|
||||
}
|
||||
for field, value := range b.Meta {
|
||||
result.Meta[field] = value
|
||||
}
|
||||
}
|
||||
|
||||
// Copy the start join addresses
|
||||
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
|
||||
|
|
|
@ -281,6 +281,19 @@ func TestDecodeConfig(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// Node metadata fields
|
||||
input = `{"node_meta": {"thing1": "1", "thing2": "2"}}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if v, ok := config.Meta["thing1"]; !ok || v != "1" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
if v, ok := config.Meta["thing2"]; !ok || v != "2" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// leave_on_terminate
|
||||
input = `{"leave_on_terminate": true}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
|
@ -1519,6 +1532,9 @@ func TestMergeConfig(t *testing.T) {
|
|||
DogStatsdAddr: "nope",
|
||||
DogStatsdTags: []string{"nope"},
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"key": "value1",
|
||||
},
|
||||
}
|
||||
|
||||
b := &Config{
|
||||
|
@ -1620,6 +1636,9 @@ func TestMergeConfig(t *testing.T) {
|
|||
DogStatsdAddr: "127.0.0.1:7254",
|
||||
DogStatsdTags: []string{"tag_1:val_1", "tag_2:val_2"},
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"key": "value2",
|
||||
},
|
||||
DisableUpdateCheck: true,
|
||||
DisableAnonymousSignature: true,
|
||||
HTTPAPIResponseHeaders: map[string]string{
|
||||
|
|
|
@ -61,6 +61,9 @@ type localState struct {
|
|||
// Used to track checks that are being deferred
|
||||
deferCheck map[types.CheckID]*time.Timer
|
||||
|
||||
// metadata tracks the local metadata fields
|
||||
metadata map[string]string
|
||||
|
||||
// consulCh is used to inform of a change to the known
|
||||
// consul nodes. This may be used to retry a sync run
|
||||
consulCh chan struct{}
|
||||
|
@ -82,6 +85,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
|
|||
l.checkTokens = make(map[types.CheckID]string)
|
||||
l.checkCriticalTime = make(map[types.CheckID]time.Time)
|
||||
l.deferCheck = make(map[types.CheckID]*time.Timer)
|
||||
l.metadata = make(map[string]string)
|
||||
l.consulCh = make(chan struct{}, 1)
|
||||
l.triggerCh = make(chan struct{}, 1)
|
||||
}
|
||||
|
@ -339,6 +343,19 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
|
|||
return checks
|
||||
}
|
||||
|
||||
// Metadata returns the local node metadata fields that the
|
||||
// agent is aware of and are being kept in sync with the server
|
||||
func (l *localState) Metadata() map[string]string {
|
||||
metadata := make(map[string]string)
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
for key, value := range l.metadata {
|
||||
metadata[key] = value
|
||||
}
|
||||
return metadata
|
||||
}
|
||||
|
||||
// antiEntropy is a long running method used to perform anti-entropy
|
||||
// between local and remote state.
|
||||
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
|
||||
|
@ -412,10 +429,10 @@ func (l *localState) setSyncState() error {
|
|||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
// Check the node info (currently limited to tagged addresses since
|
||||
// everything else is managed by the Serf layer)
|
||||
// Check the node info
|
||||
if out1.NodeServices == nil || out1.NodeServices.Node == nil ||
|
||||
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) {
|
||||
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) ||
|
||||
!reflect.DeepEqual(out1.NodeServices.Node.Meta, l.metadata) {
|
||||
l.nodeInfoInSync = false
|
||||
}
|
||||
|
||||
|
@ -619,6 +636,7 @@ func (l *localState) syncService(id string) error {
|
|||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
NodeMeta: l.metadata,
|
||||
Service: l.services[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
||||
}
|
||||
|
@ -680,6 +698,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
|
|||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
NodeMeta: l.metadata,
|
||||
Service: service,
|
||||
Check: l.checks[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||
|
@ -706,6 +725,7 @@ func (l *localState) syncNodeInfo() error {
|
|||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
NodeMeta: l.metadata,
|
||||
WriteRequest: structs.WriteRequest{Token: l.config.GetTokenForAgent()},
|
||||
}
|
||||
var out struct{}
|
||||
|
|
|
@ -156,6 +156,14 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
|||
return err
|
||||
}
|
||||
|
||||
var metaFilter []interface{}
|
||||
if args.NodeMetaKey != "" {
|
||||
metaFilter = append(metaFilter, args.NodeMetaKey)
|
||||
if args.NodeMetaValue != "" {
|
||||
metaFilter = append(metaFilter, args.NodeMetaValue)
|
||||
}
|
||||
}
|
||||
|
||||
// Get the list of nodes.
|
||||
state := c.srv.fsm.State()
|
||||
return c.srv.blockingRPC(
|
||||
|
@ -163,7 +171,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
|||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("Nodes"),
|
||||
func() error {
|
||||
index, nodes, err := state.Nodes()
|
||||
index, nodes, err := state.Nodes(metaFilter...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -78,6 +78,15 @@ func nodesTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
"meta": &memdb.IndexSchema{
|
||||
Name: "meta",
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringMapFieldIndex{
|
||||
Field: "Meta",
|
||||
Lowercase: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -426,6 +426,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
|
|||
Node: req.Node,
|
||||
Address: req.Address,
|
||||
TaggedAddresses: req.TaggedAddresses,
|
||||
Meta: req.NodeMeta,
|
||||
}
|
||||
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
||||
return fmt.Errorf("failed inserting node: %s", err)
|
||||
|
@ -527,7 +528,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
|
|||
}
|
||||
|
||||
// Nodes is used to return all of the known nodes.
|
||||
func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
|
||||
func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -535,7 +536,13 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
|
|||
idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...)
|
||||
|
||||
// Retrieve all of the nodes
|
||||
nodes, err := tx.Get("nodes", "id")
|
||||
var nodes memdb.ResultIterator
|
||||
var err error
|
||||
if len(metaFilter) > 0 {
|
||||
nodes, err = tx.Get("nodes", "meta", metaFilter...)
|
||||
} else {
|
||||
nodes, err = tx.Get("nodes", "id")
|
||||
}
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
@ -854,6 +861,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo
|
|||
node := n.(*structs.Node)
|
||||
s.Address = node.Address
|
||||
s.TaggedAddresses = node.TaggedAddresses
|
||||
s.NodeMeta = node.Meta
|
||||
|
||||
results = append(results, s)
|
||||
}
|
||||
|
@ -1392,6 +1400,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
|
|||
Node: node.Node,
|
||||
Address: node.Address,
|
||||
TaggedAddresses: node.TaggedAddresses,
|
||||
Meta: node.Meta,
|
||||
}
|
||||
|
||||
// Query the node services
|
||||
|
|
|
@ -173,6 +173,7 @@ type RegisterRequest struct {
|
|||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
NodeMeta map[string]string
|
||||
Service *NodeService
|
||||
Check *HealthCheck
|
||||
Checks HealthChecks
|
||||
|
@ -195,7 +196,8 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool {
|
|||
// Check if any of the node-level fields are being changed.
|
||||
if r.Node != node.Node ||
|
||||
r.Address != node.Address ||
|
||||
!reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) {
|
||||
!reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) ||
|
||||
!reflect.DeepEqual(r.NodeMeta, node.Meta) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -227,8 +229,10 @@ type QuerySource struct {
|
|||
|
||||
// DCSpecificRequest is used to query about a specific DC
|
||||
type DCSpecificRequest struct {
|
||||
Datacenter string
|
||||
Source QuerySource
|
||||
Datacenter string
|
||||
NodeMetaKey string
|
||||
NodeMetaValue string
|
||||
Source QuerySource
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
|
@ -278,6 +282,7 @@ type Node struct {
|
|||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
Meta map[string]string
|
||||
|
||||
RaftIndex
|
||||
}
|
||||
|
@ -296,6 +301,7 @@ type ServiceNode struct {
|
|||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
NodeMeta map[string]string
|
||||
ServiceID string
|
||||
ServiceName string
|
||||
ServiceTags []string
|
||||
|
@ -488,6 +494,7 @@ type NodeInfo struct {
|
|||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
Meta map[string]string
|
||||
Services []*NodeService
|
||||
Checks HealthChecks
|
||||
}
|
||||
|
|
|
@ -151,6 +151,9 @@ func testServiceNode() *ServiceNode {
|
|||
TaggedAddresses: map[string]string{
|
||||
"hello": "world",
|
||||
},
|
||||
NodeMeta: map[string]string{
|
||||
"tag": "value",
|
||||
},
|
||||
ServiceID: "service1",
|
||||
ServiceName: "dogs",
|
||||
ServiceTags: []string{"prod", "v1"},
|
||||
|
@ -172,12 +175,13 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) {
|
|||
// Make sure the parts that weren't supposed to be cloned didn't get
|
||||
// copied over, then zero-value them out so we can do a DeepEqual() on
|
||||
// the rest of the contents.
|
||||
if clone.Address != "" || len(clone.TaggedAddresses) != 0 {
|
||||
if clone.Address != "" || len(clone.TaggedAddresses) != 0 || len(clone.NodeMeta) != 0 {
|
||||
t.Fatalf("bad: %v", clone)
|
||||
}
|
||||
|
||||
sn.Address = ""
|
||||
sn.TaggedAddresses = nil
|
||||
sn.NodeMeta = nil
|
||||
if !reflect.DeepEqual(sn, clone) {
|
||||
t.Fatalf("bad: %v", clone)
|
||||
}
|
||||
|
@ -197,6 +201,7 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) {
|
|||
// them out before we do the compare.
|
||||
sn.Address = ""
|
||||
sn.TaggedAddresses = nil
|
||||
sn.NodeMeta = nil
|
||||
if !reflect.DeepEqual(sn, sn2) {
|
||||
t.Fatalf("bad: %v", sn2)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue