From 06087633f0d0f33fb0ee0259291e591c611c3521 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 1 Mar 2016 12:43:24 -0800 Subject: [PATCH] Adds in basic query template lookups and vendors newly-updated memdb as well as improved iradix tree. --- Godeps/Godeps.json | 4 +- consul/state/prepared_query.go | 45 +++----- consul/state/prepared_query_index.go | 51 +++++++++ consul/state/prepared_query_test.go | 27 ++++- consul/state/schema.go | 19 +--- .../hashicorp/go-immutable-radix/node.go | 9 +- vendor/github.com/hashicorp/go-memdb/index.go | 31 +++++- vendor/github.com/hashicorp/go-memdb/memdb.go | 20 +++- vendor/github.com/hashicorp/go-memdb/txn.go | 101 +++++++++++++----- 9 files changed, 219 insertions(+), 88 deletions(-) create mode 100644 consul/state/prepared_query_index.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 0a246010c..dab74ce8b 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -121,11 +121,11 @@ }, { "ImportPath": "github.com/hashicorp/go-immutable-radix", - "Rev": "12e90058b2897552deea141eff51bb7a07a09e63" + "Rev": "8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990" }, { "ImportPath": "github.com/hashicorp/go-memdb", - "Rev": "31949d523ade8a236956c6f1761e9dcf902d1638" + "Rev": "98f52f52d7a476958fa9da671354d270c50661a7" }, { "ImportPath": "github.com/hashicorp/go-msgpack/codec", diff --git a/consul/state/prepared_query.go b/consul/state/prepared_query.go index 66476ecb3..052ae0cb5 100644 --- a/consul/state/prepared_query.go +++ b/consul/state/prepared_query.go @@ -33,18 +33,6 @@ func toPreparedQuery(wrapped interface{}) *structs.PreparedQuery { return wrapped.(*queryWrapper).PreparedQuery } -// isQueryWild returns the wild-ness of a query. See isWild for details. -func isQueryWild(query *structs.PreparedQuery) bool { - return query != nil && prepared_query.IsTemplate(query) && query.Name == "" -} - -// isWrappedWild is used to determine if the given wrapped query is a wild one, -// which means it has an empty Name and it's a template. See the comments for -// "wild" in schema.go for more details and to see where this is used. -func isWrappedWild(obj interface{}) (bool, error) { - return isQueryWild(toPreparedQuery(obj)), nil -} - // PreparedQueries is used to pull all the prepared queries from the snapshot. func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) { queries, err := s.tx.Get("prepared-queries", "id") @@ -123,7 +111,9 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc } // Verify that the query name doesn't already exist, or that we are - // updating the same instance that has this name. + // updating the same instance that has this name. If this is a template + // and the name is empty then we make sure there's not an empty template + // already registered. if query.Name != "" { wrapped, err := tx.First("prepared-queries", "name", query.Name) if err != nil { @@ -133,18 +123,14 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc if other != nil && (existing == nil || existing.ID != other.ID) { return fmt.Errorf("name '%s' aliases an existing query name", query.Name) } - } - - // Similarly, if this is the wild query make sure there isn't another - // one, or that we are updating the same one. - if isQueryWild(query) { - wrapped, err := tx.First("prepared-queries", "wild", true) + } else if prepared_query.IsTemplate(query) { + wrapped, err := tx.First("prepared-queries", "template", query.Name) if err != nil { return fmt.Errorf("failed prepared query lookup: %s", err) } other := toPreparedQuery(wrapped) if other != nil && (existing == nil || existing.ID != other.ID) { - return fmt.Errorf("a prepared query template already exists with an empty name") + return fmt.Errorf("name '%s' aliases an existing query template name", query.Name) } } @@ -311,27 +297,22 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct } } - // Then try by name. We use a prefix match but check to make sure that - // the query's name matches the whole prefix for a non-template query. - // Templates are allowed to use the partial match. It's more efficient - // to combine the two lookups here, even though the logic is a little - // less clear. + // Next, look for an exact name match. This is the common case for static + // prepared queries, and could also apply to templates. { - wrapped, err := tx.First("prepared-queries", "name_prefix", queryIDOrName) + wrapped, err := tx.First("prepared-queries", "name", queryIDOrName) if err != nil { return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err) } if wrapped != nil { - query := toPreparedQuery(wrapped) - if query.Name == queryIDOrName || prepared_query.IsTemplate(query) { - return prep(wrapped) - } + return prep(wrapped) } } - // Finally, see if there's a wild template we can use. + // Next, look for the longest prefix match among the prepared query + // templates. { - wrapped, err := tx.First("prepared-queries", "wild", true) + wrapped, err := tx.LongestPrefix("prepared-queries", "template_prefix", queryIDOrName) if err != nil { return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err) } diff --git a/consul/state/prepared_query_index.go b/consul/state/prepared_query_index.go new file mode 100644 index 000000000..d0fef04ea --- /dev/null +++ b/consul/state/prepared_query_index.go @@ -0,0 +1,51 @@ +package state + +import ( + "fmt" + "strings" + + "github.com/hashicorp/consul/consul/prepared_query" +) + +// PreparedQueryIndex is a custom memdb indexer used to manage index prepared +// query templates. None of the built-in indexers do what we need, and our +// use case is pretty specific so it's better to put the logic here. +type PreparedQueryIndex struct { +} + +// FromObject is used to compute the index key when inserting or updating an +// object. +func (*PreparedQueryIndex) FromObject(obj interface{}) (bool, []byte, error) { + wrapped, ok := obj.(*queryWrapper) + if !ok { + return false, nil, fmt.Errorf("invalid object given to index as prepared query") + } + + query := toPreparedQuery(wrapped) + if !prepared_query.IsTemplate(query) { + return false, nil, nil + } + + // Always prepend a null so that we can represent even an empty name. + out := "\x00" + strings.ToLower(query.Name) + return true, []byte(out), nil +} + +// FromArgs is used when querying for an exact match. Since we don't add any +// suffix we can just call the prefix version. +func (p *PreparedQueryIndex) FromArgs(args ...interface{}) ([]byte, error) { + return p.PrefixFromArgs(args...) +} + +// PrefixFromArgs is used when doing a prefix scan for an object. +func (*PreparedQueryIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + arg, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("argument must be a string: %#v", args[0]) + } + arg = "\x00" + strings.ToLower(arg) + return []byte(arg), nil +} diff --git a/consul/state/prepared_query_test.go b/consul/state/prepared_query_test.go index de7196815..696af1130 100644 --- a/consul/state/prepared_query_test.go +++ b/consul/state/prepared_query_test.go @@ -525,9 +525,12 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) { }, &structs.PreparedQuery{ ID: testUUID(), - Name: "bob", + Name: "bob-", + Template: structs.QueryTemplateOptions{ + Type: structs.QueryTemplateTypeNamePrefixMatch, + }, Service: structs.ServiceQuery{ - Service: "mongodb", + Service: "${name.suffix}", }, }, } @@ -571,9 +574,12 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) { }, &structs.PreparedQuery{ ID: queries[1].ID, - Name: "bob", + Name: "bob-", + Template: structs.QueryTemplateOptions{ + Type: structs.QueryTemplateTypeNamePrefixMatch, + }, Service: structs.ServiceQuery{ - Service: "mongodb", + Service: "${name.suffix}", }, RaftIndex: structs.RaftIndex{ CreateIndex: 5, @@ -612,6 +618,19 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) { if !reflect.DeepEqual(actual, expected) { t.Fatalf("bad: %v", actual) } + + // Make sure the second query, which is a template, was compiled + // and can be resolved. + _, query, err := s.PreparedQueryResolve("bob-backwards-is-bob") + if err != nil { + t.Fatalf("err: %s", err) + } + if query == nil { + t.Fatalf("should have resolved the query") + } + if query.Service.Service != "backwards-is-bob" { + t.Fatalf("bad: %s", query.Service.Service) + } }() } diff --git a/consul/state/schema.go b/consul/state/schema.go index 53ff01f70..fca8a3cf2 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -390,20 +390,11 @@ func preparedQueriesTableSchema() *memdb.TableSchema { Lowercase: true, }, }, - // This is a bit of an oddball. It's an important feature - // of prepared query templates to be able to define a - // single template that matches any query. Unfortunately, - // we can't index an empty Name field. This index lets us - // keep track of whether there is any wild template in - // existence, so there will be one "true" in here if that - // exists, and everything else will be "false". - "wild": &memdb.IndexSchema{ - Name: "wild", - AllowMissing: false, - Unique: false, - Indexer: &memdb.ConditionalIndex{ - Conditional: isWrappedWild, - }, + "template": &memdb.IndexSchema{ + Name: "template", + AllowMissing: true, + Unique: true, + Indexer: &PreparedQueryIndex{}, }, "session": &memdb.IndexSchema{ Name: "session", diff --git a/vendor/github.com/hashicorp/go-immutable-radix/node.go b/vendor/github.com/hashicorp/go-immutable-radix/node.go index 245ecedf1..fea6f6343 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/node.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/node.go @@ -41,8 +41,15 @@ func (n *Node) isLeaf() bool { } func (n *Node) addEdge(e edge) { + num := len(n.edges) + idx := sort.Search(num, func(i int) bool { + return n.edges[i].label >= e.label + }) n.edges = append(n.edges, e) - n.edges.Sort() + if idx != num { + copy(n.edges[idx+1:], n.edges[idx:num]) + n.edges[idx] = e + } } func (n *Node) replaceEdge(e edge) { diff --git a/vendor/github.com/hashicorp/go-memdb/index.go b/vendor/github.com/hashicorp/go-memdb/index.go index 61bf444e7..7237f33e2 100644 --- a/vendor/github.com/hashicorp/go-memdb/index.go +++ b/vendor/github.com/hashicorp/go-memdb/index.go @@ -291,10 +291,6 @@ func (c *CompoundIndex) FromArgs(args ...interface{}) ([]byte, error) { if len(args) != len(c.Indexes) { return nil, fmt.Errorf("less arguments than index fields") } - return c.PrefixFromArgs(args...) -} - -func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { var out []byte for i, arg := range args { val, err := c.Indexes[i].FromArgs(arg) @@ -305,3 +301,30 @@ func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { } return out, nil } + +func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { + if len(args) > len(c.Indexes) { + return nil, fmt.Errorf("more arguments than index fields") + } + var out []byte + for i, arg := range args { + if i+1 < len(args) { + val, err := c.Indexes[i].FromArgs(arg) + if err != nil { + return nil, fmt.Errorf("sub-index %d error: %v", i, err) + } + out = append(out, val...) + } else { + prefixIndexer, ok := c.Indexes[i].(PrefixIndexer) + if !ok { + return nil, fmt.Errorf("sub-index %d does not support prefix scanning", i) + } + val, err := prefixIndexer.PrefixFromArgs(arg) + if err != nil { + return nil, fmt.Errorf("sub-index %d error: %v", i, err) + } + out = append(out, val...) + } + } + return out, nil +} diff --git a/vendor/github.com/hashicorp/go-memdb/memdb.go b/vendor/github.com/hashicorp/go-memdb/memdb.go index ddb2cff72..1d708517d 100644 --- a/vendor/github.com/hashicorp/go-memdb/memdb.go +++ b/vendor/github.com/hashicorp/go-memdb/memdb.go @@ -2,6 +2,8 @@ package memdb import ( "sync" + "sync/atomic" + "unsafe" "github.com/hashicorp/go-immutable-radix" ) @@ -12,7 +14,7 @@ import ( // transactions and MVCC. type MemDB struct { schema *DBSchema - root *iradix.Tree + root unsafe.Pointer // *iradix.Tree underneath // There can only be a single writter at once writer sync.Mutex @@ -28,7 +30,7 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) { // Create the MemDB db := &MemDB{ schema: schema, - root: iradix.New(), + root: unsafe.Pointer(iradix.New()), } if err := db.initialize(); err != nil { return nil, err @@ -36,6 +38,12 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) { return db, nil } +// getRoot is used to do an atomic load of the root pointer +func (db *MemDB) getRoot() *iradix.Tree { + root := (*iradix.Tree)(atomic.LoadPointer(&db.root)) + return root +} + // Txn is used to start a new transaction, in either read or write mode. // There can only be a single concurrent writer, but any number of readers. func (db *MemDB) Txn(write bool) *Txn { @@ -45,7 +53,7 @@ func (db *MemDB) Txn(write bool) *Txn { txn := &Txn{ db: db, write: write, - rootTxn: db.root.Txn(), + rootTxn: db.getRoot().Txn(), } return txn } @@ -56,20 +64,22 @@ func (db *MemDB) Txn(write bool) *Txn { func (db *MemDB) Snapshot() *MemDB { clone := &MemDB{ schema: db.schema, - root: db.root, + root: unsafe.Pointer(db.getRoot()), } return clone } // initialize is used to setup the DB for use after creation func (db *MemDB) initialize() error { + root := db.getRoot() for tName, tableSchema := range db.schema.Tables { for iName, _ := range tableSchema.Indexes { index := iradix.New() path := indexPath(tName, iName) - db.root, _, _ = db.root.Insert(path, index) + root, _, _ = root.Insert(path, index) } } + db.root = unsafe.Pointer(root) return nil } diff --git a/vendor/github.com/hashicorp/go-memdb/txn.go b/vendor/github.com/hashicorp/go-memdb/txn.go index 5441d41d9..6228677da 100644 --- a/vendor/github.com/hashicorp/go-memdb/txn.go +++ b/vendor/github.com/hashicorp/go-memdb/txn.go @@ -1,8 +1,11 @@ package memdb import ( + "bytes" "fmt" "strings" + "sync/atomic" + "unsafe" "github.com/hashicorp/go-immutable-radix" ) @@ -10,6 +13,7 @@ import ( const ( id = "id" ) + // tableIndex is a tuple of (Table, Index) used for lookups type tableIndex struct { Table string @@ -113,7 +117,8 @@ func (txn *Txn) Commit() { } // Update the root of the DB - txn.db.root = txn.rootTxn.Commit() + newRoot := txn.rootTxn.Commit() + atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot)) // Clear the txn txn.rootTxn = nil @@ -161,28 +166,44 @@ func (txn *Txn) Insert(table string, obj interface{}) error { for name, indexSchema := range tableSchema.Indexes { indexTxn := txn.writableIndex(table, name) - // Handle the update by deleting from the index first - if update { - ok, val, err := indexSchema.Indexer.FromObject(existing) - if err != nil { - return fmt.Errorf("failed to build index '%s': %v", name, err) - } - if ok { - // Handle non-unique index by computing a unique index. - // This is done by appending the primary key which must - // be unique anyways. - if !indexSchema.Unique { - val = append(val, idVal...) - } - indexTxn.Delete(val) - } - } - - // Handle the insert after the update + // Determine the new index value ok, val, err := indexSchema.Indexer.FromObject(obj) if err != nil { return fmt.Errorf("failed to build index '%s': %v", name, err) } + + // Handle non-unique index by computing a unique index. + // This is done by appending the primary key which must + // be unique anyways. + if ok && !indexSchema.Unique { + val = append(val, idVal...) + } + + // Handle the update by deleting from the index first + if update { + okExist, valExist, err := indexSchema.Indexer.FromObject(existing) + if err != nil { + return fmt.Errorf("failed to build index '%s': %v", name, err) + } + if okExist { + // Handle non-unique index by computing a unique index. + // This is done by appending the primary key which must + // be unique anyways. + if !indexSchema.Unique { + valExist = append(valExist, idVal...) + } + + // If we are writing to the same index with the same value, + // we can avoid the delete as the insert will overwrite the + // value anyways. + if !bytes.Equal(valExist, val) { + indexTxn.Delete(valExist) + } + } + } + + // If there is no index value, either this is an error or an expected + // case and we can skip updating if !ok { if indexSchema.AllowMissing { continue @@ -191,12 +212,7 @@ func (txn *Txn) Insert(table string, obj interface{}) error { } } - // Handle non-unique index by computing a unique index. - // This is done by appending the primary key which must - // be unique anyways. - if !indexSchema.Unique { - val = append(val, idVal...) - } + // Update the value of the index indexTxn.Insert(val, obj) } return nil @@ -281,7 +297,7 @@ func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) // Do the deletes num := 0 - for _, obj := range(objs) { + for _, obj := range objs { if err := txn.Delete(table, obj); err != nil { return num, err } @@ -318,6 +334,39 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er return value, nil } +// LongestPrefix is used to fetch the longest prefix match for the given +// constraints on the index. Note that this will not work with the memdb +// StringFieldIndex because it adds null terminators which prevent the +// algorithm from correctly finding a match (it will get to right before the +// null and fail to find a leaf node). This should only be used where the prefix +// given is capable of matching indexed entries directly, which typically only +// applies to a custom indexer. See the unit test for an example. +func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) { + // Enforce that this only works on prefix indexes. + if !strings.HasSuffix(index, "_prefix") { + return nil, fmt.Errorf("must use '%s_prefix' on index", index) + } + + // Get the index value. + indexSchema, val, err := txn.getIndexValue(table, index, args...) + if err != nil { + return nil, err + } + + // This algorithm only makes sense against a unique index, otherwise the + // index keys will have the IDs appended to them. + if !indexSchema.Unique { + return nil, fmt.Errorf("index '%s' is not unique", index) + } + + // Find the longest prefix match with the given index. + indexTxn := txn.readableIndex(table, indexSchema.Name) + if _, value, ok := indexTxn.Root().LongestPrefix(val); ok { + return value, nil + } + return nil, nil +} + // getIndexValue is used to get the IndexSchema and the value // used to scan the index given the parameters. This handles prefix based // scans when the index has the "_prefix" suffix. The index must support