Adds in basic query template lookups and vendors newly-updated memdb as well as improved iradix tree.

This commit is contained in:
James Phillips 2016-03-01 12:43:24 -08:00
parent 142e69befe
commit 06087633f0
9 changed files with 219 additions and 88 deletions

4
Godeps/Godeps.json generated
View file

@ -121,11 +121,11 @@
}, },
{ {
"ImportPath": "github.com/hashicorp/go-immutable-radix", "ImportPath": "github.com/hashicorp/go-immutable-radix",
"Rev": "12e90058b2897552deea141eff51bb7a07a09e63" "Rev": "8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990"
}, },
{ {
"ImportPath": "github.com/hashicorp/go-memdb", "ImportPath": "github.com/hashicorp/go-memdb",
"Rev": "31949d523ade8a236956c6f1761e9dcf902d1638" "Rev": "98f52f52d7a476958fa9da671354d270c50661a7"
}, },
{ {
"ImportPath": "github.com/hashicorp/go-msgpack/codec", "ImportPath": "github.com/hashicorp/go-msgpack/codec",

View file

@ -33,18 +33,6 @@ func toPreparedQuery(wrapped interface{}) *structs.PreparedQuery {
return wrapped.(*queryWrapper).PreparedQuery return wrapped.(*queryWrapper).PreparedQuery
} }
// isQueryWild returns the wild-ness of a query. See isWild for details.
func isQueryWild(query *structs.PreparedQuery) bool {
return query != nil && prepared_query.IsTemplate(query) && query.Name == ""
}
// isWrappedWild is used to determine if the given wrapped query is a wild one,
// which means it has an empty Name and it's a template. See the comments for
// "wild" in schema.go for more details and to see where this is used.
func isWrappedWild(obj interface{}) (bool, error) {
return isQueryWild(toPreparedQuery(obj)), nil
}
// PreparedQueries is used to pull all the prepared queries from the snapshot. // PreparedQueries is used to pull all the prepared queries from the snapshot.
func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) { func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) {
queries, err := s.tx.Get("prepared-queries", "id") queries, err := s.tx.Get("prepared-queries", "id")
@ -123,7 +111,9 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
} }
// Verify that the query name doesn't already exist, or that we are // Verify that the query name doesn't already exist, or that we are
// updating the same instance that has this name. // updating the same instance that has this name. If this is a template
// and the name is empty then we make sure there's not an empty template
// already registered.
if query.Name != "" { if query.Name != "" {
wrapped, err := tx.First("prepared-queries", "name", query.Name) wrapped, err := tx.First("prepared-queries", "name", query.Name)
if err != nil { if err != nil {
@ -133,18 +123,14 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
if other != nil && (existing == nil || existing.ID != other.ID) { if other != nil && (existing == nil || existing.ID != other.ID) {
return fmt.Errorf("name '%s' aliases an existing query name", query.Name) return fmt.Errorf("name '%s' aliases an existing query name", query.Name)
} }
} } else if prepared_query.IsTemplate(query) {
wrapped, err := tx.First("prepared-queries", "template", query.Name)
// Similarly, if this is the wild query make sure there isn't another
// one, or that we are updating the same one.
if isQueryWild(query) {
wrapped, err := tx.First("prepared-queries", "wild", true)
if err != nil { if err != nil {
return fmt.Errorf("failed prepared query lookup: %s", err) return fmt.Errorf("failed prepared query lookup: %s", err)
} }
other := toPreparedQuery(wrapped) other := toPreparedQuery(wrapped)
if other != nil && (existing == nil || existing.ID != other.ID) { if other != nil && (existing == nil || existing.ID != other.ID) {
return fmt.Errorf("a prepared query template already exists with an empty name") return fmt.Errorf("name '%s' aliases an existing query template name", query.Name)
} }
} }
@ -311,27 +297,22 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct
} }
} }
// Then try by name. We use a prefix match but check to make sure that // Next, look for an exact name match. This is the common case for static
// the query's name matches the whole prefix for a non-template query. // prepared queries, and could also apply to templates.
// Templates are allowed to use the partial match. It's more efficient
// to combine the two lookups here, even though the logic is a little
// less clear.
{ {
wrapped, err := tx.First("prepared-queries", "name_prefix", queryIDOrName) wrapped, err := tx.First("prepared-queries", "name", queryIDOrName)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err) return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
} }
if wrapped != nil { if wrapped != nil {
query := toPreparedQuery(wrapped) return prep(wrapped)
if query.Name == queryIDOrName || prepared_query.IsTemplate(query) {
return prep(wrapped)
}
} }
} }
// Finally, see if there's a wild template we can use. // Next, look for the longest prefix match among the prepared query
// templates.
{ {
wrapped, err := tx.First("prepared-queries", "wild", true) wrapped, err := tx.LongestPrefix("prepared-queries", "template_prefix", queryIDOrName)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err) return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
} }

