Changes structs and state store for prepared queries.

This commit is contained in:
James Phillips 2015-11-06 16:59:32 -08:00
parent 3bc9764da8
commit 781f9611e8
8 changed files with 1245 additions and 9 deletions

241
consul/state/query.go Normal file
View File

@ -0,0 +1,241 @@
package state
import (
"fmt"
"strings"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// Queries is used to pull all the prepared queries from the snapshot.
func (s *StateSnapshot) Queries() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("queries", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// Query is used when restoring from a snapshot. For general inserts, use
// QuerySet.
func (s *StateRestore) Query(query *structs.PreparedQuery) error {
if err := s.tx.Insert("queries", query); err != nil {
return fmt.Errorf("failed restoring query: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, query.ModifyIndex, "queries"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("queries")
return nil
}
// QuerySet is used to create or update a prepared query.
func (s *StateStore) QuerySet(idx uint64, query *structs.PreparedQuery) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call set on the Query.
if err := s.querySetTxn(tx, idx, query); err != nil {
return err
}
tx.Commit()
return nil
}
// querySetTxn is the inner method used to insert a prepared query with the
// proper indexes into the state store.
func (s *StateStore) querySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
// Check that the ID is set.
if query.ID == "" {
return ErrMissingQueryID
}
// Check for an existing query.
existing, err := tx.First("queries", "id", query.ID)
if err != nil {
return fmt.Errorf("failed query lookup: %s", err)
}
// Set the indexes.
if existing != nil {
query.CreateIndex = existing.(*structs.PreparedQuery).CreateIndex
query.ModifyIndex = idx
} else {
query.CreateIndex = idx
query.ModifyIndex = idx
}
// Verify that the name doesn't alias any existing ID. If we didn't do
// this then a bad actor could steal traffic away from an existing DNS
// entry.
if query.Name != "" {
existing, err := tx.First("queries", "id", query.Name)
// This is a little unfortunate but the UUID index will complain
// if the name isn't formatted like a UUID, so we can safely
// ignore any UUID format-related errors.
if err != nil && !strings.Contains(err.Error(), "UUID") {
return fmt.Errorf("failed query lookup: %s", err)
}
if existing != nil {
return fmt.Errorf("name '%s' aliases an existing query id", query.Name)
}
}
// Verify that the session exists.
if query.Session != "" {
sess, err := tx.First("sessions", "id", query.Session)
if err != nil {
return fmt.Errorf("failed session lookup: %s", err)
}
if sess == nil {
return fmt.Errorf("invalid session %#v", query.Session)
}
}
// Verify that the service exists.
service, err := tx.First("services", "service", query.Service.Service)
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
if service == nil {
return fmt.Errorf("invalid service %#v", query.Service.Service)
}
// Insert the query.
if err := tx.Insert("queries", query); err != nil {
return fmt.Errorf("failed inserting query: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"queries", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["queries"].Notify() })
return nil
}
// QueryDelete deletes the given query by ID.
func (s *StateStore) QueryDelete(idx uint64, queryID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
watches := NewDumbWatchManager(s.tableWatches)
if err := s.queryDeleteTxn(tx, idx, watches, queryID); err != nil {
return fmt.Errorf("failed query delete: %s", err)
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// queryDeleteTxn is the inner method used to delete a prepared query with the
// proper indexes into the state store.
func (s *StateStore) queryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
queryID string) error {
// Pull the query.
query, err := tx.First("queries", "id", queryID)
if err != nil {
return fmt.Errorf("failed query lookup: %s", err)
}
if query == nil {
return nil
}
// Delete the query and update the index.
if err := tx.Delete("queries", query); err != nil {
return fmt.Errorf("failed query delete: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"queries", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("queries")
return nil
}
// QueryGet returns the given prepared query by ID.
func (s *StateStore) QueryGet(queryID string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("QueryGet")...)
// Look up the query by its ID.
query, err := tx.First("queries", "id", queryID)
if err != nil {
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
}
if query != nil {
return idx, query.(*structs.PreparedQuery), nil
}
return idx, nil, nil
}
// QueryLookup returns the given prepared query by looking up an ID or Name.
func (s *StateStore) QueryLookup(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("QueryLookup")...)
// Explicitly ban an empty query. This will never match an ID and the
// schema is set up so it will never match a query with an empty name,
// but we check it here to be explicit about it (we'd never want to
// return the results from the first query w/o a name).
if queryIDOrName == "" {
return idx, nil, ErrMissingQueryID
}
// Try first by ID.
query, err := tx.First("queries", "id", queryIDOrName)
// This is a little unfortunate but the UUID index will complain
// if the name isn't formatted like a UUID, so we can safely
// ignore any UUID format-related errors.
if err != nil && !strings.Contains(err.Error(), "UUID") {
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
}
if query != nil {
return idx, query.(*structs.PreparedQuery), nil
}
// Then try by name.
query, err = tx.First("queries", "name", queryIDOrName)
if err != nil {
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
}
if query != nil {
return idx, query.(*structs.PreparedQuery), nil
}
return idx, nil, nil
}
// QueryList returns all the prepared queries.
func (s *StateStore) QueryList() (uint64, structs.PreparedQueries, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("QueryList")...)
// Query all of the prepared queries in the state store.
queries, err := tx.Get("queries", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
}
// Go over all of the queries and build the response.
var result structs.PreparedQueries
for query := queries.Next(); query != nil; query = queries.Next() {
result = append(result, query.(*structs.PreparedQuery))
}
return idx, result, nil
}

580
consul/state/query_test.go Normal file
View File

@ -0,0 +1,580 @@
package state
import (
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
s := testStateStore(t)
// Querying with no results returns nil.
idx, res, err := s.QueryGet(testUUID())
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Inserting a query with empty ID is disallowed.
if err := s.QuerySet(1, &structs.PreparedQuery{}); err == nil {
t.Fatalf("expected %#v, got: %#v", ErrMissingQueryID, err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Build a legit-looking query with the most basic options.
query := &structs.PreparedQuery{
ID: testUUID(),
Service: structs.ServiceQuery{
Service: "redis",
},
}
// The set will still fail because the service isn't registered yet.
err = s.QuerySet(1, query)
if err == nil || !strings.Contains(err.Error(), "invalid service") {
t.Fatalf("bad: %v", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Now register the service.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
// This should go through.
if err := s.QuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Read it back out and verify it.
expected := &structs.PreparedQuery{
ID: query.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
}
idx, actual, err := s.QueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Give it a name and set it again.
query.Name = "test-query"
if err := s.QuerySet(4, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Read it back and verify the data was updated as well as the index.
expected.Name = "test-query"
expected.ModifyIndex = 4
idx, actual, err = s.QueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Try to tie it to a bogus session.
query.Session = testUUID()
err = s.QuerySet(5, query)
if err == nil || !strings.Contains(err.Error(), "invalid session") {
t.Fatalf("bad: %v", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Now make a session and try again.
session := &structs.Session{
ID: query.Session,
Node: "foo",
}
if err := s.SessionCreate(5, session); err != nil {
t.Fatalf("err: %s", err)
}
if err := s.QuerySet(6, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Read it back and verify the data was updated as well as the index.
expected.Session = query.Session
expected.ModifyIndex = 6
idx, actual, err = s.QueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Finally, try to abuse the system by trying to register a query whose
// name aliases a real query ID.
evil := &structs.PreparedQuery{
ID: testUUID(),
Name: query.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
}
err = s.QuerySet(7, evil)
if err == nil || !strings.Contains(err.Error(), "aliases an existing query") {
t.Fatalf("bad: %v", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Sanity check to make sure it's not there.
idx, actual, err = s.QueryGet(evil.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
func TestStateStore_Query_QueryDelete(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
// Create a new query.
query := &structs.PreparedQuery{
ID: testUUID(),
Service: structs.ServiceQuery{
Service: "redis",
},
}
// Deleting a query that doesn't exist should be a no-op.
if err := s.QueryDelete(3, query.ID); err != nil {
t.Fatalf("err: %s", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Now add the query to the data store.
if err := s.QuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Read it back out and verify it.
expected := &structs.PreparedQuery{
ID: query.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
}
idx, actual, err := s.QueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Now delete it.
if err := s.QueryDelete(4, query.ID); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Sanity check to make sure it's not there.
idx, actual, err = s.QueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
func TestStateStore_Query_QueryLookup(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
// Create a new query.
query := &structs.PreparedQuery{
ID: testUUID(),
Name: "my-test-query",
Service: structs.ServiceQuery{
Service: "redis",
},
}
// Try to lookup a query that's not there using something that looks
// like a real ID.
idx, actual, err := s.QueryLookup(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
// Try to lookup a query that's not there using something that looks
// like a name
idx, actual, err = s.QueryLookup(query.Name)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
// Now actually insert the query.
if err := s.QuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Read it back out using the ID and verify it.
expected := &structs.PreparedQuery{
ID: query.ID,
Name: "my-test-query",
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
}
idx, actual, err = s.QueryLookup(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Read it back using the name and verify it again.
idx, actual, err = s.QueryLookup(query.Name)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Make sure an empty lookup is well-behaved if there are actual queries
// in the state store.
if _, _, err = s.QueryLookup(""); err != ErrMissingQueryID {
t.Fatalf("bad: %v", err)
}
}
func TestStateStore_Query_QueryList(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
testRegisterService(t, s, 3, "foo", "mongodb")
// Create some queries.
queries := structs.PreparedQueries{
&structs.PreparedQuery{
ID: testUUID(),
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
},
&structs.PreparedQuery{
ID: testUUID(),
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
},
}
// Force the sort order of the UUIDs before we create them so the
// order is deterministic.
queries[0].ID = "a" + queries[0].ID[1:]
queries[1].ID = "b" + queries[1].ID[1:]
// Now create the queries.
for i, query := range queries {
if err := s.QuerySet(uint64(4+i), query); err != nil {
t.Fatalf("err: %s", err)
}
}
// Read it back and verify.
expected := structs.PreparedQueries{
&structs.PreparedQuery{
ID: queries[0].ID,
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
ModifyIndex: 4,
},
},
&structs.PreparedQuery{
ID: queries[1].ID,
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
}
idx, actual, err := s.QueryList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
}
func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
testRegisterService(t, s, 3, "foo", "mongodb")
// Create some queries.
queries := structs.PreparedQueries{
&structs.PreparedQuery{
ID: testUUID(),
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
},
&structs.PreparedQuery{
ID: testUUID(),
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
},
}
// Force the sort order of the UUIDs before we create them so the
// order is deterministic.
queries[0].ID = "a" + queries[0].ID[1:]
queries[1].ID = "b" + queries[1].ID[1:]
// Now create the queries.
for i, query := range queries {
if err := s.QuerySet(uint64(4+i), query); err != nil {
t.Fatalf("err: %s", err)
}
}
// Snapshot the queries.
snap := s.Snapshot()
defer snap.Close()
// Alter the real state store.
if err := s.QueryDelete(6, queries[0].ID); err != nil {
t.Fatalf("err: %s", err)
}
// Verify the snapshot.
if idx := snap.LastIndex(); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
expected := structs.PreparedQueries{
&structs.PreparedQuery{
ID: queries[0].ID,
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
ModifyIndex: 4,
},
},
&structs.PreparedQuery{
ID: queries[1].ID,
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
}
iter, err := snap.Queries()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.PreparedQueries
for query := iter.Next(); query != nil; query = iter.Next() {
dump = append(dump, query.(*structs.PreparedQuery))
}
if !reflect.DeepEqual(dump, expected) {
t.Fatalf("bad: %v", dump)
}
// Restore the values into a new state store.
func() {
s := testStateStore(t)
restore := s.Restore()
for _, query := range dump {
if err := restore.Query(query); err != nil {
t.Fatalf("err: %s", err)
}
}
restore.Commit()
// Read the restored queries back out and verify that they
// match.
idx, actual, err := s.QueryList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
}()
}
func TestStateStore_Query_Watches(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
query := &structs.PreparedQuery{
ID: testUUID(),
Service: structs.ServiceQuery{
Service: "redis",
},
}
// Call functions that update the queries table and make sure a watch
// fires each time.
verifyWatch(t, s.getTableWatch("queries"), func() {
if err := s.QuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("queries"), func() {
if err := s.QueryDelete(4, query.ID); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("queries"), func() {
restore := s.Restore()
if err := restore.Query(query); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
}

View File

@ -30,6 +30,7 @@ func stateStoreSchema() *memdb.DBSchema {
sessionChecksTableSchema,
aclsTableSchema,
coordinatesTableSchema,
queriesTableSchema,
}
// Add the tables to the root schema
@ -365,3 +366,38 @@ func coordinatesTableSchema() *memdb.TableSchema {
},
}
}
// queriesTableSchema returns a new table schema used for storing
// prepared queries.
func queriesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "queries",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.UUIDFieldIndex{
Field: "ID",
},
},
"name": &memdb.IndexSchema{
Name: "name",
AllowMissing: true,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Name",
Lowercase: true,
},
},
"session": &memdb.IndexSchema{
Name: "session",
AllowMissing: true,
Unique: false,
Indexer: &memdb.UUIDFieldIndex{
Field: "Session",
},
},
},
}
}

View File

@ -24,9 +24,13 @@ var (
// is attempted with an empty session ID.
ErrMissingSessionID = errors.New("Missing session ID")
// ErrMissingACLID is returned when a session set is called on
// a session with an empty ID.
// ErrMissingACLID is returned when an ACL set is called on
// an ACL with an empty ID.
ErrMissingACLID = errors.New("Missing ACL ID")
// ErrMissingQueryID is returned when a Query set is called on
// a Query with an empty ID.
ErrMissingQueryID = errors.New("Missing Query ID")
)
// StateStore is where we store all of Consul's state, including
@ -409,6 +413,8 @@ func (s *StateStore) getWatchTables(method string) []string {
return []string{"acls"}
case "Coordinates":
return []string{"coordinates"}
case "QueryGet", "QueryLookup", "QueryList":
return []string{"queries"}
}
panic(fmt.Sprintf("Unknown method %s", method))
@ -2120,15 +2126,37 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
if err != nil {
return fmt.Errorf("failed session checks lookup: %s", err)
}
var objs []interface{}
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
objs = append(objs, mapping)
{
var objs []interface{}
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
objs = append(objs, mapping)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
if err := tx.Delete("session_checks", obj); err != nil {
return fmt.Errorf("failed deleting session check: %s", err)
}
}
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
if err := tx.Delete("session_checks", obj); err != nil {
return fmt.Errorf("failed deleting session check: %s", err)
// Delete any prepared queries.
queries, err := tx.Get("queries", "session", sessionID)
if err != nil {
return fmt.Errorf("failed query lookup: %s", err)
}
{
var objs []interface{}
for query := queries.Next(); query != nil; query = queries.Next() {
objs = append(objs, query)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
q := obj.(*structs.PreparedQuery)
if err := s.queryDeleteTxn(tx, idx, watches, q.ID); err != nil {
return fmt.Errorf("failed query delete: %s", err)
}
}
}

View File

@ -4445,6 +4445,64 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
}
}
func TestStateStore_Session_Invalidate_Query_Delete(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
session := &structs.Session{
ID: testUUID(),
Node: "foo",
}
if err := s.SessionCreate(3, session); err != nil {
t.Fatalf("err: %v", err)
}
query := &structs.PreparedQuery{
ID: testUUID(),
Session: session.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
}
if err := s.QuerySet(4, query); err != nil {
t.Fatalf("err: %s", err)
}
// Invalidate the session and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("queries"), func() {
if err := s.SessionDestroy(5, session.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
})
// Make sure the session is gone.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
// Make sure the query is gone and the index is updated.
idx, q2, err := s.QueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if q2 != nil {
t.Fatalf("bad: %v", q2)
}
}
func TestStateStore_ACLSet_ACLGet(t *testing.T) {
s := testStateStore(t)

151
consul/structs/query.go Normal file
View File

@ -0,0 +1,151 @@
package structs
import (
"time"
)
const (
QueryOrderShuffle = "shuffle"
QueryOrderSort = "near_agent"
)
const (
QueryTTLMax = 24 * time.Hour
QueryTTLMin = 10 * time.Second
)
// QueryDatacenterOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter.
type QueryDatacenterOptions struct {
// NearestN is set to the number of remote datacenters to try, based on
// network coordinates.
NearestN int
// Datacenters is a fixed list of datacenters to try after NearestN. We
// never try a datacenter multiple times, so those are subtracted from
// this list before proceeding.
Datacenters []string
}
// QueryDNSOptions controls settings when query results are served over DNS.
type QueryDNSOptions struct {
// TTL is the time to live for the served DNS results.
TTL string
}
// ServiceQuery is used to query for a set of healthy nodes offering a specific
// service.
type ServiceQuery struct {
// Service is the service to query.
Service string
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryDatacenterOptions
// If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be
// discarded)
OnlyPassing bool
// Tags are a set of required and/or disallowed tags. If a tag is in
// this list it must be present. If the tag is preceded with "~" then
// it is disallowed.
Tags []string
// Sort has one of the QueryOrder* options which control how the output
// is sorted. If this is left blank we default to "shuffle".
Sort string
}
// PreparedQuery defines a complete prepared query, and is the structure we
// maintain in the state store.
type PreparedQuery struct {
// ID is this UUID-based ID for the query, always generated by Consul.
ID string
// Name is an optional friendly name for the query supplied by the
// user. NOTE - if this feature is used then it will reduce the security
// of any read ACL associated with this query/service since this name
// can be used to locate nodes with supplying any ACL.
Name string
// TTL is the time to live for the query itself. If this is omitted then
// the query will not expire (unless tied to a session).
TTL string
// Session is an optional session to tie this query's lifetime to. If
// this is omitted then the query will not expire (unless given a TTL).
Session string
// Token is the ACL token used when the query was created, and it is
// used when a query is subsequently executed. This token, or a token
// with management privileges, must be used to change the query later.
Token string
// Service defines a service query (leaving things open for other types
// later).
Service ServiceQuery
// DNS has options that control how the results of this query are
// served over DNS.
DNS QueryDNSOptions
RaftIndex
}
type PreparedQueries []*PreparedQuery
type QueryOp string
const (
QueryCreate QueryOp = "create"
QueryUpdate = "update"
QueryDelete = "delete"
)
// QueryRequest is used to create or change prepared queries.
type QueryRequest struct {
Datacenter string
Op QueryOp
Query PreparedQuery
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (q *QueryRequest) RequestDatacenter() string {
return q.Datacenter
}
// QuerySpecificRequest is used to execute a prepared query.
type QuerySpecificRequest struct {
Datacenter string
QueryIDOrName string
Source QuerySource
QueryOptions
}
// RequestDatacenter returns the datacenter for a given request.
func (q *QuerySpecificRequest) RequestDatacenter() string {
return q.Datacenter
}
// QueryRemoteRequest is used when running a local query in a remote
// datacenter. We have to ship the entire query over since it won't be
// present in the remote state store.
type QueryRemoteRequest struct {
Datacenter string
Query PreparedQuery
QueryOptions
}
// RequestDatacenter returns the datacenter for a given request.
func (q *QueryRemoteRequest) RequestDatacenter() string {
return q.Datacenter
}
// QueryExecutionResponse has the results of executing a query.
type QueryExecutionResponse struct {
Nodes CheckServiceNodes
DNS QueryDNSOptions
}

View File

@ -3,6 +3,7 @@ package structs
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"time"
@ -34,6 +35,7 @@ const (
ACLRequestType
TombstoneRequestType
CoordinateBatchUpdateType
QueryRequestType
)
const (
@ -403,6 +405,35 @@ type CheckServiceNode struct {
}
type CheckServiceNodes []CheckServiceNode
// Shuffle does an in-place random shuffle using the Fisher-Yates algorithm.
func (nodes CheckServiceNodes) Shuffle() {
for i := len(nodes) - 1; i > 0; i-- {
j := rand.Int31() % int32(i+1)
nodes[i], nodes[j] = nodes[j], nodes[i]
}
}
// Filter removes nodes that are failing health checks (and any non-passing
// check if that option is selected). Note that this returns the filtered
// results AND modifies the receiver for performance.
func (nodes CheckServiceNodes) Filter(onlyPassing bool) CheckServiceNodes {
n := len(nodes)
OUTER:
for i := 0; i < n; i++ {
node := nodes[i]
for _, check := range node.Checks {
if check.Status == HealthCritical ||
(onlyPassing && check.Status != HealthPassing) {
nodes[i], nodes[n-1] = nodes[n-1], CheckServiceNode{}
n--
i--
continue OUTER
}
}
}
return nodes[:n]
}
// NodeInfo is used to dump all associated information about
// a node. This is currently used for the UI only, as it is
// rather expensive to generate.

View File

@ -209,6 +209,117 @@ func TestStructs_HealthCheck_IsSame(t *testing.T) {
check(&other.ServiceName)
}
func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) {
nodes := CheckServiceNodes{
CheckServiceNode{
Node: &Node{
Node: "node1",
Address: "127.0.0.1",
},
},
CheckServiceNode{
Node: &Node{
Node: "node2",
Address: "127.0.0.2",
},
},
CheckServiceNode{
Node: &Node{
Node: "node3",
Address: "127.0.0.3",
},
},
}
// Make a copy to shuffle and make sure it matches initially.
twiddle := make(CheckServiceNodes, len(nodes))
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
if !reflect.DeepEqual(twiddle, nodes) {
t.Fatalf("bad: %v", twiddle)
}
// Give this lots of tries to randomize. If we find a case that's
// not equal we can end the test, otherwise we will call shenanigans.
for i := 0; i < 100; i++ {
twiddle.Shuffle()
if !reflect.DeepEqual(twiddle, nodes) {
return
}
}
t.Fatalf("shuffle is not working")
}
func TestStructs_CheckServiceNodes_Filter(t *testing.T) {
nodes := CheckServiceNodes{
CheckServiceNode{
Node: &Node{
Node: "node1",
Address: "127.0.0.1",
},
Checks: HealthChecks{
&HealthCheck{
Status: HealthWarning,
},
},
},
CheckServiceNode{
Node: &Node{
Node: "node2",
Address: "127.0.0.2",
},
Checks: HealthChecks{
&HealthCheck{
Status: HealthPassing,
},
},
},
CheckServiceNode{
Node: &Node{
Node: "node3",
Address: "127.0.0.3",
},
Checks: HealthChecks{
&HealthCheck{
Status: HealthCritical,
},
},
},
}
// Test the case where warnings are allowed.
{
twiddle := make(CheckServiceNodes, len(nodes))
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
filtered := twiddle.Filter(false)
expected := CheckServiceNodes{
nodes[0],
nodes[1],
}
if !reflect.DeepEqual(filtered, expected) {
t.Fatalf("bad: %v", filtered)
}
}
// Limit to only passing checks.
{
twiddle := make(CheckServiceNodes, len(nodes))
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
filtered := twiddle.Filter(true)
expected := CheckServiceNodes{
nodes[1],
}
if !reflect.DeepEqual(filtered, expected) {
t.Fatalf("bad: %v", filtered)
}
}
}
func TestStructs_DirEntry_Clone(t *testing.T) {
e := &DirEntry{
LockIndex: 5,