diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 496416d37..36cfc4623 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -146,7 +146,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { } // Verify check - _, checks, err := fsm.state.NodeChecks("foo") + _, checks, err := fsm.state.NodeChecks(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -268,7 +268,7 @@ func TestFSM_DeregisterCheck(t *testing.T) { } // Verify check not registered - _, checks, err := fsm.state.NodeChecks("foo") + _, checks, err := fsm.state.NodeChecks(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -344,7 +344,7 @@ func TestFSM_DeregisterNode(t *testing.T) { } // Verify checks not registered - _, checks, err := fsm.state.NodeChecks("foo") + _, checks, err := fsm.state.NodeChecks(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -482,7 +482,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { t.Fatalf("Bad: %v", fooSrv) } - _, checks, err := fsm2.state.NodeChecks("foo") + _, checks, err := fsm2.state.NodeChecks(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 7eb28ce2a..0e33151ab 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" ) // Health endpoint is used to query the health information @@ -20,18 +21,17 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // Get the state specific checks state := h.srv.fsm.State() - return h.srv.blockingRPC( + return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("ChecksInState"), - func() error { + func(ws memdb.WatchSet) error { var index uint64 var checks structs.HealthChecks var err error if len(args.NodeMetaFilters) > 0 { - index, checks, err = state.ChecksInStateByNodeMeta(args.State, args.NodeMetaFilters) + index, checks, err = state.ChecksInStateByNodeMeta(ws, args.State, args.NodeMetaFilters) } else { - index, checks, err = state.ChecksInState(args.State) + index, checks, err = state.ChecksInState(ws, args.State) } if err != nil { return err @@ -53,12 +53,11 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, // Get the node checks state := h.srv.fsm.State() - return h.srv.blockingRPC( + return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("NodeChecks"), - func() error { - index, checks, err := state.NodeChecks(args.Node) + func(ws memdb.WatchSet) error { + index, checks, err := state.NodeChecks(ws, args.Node) if err != nil { return err } @@ -82,18 +81,17 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // Get the service checks state := h.srv.fsm.State() - return h.srv.blockingRPC( + return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("ServiceChecks"), - func() error { + func(ws memdb.WatchSet) error { var index uint64 var checks structs.HealthChecks var err error if len(args.NodeMetaFilters) > 0 { - index, checks, err = state.ServiceChecksByNodeMeta(args.ServiceName, args.NodeMetaFilters) + index, checks, err = state.ServiceChecksByNodeMeta(ws, args.ServiceName, args.NodeMetaFilters) } else { - index, checks, err = state.ServiceChecks(args.ServiceName) + index, checks, err = state.ServiceChecks(ws, args.ServiceName) } if err != nil { return err @@ -119,18 +117,17 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc // Get the nodes state := h.srv.fsm.State() - err := h.srv.blockingRPC( + err := h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("CheckServiceNodes"), - func() error { + func(ws memdb.WatchSet) error { var index uint64 var nodes structs.CheckServiceNodes var err error if args.TagFilter { - index, nodes, err = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) + index, nodes, err = state.CheckServiceTagNodes(ws, args.ServiceName, args.ServiceTag) } else { - index, nodes, err = state.CheckServiceNodes(args.ServiceName) + index, nodes, err = state.CheckServiceNodes(ws, args.ServiceName) } if err != nil { return err diff --git a/consul/issue_test.go b/consul/issue_test.go index 45f9e91c6..e2fdf9470 100644 --- a/consul/issue_test.go +++ b/consul/issue_test.go @@ -45,7 +45,7 @@ func TestHealthCheckRace(t *testing.T) { } // Verify the index - idx, out1, err := state.CheckServiceNodes("db") + idx, out1, err := state.CheckServiceNodes(nil, "db") if err != nil { t.Fatalf("err: %s", err) } @@ -68,7 +68,7 @@ func TestHealthCheckRace(t *testing.T) { } // Verify the index changed - idx, out2, err := state.CheckServiceNodes("db") + idx, out2, err := state.CheckServiceNodes(nil, "db") if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/leader.go b/consul/leader.go index 93505753a..2340cdaa4 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -262,7 +262,7 @@ func (s *Server) reconcile() (err error) { // a "reap" event to cause the node to be cleaned up. func (s *Server) reconcileReaped(known map[string]struct{}) error { state := s.fsm.State() - _, checks, err := state.ChecksInState(structs.HealthAny) + _, checks, err := state.ChecksInState(nil, structs.HealthAny) if err != nil { return err } @@ -402,7 +402,7 @@ func (s *Server) handleAliveMember(member serf.Member) error { } // Check if the serfCheck is in the passing state - _, checks, err := state.NodeChecks(member.Name) + _, checks, err := state.NodeChecks(nil, member.Name) if err != nil { return err } @@ -446,7 +446,7 @@ func (s *Server) handleFailedMember(member serf.Member) error { } if node != nil && node.Address == member.Addr.String() { // Check if the serfCheck is in the critical state - _, checks, err := state.NodeChecks(member.Name) + _, checks, err := state.NodeChecks(nil, member.Name) if err != nil { return err } diff --git a/consul/leader_test.go b/consul/leader_test.go index 3ff724090..c4312dfba 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -44,7 +44,7 @@ func TestLeader_RegisterMember(t *testing.T) { }) // Should have a check - _, checks, err := state.NodeChecks(c1.config.NodeName) + _, checks, err := state.NodeChecks(nil, c1.config.NodeName) if err != nil { t.Fatalf("err: %v", err) } @@ -114,7 +114,7 @@ func TestLeader_FailedMember(t *testing.T) { }) // Should have a check - _, checks, err := state.NodeChecks(c1.config.NodeName) + _, checks, err := state.NodeChecks(nil, c1.config.NodeName) if err != nil { t.Fatalf("err: %v", err) } @@ -129,7 +129,7 @@ func TestLeader_FailedMember(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - _, checks, err = state.NodeChecks(c1.config.NodeName) + _, checks, err = state.NodeChecks(nil, c1.config.NodeName) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index d53d8fc02..6a1c5a760 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -489,7 +489,7 @@ func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRe func (p *PreparedQuery) execute(query *structs.PreparedQuery, reply *structs.PreparedQueryExecuteResponse) error { state := p.srv.fsm.State() - _, nodes, err := state.CheckServiceNodes(query.Service.Service) + _, nodes, err := state.CheckServiceNodes(nil, query.Service.Service) if err != nil { return err } diff --git a/consul/state/catalog.go b/consul/state/catalog.go index 3c5638243..cdc403845 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -499,6 +499,7 @@ func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]st if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) { continue } + // List all the services on the node services, err := tx.Get("services", "node", n.Node) if err != nil { @@ -552,7 +553,7 @@ func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64 results = append(results, service.(*structs.ServiceNode)) } - // Fill in the address details. + // Fill in the node details. results, err = s.parseServiceNodes(tx, ws, results) if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) @@ -585,7 +586,7 @@ func (s *StateStore) ServiceTagNodes(ws memdb.WatchSet, service string, tag stri } } - // Fill in the address details. + // Fill in the node details. results, err = s.parseServiceNodes(tx, ws, results) if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) @@ -882,13 +883,14 @@ func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64, defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("NodeCheck")...) + idx := maxIndexTxn(tx, "checks") // Return the check. check, err := tx.First("checks", "id", nodeName, string(checkID)) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } + if check != nil { return idx, check.(*structs.HealthCheck), nil } else { @@ -898,115 +900,20 @@ func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64, // NodeChecks is used to retrieve checks associated with the // given node from the state store. -func (s *StateStore) NodeChecks(nodeName string) (uint64, structs.HealthChecks, error) { +func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("NodeChecks")...) + idx := maxIndexTxn(tx, "checks") // Return the checks. - checks, err := tx.Get("checks", "node", nodeName) + iter, err := tx.Get("checks", "node", nodeName) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } - return s.parseChecks(idx, checks) -} + ws.Add(iter.WatchCh()) -// ServiceChecks is used to get all checks associated with a -// given service ID. The query is performed against a service -// _name_ instead of a service ID. -func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthChecks, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecks")...) - - // Return the checks. - checks, err := tx.Get("checks", "service", serviceName) - if err != nil { - return 0, nil, fmt.Errorf("failed check lookup: %s", err) - } - return s.parseChecks(idx, checks) -} - -// ServiceChecksByNodeMeta is used to get all checks associated with a -// given service ID, filtered by the given node metadata values. The query -// is performed against a service _name_ instead of a service ID. -func (s *StateStore) ServiceChecksByNodeMeta(serviceName string, filters map[string]string) (uint64, structs.HealthChecks, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecksByNodeMeta")...) - - // Return the checks. - checks, err := tx.Get("checks", "service", serviceName) - if err != nil { - return 0, nil, fmt.Errorf("failed check lookup: %s", err) - } - return s.parseChecksByNodeMeta(idx, checks, tx, filters) -} - -// ChecksInState is used to query the state store for all checks -// which are in the provided state. -func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("ChecksInState")...) - - // Query all checks if HealthAny is passed - if state == structs.HealthAny { - checks, err := tx.Get("checks", "status") - if err != nil { - return 0, nil, fmt.Errorf("failed check lookup: %s", err) - } - return s.parseChecks(idx, checks) - } - - // Any other state we need to query for explicitly - checks, err := tx.Get("checks", "status", state) - if err != nil { - return 0, nil, fmt.Errorf("failed check lookup: %s", err) - } - return s.parseChecks(idx, checks) -} - -// ChecksInStateByNodeMeta is used to query the state store for all checks -// which are in the provided state, filtered by the given node metadata values. -func (s *StateStore) ChecksInStateByNodeMeta(state string, filters map[string]string) (uint64, structs.HealthChecks, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("ChecksInStateByNodeMeta")...) - - // Query all checks if HealthAny is passed - var checks memdb.ResultIterator - var err error - if state == structs.HealthAny { - checks, err = tx.Get("checks", "status") - if err != nil { - return 0, nil, fmt.Errorf("failed check lookup: %s", err) - } - } else { - // Any other state we need to query for explicitly - checks, err = tx.Get("checks", "status", state) - if err != nil { - return 0, nil, fmt.Errorf("failed check lookup: %s", err) - } - } - - return s.parseChecksByNodeMeta(idx, checks, tx, filters) -} - -// parseChecks is a helper function used to deduplicate some -// repetitive code for returning health checks. -func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) { - // Gather the health checks and return them properly type casted. var results structs.HealthChecks for check := iter.Next(); check != nil; check = iter.Next() { results = append(results, check.(*structs.HealthCheck)) @@ -1014,20 +921,140 @@ func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, return idx, results, nil } +// ServiceChecks is used to get all checks associated with a +// given service ID. The query is performed against a service +// _name_ instead of a service ID. +func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, "checks") + + // Return the checks. + iter, err := tx.Get("checks", "service", serviceName) + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + ws.Add(iter.WatchCh()) + + var results structs.HealthChecks + for check := iter.Next(); check != nil; check = iter.Next() { + results = append(results, check.(*structs.HealthCheck)) + } + return idx, results, nil +} + +// ServiceChecksByNodeMeta is used to get all checks associated with a +// given service ID, filtered by the given node metadata values. The query +// is performed against a service _name_ instead of a service ID. +func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string, + filters map[string]string) (uint64, structs.HealthChecks, error) { + + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, "nodes", "checks") + + // Return the checks. + iter, err := tx.Get("checks", "service", serviceName) + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + ws.Add(iter.WatchCh()) + + return s.parseChecksByNodeMeta(tx, ws, idx, iter, filters) +} + +// ChecksInState is used to query the state store for all checks +// which are in the provided state. +func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, "checks") + + // Query all checks if HealthAny is passed, otherwise use the index. + var iter memdb.ResultIterator + var err error + if state == structs.HealthAny { + iter, err = tx.Get("checks", "status") + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + } else { + iter, err = tx.Get("checks", "status", state) + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + } + ws.Add(iter.WatchCh()) + + var results structs.HealthChecks + for check := iter.Next(); check != nil; check = iter.Next() { + results = append(results, check.(*structs.HealthCheck)) + } + return idx, results, nil +} + +// ChecksInStateByNodeMeta is used to query the state store for all checks +// which are in the provided state, filtered by the given node metadata values. +func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, "nodes", "checks") + + // Query all checks if HealthAny is passed, otherwise use the index. + var iter memdb.ResultIterator + var err error + if state == structs.HealthAny { + iter, err = tx.Get("checks", "status") + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + } else { + iter, err = tx.Get("checks", "status", state) + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + } + ws.Add(iter.WatchCh()) + + return s.parseChecksByNodeMeta(tx, ws, idx, iter, filters) +} + // parseChecksByNodeMeta is a helper function used to deduplicate some // repetitive code for returning health checks filtered by node metadata fields. -func (s *StateStore) parseChecksByNodeMeta(idx uint64, iter memdb.ResultIterator, tx *memdb.Txn, - filters map[string]string) (uint64, structs.HealthChecks, error) { +func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet, + idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) { + + // We don't want to track an unlimited number of nodes, so we pull a + // top-level watch to use as a fallback. + allNodes, err := tx.Get("nodes", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) + } + allNodesCh := allNodes.WatchCh() + + // Only take results for nodes that satisfy the node metadata filters. var results structs.HealthChecks for check := iter.Next(); check != nil; check = iter.Next() { healthCheck := check.(*structs.HealthCheck) - node, err := tx.First("nodes", "id", healthCheck.Node) + watchCh, node, err := tx.FirstWatch("nodes", "id", healthCheck.Node) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } if node == nil { return 0, nil, ErrMissingNode } + + // Add even the filtered nodes so we wake up if the node metadata + // changes. + ws.AddWithLimit(watchLimit, watchCh, allNodesCh) if structs.SatisfiesMetaFilters(node.(*structs.Node).Meta, filters) { results = append(results, healthCheck) } @@ -1093,58 +1120,61 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc } // CheckServiceNodes is used to query all nodes and checks for a given service. -func (s *StateStore) CheckServiceNodes(serviceName string) (uint64, structs.CheckServiceNodes, error) { +func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...) + idx := maxIndexTxn(tx, "nodes", "services", "checks") // Query the state store for the service. - services, err := tx.Get("services", "service", serviceName) + iter, err := tx.Get("services", "service", serviceName) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } + ws.Add(iter.WatchCh()) // Return the results. var results structs.ServiceNodes - for service := services.Next(); service != nil; service = services.Next() { + for service := iter.Next(); service != nil; service = iter.Next() { results = append(results, service.(*structs.ServiceNode)) } - return s.parseCheckServiceNodes(tx, idx, results, err) + return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err) } // CheckServiceTagNodes is used to query all nodes and checks for a given // service, filtering out services that don't contain the given tag. -func (s *StateStore) CheckServiceTagNodes(serviceName, tag string) (uint64, structs.CheckServiceNodes, error) { +func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...) + idx := maxIndexTxn(tx, "nodes", "services", "checks") // Query the state store for the service. - services, err := tx.Get("services", "service", serviceName) + iter, err := tx.Get("services", "service", serviceName) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } + ws.Add(iter.WatchCh()) // Return the results, filtering by tag. var results structs.ServiceNodes - for service := services.Next(); service != nil; service = services.Next() { + for service := iter.Next(); service != nil; service = iter.Next() { svc := service.(*structs.ServiceNode) if !serviceTagFilter(svc, tag) { results = append(results, svc) } } - return s.parseCheckServiceNodes(tx, idx, results, err) + return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err) } // parseCheckServiceNodes is used to parse through a given set of services, // and query for an associated node and a set of checks. This is the inner // method used to return a rich set of results from a more simple query. func (s *StateStore) parseCheckServiceNodes( - tx *memdb.Txn, idx uint64, services structs.ServiceNodes, + tx *memdb.Txn, ws memdb.WatchSet, idx uint64, + serviceName string, services structs.ServiceNodes, err error) (uint64, structs.CheckServiceNodes, error) { if err != nil { return 0, nil, err @@ -1156,32 +1186,57 @@ func (s *StateStore) parseCheckServiceNodes( return idx, nil, nil } + // We don't want to track an unlimited number of nodes, so we pull a + // top-level watch to use as a fallback. + allNodes, err := tx.Get("nodes", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) + } + allNodesCh := allNodes.WatchCh() + + // We need a similar fallback for checks. Since services need the + // status of node + service-specific checks, we pull in a top-level + // watch over all checks. + allChecks, err := tx.Get("checks", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed checks lookup: %s", err) + } + allChecksCh := allChecks.WatchCh() + results := make(structs.CheckServiceNodes, 0, len(services)) for _, sn := range services { // Retrieve the node. - n, err := tx.First("nodes", "id", sn.Node) + watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } + ws.AddWithLimit(watchLimit, watchCh, allNodesCh) + if n == nil { return 0, nil, ErrMissingNode } node := n.(*structs.Node) - // We need to return the checks specific to the given service - // as well as the node itself. Unfortunately, memdb won't let - // us use the index to do the latter query so we have to pull - // them all and filter. + // First add the node-level checks. These always apply to any + // service on the node. var checks structs.HealthChecks - iter, err := tx.Get("checks", "node", sn.Node) + iter, err := tx.Get("checks", "node_service_check", sn.Node, false) if err != nil { return 0, nil, err } + ws.AddWithLimit(watchLimit, iter.WatchCh(), allChecksCh) for check := iter.Next(); check != nil; check = iter.Next() { - hc := check.(*structs.HealthCheck) - if hc.ServiceID == "" || hc.ServiceID == sn.ServiceID { - checks = append(checks, hc) - } + checks = append(checks, check.(*structs.HealthCheck)) + } + + // Now add the service-specific checks. + iter, err = tx.Get("checks", "node_service", sn.Node, sn.ServiceID) + if err != nil { + return 0, nil, err + } + ws.AddWithLimit(watchLimit, iter.WatchCh(), allChecksCh) + for check := iter.Next(); check != nil; check = iter.Next() { + checks = append(checks, check.(*structs.HealthCheck)) } // Append to the results. diff --git a/consul/state/catalog_test.go b/consul/state/catalog_test.go index f4793efe3..b66a755db 100644 --- a/consul/state/catalog_test.go +++ b/consul/state/catalog_test.go @@ -106,7 +106,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { // Verify that the check got registered. verifyCheck := func() { - idx, out, err := s.NodeChecks("node1") + idx, out, err := s.NodeChecks(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -154,7 +154,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { verifyNode() verifyService() { - idx, out, err := s.NodeChecks("node1") + idx, out, err := s.NodeChecks(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -252,7 +252,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { // Verify that the check got registered. verifyCheck := func() { - idx, out, err := s.NodeChecks("node1") + idx, out, err := s.NodeChecks(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -290,7 +290,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { verifyNode() verifyService() func() { - idx, out, err := s.NodeChecks("node1") + idx, out, err := s.NodeChecks(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -1620,7 +1620,7 @@ func TestStateStore_EnsureCheck(t *testing.T) { } // Retrieve the check and make sure it matches - idx, checks, err := s.NodeChecks("node1") + idx, checks, err := s.NodeChecks(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -1641,7 +1641,7 @@ func TestStateStore_EnsureCheck(t *testing.T) { } // Check that we successfully updated - idx, checks, err = s.NodeChecks("node1") + idx, checks, err = s.NodeChecks(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -1681,7 +1681,7 @@ func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) { } // Get the check again - _, result, err := s.NodeChecks("node1") + _, result, err := s.NodeChecks(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -1695,19 +1695,34 @@ func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) { func TestStateStore_NodeChecks(t *testing.T) { s := testStateStore(t) - // Create the first node and service with some checks + // Do an initial query for a node that doesn't exist. + ws := memdb.NewWatchSet() + idx, checks, err := s.NodeChecks(ws, "node1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if len(checks) != 0 { + t.Fatalf("bad: %#v", checks) + } + + // Create some nodes and checks. testRegisterNode(t, s, 0, "node1") testRegisterService(t, s, 1, "node1", "service1") testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing) testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing) - - // Create a second node/service with a different set of checks testRegisterNode(t, s, 4, "node2") testRegisterService(t, s, 5, "node2", "service2") testRegisterCheck(t, s, 6, "node2", "service2", "check3", structs.HealthPassing) + if !watchFired(ws) { + t.Fatalf("bad") + } // Try querying for all checks associated with node1 - idx, checks, err := s.NodeChecks("node1") + ws = memdb.NewWatchSet() + idx, checks, err = s.NodeChecks(ws, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -1718,35 +1733,64 @@ func TestStateStore_NodeChecks(t *testing.T) { t.Fatalf("bad checks: %#v", checks) } + // Creating some unrelated node should not fire the watch. + testRegisterNode(t, s, 7, "node3") + testRegisterCheck(t, s, 8, "node3", "", "check1", structs.HealthPassing) + if watchFired(ws) { + t.Fatalf("bad") + } + // Try querying for all checks associated with node2 - idx, checks, err = s.NodeChecks("node2") + ws = memdb.NewWatchSet() + idx, checks, err = s.NodeChecks(ws, "node2") if err != nil { t.Fatalf("err: %s", err) } - if idx != 6 { + if idx != 8 { t.Fatalf("bad index: %d", idx) } if len(checks) != 1 || checks[0].CheckID != "check3" { t.Fatalf("bad checks: %#v", checks) } + + // Changing node2 should fire the watch. + testRegisterCheck(t, s, 9, "node2", "service2", "check3", structs.HealthCritical) + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_ServiceChecks(t *testing.T) { s := testStateStore(t) - // Create the first node and service with some checks + // Do an initial query for a service that doesn't exist. + ws := memdb.NewWatchSet() + idx, checks, err := s.ServiceChecks(ws, "service1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if len(checks) != 0 { + t.Fatalf("bad: %#v", checks) + } + + // Create some nodes and checks. testRegisterNode(t, s, 0, "node1") testRegisterService(t, s, 1, "node1", "service1") testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing) testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing) - - // Create a second node/service with a different set of checks testRegisterNode(t, s, 4, "node2") testRegisterService(t, s, 5, "node2", "service2") testRegisterCheck(t, s, 6, "node2", "service2", "check3", structs.HealthPassing) + if !watchFired(ws) { + t.Fatalf("bad") + } - // Try querying for all checks associated with service1 - idx, checks, err := s.ServiceChecks("service1") + // Try querying for all checks associated with service1. + ws = memdb.NewWatchSet() + idx, checks, err = s.ServiceChecks(ws, "service1") if err != nil { t.Fatalf("err: %s", err) } @@ -1756,21 +1800,48 @@ func TestStateStore_ServiceChecks(t *testing.T) { if len(checks) != 2 || checks[0].CheckID != "check1" || checks[1].CheckID != "check2" { t.Fatalf("bad checks: %#v", checks) } + + // Adding some unrelated service + check should not fire the watch. + testRegisterService(t, s, 7, "node1", "service3") + testRegisterCheck(t, s, 8, "node1", "service3", "check3", structs.HealthPassing) + if watchFired(ws) { + t.Fatalf("bad") + } + + // Updating a related check should fire the watch. + testRegisterCheck(t, s, 9, "node1", "service1", "check2", structs.HealthCritical) + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) { s := testStateStore(t) - // Create the first node and service with some checks + // Querying with no results returns nil. + ws := memdb.NewWatchSet() + idx, checks, err := s.ServiceChecksByNodeMeta(ws, "service1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if len(checks) != 0 { + t.Fatalf("bad: %#v", checks) + } + + // Create some nodes and checks. testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"}) testRegisterService(t, s, 1, "node1", "service1") testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing) testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing) - - // Create a second node/service with a different set of checks testRegisterNodeWithMeta(t, s, 4, "node2", map[string]string{"common": "1"}) testRegisterService(t, s, 5, "node2", "service1") testRegisterCheck(t, s, 6, "node2", "service1", "check3", structs.HealthPassing) + if !watchFired(ws) { + t.Fatalf("bad") + } cases := []struct { filters map[string]string @@ -1798,9 +1869,11 @@ func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) { }, } - // Try querying for all checks associated with service1 + // Try querying for all checks associated with service1. + idx = 7 for _, tc := range cases { - _, checks, err := s.ServiceChecksByNodeMeta("service1", tc.filters) + ws = memdb.NewWatchSet() + _, checks, err := s.ServiceChecksByNodeMeta(ws, "service1", tc.filters) if err != nil { t.Fatalf("err: %s", err) } @@ -1812,6 +1885,39 @@ func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) { t.Fatalf("bad checks: %#v", checks) } } + + // Registering some unrelated node should not fire the watch. + testRegisterNode(t, s, idx, fmt.Sprintf("nope%d", idx)) + idx++ + if watchFired(ws) { + t.Fatalf("bad") + } + } + + // Overwhelm the node tracking. + for i := 0; i < 2*watchLimit; i++ { + node := fmt.Sprintf("many%d", idx) + testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"}) + idx++ + testRegisterService(t, s, idx, node, "service1") + idx++ + testRegisterCheck(t, s, idx, node, "service1", "check1", structs.HealthPassing) + idx++ + } + + // Now get a fresh watch, which will be forced to watch the whole + // node table. + ws = memdb.NewWatchSet() + _, _, err = s.ServiceChecksByNodeMeta(ws, "service1", + map[string]string{"common": "1"}) + if err != nil { + t.Fatalf("err: %s", err) + } + + // Registering some unrelated node should now fire the watch. + testRegisterNode(t, s, idx, "nope") + if !watchFired(ws) { + t.Fatalf("bad") } } @@ -1819,7 +1925,8 @@ func TestStateStore_ChecksInState(t *testing.T) { s := testStateStore(t) // Querying with no results returns nil - idx, res, err := s.ChecksInState(structs.HealthPassing) + ws := memdb.NewWatchSet() + idx, res, err := s.ChecksInState(ws, structs.HealthPassing) if idx != 0 || res != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } @@ -1829,9 +1936,13 @@ func TestStateStore_ChecksInState(t *testing.T) { testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing) testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical) testRegisterCheck(t, s, 3, "node1", "", "check3", structs.HealthPassing) + if !watchFired(ws) { + t.Fatalf("bad") + } // Query the state store for passing checks. - _, checks, err := s.ChecksInState(structs.HealthPassing) + ws = memdb.NewWatchSet() + _, checks, err := s.ChecksInState(ws, structs.HealthPassing) if err != nil { t.Fatalf("err: %s", err) } @@ -1843,33 +1954,55 @@ func TestStateStore_ChecksInState(t *testing.T) { if checks[0].CheckID != "check1" || checks[1].CheckID != "check3" { t.Fatalf("bad: %#v", checks) } + if watchFired(ws) { + t.Fatalf("bad") + } + + // Changing the state of a check should fire the watch. + testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthCritical) + if !watchFired(ws) { + t.Fatalf("bad") + } // HealthAny just returns everything. - _, checks, err = s.ChecksInState(structs.HealthAny) + ws = memdb.NewWatchSet() + _, checks, err = s.ChecksInState(ws, structs.HealthAny) if err != nil { t.Fatalf("err: %s", err) } if n := len(checks); n != 3 { t.Fatalf("expected 3 checks, got: %d", n) } + if watchFired(ws) { + t.Fatalf("bad") + } + + // Adding a new check should fire the watch. + testRegisterCheck(t, s, 5, "node1", "", "check4", structs.HealthCritical) + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) { s := testStateStore(t) - // Querying with no results returns nil - idx, res, err := s.ChecksInStateByNodeMeta(structs.HealthPassing, nil) + // Querying with no results returns nil. + ws := memdb.NewWatchSet() + idx, res, err := s.ChecksInStateByNodeMeta(ws, structs.HealthPassing, nil) if idx != 0 || res != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } - // Register a node with checks in varied states + // Register a node with checks in varied states. testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"}) testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing) testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical) - testRegisterNodeWithMeta(t, s, 3, "node2", map[string]string{"common": "1"}) testRegisterCheck(t, s, 4, "node2", "", "check3", structs.HealthPassing) + if !watchFired(ws) { + t.Fatalf("bad") + } cases := []struct { filters map[string]string @@ -1919,9 +2052,11 @@ func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) { }, } - // Try querying for all checks associated with service1 + // Try querying for all checks associated with service1. + idx = 5 for _, tc := range cases { - _, checks, err := s.ChecksInStateByNodeMeta(tc.state, tc.filters) + ws = memdb.NewWatchSet() + _, checks, err := s.ChecksInStateByNodeMeta(ws, tc.state, tc.filters) if err != nil { t.Fatalf("err: %s", err) } @@ -1933,23 +2068,70 @@ func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) { t.Fatalf("bad checks: %#v, %v", checks, tc.checks) } } + + // Registering some unrelated node should not fire the watch. + testRegisterNode(t, s, idx, fmt.Sprintf("nope%d", idx)) + idx++ + if watchFired(ws) { + t.Fatalf("bad") + } + } + + // Overwhelm the node tracking. + for i := 0; i < 2*watchLimit; i++ { + node := fmt.Sprintf("many%d", idx) + testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"}) + idx++ + testRegisterService(t, s, idx, node, "service1") + idx++ + testRegisterCheck(t, s, idx, node, "service1", "check1", structs.HealthPassing) + idx++ + } + + // Now get a fresh watch, which will be forced to watch the whole + // node table. + ws = memdb.NewWatchSet() + _, _, err = s.ChecksInStateByNodeMeta(ws, structs.HealthPassing, + map[string]string{"common": "1"}) + if err != nil { + t.Fatalf("err: %s", err) + } + + // Registering some unrelated node should now fire the watch. + testRegisterNode(t, s, idx, "nope") + if !watchFired(ws) { + t.Fatalf("bad") } } func TestStateStore_DeleteCheck(t *testing.T) { s := testStateStore(t) - // Register a node and a node-level health check + // Register a node and a node-level health check. testRegisterNode(t, s, 1, "node1") testRegisterCheck(t, s, 2, "node1", "", "check1", structs.HealthPassing) - // Delete the check + // Make sure the check is there. + ws := memdb.NewWatchSet() + _, checks, err := s.NodeChecks(ws, "node1") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(checks) != 1 { + t.Fatalf("bad: %#v", checks) + } + + // Delete the check. if err := s.DeleteCheck(3, "node1", "check1"); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Check is gone - _, checks, err := s.NodeChecks("node1") + ws = memdb.NewWatchSet() + _, checks, err = s.NodeChecks(ws, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -1957,50 +2139,59 @@ func TestStateStore_DeleteCheck(t *testing.T) { t.Fatalf("bad: %#v", checks) } - // Index tables were updated + // Index tables were updated. if idx := s.maxIndex("checks"); idx != 3 { t.Fatalf("bad index: %d", idx) } // Deleting a nonexistent check should be idempotent and not return an - // error + // error. if err := s.DeleteCheck(4, "node1", "check1"); err != nil { t.Fatalf("err: %s", err) } if idx := s.maxIndex("checks"); idx != 3 { t.Fatalf("bad index: %d", idx) } + if watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_CheckServiceNodes(t *testing.T) { s := testStateStore(t) // Querying with no matches gives an empty response - idx, res, err := s.CheckServiceNodes("service1") + ws := memdb.NewWatchSet() + idx, res, err := s.CheckServiceNodes(ws, "service1") if idx != 0 || res != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } - // Register some nodes + // Register some nodes. testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 1, "node2") - // Register node-level checks. These should not be returned - // in the final result. + // Register node-level checks. These should be the final result. testRegisterCheck(t, s, 2, "node1", "", "check1", structs.HealthPassing) testRegisterCheck(t, s, 3, "node2", "", "check2", structs.HealthPassing) - // Register a service against the nodes + // Register a service against the nodes. testRegisterService(t, s, 4, "node1", "service1") testRegisterService(t, s, 5, "node2", "service2") - // Register checks against the services + // Register checks against the services. testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing) testRegisterCheck(t, s, 7, "node2", "service2", "check4", structs.HealthPassing) - // Query the state store for nodes and checks which - // have been registered with a specific service. - idx, results, err := s.CheckServiceNodes("service1") + // At this point all the changes should have fired the watch. + if !watchFired(ws) { + t.Fatalf("bad") + } + + // Query the state store for nodes and checks which have been registered + // with a specific service. + ws = memdb.NewWatchSet() + idx, results, err := s.CheckServiceNodes(ws, "service1") if err != nil { t.Fatalf("err: %s", err) } @@ -2008,18 +2199,24 @@ func TestStateStore_CheckServiceNodes(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Make sure we get the expected result (service check + node check) + // Make sure we get the expected result (service check + node check). if n := len(results); n != 1 { t.Fatalf("expected 1 result, got: %d", n) } csn := results[0] - if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 2 { + if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 2 || + csn.Checks[0].ServiceID != "" || csn.Checks[0].CheckID != "check1" || + csn.Checks[1].ServiceID != "service1" || csn.Checks[1].CheckID != "check3" { t.Fatalf("bad output: %#v", csn) } - // Node updates alter the returned index + // Node updates alter the returned index and fire the watch. testRegisterNode(t, s, 8, "node1") - idx, results, err = s.CheckServiceNodes("service1") + if !watchFired(ws) { + t.Fatalf("bad") + } + ws = memdb.NewWatchSet() + idx, results, err = s.CheckServiceNodes(ws, "service1") if err != nil { t.Fatalf("err: %s", err) } @@ -2027,9 +2224,13 @@ func TestStateStore_CheckServiceNodes(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Service updates alter the returned index + // Service updates alter the returned index and fire the watch. testRegisterService(t, s, 9, "node1", "service1") - idx, results, err = s.CheckServiceNodes("service1") + if !watchFired(ws) { + t.Fatalf("bad") + } + ws = memdb.NewWatchSet() + idx, results, err = s.CheckServiceNodes(ws, "service1") if err != nil { t.Fatalf("err: %s", err) } @@ -2037,15 +2238,64 @@ func TestStateStore_CheckServiceNodes(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Check updates alter the returned index + // Check updates alter the returned index and fire the watch. testRegisterCheck(t, s, 10, "node1", "service1", "check1", structs.HealthCritical) - idx, results, err = s.CheckServiceNodes("service1") + if !watchFired(ws) { + t.Fatalf("bad") + } + ws = memdb.NewWatchSet() + idx, results, err = s.CheckServiceNodes(ws, "service1") if err != nil { t.Fatalf("err: %s", err) } if idx != 10 { t.Fatalf("bad index: %d", idx) } + + // Registering some unrelated node + service should not fire the watch. + testRegisterNode(t, s, 11, "nope") + testRegisterService(t, s, 12, "nope", "nope") + if watchFired(ws) { + t.Fatalf("bad") + } + + // Overwhelm node and check tracking. + idx = 13 + for i := 0; i < 2*watchLimit; i++ { + node := fmt.Sprintf("many%d", i) + testRegisterNode(t, s, idx, node) + idx++ + testRegisterCheck(t, s, idx, node, "", "check1", structs.HealthPassing) + idx++ + testRegisterService(t, s, idx, node, "service1") + idx++ + testRegisterCheck(t, s, idx, node, "service1", "check2", structs.HealthPassing) + idx++ + } + + // Now registering an unrelated node will fire the watch. + ws = memdb.NewWatchSet() + idx, results, err = s.CheckServiceNodes(ws, "service1") + if err != nil { + t.Fatalf("err: %s", err) + } + testRegisterNode(t, s, idx, "more-nope") + idx++ + if !watchFired(ws) { + t.Fatalf("bad") + } + + // Also, registering an unrelated check will fire the watch. + ws = memdb.NewWatchSet() + idx, results, err = s.CheckServiceNodes(ws, "service1") + if err != nil { + t.Fatalf("err: %s", err) + } + testRegisterCheck(t, s, idx, "more-nope", "", "check1", structs.HealthPassing) + idx++ + if !watchFired(ws) { + t.Fatalf("bad") + } } func BenchmarkCheckServiceNodes(b *testing.B) { @@ -2080,8 +2330,9 @@ func BenchmarkCheckServiceNodes(b *testing.B) { b.Fatalf("err: %v", err) } + ws := memdb.NewWatchSet() for i := 0; i < b.N; i++ { - s.CheckServiceNodes("db") + s.CheckServiceNodes(ws, "db") } } @@ -2114,7 +2365,8 @@ func TestStateStore_CheckServiceTagNodes(t *testing.T) { t.Fatalf("err: %v", err) } - idx, nodes, err := s.CheckServiceTagNodes("db", "master") + ws := memdb.NewWatchSet() + idx, nodes, err := s.CheckServiceTagNodes(ws, "db", "master") if err != nil { t.Fatalf("err: %s", err) } @@ -2139,6 +2391,14 @@ func TestStateStore_CheckServiceTagNodes(t *testing.T) { if nodes[0].Checks[1].CheckID != "db" { t.Fatalf("Bad: %v", nodes[0]) } + + // Changing a tag should fire the watch. + if err := s.EnsureService(4, "foo", &structs.NodeService{ID: "db1", Service: "db", Tags: []string{"nope"}, Address: "", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_Check_Snapshot(t *testing.T) { diff --git a/consul/state/schema.go b/consul/state/schema.go index 3d3ed1450..cf701d64e 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -188,6 +188,22 @@ func checksTableSchema() *memdb.TableSchema { Lowercase: true, }, }, + "node_service_check": &memdb.IndexSchema{ + Name: "node_service_check", + AllowMissing: true, + Unique: false, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Node", + Lowercase: true, + }, + &memdb.FieldSetIndex{ + Field: "ServiceID", + }, + }, + }, + }, "node_service": &memdb.IndexSchema{ Name: "node_service", AllowMissing: true,