View file

@ -0,0 +1,51 @@
package state
import (
"fmt"
"strings"
"github.com/hashicorp/consul/consul/prepared_query"
)
// PreparedQueryIndex is a custom memdb indexer used to manage index prepared
// query templates. None of the built-in indexers do what we need, and our
// use case is pretty specific so it's better to put the logic here.
type PreparedQueryIndex struct {
}
// FromObject is used to compute the index key when inserting or updating an
// object.
func (*PreparedQueryIndex) FromObject(obj interface{}) (bool, []byte, error) {
wrapped, ok := obj.(*queryWrapper)
if !ok {
return false, nil, fmt.Errorf("invalid object given to index as prepared query")
}
query := toPreparedQuery(wrapped)
if !prepared_query.IsTemplate(query) {
return false, nil, nil
}
// Always prepend a null so that we can represent even an empty name.
out := "\x00" + strings.ToLower(query.Name)
return true, []byte(out), nil
}
// FromArgs is used when querying for an exact match. Since we don't add any
// suffix we can just call the prefix version.
func (p *PreparedQueryIndex) FromArgs(args ...interface{}) ([]byte, error) {
return p.PrefixFromArgs(args...)
}
// PrefixFromArgs is used when doing a prefix scan for an object.
func (*PreparedQueryIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
arg = "\x00" + strings.ToLower(arg)
return []byte(arg), nil
}

View file

@ -525,9 +525,12 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
}, },
&structs.PreparedQuery{ &structs.PreparedQuery{
ID: testUUID(), ID: testUUID(),
Name: "bob", Name: "bob-",
Template: structs.QueryTemplateOptions{
Type: structs.QueryTemplateTypeNamePrefixMatch,
},
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "mongodb", Service: "${name.suffix}",
}, },
}, },
} }
@ -571,9 +574,12 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
}, },
&structs.PreparedQuery{ &structs.PreparedQuery{
ID: queries[1].ID, ID: queries[1].ID,
Name: "bob", Name: "bob-",
Template: structs.QueryTemplateOptions{
Type: structs.QueryTemplateTypeNamePrefixMatch,
},
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "mongodb", Service: "${name.suffix}",
}, },
RaftIndex: structs.RaftIndex{ RaftIndex: structs.RaftIndex{
CreateIndex: 5, CreateIndex: 5,
@ -612,6 +618,19 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
if !reflect.DeepEqual(actual, expected) { if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual) t.Fatalf("bad: %v", actual)
} }
// Make sure the second query, which is a template, was compiled
// and can be resolved.
_, query, err := s.PreparedQueryResolve("bob-backwards-is-bob")
if err != nil {
t.Fatalf("err: %s", err)
}
if query == nil {
t.Fatalf("should have resolved the query")
}
if query.Service.Service != "backwards-is-bob" {
t.Fatalf("bad: %s", query.Service.Service)
}
}() }()
} }

View file

