diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 3824095c0..c66d9de40 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1638,8 +1638,11 @@ func (s *Store) ServiceChecks(ws memdb.WatchSet, serviceName string, entMeta *st // Get the table index. idx := catalogChecksMaxIndex(tx, entMeta) - // Return the checks. - iter, err := catalogListChecksByService(tx, serviceName, entMeta) + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + q := Query{Value: serviceName, EnterpriseMeta: *entMeta} + iter, err := tx.Get(tableChecks, indexService, q) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } @@ -1663,8 +1666,12 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string, // Get the table index. idx := maxIndexForService(tx, serviceName, true, true, entMeta) - // Return the checks. - iter, err := catalogListChecksByService(tx, serviceName, entMeta) + + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + q := Query{Value: serviceName, EnterpriseMeta: *entMeta} + iter, err := tx.Get(tableChecks, indexService, q) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } @@ -1709,13 +1716,18 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru // Get the table index. idx := catalogChecksMaxIndex(tx, entMeta) + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Query all checks if HealthAny is passed, otherwise use the index. var iter memdb.ResultIterator var err error if state == api.HealthAny { iter, err = tx.Get(tableChecks, indexID+"_prefix", entMeta) } else { - iter, err = catalogListChecksInState(tx, state, entMeta) + q := Query{Value: state, EnterpriseMeta: *entMeta} + iter, err = tx.Get(tableChecks, indexStatus, q) } if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) @@ -1857,7 +1869,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ } // Delete the check from the DB and update the index. - if err := tx.Delete("checks", hc); err != nil { + if err := tx.Delete(tableChecks, hc); err != nil { return fmt.Errorf("failed removing check: %s", err) } diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 92f474579..038f8ea15 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -175,7 +175,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change} markService(newNodeServiceTupleFromServiceNode(sn), srvChange) - case "checks": + case tableChecks: // For health we only care about the scope for now to know if it's just // affecting a single service or every service on a node. There is a // subtle edge case where the check with same ID changes from being node diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 49b0f6d2d..842e0f397 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -107,46 +107,37 @@ func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (i func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 { if checks { - return maxIndexTxn(tx, "nodes", tableServices, "checks") + return maxIndexTxn(tx, "nodes", tableServices, tableChecks) } return maxIndexTxn(tx, "nodes", tableServices) } func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 { if checks { - return maxIndexWatchTxn(tx, ws, "nodes", tableServices, "checks") + return maxIndexWatchTxn(tx, ws, "nodes", tableServices, tableChecks) } return maxIndexWatchTxn(tx, ws, "nodes", tableServices) } func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { // update the universal index entry - if err := tx.Insert(tableIndex, &IndexEntry{"checks", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{tableChecks, idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } return nil } func catalogChecksMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 { - return maxIndexTxn(tx, "checks") + return maxIndexTxn(tx, tableChecks) } func catalogListChecksByNode(tx ReadTxn, q Query) (memdb.ResultIterator, error) { return tx.Get(tableChecks, indexNode, q) } -func catalogListChecksByService(tx ReadTxn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("checks", "service", service) -} - -func catalogListChecksInState(tx ReadTxn, state string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - // simpler than normal due to the use of the CompoundMultiIndex - return tx.Get("checks", "status", state) -} - func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error { // Insert the check - if err := tx.Insert("checks", chk); err != nil { + if err := tx.Insert(tableChecks, chk); err != nil { return fmt.Errorf("failed inserting check: %s", err) } diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index c33b12df9..70d9be30c 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -8,9 +8,11 @@ import ( func testIndexerTableChecks() map[string]indexerTestCase { obj := &structs.HealthCheck{ - Node: "NoDe", - ServiceID: "SeRvIcE", - CheckID: "CheckID", + Node: "NoDe", + ServiceID: "SeRvIcE", + ServiceName: "ServiceName", + CheckID: "CheckID", + Status: "PASSING", } return map[string]indexerTestCase{ indexID: { @@ -36,6 +38,26 @@ func testIndexerTableChecks() map[string]indexerTestCase { }, }, }, + indexStatus: { + read: indexValue{ + source: Query{Value: "PASSING"}, + expected: []byte("passing\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("passing\x00"), + }, + }, + indexService: { + read: indexValue{ + source: Query{Value: "ServiceName"}, + expected: []byte("servicename\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("servicename\x00"), + }, + }, indexNodeService: { read: indexValue{ source: NodeServiceQuery{ diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index e9915d9c9..565b4bf1b 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -258,18 +258,18 @@ func checksTableSchema() *memdb.TableSchema { Name: indexStatus, AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "Status", - Lowercase: false, + Indexer: indexerSingle{ + readIndex: indexFromQuery, + writeIndex: indexStatusFromHealthCheck, }, }, indexService: { Name: indexService, AllowMissing: true, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "ServiceName", - Lowercase: true, + Indexer: indexerSingle{ + readIndex: indexFromQuery, + writeIndex: indexServiceNameFromHealthCheck, }, }, indexNode: { @@ -342,6 +342,36 @@ func indexNodeServiceFromHealthCheck(raw interface{}) ([]byte, error) { return b.Bytes(), nil } +func indexStatusFromHealthCheck(raw interface{}) ([]byte, error) { + hc, ok := raw.(*structs.HealthCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw) + } + + if hc.Status == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(hc.Status)) + return b.Bytes(), nil +} + +func indexServiceNameFromHealthCheck(raw interface{}) ([]byte, error) { + hc, ok := raw.(*structs.HealthCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw) + } + + if hc.ServiceName == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(hc.ServiceName)) + return b.Bytes(), nil +} + // gatewayServicesTableSchema returns a new table schema used to store information // about services associated with terminating gateways. func gatewayServicesTableSchema() *memdb.TableSchema { diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index ee3f8db14..96f3122e1 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1316,7 +1316,7 @@ func TestStateStore_DeleteNode(t *testing.T) { } // Indexes were updated. - for _, tbl := range []string{"nodes", tableServices, "checks"} { + for _, tbl := range []string{"nodes", tableServices, tableChecks} { if idx := s.maxIndex(tbl); idx != 3 { t.Fatalf("bad index: %d (%s)", idx, tbl) } @@ -2076,7 +2076,7 @@ func TestStateStore_DeleteService(t *testing.T) { if idx := s.maxIndex(tableServices); idx != 4 { t.Fatalf("bad index: %d", idx) } - if idx := s.maxIndex("checks"); idx != 4 { + if idx := s.maxIndex(tableChecks); idx != 4 { t.Fatalf("bad index: %d", idx) } @@ -2411,7 +2411,7 @@ func TestStateStore_EnsureCheck(t *testing.T) { testCheckOutput(t, 5, 5, "bbbmodified") // Index tables were updated - if idx := s.maxIndex("checks"); idx != 5 { + if idx := s.maxIndex(tableChecks); idx != 5 { t.Fatalf("bad index: %d", idx) } } @@ -2894,7 +2894,7 @@ func TestStateStore_DeleteCheck(t *testing.T) { if idx, check, err := s.NodeCheck("node1", "check1", nil); idx != 3 || err != nil || check != nil { t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err) } - if idx := s.maxIndex("checks"); idx != 3 { + if idx := s.maxIndex(tableChecks); idx != 3 { t.Fatalf("bad index for checks: %d", idx) } if !watchFired(ws) { @@ -2914,7 +2914,7 @@ func TestStateStore_DeleteCheck(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex("checks"); idx != 3 { + if idx := s.maxIndex(tableChecks); idx != 3 { t.Fatalf("bad index: %d", idx) } @@ -2923,7 +2923,7 @@ func TestStateStore_DeleteCheck(t *testing.T) { if err := s.DeleteCheck(4, "node1", "check1", nil); err != nil { t.Fatalf("err: %s", err) } - if idx := s.maxIndex("checks"); idx != 3 { + if idx := s.maxIndex(tableChecks); idx != 3 { t.Fatalf("bad index: %d", idx) } if watchFired(ws) {