refactor session state store tables to use the new index pattern (#11525)

* state: port KV and Tombstone tables to new pattern

* go fmt'ed

* handle wildcards for tombstones

* Fix graveyard ent vs oss

* fix oss compilation error

* add partition to tombstones and kv state store indexes

* refactor to use `indexWithEnterpriseIndexable`

* Apply suggestions from code review

Co-authored-by: Chris S. Kim <ckim@hashicorp.com>
Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* add `singleValueID` implementation assertions

* partition `tableSessions` table

* fix sessions to use UUID and fix prefix index

* fix oss build

* clean up unused functions

* fix oss compilation

* add a partition indexer for sessions

* Fix oss to not have partition index

* fix oss tests

* remove unused func `prefixIndexFromServiceNameAsString`

* fix test error check

* remove unused operations_ent.go and operations_oss.go func

* remove unused const

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>
Co-authored-by: Chris S. Kim <ckim@hashicorp.com>
Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>
This commit is contained in:
Dhia Ayachi 2021-11-08 16:20:50 -05:00 committed by GitHub
parent dfafd4e38c
commit f61892393f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 108 additions and 80 deletions

View File

@ -30,7 +30,7 @@ func (g *Graveyard) insertTombstoneWithTxn(tx WriteTxn, _ string, stone *Tombsto
// given context, using a prefix match.
func (g *Graveyard) GetMaxIndexTxn(tx ReadTxn, prefix string, _ *structs.EnterpriseMeta) (uint64, error) {
var lindex uint64
q := Query{Value: prefix}
q := Query{Value: prefix, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()}
stones, err := tx.Get(tableTombstones, indexID+"_prefix", q)
if err != nil {
return 0, fmt.Errorf("failed querying tombstones: %s", err)

View File

@ -148,6 +148,7 @@ type singleValueID interface {
var _ singleValueID = (*structs.DirEntry)(nil)
var _ singleValueID = (*Tombstone)(nil)
var _ singleValueID = (*Query)(nil)
var _ singleValueID = (*structs.Session)(nil)
func (b *indexBuilder) Bool(v bool) {
b.Raw([]byte{intFromBool(v)})

View File

@ -39,8 +39,8 @@ func kvsTableSchema() *memdb.TableSchema {
}
}
// indexFromKVEntry creates an index key from any struct that implements singleValueID
func indexFromKVEntry(raw interface{}) ([]byte, error) {
// indexFromIDValue creates an index key from any struct that implements singleValueID
func indexFromIDValue(raw interface{}) ([]byte, error) {
e, ok := raw.(singleValueID)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw)
@ -431,7 +431,7 @@ func kvsLockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error)
}
// Verify that the session exists.
sess, err := firstWithTxn(tx, "sessions", "id", entry.Session, &entry.EnterpriseMeta)
sess, err := tx.First(tableSessions, indexID, Query{Value: entry.Session, EnterpriseMeta: entry.EnterpriseMeta})
if err != nil {
return false, fmt.Errorf("failed session lookup: %s", err)
}

View File

@ -14,12 +14,31 @@ import (
func kvsIndexer() indexerSingleWithPrefix {
return indexerSingleWithPrefix{
readIndex: readIndex(indexFromKVEntry),
writeIndex: writeIndex(indexFromKVEntry),
prefixIndex: prefixIndex(prefixIndexForKVEntry),
readIndex: readIndex(indexFromIDValue),
writeIndex: writeIndex(indexFromIDValue),
prefixIndex: prefixIndex(prefixIndexForIDValue),
}
}
func prefixIndexForIDValue(arg interface{}) ([]byte, error) {
switch v := arg.(type) {
// DeletePrefix always uses a string, pass it along unmodified
case string:
return []byte(v), nil
case structs.EnterpriseMeta:
return nil, nil
case singleValueID:
var b indexBuilder
if v.IDValue() != "" {
// Omit null terminator, because we want to prefix match keys
b.String(v.IDValue())
}
prefix := bytes.Trim(b.Bytes(), "\x00")
return prefix, nil
}
return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg)
}
func prefixIndexForKVEntry(arg interface{}) ([]byte, error) {
var b indexBuilder
switch v := arg.(type) {

View File

@ -1,3 +1,4 @@
//go:build !consulent
// +build !consulent
package state
@ -8,24 +9,6 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
func firstWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) {
return tx.First(table, index, idxVal)
}
func firstWatchWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(table, index, idxVal)
}
func getWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(table, index, idxVal)
}
func getCompoundWithTxn(tx ReadTxn, table, index string,
_ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) {

View File

@ -206,7 +206,7 @@ func preparedQuerySetTxn(tx WriteTxn, idx uint64, query *structs.PreparedQuery)
// Verify that the session exists.
if query.Session != "" {
sess, err := firstWithTxn(tx, "sessions", "id", query.Session, nil)
sess, err := tx.First(tableSessions, indexID, Query{Value: query.Session, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()})
if err != nil {
return fmt.Errorf("invalid session: %v", err)
}

View File

@ -1,3 +1,4 @@
//go:build !consulent
// +build !consulent
package state
@ -17,6 +18,9 @@ func prefixIndexFromQuery(arg interface{}) ([]byte, error) {
case structs.EnterpriseMeta:
return nil, nil
case Query:
if v.Value == "" {
return nil, nil
}
b.String(strings.ToLower(v.Value))
return b.Bytes(), nil
}
@ -28,21 +32,6 @@ func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) {
return prefixIndexFromQuery(arg)
}
func prefixIndexFromServiceNameAsString(arg interface{}) ([]byte, error) {
var b indexBuilder
switch v := arg.(type) {
case *structs.EnterpriseMeta:
return nil, nil
case structs.EnterpriseMeta:
return nil, nil
case structs.ServiceName:
b.String(strings.ToLower(v.String()))
return b.Bytes(), nil
}
return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg)
}
// indexFromAuthMethodQuery builds an index key where Query.Value is lowercase, and is
// a required value.
func indexFromAuthMethodQuery(arg interface{}) ([]byte, error) {

View File

@ -11,20 +11,40 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
const (
tableSessions = "sessions"
)
func indexFromSession(raw interface{}) ([]byte, error) {
e, ok := raw.(*structs.Session)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw)
}
v := strings.ToLower(e.ID)
if v == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(v)
return b.Bytes(), nil
}
// sessionsTableSchema returns a new table schema used for storing session
// information.
func sessionsTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "sessions",
Name: tableSessions,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: sessionIndexer(),
},
"node": {
Name: "node",
indexNode: {
Name: indexNode,
AllowMissing: false,
Unique: false,
Indexer: nodeSessionsIndexer(),
@ -132,7 +152,7 @@ func (index *CheckIDIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
// Sessions is used to pull the full list of sessions for use during snapshots.
func (s *Snapshot) Sessions() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("sessions", "id")
iter, err := s.tx.Get(tableSessions, indexID)
if err != nil {
return nil, err
}
@ -227,7 +247,11 @@ func (s *Store) SessionGet(ws memdb.WatchSet,
idx := sessionMaxIndex(tx, entMeta)
// Look up the session by its ID
watchCh, session, err := firstWatchWithTxn(tx, "sessions", "id", sessionID, entMeta)
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}
watchCh, session, err := tx.FirstWatch(tableSessions, indexID, Query{Value: sessionID, EnterpriseMeta: *entMeta})
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
@ -239,29 +263,6 @@ func (s *Store) SessionGet(ws memdb.WatchSet,
return idx, nil, nil
}
// SessionList returns a slice containing all of the active sessions.
func (s *Store) SessionList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := sessionMaxIndex(tx, entMeta)
// Query all of the active sessions.
sessions, err := getWithTxn(tx, "sessions", "id_prefix", "", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
ws.Add(sessions.WatchCh())
// Go over the sessions and create a slice of them.
var result structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
result = append(result, session.(*structs.Session))
}
return idx, result, nil
}
// NodeSessions returns a set of active sessions associated
// with the given node ID. The returned index is the highest
// index seen from the result set.
@ -299,7 +300,10 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En
// session deletion and handle session invalidation, etc.
func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error {
// Look up the session.
sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta)
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}
sess, err := tx.First(tableSessions, indexID, Query{Value: sessionID, EnterpriseMeta: *entMeta})
if err != nil {
return fmt.Errorf("failed session lookup: %s", err)
}

View File

@ -1,3 +1,4 @@
//go:build !consulent
// +build !consulent
package state
@ -11,9 +12,11 @@ import (
"github.com/hashicorp/consul/api"
)
func sessionIndexer() *memdb.UUIDFieldIndex {
return &memdb.UUIDFieldIndex{
Field: "ID",
func sessionIndexer() indexerSingleWithPrefix {
return indexerSingleWithPrefix{
readIndex: readIndex(indexFromQuery),
writeIndex: writeIndex(indexFromSession),
prefixIndex: prefixIndex(prefixIndexFromQuery),
}
}
@ -37,7 +40,7 @@ func nodeChecksIndexer() *memdb.CompoundIndex {
}
func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error {
if err := tx.Delete("sessions", session); err != nil {
if err := tx.Delete(tableSessions, session); err != nil {
return fmt.Errorf("failed deleting session: %s", err)
}
@ -50,7 +53,7 @@ func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64)
}
func insertSessionTxn(tx WriteTxn, session *structs.Session, idx uint64, updateMax bool, _ bool) error {
if err := tx.Insert("sessions", session); err != nil {
if err := tx.Insert(tableSessions, session); err != nil {
return err
}
@ -88,7 +91,7 @@ func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) {
func nodeSessionsTxn(tx ReadTxn,
ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) {
sessions, err := tx.Get("sessions", "node", node)
sessions, err := tx.Get(tableSessions, indexNode, node)
if err != nil {
return nil, fmt.Errorf("failed session lookup: %s", err)
}
@ -124,3 +127,27 @@ func validateSessionChecksTxn(tx ReadTxn, session *structs.Session) error {
}
return nil
}
// SessionList returns a slice containing all of the active sessions.
func (s *Store) SessionList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := sessionMaxIndex(tx, entMeta)
var result structs.Sessions
// Query all of the active sessions.
sessions, err := tx.Get(tableSessions, indexID+"_prefix", Query{})
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
ws.Add(sessions.WatchCh())
// Go over the sessions and create a slice of them.
for session := sessions.Next(); session != nil; session = sessions.Next() {
result = append(result, session.(*structs.Session))
}
return idx, result, nil
}

View File

@ -385,7 +385,7 @@ func TestStateStore_SessionDestroy(t *testing.T) {
// Make sure the session is really gone.
tx := s.db.Txn(false)
sessions, err := tx.Get("sessions", "id")
sessions, err := tx.Get(tableSessions, indexID)
if err != nil || sessions.Next() != nil {
t.Fatalf("session should not exist")
}

View File

@ -2325,6 +2325,11 @@ type ServiceCheck struct {
Namespace string
}
// IDValue implements the state.singleValueID interface for indexing.
func (s *Session) IDValue() string {
return s.ID
}
func (s *Session) UnmarshalJSON(data []byte) (err error) {
type Alias Session
aux := &struct {

View File

@ -495,7 +495,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
if resp.Code != 409 {
t.Fatalf("expected 409, got %d", resp.Code)
}
if !bytes.Contains(resp.Body.Bytes(), []byte("failed session lookup")) {
if !bytes.Contains(resp.Body.Bytes(), []byte("invalid session")) {
t.Fatalf("bad: %s", resp.Body.String())
}
})