@ -390,20 +390,11 @@ func preparedQueriesTableSchema() *memdb.TableSchema {
Lowercase: true, Lowercase: true,
}, },
}, },
// This is a bit of an oddball. It's an important feature "template": &memdb.IndexSchema{
// of prepared query templates to be able to define a Name: "template",
// single template that matches any query. Unfortunately, AllowMissing: true,
// we can't index an empty Name field. This index lets us Unique: true,
// keep track of whether there is any wild template in Indexer: &PreparedQueryIndex{},
// existence, so there will be one "true" in here if that
// exists, and everything else will be "false".
"wild": &memdb.IndexSchema{
Name: "wild",
AllowMissing: false,
Unique: false,
Indexer: &memdb.ConditionalIndex{
Conditional: isWrappedWild,
},
}, },
"session": &memdb.IndexSchema{ "session": &memdb.IndexSchema{
Name: "session", Name: "session",

View file

@ -41,8 +41,15 @@ func (n *Node) isLeaf() bool {
} }
func (n *Node) addEdge(e edge) { func (n *Node) addEdge(e edge) {
num := len(n.edges)
idx := sort.Search(num, func(i int) bool {
return n.edges[i].label >= e.label
})
n.edges = append(n.edges, e) n.edges = append(n.edges, e)
n.edges.Sort() if idx != num {
copy(n.edges[idx+1:], n.edges[idx:num])
n.edges[idx] = e
}
} }
func (n *Node) replaceEdge(e edge) { func (n *Node) replaceEdge(e edge) {

View file

@ -291,10 +291,6 @@ func (c *CompoundIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != len(c.Indexes) { if len(args) != len(c.Indexes) {
return nil, fmt.Errorf("less arguments than index fields") return nil, fmt.Errorf("less arguments than index fields")
} }
return c.PrefixFromArgs(args...)
}
func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
var out []byte var out []byte
for i, arg := range args { for i, arg := range args {
val, err := c.Indexes[i].FromArgs(arg) val, err := c.Indexes[i].FromArgs(arg)
@ -305,3 +301,30 @@ func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
} }
return out, nil return out, nil
} }
func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
if len(args) > len(c.Indexes) {
return nil, fmt.Errorf("more arguments than index fields")
}
var out []byte
for i, arg := range args {
if i+1 < len(args) {
val, err := c.Indexes[i].FromArgs(arg)
if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
}
out = append(out, val...)
} else {
prefixIndexer, ok := c.Indexes[i].(PrefixIndexer)
if !ok {
return nil, fmt.Errorf("sub-index %d does not support prefix scanning", i)
}
val, err := prefixIndexer.PrefixFromArgs(arg)
if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
}
out = append(out, val...)
}
}
return out, nil
}

View file

