Adds prefix "prepared" to everything prepared query-related.
This commit is contained in:
parent
2183565d83
commit
09034a84bd
|
@ -91,8 +91,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||
return c.applyTombstoneOperation(buf[1:], log.Index)
|
||||
case structs.CoordinateBatchUpdateType:
|
||||
return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
|
||||
case structs.QueryRequestType:
|
||||
return c.applyQueryOperation(buf[1:], log.Index)
|
||||
case structs.PreparedQueryRequestType:
|
||||
return c.applyPreparedQueryOperation(buf[1:], log.Index)
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
|
@ -266,20 +266,22 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyQueryOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.QueryRequest
|
||||
// applyPreparedQueryOperation applies the given prepared query operation to the
|
||||
// state store.
|
||||
func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.PreparedQueryRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "query", string(req.Op)}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now())
|
||||
switch req.Op {
|
||||
case structs.QueryCreate, structs.QueryUpdate:
|
||||
return c.state.QuerySet(index, &req.Query)
|
||||
case structs.QueryDelete:
|
||||
return c.state.QueryDelete(index, req.Query.ID)
|
||||
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
|
||||
return c.state.PreparedQuerySet(index, &req.Query)
|
||||
case structs.PreparedQueryDelete:
|
||||
return c.state.PreparedQueryDelete(index, req.Query.ID)
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid Query operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid Query operation '%s'", req.Op)
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid PreparedQuery operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,35 +15,35 @@ var (
|
|||
ErrQueryNotFound = errors.New("Query not found")
|
||||
)
|
||||
|
||||
// Query manages the prepared query endpoint.
|
||||
type Query struct {
|
||||
// PreparedQuery manages the prepared query endpoint.
|
||||
type PreparedQuery struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// Apply is used to apply a modifying request to the data store. This should
|
||||
// only be used for operations that modify the data. The ID of the session is
|
||||
// returned in the reply.
|
||||
func (q *Query) Apply(args *structs.QueryRequest, reply *string) (err error) {
|
||||
if done, err := q.srv.forward("Query.Apply", args, args, reply); done {
|
||||
func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) {
|
||||
if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "query", "apply"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"consul", "prepared-query", "apply"}, time.Now())
|
||||
|
||||
// Validate the ID. We must create new IDs before applying to the Raft
|
||||
// log since it's not deterministic.
|
||||
if args.Op == structs.QueryCreate {
|
||||
if args.Op == structs.PreparedQueryCreate {
|
||||
if args.Query.ID != "" {
|
||||
return fmt.Errorf("ID must be empty when creating a new query")
|
||||
return fmt.Errorf("ID must be empty when creating a new prepared query")
|
||||
}
|
||||
|
||||
// We are relying on the fact that UUIDs are random and unlikely
|
||||
// to collide since this isn't inside a write transaction.
|
||||
state := q.srv.fsm.State()
|
||||
state := p.srv.fsm.State()
|
||||
for {
|
||||
args.Query.ID = generateUUID()
|
||||
_, query, err := state.QueryGet(args.Query.ID)
|
||||
_, query, err := state.PreparedQueryGet(args.Query.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Query lookup failed: %v", err)
|
||||
return fmt.Errorf("Prepared query lookup failed: %v", err)
|
||||
}
|
||||
if query == nil {
|
||||
break
|
||||
|
@ -53,55 +53,55 @@ func (q *Query) Apply(args *structs.QueryRequest, reply *string) (err error) {
|
|||
*reply = args.Query.ID
|
||||
|
||||
// Grab the ACL because we need it in several places below.
|
||||
acl, err := q.srv.resolveToken(args.Token)
|
||||
acl, err := p.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Enforce that any modify operation has the same token used when the
|
||||
// query was created, or a management token with sufficient rights.
|
||||
if args.Op != structs.QueryCreate {
|
||||
state := q.srv.fsm.State()
|
||||
_, query, err := state.QueryGet(args.Query.ID)
|
||||
if args.Op != structs.PreparedQueryCreate {
|
||||
state := p.srv.fsm.State()
|
||||
_, query, err := state.PreparedQueryGet(args.Query.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Query lookup failed: %v", err)
|
||||
return fmt.Errorf("Prepared Query lookup failed: %v", err)
|
||||
}
|
||||
if query == nil {
|
||||
return fmt.Errorf("Cannot modify non-existent query: '%s'", args.Query.ID)
|
||||
return fmt.Errorf("Cannot modify non-existent prepared query: '%s'", args.Query.ID)
|
||||
}
|
||||
if (query.Token != args.Token) && (acl != nil && !acl.QueryModify()) {
|
||||
q.srv.logger.Printf("[WARN] consul.query: Operation on query '%s' denied because ACL didn't match ACL used to create the query, and a management token wasn't supplied", args.Query.ID)
|
||||
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied because ACL didn't match ACL used to create the query, and a management token wasn't supplied", args.Query.ID)
|
||||
return permissionDeniedErr
|
||||
}
|
||||
}
|
||||
|
||||
// Parse the query and prep it for the state store.
|
||||
switch args.Op {
|
||||
case structs.QueryCreate, structs.QueryUpdate:
|
||||
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
|
||||
if err := parseQuery(&args.Query); err != nil {
|
||||
return fmt.Errorf("Invalid query: %v", err)
|
||||
return fmt.Errorf("Invalid prepared query: %v", err)
|
||||
}
|
||||
|
||||
if acl != nil && !acl.ServiceRead(args.Query.Service.Service) {
|
||||
q.srv.logger.Printf("[WARN] consul.query: Operation on query for service '%s' denied due to ACLs", args.Query.Service.Service)
|
||||
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query for service '%s' denied due to ACLs", args.Query.Service.Service)
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
case structs.QueryDelete:
|
||||
case structs.PreparedQueryDelete:
|
||||
// Nothing else to verify here, just do the delete (we only look
|
||||
// at the ID field for this op).
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unknown query operation: %s", args.Op)
|
||||
return fmt.Errorf("Unknown prepared query operation: %s", args.Op)
|
||||
}
|
||||
|
||||
// At this point the token has been vetted, so make sure the token that
|
||||
// is stored in the state store matches what was supplied.
|
||||
args.Query.Token = args.Token
|
||||
|
||||
resp, err := q.srv.raftApply(structs.QueryRequestType, args)
|
||||
resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
|
||||
if err != nil {
|
||||
q.srv.logger.Printf("[ERR] consul.query: Apply failed %v", err)
|
||||
p.srv.logger.Printf("[ERR] consul.prepared_query: Apply failed %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
|
@ -194,22 +194,23 @@ func parseDNS(dns *structs.QueryDNSOptions) error {
|
|||
// Execute runs a prepared query and returns the results. This will perform the
|
||||
// failover logic if no local results are available. This is typically called as
|
||||
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
|
||||
func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryExecuteResponse) error {
|
||||
if done, err := q.srv.forward("Query.Execute", args, args, reply); done {
|
||||
func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "query", "execute"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute"}, time.Now())
|
||||
|
||||
// We have to do this ourselves since we are not doing a blocking RPC.
|
||||
if args.RequireConsistent {
|
||||
if err := q.srv.consistentRead(); err != nil {
|
||||
if err := p.srv.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Try to locate the query.
|
||||
state := q.srv.fsm.State()
|
||||
_, query, err := state.QueryLookup(args.QueryIDOrName)
|
||||
state := p.srv.fsm.State()
|
||||
_, query, err := state.PreparedQueryLookup(args.QueryIDOrName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -218,7 +219,7 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
|
|||
}
|
||||
|
||||
// Execute the query for the local DC.
|
||||
if err := q.execute(query, reply); err != nil {
|
||||
if err := p.execute(query, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -226,7 +227,7 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
|
|||
// requested an RTT sort.
|
||||
reply.Nodes.Shuffle()
|
||||
if query.Service.Sort == structs.QueryOrderSort {
|
||||
if err := q.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
|
||||
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -235,8 +236,8 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
|
|||
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
|
||||
// by the query setup.
|
||||
if len(reply.Nodes) == 0 {
|
||||
wrapper := &queryServerWrapper{q.srv}
|
||||
if err := queryFailover(wrapper, query, args, reply); err != nil {
|
||||
wrapper := &queryServerWrapper{p.srv}
|
||||
if err := queryFailover(wrapper, query, args.QueryOptions, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -249,22 +250,22 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
|
|||
// over since the remote side won't have it in its state store, and this doesn't
|
||||
// do the failover logic since that's already being run on the originating DC.
|
||||
// We don't want things to fan out further than one level.
|
||||
func (q *Query) ExecuteRemote(args *structs.QueryExecuteRemoteRequest,
|
||||
reply *structs.QueryExecuteResponse) error {
|
||||
if done, err := q.srv.forward("Query.ExecuteRemote", args, args, reply); done {
|
||||
func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "query", "execute_remote"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute_remote"}, time.Now())
|
||||
|
||||
// We have to do this ourselves since we are not doing a blocking RPC.
|
||||
if args.RequireConsistent {
|
||||
if err := q.srv.consistentRead(); err != nil {
|
||||
if err := p.srv.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Run the query locally to see what we can find.
|
||||
if err := q.execute(&args.Query, reply); err != nil {
|
||||
if err := p.execute(&args.Query, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -278,8 +279,9 @@ func (q *Query) ExecuteRemote(args *structs.QueryExecuteRemoteRequest,
|
|||
|
||||
// execute runs a prepared query in the local DC without any failover. We don't
|
||||
// apply any sorting options at this level - it should be done up above.
|
||||
func (q *Query) execute(query *structs.PreparedQuery, reply *structs.QueryExecuteResponse) error {
|
||||
state := q.srv.fsm.State()
|
||||
func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
state := p.srv.fsm.State()
|
||||
_, nodes, err := state.CheckServiceNodes(query.Service.Service)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -290,7 +292,7 @@ func (q *Query) execute(query *structs.PreparedQuery, reply *structs.QueryExecut
|
|||
// the token stored with the query, NOT the passed-in one, which is
|
||||
// critical to how queries work (the query becomes a proxy for a lookup
|
||||
// using the ACL it was created with).
|
||||
if err := q.srv.filterACL(query.Token, nodes); err != nil {
|
||||
if err := p.srv.filterACL(query.Token, nodes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -396,7 +398,8 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
|
|||
// queryFailover runs an algorithm to determine which DCs to try and then calls
|
||||
// them to try to locate alternative services.
|
||||
func queryFailover(q queryServer, query *structs.PreparedQuery,
|
||||
args *structs.QueryExecuteRequest, reply *structs.QueryExecuteResponse) error {
|
||||
options structs.QueryOptions,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
|
||||
// Build a candidate list of DCs, starting with the nearest N from RTTs.
|
||||
var dcs []string
|
||||
|
@ -427,12 +430,12 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
|
|||
|
||||
// Now try the selected DCs in priority order.
|
||||
for _, dc := range dcs {
|
||||
remote := &structs.QueryExecuteRemoteRequest{
|
||||
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
||||
Datacenter: dc,
|
||||
Query: *query,
|
||||
QueryOptions: args.QueryOptions,
|
||||
QueryOptions: options,
|
||||
}
|
||||
if err := q.ForwardDC("Query.ExecuteRemote", dc, remote, reply); err != nil {
|
||||
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -9,7 +9,7 @@ import (
|
|||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestQuery_Apply(t *testing.T) {
|
||||
func TestPreparedQuery_Apply(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
@ -18,9 +18,9 @@ func TestQuery_Apply(t *testing.T) {
|
|||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.QueryRequest{
|
||||
arg := structs.PreparedQueryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.QueryCreate,
|
||||
Op: structs.PreparedQueryCreate,
|
||||
Query: structs.PreparedQuery{
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "redis",
|
||||
|
@ -28,7 +28,7 @@ func TestQuery_Apply(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var reply string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Query.Apply", &arg, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
|
@ -153,15 +153,15 @@ type Server struct {
|
|||
|
||||
// Holds the RPC endpoints
|
||||
type endpoints struct {
|
||||
Catalog *Catalog
|
||||
Health *Health
|
||||
Status *Status
|
||||
KVS *KVS
|
||||
Session *Session
|
||||
Internal *Internal
|
||||
ACL *ACL
|
||||
Coordinate *Coordinate
|
||||
Query *Query
|
||||
Catalog *Catalog
|
||||
Health *Health
|
||||
Status *Status
|
||||
KVS *KVS
|
||||
Session *Session
|
||||
Internal *Internal
|
||||
ACL *ACL
|
||||
Coordinate *Coordinate
|
||||
PreparedQuery *PreparedQuery
|
||||
}
|
||||
|
||||
// NewServer is used to construct a new Consul server from the
|
||||
|
@ -412,7 +412,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
|
|||
s.endpoints.Internal = &Internal{s}
|
||||
s.endpoints.ACL = &ACL{s}
|
||||
s.endpoints.Coordinate = NewCoordinate(s)
|
||||
s.endpoints.Query = &Query{s}
|
||||
s.endpoints.PreparedQuery = &PreparedQuery{s}
|
||||
|
||||
// Register the handlers
|
||||
s.rpcServer.Register(s.endpoints.Status)
|
||||
|
@ -423,7 +423,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
|
|||
s.rpcServer.Register(s.endpoints.Internal)
|
||||
s.rpcServer.Register(s.endpoints.ACL)
|
||||
s.rpcServer.Register(s.endpoints.Coordinate)
|
||||
s.rpcServer.Register(s.endpoints.Query)
|
||||
s.rpcServer.Register(s.endpoints.PreparedQuery)
|
||||
|
||||
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
||||
if err != nil {
|
||||
|
|
|
@ -8,37 +8,36 @@ import (
|
|||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// Queries is used to pull all the prepared queries from the snapshot.
|
||||
func (s *StateSnapshot) Queries() (memdb.ResultIterator, error) {
|
||||
iter, err := s.tx.Get("queries", "id")
|
||||
// PreparedQueries is used to pull all the prepared queries from the snapshot.
|
||||
func (s *StateSnapshot) PreparedQueries() (memdb.ResultIterator, error) {
|
||||
iter, err := s.tx.Get("prepared-queries", "id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// Query is used when restoring from a snapshot. For general inserts, use
|
||||
// QuerySet.
|
||||
func (s *StateRestore) Query(query *structs.PreparedQuery) error {
|
||||
if err := s.tx.Insert("queries", query); err != nil {
|
||||
return fmt.Errorf("failed restoring query: %s", err)
|
||||
// PrepparedQuery is used when restoring from a snapshot. For general inserts,
|
||||
// use PreparedQuerySet.
|
||||
func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
|
||||
if err := s.tx.Insert("prepared-queries", query); err != nil {
|
||||
return fmt.Errorf("failed restoring prepared query: %s", err)
|
||||
}
|
||||
|
||||
if err := indexUpdateMaxTxn(s.tx, query.ModifyIndex, "queries"); err != nil {
|
||||
if err := indexUpdateMaxTxn(s.tx, query.ModifyIndex, "prepared-queries"); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
s.watches.Arm("queries")
|
||||
s.watches.Arm("prepared-queries")
|
||||
return nil
|
||||
}
|
||||
|
||||
// QuerySet is used to create or update a prepared query.
|
||||
func (s *StateStore) QuerySet(idx uint64, query *structs.PreparedQuery) error {
|
||||
// PreparedQuerySet is used to create or update a prepared query.
|
||||
func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Call set on the Query.
|
||||
if err := s.querySetTxn(tx, idx, query); err != nil {
|
||||
if err := s.preparedQuerySetTxn(tx, idx, query); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -46,18 +45,18 @@ func (s *StateStore) QuerySet(idx uint64, query *structs.PreparedQuery) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// querySetTxn is the inner method used to insert a prepared query with the
|
||||
// proper indexes into the state store.
|
||||
func (s *StateStore) querySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
|
||||
// preparedQuerySetTxn is the inner method used to insert a prepared query with
|
||||
// the proper indexes into the state store.
|
||||
func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
|
||||
// Check that the ID is set.
|
||||
if query.ID == "" {
|
||||
return ErrMissingQueryID
|
||||
}
|
||||
|
||||
// Check for an existing query.
|
||||
existing, err := tx.First("queries", "id", query.ID)
|
||||
existing, err := tx.First("prepared-queries", "id", query.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed query lookup: %s", err)
|
||||
return fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
|
||||
// Set the indexes.
|
||||
|
@ -73,7 +72,7 @@ func (s *StateStore) querySetTxn(tx *memdb.Txn, idx uint64, query *structs.Prepa
|
|||
// this then a bad actor could steal traffic away from an existing DNS
|
||||
// entry.
|
||||
if query.Name != "" {
|
||||
existing, err := tx.First("queries", "id", query.Name)
|
||||
existing, err := tx.First("prepared-queries", "id", query.Name)
|
||||
|
||||
// This is a little unfortunate but the UUID index will complain
|
||||
// if the name isn't formatted like a UUID, so we can safely
|
||||
|
@ -107,25 +106,25 @@ func (s *StateStore) querySetTxn(tx *memdb.Txn, idx uint64, query *structs.Prepa
|
|||
}
|
||||
|
||||
// Insert the query.
|
||||
if err := tx.Insert("queries", query); err != nil {
|
||||
return fmt.Errorf("failed inserting query: %s", err)
|
||||
if err := tx.Insert("prepared-queries", query); err != nil {
|
||||
return fmt.Errorf("failed inserting prepared query: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{"queries", idx}); err != nil {
|
||||
if err := tx.Insert("index", &IndexEntry{"prepared-queries", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { s.tableWatches["queries"].Notify() })
|
||||
tx.Defer(func() { s.tableWatches["prepared-queries"].Notify() })
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueryDelete deletes the given query by ID.
|
||||
func (s *StateStore) QueryDelete(idx uint64, queryID string) error {
|
||||
// PreparedQueryDelete deletes the given query by ID.
|
||||
func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
watches := NewDumbWatchManager(s.tableWatches)
|
||||
if err := s.queryDeleteTxn(tx, idx, watches, queryID); err != nil {
|
||||
return fmt.Errorf("failed query delete: %s", err)
|
||||
if err := s.preparedQueryDeleteTxn(tx, idx, watches, queryID); err != nil {
|
||||
return fmt.Errorf("failed prepared query delete: %s", err)
|
||||
}
|
||||
|
||||
tx.Defer(func() { watches.Notify() })
|
||||
|
@ -133,43 +132,43 @@ func (s *StateStore) QueryDelete(idx uint64, queryID string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// queryDeleteTxn is the inner method used to delete a prepared query with the
|
||||
// proper indexes into the state store.
|
||||
func (s *StateStore) queryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
|
||||
// with the proper indexes into the state store.
|
||||
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
queryID string) error {
|
||||
// Pull the query.
|
||||
query, err := tx.First("queries", "id", queryID)
|
||||
query, err := tx.First("prepared-queries", "id", queryID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed query lookup: %s", err)
|
||||
return fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
if query == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the query and update the index.
|
||||
if err := tx.Delete("queries", query); err != nil {
|
||||
return fmt.Errorf("failed query delete: %s", err)
|
||||
if err := tx.Delete("prepared-queries", query); err != nil {
|
||||
return fmt.Errorf("failed prepared query delete: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{"queries", idx}); err != nil {
|
||||
if err := tx.Insert("index", &IndexEntry{"prepared-queries", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
watches.Arm("queries")
|
||||
watches.Arm("prepared-queries")
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueryGet returns the given prepared query by ID.
|
||||
func (s *StateStore) QueryGet(queryID string) (uint64, *structs.PreparedQuery, error) {
|
||||
// PreparedQueryGet returns the given prepared query by ID.
|
||||
func (s *StateStore) PreparedQueryGet(queryID string) (uint64, *structs.PreparedQuery, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("QueryGet")...)
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryGet")...)
|
||||
|
||||
// Look up the query by its ID.
|
||||
query, err := tx.First("queries", "id", queryID)
|
||||
query, err := tx.First("prepared-queries", "id", queryID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
if query != nil {
|
||||
return idx, query.(*structs.PreparedQuery), nil
|
||||
|
@ -177,13 +176,14 @@ func (s *StateStore) QueryGet(queryID string) (uint64, *structs.PreparedQuery, e
|
|||
return idx, nil, nil
|
||||
}
|
||||
|
||||
// QueryLookup returns the given prepared query by looking up an ID or Name.
|
||||
func (s *StateStore) QueryLookup(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
|
||||
// PreparedQueryLookup returns the given prepared query by looking up an ID or
|
||||
// Name.
|
||||
func (s *StateStore) PreparedQueryLookup(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("QueryLookup")...)
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryLookup")...)
|
||||
|
||||
// Explicitly ban an empty query. This will never match an ID and the
|
||||
// schema is set up so it will never match a query with an empty name,
|
||||
|
@ -194,22 +194,22 @@ func (s *StateStore) QueryLookup(queryIDOrName string) (uint64, *structs.Prepare
|
|||
}
|
||||
|
||||
// Try first by ID.
|
||||
query, err := tx.First("queries", "id", queryIDOrName)
|
||||
query, err := tx.First("prepared-queries", "id", queryIDOrName)
|
||||
|
||||
// This is a little unfortunate but the UUID index will complain
|
||||
// if the name isn't formatted like a UUID, so we can safely
|
||||
// ignore any UUID format-related errors.
|
||||
if err != nil && !strings.Contains(err.Error(), "UUID") {
|
||||
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
if query != nil {
|
||||
return idx, query.(*structs.PreparedQuery), nil
|
||||
}
|
||||
|
||||
// Then try by name.
|
||||
query, err = tx.First("queries", "name", queryIDOrName)
|
||||
query, err = tx.First("prepared-queries", "name", queryIDOrName)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
if query != nil {
|
||||
return idx, query.(*structs.PreparedQuery), nil
|
||||
|
@ -218,18 +218,18 @@ func (s *StateStore) QueryLookup(queryIDOrName string) (uint64, *structs.Prepare
|
|||
return idx, nil, nil
|
||||
}
|
||||
|
||||
// QueryList returns all the prepared queries.
|
||||
func (s *StateStore) QueryList() (uint64, structs.PreparedQueries, error) {
|
||||
// PreparedQueryList returns all the prepared queries.
|
||||
func (s *StateStore) PreparedQueryList() (uint64, structs.PreparedQueries, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("QueryList")...)
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryList")...)
|
||||
|
||||
// Query all of the prepared queries in the state store.
|
||||
queries, err := tx.Get("queries", "id")
|
||||
queries, err := tx.Get("prepared-queries", "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed query lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
|
||||
// Go over all of the queries and build the response.
|
|
@ -8,22 +8,22 @@ import (
|
|||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
||||
func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Querying with no results returns nil.
|
||||
idx, res, err := s.QueryGet(testUUID())
|
||||
idx, res, err := s.PreparedQueryGet(testUUID())
|
||||
if idx != 0 || res != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||
}
|
||||
|
||||
// Inserting a query with empty ID is disallowed.
|
||||
if err := s.QuerySet(1, &structs.PreparedQuery{}); err == nil {
|
||||
if err := s.PreparedQuerySet(1, &structs.PreparedQuery{}); err == nil {
|
||||
t.Fatalf("expected %#v, got: %#v", ErrMissingQueryID, err)
|
||||
}
|
||||
|
||||
// Index is not updated if nothing is saved.
|
||||
if idx := s.maxIndex("queries"); idx != 0 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 0 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -36,13 +36,13 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
}
|
||||
|
||||
// The set will still fail because the service isn't registered yet.
|
||||
err = s.QuerySet(1, query)
|
||||
err = s.PreparedQuerySet(1, query)
|
||||
if err == nil || !strings.Contains(err.Error(), "invalid service") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Index is not updated if nothing is saved.
|
||||
if idx := s.maxIndex("queries"); idx != 0 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 0 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -51,12 +51,12 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
testRegisterService(t, s, 2, "foo", "redis")
|
||||
|
||||
// This should go through.
|
||||
if err := s.QuerySet(3, query); err != nil {
|
||||
if err := s.PreparedQuerySet(3, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex("queries"); idx != 3 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
ModifyIndex: 3,
|
||||
},
|
||||
}
|
||||
idx, actual, err := s.QueryGet(query.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -84,19 +84,19 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
|
||||
// Give it a name and set it again.
|
||||
query.Name = "test-query"
|
||||
if err := s.QuerySet(4, query); err != nil {
|
||||
if err := s.PreparedQuerySet(4, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex("queries"); idx != 4 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Read it back and verify the data was updated as well as the index.
|
||||
expected.Name = "test-query"
|
||||
expected.ModifyIndex = 4
|
||||
idx, actual, err = s.QueryGet(query.ID)
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -109,13 +109,13 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
|
||||
// Try to tie it to a bogus session.
|
||||
query.Session = testUUID()
|
||||
err = s.QuerySet(5, query)
|
||||
err = s.PreparedQuerySet(5, query)
|
||||
if err == nil || !strings.Contains(err.Error(), "invalid session") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Index is not updated if nothing is saved.
|
||||
if idx := s.maxIndex("queries"); idx != 4 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -127,19 +127,19 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
if err := s.SessionCreate(5, session); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := s.QuerySet(6, query); err != nil {
|
||||
if err := s.PreparedQuerySet(6, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex("queries"); idx != 6 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Read it back and verify the data was updated as well as the index.
|
||||
expected.Session = query.Session
|
||||
expected.ModifyIndex = 6
|
||||
idx, actual, err = s.QueryGet(query.ID)
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -159,18 +159,18 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
Service: "redis",
|
||||
},
|
||||
}
|
||||
err = s.QuerySet(7, evil)
|
||||
err = s.PreparedQuerySet(7, evil)
|
||||
if err == nil || !strings.Contains(err.Error(), "aliases an existing query") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Index is not updated if nothing is saved.
|
||||
if idx := s.maxIndex("queries"); idx != 6 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err = s.QueryGet(evil.ID)
|
||||
idx, actual, err = s.PreparedQueryGet(evil.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Query_QueryDelete(t *testing.T) {
|
||||
func TestStateStore_PreparedQueryDelete(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Set up our test environment.
|
||||
|
@ -198,22 +198,22 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
// Deleting a query that doesn't exist should be a no-op.
|
||||
if err := s.QueryDelete(3, query.ID); err != nil {
|
||||
if err := s.PreparedQueryDelete(3, query.ID); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Index is not updated if nothing is saved.
|
||||
if idx := s.maxIndex("queries"); idx != 0 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 0 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Now add the query to the data store.
|
||||
if err := s.QuerySet(3, query); err != nil {
|
||||
if err := s.PreparedQuerySet(3, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex("queries"); idx != 3 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -228,7 +228,7 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
|
|||
ModifyIndex: 3,
|
||||
},
|
||||
}
|
||||
idx, actual, err := s.QueryGet(query.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -240,17 +240,17 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
// Now delete it.
|
||||
if err := s.QueryDelete(4, query.ID); err != nil {
|
||||
if err := s.PreparedQueryDelete(4, query.ID); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex("queries"); idx != 4 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err = s.QueryGet(query.ID)
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Query_QueryLookup(t *testing.T) {
|
||||
func TestStateStore_PreparedQueryLookup(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Set up our test environment.
|
||||
|
@ -280,7 +280,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
|
|||
|
||||
// Try to lookup a query that's not there using something that looks
|
||||
// like a real ID.
|
||||
idx, actual, err := s.QueryLookup(query.ID)
|
||||
idx, actual, err := s.PreparedQueryLookup(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -293,7 +293,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
|
|||
|
||||
// Try to lookup a query that's not there using something that looks
|
||||
// like a name
|
||||
idx, actual, err = s.QueryLookup(query.Name)
|
||||
idx, actual, err = s.PreparedQueryLookup(query.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -305,12 +305,12 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
|
|||
}
|
||||
|
||||
// Now actually insert the query.
|
||||
if err := s.QuerySet(3, query); err != nil {
|
||||
if err := s.PreparedQuerySet(3, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex("queries"); idx != 3 {
|
||||
if idx := s.maxIndex("prepared-queries"); idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -326,7 +326,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
|
|||
ModifyIndex: 3,
|
||||
},
|
||||
}
|
||||
idx, actual, err = s.QueryLookup(query.ID)
|
||||
idx, actual, err = s.PreparedQueryLookup(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -338,7 +338,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
|
|||
}
|
||||
|
||||
// Read it back using the name and verify it again.
|
||||
idx, actual, err = s.QueryLookup(query.Name)
|
||||
idx, actual, err = s.PreparedQueryLookup(query.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -351,12 +351,12 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
|
|||
|
||||
// Make sure an empty lookup is well-behaved if there are actual queries
|
||||
// in the state store.
|
||||
if _, _, err = s.QueryLookup(""); err != ErrMissingQueryID {
|
||||
if _, _, err = s.PreparedQueryLookup(""); err != ErrMissingQueryID {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Query_QueryList(t *testing.T) {
|
||||
func TestStateStore_PreparedQueryList(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Set up our test environment.
|
||||
|
@ -389,7 +389,7 @@ func TestStateStore_Query_QueryList(t *testing.T) {
|
|||
|
||||
// Now create the queries.
|
||||
for i, query := range queries {
|
||||
if err := s.QuerySet(uint64(4+i), query); err != nil {
|
||||
if err := s.PreparedQuerySet(uint64(4+i), query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -419,7 +419,7 @@ func TestStateStore_Query_QueryList(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
idx, actual, err := s.QueryList()
|
||||
idx, actual, err := s.PreparedQueryList()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -431,7 +431,7 @@ func TestStateStore_Query_QueryList(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
|
||||
func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Set up our test environment.
|
||||
|
@ -464,7 +464,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
|
|||
|
||||
// Now create the queries.
|
||||
for i, query := range queries {
|
||||
if err := s.QuerySet(uint64(4+i), query); err != nil {
|
||||
if err := s.PreparedQuerySet(uint64(4+i), query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -474,7 +474,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
|
|||
defer snap.Close()
|
||||
|
||||
// Alter the real state store.
|
||||
if err := s.QueryDelete(6, queries[0].ID); err != nil {
|
||||
if err := s.PreparedQueryDelete(6, queries[0].ID); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
|
@ -506,7 +506,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
iter, err := snap.Queries()
|
||||
iter, err := snap.PreparedQueries()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -523,7 +523,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
restore := s.Restore()
|
||||
for _, query := range dump {
|
||||
if err := restore.Query(query); err != nil {
|
||||
if err := restore.PreparedQuery(query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -531,7 +531,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
|
|||
|
||||
// Read the restored queries back out and verify that they
|
||||
// match.
|
||||
idx, actual, err := s.QueryList()
|
||||
idx, actual, err := s.PreparedQueryList()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -544,7 +544,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
|
||||
func TestStateStore_Query_Watches(t *testing.T) {
|
||||
func TestStateStore_PreparedQuery_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Set up our test environment.
|
||||
|
@ -560,19 +560,19 @@ func TestStateStore_Query_Watches(t *testing.T) {
|
|||
|
||||
// Call functions that update the queries table and make sure a watch
|
||||
// fires each time.
|
||||
verifyWatch(t, s.getTableWatch("queries"), func() {
|
||||
if err := s.QuerySet(3, query); err != nil {
|
||||
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||
if err := s.PreparedQuerySet(3, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("queries"), func() {
|
||||
if err := s.QueryDelete(4, query.ID); err != nil {
|
||||
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||
if err := s.PreparedQueryDelete(4, query.ID); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("queries"), func() {
|
||||
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||
restore := s.Restore()
|
||||
if err := restore.Query(query); err != nil {
|
||||
if err := restore.PreparedQuery(query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
restore.Commit()
|
|
@ -30,7 +30,7 @@ func stateStoreSchema() *memdb.DBSchema {
|
|||
sessionChecksTableSchema,
|
||||
aclsTableSchema,
|
||||
coordinatesTableSchema,
|
||||
queriesTableSchema,
|
||||
preparedQueriesTableSchema,
|
||||
}
|
||||
|
||||
// Add the tables to the root schema
|
||||
|
@ -367,11 +367,11 @@ func coordinatesTableSchema() *memdb.TableSchema {
|
|||
}
|
||||
}
|
||||
|
||||
// queriesTableSchema returns a new table schema used for storing
|
||||
// preparedQueriesTableSchema returns a new table schema used for storing
|
||||
// prepared queries.
|
||||
func queriesTableSchema() *memdb.TableSchema {
|
||||
func preparedQueriesTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "queries",
|
||||
Name: "prepared-queries",
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": &memdb.IndexSchema{
|
||||
Name: "id",
|
||||
|
|
|
@ -413,8 +413,8 @@ func (s *StateStore) getWatchTables(method string) []string {
|
|||
return []string{"acls"}
|
||||
case "Coordinates":
|
||||
return []string{"coordinates"}
|
||||
case "QueryGet", "QueryLookup", "QueryList":
|
||||
return []string{"queries"}
|
||||
case "PreparedQueryGet", "PreparedQueryLookup", "PreparedQueryList":
|
||||
return []string{"prepared-queries"}
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("Unknown method %s", method))
|
||||
|
@ -2141,9 +2141,9 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
}
|
||||
|
||||
// Delete any prepared queries.
|
||||
queries, err := tx.Get("queries", "session", sessionID)
|
||||
queries, err := tx.Get("prepared-queries", "session", sessionID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed query lookup: %s", err)
|
||||
return fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
{
|
||||
var objs []interface{}
|
||||
|
@ -2154,8 +2154,8 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
for _, obj := range objs {
|
||||
q := obj.(*structs.PreparedQuery)
|
||||
if err := s.queryDeleteTxn(tx, idx, watches, q.ID); err != nil {
|
||||
return fmt.Errorf("failed query delete: %s", err)
|
||||
if err := s.preparedQueryDeleteTxn(tx, idx, watches, q.ID); err != nil {
|
||||
return fmt.Errorf("failed prepared query delete: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4445,7 +4445,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Session_Invalidate_Query_Delete(t *testing.T) {
|
||||
func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Set up our test environment.
|
||||
|
@ -4465,13 +4465,13 @@ func TestStateStore_Session_Invalidate_Query_Delete(t *testing.T) {
|
|||
Service: "redis",
|
||||
},
|
||||
}
|
||||
if err := s.QuerySet(4, query); err != nil {
|
||||
if err := s.PreparedQuerySet(4, query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Invalidate the session and make sure the watches fire.
|
||||
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||
verifyWatch(t, s.getTableWatch("queries"), func() {
|
||||
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||
if err := s.SessionDestroy(5, session.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -4491,7 +4491,7 @@ func TestStateStore_Session_Invalidate_Query_Delete(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the query is gone and the index is updated.
|
||||
idx, q2, err := s.QueryGet(query.ID)
|
||||
idx, q2, err := s.PreparedQueryGet(query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -92,34 +92,29 @@ type PreparedQuery struct {
|
|||
|
||||
type PreparedQueries []*PreparedQuery
|
||||
|
||||
type QueryOp string
|
||||
type PreparedQueryOp string
|
||||
|
||||
const (
|
||||
QueryCreate QueryOp = "create"
|
||||
QueryUpdate = "update"
|
||||
QueryDelete = "delete"
|
||||
PreparedQueryCreate PreparedQueryOp = "create"
|
||||
PreparedQueryUpdate = "update"
|
||||
PreparedQueryDelete = "delete"
|
||||
)
|
||||
|
||||
// QueryRequest is used to create or change prepared queries.
|
||||
type QueryRequest struct {
|
||||
type PreparedQueryRequest struct {
|
||||
Datacenter string
|
||||
Op QueryOp
|
||||
Op PreparedQueryOp
|
||||
Query PreparedQuery
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (q *QueryRequest) RequestDatacenter() string {
|
||||
func (q *PreparedQueryRequest) RequestDatacenter() string {
|
||||
return q.Datacenter
|
||||
}
|
||||
|
||||
// QueryResponse is used to return the ID of an updated query.
|
||||
type QueryResponse struct {
|
||||
ID string
|
||||
}
|
||||
|
||||
// QueryExecuteRequest is used to execute a prepared query.
|
||||
type QueryExecuteRequest struct {
|
||||
// PreparedQueryExecuteRequest is used to execute a prepared query.
|
||||
type PreparedQueryExecuteRequest struct {
|
||||
Datacenter string
|
||||
QueryIDOrName string
|
||||
Source QuerySource
|
||||
|
@ -127,26 +122,26 @@ type QueryExecuteRequest struct {
|
|||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (q *QueryExecuteRequest) RequestDatacenter() string {
|
||||
func (q *PreparedQueryExecuteRequest) RequestDatacenter() string {
|
||||
return q.Datacenter
|
||||
}
|
||||
|
||||
// QueryExecuteRemoteRequest is used when running a local query in a remote
|
||||
// datacenter. We have to ship the entire query over since it won't be
|
||||
// PreparedQueryExecuteRemoteRequest is used when running a local query in a
|
||||
// remote datacenter. We have to ship the entire query over since it won't be
|
||||
// present in the remote state store.
|
||||
type QueryExecuteRemoteRequest struct {
|
||||
type PreparedQueryExecuteRemoteRequest struct {
|
||||
Datacenter string
|
||||
Query PreparedQuery
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (q *QueryExecuteRemoteRequest) RequestDatacenter() string {
|
||||
func (q *PreparedQueryExecuteRemoteRequest) RequestDatacenter() string {
|
||||
return q.Datacenter
|
||||
}
|
||||
|
||||
// QueryExecuteResponse has the results of executing a query.
|
||||
type QueryExecuteResponse struct {
|
||||
// PreparedQueryExecuteResponse has the results of executing a query.
|
||||
type PreparedQueryExecuteResponse struct {
|
||||
Nodes CheckServiceNodes
|
||||
DNS QueryDNSOptions
|
||||
}
|
|
@ -35,7 +35,7 @@ const (
|
|||
ACLRequestType
|
||||
TombstoneRequestType
|
||||
CoordinateBatchUpdateType
|
||||
QueryRequestType
|
||||
PreparedQueryRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue