From f61892393f58dd995fb4523e11bdb7ac1009e631 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 8 Nov 2021 16:20:50 -0500 Subject: [PATCH] refactor session state store tables to use the new index pattern (#11525) * state: port KV and Tombstone tables to new pattern * go fmt'ed * handle wildcards for tombstones * Fix graveyard ent vs oss * fix oss compilation error * add partition to tombstones and kv state store indexes * refactor to use `indexWithEnterpriseIndexable` * Apply suggestions from code review Co-authored-by: Chris S. Kim Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com> * add `singleValueID` implementation assertions * partition `tableSessions` table * fix sessions to use UUID and fix prefix index * fix oss build * clean up unused functions * fix oss compilation * add a partition indexer for sessions * Fix oss to not have partition index * fix oss tests * remove unused func `prefixIndexFromServiceNameAsString` * fix test error check * remove unused operations_ent.go and operations_oss.go func * remove unused const Co-authored-by: Daniel Nephin Co-authored-by: Chris S. Kim Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com> --- agent/consul/state/graveyard_oss.go | 2 +- agent/consul/state/indexer.go | 1 + agent/consul/state/kvs.go | 6 +-- agent/consul/state/kvs_oss.go | 25 +++++++++-- agent/consul/state/operations_oss.go | 19 +------- agent/consul/state/prepared_query.go | 2 +- agent/consul/state/query_oss.go | 19 ++------ agent/consul/state/session.go | 66 +++++++++++++++------------- agent/consul/state/session_oss.go | 39 +++++++++++++--- agent/consul/state/session_test.go | 2 +- agent/structs/structs.go | 5 +++ agent/txn_endpoint_test.go | 2 +- 12 files changed, 108 insertions(+), 80 deletions(-) diff --git a/agent/consul/state/graveyard_oss.go b/agent/consul/state/graveyard_oss.go index ebd928992..71b6bd90b 100644 --- a/agent/consul/state/graveyard_oss.go +++ b/agent/consul/state/graveyard_oss.go @@ -30,7 +30,7 @@ func (g *Graveyard) insertTombstoneWithTxn(tx WriteTxn, _ string, stone *Tombsto // given context, using a prefix match. func (g *Graveyard) GetMaxIndexTxn(tx ReadTxn, prefix string, _ *structs.EnterpriseMeta) (uint64, error) { var lindex uint64 - q := Query{Value: prefix} + q := Query{Value: prefix, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()} stones, err := tx.Get(tableTombstones, indexID+"_prefix", q) if err != nil { return 0, fmt.Errorf("failed querying tombstones: %s", err) diff --git a/agent/consul/state/indexer.go b/agent/consul/state/indexer.go index 5ee3b0572..8f9c7bbe8 100644 --- a/agent/consul/state/indexer.go +++ b/agent/consul/state/indexer.go @@ -148,6 +148,7 @@ type singleValueID interface { var _ singleValueID = (*structs.DirEntry)(nil) var _ singleValueID = (*Tombstone)(nil) var _ singleValueID = (*Query)(nil) +var _ singleValueID = (*structs.Session)(nil) func (b *indexBuilder) Bool(v bool) { b.Raw([]byte{intFromBool(v)}) diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index 31520db41..34639ace0 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -39,8 +39,8 @@ func kvsTableSchema() *memdb.TableSchema { } } -// indexFromKVEntry creates an index key from any struct that implements singleValueID -func indexFromKVEntry(raw interface{}) ([]byte, error) { +// indexFromIDValue creates an index key from any struct that implements singleValueID +func indexFromIDValue(raw interface{}) ([]byte, error) { e, ok := raw.(singleValueID) if !ok { return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw) @@ -431,7 +431,7 @@ func kvsLockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) } // Verify that the session exists. - sess, err := firstWithTxn(tx, "sessions", "id", entry.Session, &entry.EnterpriseMeta) + sess, err := tx.First(tableSessions, indexID, Query{Value: entry.Session, EnterpriseMeta: entry.EnterpriseMeta}) if err != nil { return false, fmt.Errorf("failed session lookup: %s", err) } diff --git a/agent/consul/state/kvs_oss.go b/agent/consul/state/kvs_oss.go index ac2a524ac..75eb217df 100644 --- a/agent/consul/state/kvs_oss.go +++ b/agent/consul/state/kvs_oss.go @@ -14,12 +14,31 @@ import ( func kvsIndexer() indexerSingleWithPrefix { return indexerSingleWithPrefix{ - readIndex: readIndex(indexFromKVEntry), - writeIndex: writeIndex(indexFromKVEntry), - prefixIndex: prefixIndex(prefixIndexForKVEntry), + readIndex: readIndex(indexFromIDValue), + writeIndex: writeIndex(indexFromIDValue), + prefixIndex: prefixIndex(prefixIndexForIDValue), } } +func prefixIndexForIDValue(arg interface{}) ([]byte, error) { + switch v := arg.(type) { + // DeletePrefix always uses a string, pass it along unmodified + case string: + return []byte(v), nil + case structs.EnterpriseMeta: + return nil, nil + case singleValueID: + var b indexBuilder + if v.IDValue() != "" { + // Omit null terminator, because we want to prefix match keys + b.String(v.IDValue()) + } + prefix := bytes.Trim(b.Bytes(), "\x00") + return prefix, nil + } + return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg) +} + func prefixIndexForKVEntry(arg interface{}) ([]byte, error) { var b indexBuilder switch v := arg.(type) { diff --git a/agent/consul/state/operations_oss.go b/agent/consul/state/operations_oss.go index a3a8f396d..7be71732d 100644 --- a/agent/consul/state/operations_oss.go +++ b/agent/consul/state/operations_oss.go @@ -1,3 +1,4 @@ +//go:build !consulent // +build !consulent package state @@ -8,24 +9,6 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -func firstWithTxn(tx ReadTxn, - table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) { - - return tx.First(table, index, idxVal) -} - -func firstWatchWithTxn(tx ReadTxn, - table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { - - return tx.FirstWatch(table, index, idxVal) -} - -func getWithTxn(tx ReadTxn, - table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - - return tx.Get(table, index, idxVal) -} - func getCompoundWithTxn(tx ReadTxn, table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) { diff --git a/agent/consul/state/prepared_query.go b/agent/consul/state/prepared_query.go index 8d2364511..bf4aecb0f 100644 --- a/agent/consul/state/prepared_query.go +++ b/agent/consul/state/prepared_query.go @@ -206,7 +206,7 @@ func preparedQuerySetTxn(tx WriteTxn, idx uint64, query *structs.PreparedQuery) // Verify that the session exists. if query.Session != "" { - sess, err := firstWithTxn(tx, "sessions", "id", query.Session, nil) + sess, err := tx.First(tableSessions, indexID, Query{Value: query.Session, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()}) if err != nil { return fmt.Errorf("invalid session: %v", err) } diff --git a/agent/consul/state/query_oss.go b/agent/consul/state/query_oss.go index 0b36a461c..04fed3a6b 100644 --- a/agent/consul/state/query_oss.go +++ b/agent/consul/state/query_oss.go @@ -1,3 +1,4 @@ +//go:build !consulent // +build !consulent package state @@ -17,6 +18,9 @@ func prefixIndexFromQuery(arg interface{}) ([]byte, error) { case structs.EnterpriseMeta: return nil, nil case Query: + if v.Value == "" { + return nil, nil + } b.String(strings.ToLower(v.Value)) return b.Bytes(), nil } @@ -28,21 +32,6 @@ func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) { return prefixIndexFromQuery(arg) } -func prefixIndexFromServiceNameAsString(arg interface{}) ([]byte, error) { - var b indexBuilder - switch v := arg.(type) { - case *structs.EnterpriseMeta: - return nil, nil - case structs.EnterpriseMeta: - return nil, nil - case structs.ServiceName: - b.String(strings.ToLower(v.String())) - return b.Bytes(), nil - } - - return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg) -} - // indexFromAuthMethodQuery builds an index key where Query.Value is lowercase, and is // a required value. func indexFromAuthMethodQuery(arg interface{}) ([]byte, error) { diff --git a/agent/consul/state/session.go b/agent/consul/state/session.go index 67c3e6eed..58e39f9c6 100644 --- a/agent/consul/state/session.go +++ b/agent/consul/state/session.go @@ -11,20 +11,40 @@ import ( "github.com/hashicorp/consul/agent/structs" ) +const ( + tableSessions = "sessions" +) + +func indexFromSession(raw interface{}) ([]byte, error) { + e, ok := raw.(*structs.Session) + if !ok { + return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw) + } + + v := strings.ToLower(e.ID) + if v == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(v) + return b.Bytes(), nil +} + // sessionsTableSchema returns a new table schema used for storing session // information. func sessionsTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: "sessions", + Name: tableSessions, Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", + indexID: { + Name: indexID, AllowMissing: false, Unique: true, Indexer: sessionIndexer(), }, - "node": { - Name: "node", + indexNode: { + Name: indexNode, AllowMissing: false, Unique: false, Indexer: nodeSessionsIndexer(), @@ -132,7 +152,7 @@ func (index *CheckIDIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { // Sessions is used to pull the full list of sessions for use during snapshots. func (s *Snapshot) Sessions() (memdb.ResultIterator, error) { - iter, err := s.tx.Get("sessions", "id") + iter, err := s.tx.Get(tableSessions, indexID) if err != nil { return nil, err } @@ -227,7 +247,11 @@ func (s *Store) SessionGet(ws memdb.WatchSet, idx := sessionMaxIndex(tx, entMeta) // Look up the session by its ID - watchCh, session, err := firstWatchWithTxn(tx, "sessions", "id", sessionID, entMeta) + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() + } + watchCh, session, err := tx.FirstWatch(tableSessions, indexID, Query{Value: sessionID, EnterpriseMeta: *entMeta}) + if err != nil { return 0, nil, fmt.Errorf("failed session lookup: %s", err) } @@ -239,29 +263,6 @@ func (s *Store) SessionGet(ws memdb.WatchSet, return idx, nil, nil } -// SessionList returns a slice containing all of the active sessions. -func (s *Store) SessionList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Sessions, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := sessionMaxIndex(tx, entMeta) - - // Query all of the active sessions. - sessions, err := getWithTxn(tx, "sessions", "id_prefix", "", entMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed session lookup: %s", err) - } - ws.Add(sessions.WatchCh()) - - // Go over the sessions and create a slice of them. - var result structs.Sessions - for session := sessions.Next(); session != nil; session = sessions.Next() { - result = append(result, session.(*structs.Session)) - } - return idx, result, nil -} - // NodeSessions returns a set of active sessions associated // with the given node ID. The returned index is the highest // index seen from the result set. @@ -299,7 +300,10 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En // session deletion and handle session invalidation, etc. func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error { // Look up the session. - sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta) + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() + } + sess, err := tx.First(tableSessions, indexID, Query{Value: sessionID, EnterpriseMeta: *entMeta}) if err != nil { return fmt.Errorf("failed session lookup: %s", err) } diff --git a/agent/consul/state/session_oss.go b/agent/consul/state/session_oss.go index 16eb026dd..5866190c6 100644 --- a/agent/consul/state/session_oss.go +++ b/agent/consul/state/session_oss.go @@ -1,3 +1,4 @@ +//go:build !consulent // +build !consulent package state @@ -11,9 +12,11 @@ import ( "github.com/hashicorp/consul/api" ) -func sessionIndexer() *memdb.UUIDFieldIndex { - return &memdb.UUIDFieldIndex{ - Field: "ID", +func sessionIndexer() indexerSingleWithPrefix { + return indexerSingleWithPrefix{ + readIndex: readIndex(indexFromQuery), + writeIndex: writeIndex(indexFromSession), + prefixIndex: prefixIndex(prefixIndexFromQuery), } } @@ -37,7 +40,7 @@ func nodeChecksIndexer() *memdb.CompoundIndex { } func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error { - if err := tx.Delete("sessions", session); err != nil { + if err := tx.Delete(tableSessions, session); err != nil { return fmt.Errorf("failed deleting session: %s", err) } @@ -50,7 +53,7 @@ func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) } func insertSessionTxn(tx WriteTxn, session *structs.Session, idx uint64, updateMax bool, _ bool) error { - if err := tx.Insert("sessions", session); err != nil { + if err := tx.Insert(tableSessions, session); err != nil { return err } @@ -88,7 +91,7 @@ func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) { func nodeSessionsTxn(tx ReadTxn, ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) { - sessions, err := tx.Get("sessions", "node", node) + sessions, err := tx.Get(tableSessions, indexNode, node) if err != nil { return nil, fmt.Errorf("failed session lookup: %s", err) } @@ -124,3 +127,27 @@ func validateSessionChecksTxn(tx ReadTxn, session *structs.Session) error { } return nil } + +// SessionList returns a slice containing all of the active sessions. +func (s *Store) SessionList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Sessions, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := sessionMaxIndex(tx, entMeta) + + var result structs.Sessions + + // Query all of the active sessions. + sessions, err := tx.Get(tableSessions, indexID+"_prefix", Query{}) + if err != nil { + return 0, nil, fmt.Errorf("failed session lookup: %s", err) + } + ws.Add(sessions.WatchCh()) + // Go over the sessions and create a slice of them. + for session := sessions.Next(); session != nil; session = sessions.Next() { + result = append(result, session.(*structs.Session)) + } + + return idx, result, nil +} diff --git a/agent/consul/state/session_test.go b/agent/consul/state/session_test.go index 1ef85e802..c81d7dc55 100644 --- a/agent/consul/state/session_test.go +++ b/agent/consul/state/session_test.go @@ -385,7 +385,7 @@ func TestStateStore_SessionDestroy(t *testing.T) { // Make sure the session is really gone. tx := s.db.Txn(false) - sessions, err := tx.Get("sessions", "id") + sessions, err := tx.Get(tableSessions, indexID) if err != nil || sessions.Next() != nil { t.Fatalf("session should not exist") } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index b7e15b4e5..386280e7d 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -2325,6 +2325,11 @@ type ServiceCheck struct { Namespace string } +// IDValue implements the state.singleValueID interface for indexing. +func (s *Session) IDValue() string { + return s.ID +} + func (s *Session) UnmarshalJSON(data []byte) (err error) { type Alias Session aux := &struct { diff --git a/agent/txn_endpoint_test.go b/agent/txn_endpoint_test.go index b3770fb16..1d6d3f01b 100644 --- a/agent/txn_endpoint_test.go +++ b/agent/txn_endpoint_test.go @@ -495,7 +495,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) { if resp.Code != 409 { t.Fatalf("expected 409, got %d", resp.Code) } - if !bytes.Contains(resp.Body.Bytes(), []byte("failed session lookup")) { + if !bytes.Contains(resp.Body.Bytes(), []byte("invalid session")) { t.Fatalf("bad: %s", resp.Body.String()) } })