@ -2,6 +2,8 @@ package memdb
import ( import (
"sync" "sync"
"sync/atomic"
"unsafe"
"github.com/hashicorp/go-immutable-radix" "github.com/hashicorp/go-immutable-radix"
) )
@ -12,7 +14,7 @@ import (
// transactions and MVCC. // transactions and MVCC.
type MemDB struct { type MemDB struct {
schema *DBSchema schema *DBSchema
root *iradix.Tree root unsafe.Pointer // *iradix.Tree underneath
// There can only be a single writter at once // There can only be a single writter at once
writer sync.Mutex writer sync.Mutex
@ -28,7 +30,7 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
// Create the MemDB // Create the MemDB
db := &MemDB{ db := &MemDB{
schema: schema, schema: schema,
root: iradix.New(), root: unsafe.Pointer(iradix.New()),
} }
if err := db.initialize(); err != nil { if err := db.initialize(); err != nil {
return nil, err return nil, err
@ -36,6 +38,12 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
return db, nil return db, nil
} }
// getRoot is used to do an atomic load of the root pointer
func (db *MemDB) getRoot() *iradix.Tree {
root := (*iradix.Tree)(atomic.LoadPointer(&db.root))
return root
}
// Txn is used to start a new transaction, in either read or write mode. // Txn is used to start a new transaction, in either read or write mode.
// There can only be a single concurrent writer, but any number of readers. // There can only be a single concurrent writer, but any number of readers.
func (db *MemDB) Txn(write bool) *Txn { func (db *MemDB) Txn(write bool) *Txn {
@ -45,7 +53,7 @@ func (db *MemDB) Txn(write bool) *Txn {
txn := &Txn{ txn := &Txn{
db: db, db: db,
write: write, write: write,
rootTxn: db.root.Txn(), rootTxn: db.getRoot().Txn(),
} }
return txn return txn
} }
@ -56,20 +64,22 @@ func (db *MemDB) Txn(write bool) *Txn {
func (db *MemDB) Snapshot() *MemDB { func (db *MemDB) Snapshot() *MemDB {
clone := &MemDB{ clone := &MemDB{
schema: db.schema, schema: db.schema,
root: db.root, root: unsafe.Pointer(db.getRoot()),
} }
return clone return clone
} }
// initialize is used to setup the DB for use after creation // initialize is used to setup the DB for use after creation
func (db *MemDB) initialize() error { func (db *MemDB) initialize() error {
root := db.getRoot()
for tName, tableSchema := range db.schema.Tables { for tName, tableSchema := range db.schema.Tables {
for iName, _ := range tableSchema.Indexes { for iName, _ := range tableSchema.Indexes {
index := iradix.New() index := iradix.New()
path := indexPath(tName, iName) path := indexPath(tName, iName)
db.root, _, _ = db.root.Insert(path, index) root, _, _ = root.Insert(path, index)
} }
} }
db.root = unsafe.Pointer(root)
return nil return nil
} }

View file

@ -1,8 +1,11 @@
package memdb package memdb
import ( import (
"bytes"
"fmt" "fmt"
"strings" "strings"
"sync/atomic"
"unsafe"
"github.com/hashicorp/go-immutable-radix" "github.com/hashicorp/go-immutable-radix"
) )
@ -10,6 +13,7 @@ import (
const ( const (
id = "id" id = "id"
) )
// tableIndex is a tuple of (Table, Index) used for lookups // tableIndex is a tuple of (Table, Index) used for lookups
type tableIndex struct { type tableIndex struct {
Table string Table string
@ -113,7 +117,8 @@ func (txn *Txn) Commit() {
} }
// Update the root of the DB // Update the root of the DB
txn.db.root = txn.rootTxn.Commit() newRoot := txn.rootTxn.Commit()
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
// Clear the txn // Clear the txn
txn.rootTxn = nil txn.rootTxn = nil
@ -161,28 +166,44 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
for name, indexSchema := range tableSchema.Indexes { for name, indexSchema := range tableSchema.Indexes {
indexTxn := txn.writableIndex(table, name) indexTxn := txn.writableIndex(table, name)
// Handle the update by deleting from the index first // Determine the new index value
if update {
ok, val, err := indexSchema.Indexer.FromObject(existing)
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
if ok {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if !indexSchema.Unique {
val = append(val, idVal...)
}
indexTxn.Delete(val)
}
}
// Handle the insert after the update
ok, val, err := indexSchema.Indexer.FromObject(obj) ok, val, err := indexSchema.Indexer.FromObject(obj)
if err != nil { if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err) return fmt.Errorf("failed to build index '%s': %v", name, err)
} }
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if ok && !indexSchema.Unique {
val = append(val, idVal...)
}
// Handle the update by deleting from the index first
if update {
okExist, valExist, err := indexSchema.Indexer.FromObject(existing)
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
if okExist {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if !indexSchema.Unique {
valExist = append(valExist, idVal...)
}
// If we are writing to the same index with the same value,
// we can avoid the delete as the insert will overwrite the
// value anyways.
if !bytes.Equal(valExist, val) {
indexTxn.Delete(valExist)
}
}
}
// If there is no index value, either this is an error or an expected
// case and we can skip updating
if !ok { if !ok {
if indexSchema.AllowMissing { if indexSchema.AllowMissing {
continue continue
@ -191,12 +212,7 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
} }
} }
// Handle non-unique index by computing a unique index. // Update the value of the index
// This is done by appending the primary key which must
// be unique anyways.
if !indexSchema.Unique {
val = append(val, idVal...)
}
indexTxn.Insert(val, obj) indexTxn.Insert(val, obj)
} }
return nil return nil
@ -281,7 +297,7 @@ func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error)
// Do the deletes // Do the deletes
num := 0 num := 0
for _, obj := range(objs) { for _, obj := range objs {
if err := txn.Delete(table, obj); err != nil { if err := txn.Delete(table, obj); err != nil {
return num, err return num, err
} }
@ -318,6 +334,39 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er
return value, nil return value, nil
} }
// LongestPrefix is used to fetch the longest prefix match for the given
// constraints on the index. Note that this will not work with the memdb
// StringFieldIndex because it adds null terminators which prevent the
// algorithm from correctly finding a match (it will get to right before the
// null and fail to find a leaf node). This should only be used where the prefix
// given is capable of matching indexed entries directly, which typically only
// applies to a custom indexer. See the unit test for an example.
func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
// Enforce that this only works on prefix indexes.
if !strings.HasSuffix(index, "_prefix") {
return nil, fmt.Errorf("must use '%s_prefix' on index", index)
}
// Get the index value.
indexSchema, val, err := txn.getIndexValue(table, index, args...)
if err != nil {
return nil, err
}
// This algorithm only makes sense against a unique index, otherwise the
// index keys will have the IDs appended to them.
if !indexSchema.Unique {
return nil, fmt.Errorf("index '%s' is not unique", index)
}
// Find the longest prefix match with the given index.
indexTxn := txn.readableIndex(table, indexSchema.Name)
if _, value, ok := indexTxn.Root().LongestPrefix(val); ok {
return value, nil
}
return nil, nil
}
// getIndexValue is used to get the IndexSchema and the value // getIndexValue is used to get the IndexSchema and the value
// used to scan the index given the parameters. This handles prefix based // used to scan the index given the parameters. This handles prefix based
// scans when the index has the "_prefix" suffix. The index must support // scans when the index has the "_prefix" suffix. The index must support