Merge pull request #1389 from hashicorp/f-prepared-queries

Adds prepared queries.
This commit is contained in:
James Phillips 2015-11-17 11:36:28 -08:00
commit 80e7a0de46
34 changed files with 7750 additions and 842 deletions

View File

@ -26,6 +26,10 @@ FEATURES:
* Consul now builds under Go 1.5.1 by default [GH-1345]
* Added built-in support for running health checks inside Docker containers
[GH-1343]
* Added prepared queries which support service health queries with rich
features such as filters for multiple tags and failover to remote datacenters
based on network coordinates; these are available via HTTP as well as the
DNS interface [GH-1389]
BUG FIXES:

View File

@ -70,6 +70,12 @@ type ACL interface {
// ACLModify checks for permission to manipulate ACLs
ACLModify() bool
// QueryList checks for permission to list all the prepared queries.
QueryList() bool
// QueryModify checks for permission to modify any prepared query.
QueryModify() bool
}
// StaticACL is used to implement a base ACL policy. It either
@ -124,6 +130,14 @@ func (s *StaticACL) ACLModify() bool {
return s.allowManage
}
func (s *StaticACL) QueryList() bool {
return s.allowManage
}
func (s *StaticACL) QueryModify() bool {
return s.allowManage
}
// AllowAll returns an ACL rule that allows all operations
func AllowAll() ACL {
return allowAll
@ -374,3 +388,13 @@ func (p *PolicyACL) ACLList() bool {
func (p *PolicyACL) ACLModify() bool {
return p.parent.ACLModify()
}
// QueryList checks if listing of all prepared queries is allowed.
func (p *PolicyACL) QueryList() bool {
return p.parent.QueryList()
}
// QueryModify checks if modifying of any prepared query is allowed.
func (p *PolicyACL) QueryModify() bool {
return p.parent.QueryModify()
}

View File

@ -65,6 +65,12 @@ func TestStaticACL(t *testing.T) {
if all.ACLModify() {
t.Fatalf("should not allow")
}
if all.QueryList() {
t.Fatalf("should not allow")
}
if all.QueryModify() {
t.Fatalf("should not allow")
}
if none.KeyRead("foobar") {
t.Fatalf("should not allow")
@ -102,6 +108,12 @@ func TestStaticACL(t *testing.T) {
if none.ACLModify() {
t.Fatalf("should not allow")
}
if none.QueryList() {
t.Fatalf("should not allow")
}
if none.QueryModify() {
t.Fatalf("should not allow")
}
if !manage.KeyRead("foobar") {
t.Fatalf("should allow")
@ -133,6 +145,12 @@ func TestStaticACL(t *testing.T) {
if !manage.ACLModify() {
t.Fatalf("should allow")
}
if !manage.QueryList() {
t.Fatalf("should allow")
}
if !manage.QueryModify() {
t.Fatalf("should allow")
}
}
func TestPolicyACL(t *testing.T) {
@ -369,6 +387,20 @@ func TestPolicyACL_Parent(t *testing.T) {
t.Fatalf("Write fail: %#v", c)
}
}
// Check some management functions that chain up
if acl.ACLList() {
t.Fatalf("should not allow")
}
if acl.ACLModify() {
t.Fatalf("should not allow")
}
if acl.QueryList() {
t.Fatalf("should not allow")
}
if acl.QueryModify() {
t.Fatalf("should not allow")
}
}
func TestPolicyACL_Keyring(t *testing.T) {

173
api/prepared_query.go Normal file
View File

@ -0,0 +1,173 @@
package api
// QueryDatacenterOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter.
type QueryDatacenterOptions struct {
// NearestN is set to the number of remote datacenters to try, based on
// network coordinates.
NearestN int
// Datacenters is a fixed list of datacenters to try after NearestN. We
// never try a datacenter multiple times, so those are subtracted from
// this list before proceeding.
Datacenters []string
}
// QueryDNSOptions controls settings when query results are served over DNS.
type QueryDNSOptions struct {
// TTL is the time to live for the served DNS results.
TTL string
}
// ServiceQuery is used to query for a set of healthy nodes offering a specific
// service.
type ServiceQuery struct {
// Service is the service to query.
Service string
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryDatacenterOptions
// If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be
// discarded)
OnlyPassing bool
// Tags are a set of required and/or disallowed tags. If a tag is in
// this list it must be present. If the tag is preceded with "!" then
// it is disallowed.
Tags []string
}
// PrepatedQueryDefinition defines a complete prepared query.
type PreparedQueryDefinition struct {
// ID is this UUID-based ID for the query, always generated by Consul.
ID string
// Name is an optional friendly name for the query supplied by the
// user. NOTE - if this feature is used then it will reduce the security
// of any read ACL associated with this query/service since this name
// can be used to locate nodes with supplying any ACL.
Name string
// Session is an optional session to tie this query's lifetime to. If
// this is omitted then the query will not expire.
Session string
// Token is the ACL token used when the query was created, and it is
// used when a query is subsequently executed. This token, or a token
// with management privileges, must be used to change the query later.
Token string
// Service defines a service query (leaving things open for other types
// later).
Service ServiceQuery
// DNS has options that control how the results of this query are
// served over DNS.
DNS QueryDNSOptions
}
// PreparedQueryExecuteResponse has the results of executing a query.
type PreparedQueryExecuteResponse struct {
// Service is the service that was queried.
Service string
// Nodes has the nodes that were output by the query.
Nodes []ServiceEntry
// DNS has the options for serving these results over DNS.
DNS QueryDNSOptions
// Datacenter is the datacenter that these results came from.
Datacenter string
// Failovers is a count of how many times we had to query a remote
// datacenter.
Failovers int
}
// PreparedQuery can be used to query the prepared query endpoints.
type PreparedQuery struct {
c *Client
}
// PreparedQuery returns a handle to the prepared query endpoints.
func (c *Client) PreparedQuery() *PreparedQuery {
return &PreparedQuery{c}
}
// Create makes a new prepared query. The ID of the new query is returned.
func (c *PreparedQuery) Create(query *PreparedQueryDefinition, q *WriteOptions) (string, *WriteMeta, error) {
r := c.c.newRequest("POST", "/v1/query")
r.setWriteOptions(q)
r.obj = query
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// Update makes updates to an existing prepared query.
func (c *PreparedQuery) Update(query *PreparedQueryDefinition, q *WriteOptions) (*WriteMeta, error) {
return c.c.write("/v1/query/"+query.ID, query, nil, q)
}
// List is used to fetch all the prepared queries (always requires a management
// token).
func (c *PreparedQuery) List(q *QueryOptions) ([]*PreparedQueryDefinition, *QueryMeta, error) {
var out []*PreparedQueryDefinition
qm, err := c.c.query("/v1/query", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Get is used to fetch a specific prepared query.
func (c *PreparedQuery) Get(queryID string, q *QueryOptions) ([]*PreparedQueryDefinition, *QueryMeta, error) {
var out []*PreparedQueryDefinition
qm, err := c.c.query("/v1/query/"+queryID, &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Delete is used to delete a specific prepared query.
func (c *PreparedQuery) Delete(queryID string, q *QueryOptions) (*QueryMeta, error) {
r := c.c.newRequest("DELETE", "/v1/query/"+queryID)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
return qm, nil
}
// Execute is used to execute a specific prepared query. You can execute using
// a query ID or name.
func (c *PreparedQuery) Execute(queryIDOrName string, q *QueryOptions) (*PreparedQueryExecuteResponse, *QueryMeta, error) {
var out *PreparedQueryExecuteResponse
qm, err := c.c.query("/v1/query/"+queryIDOrName+"/execute", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

123
api/prepared_query_test.go Normal file
View File

@ -0,0 +1,123 @@
package api
import (
"reflect"
"testing"
"github.com/hashicorp/consul/testutil"
)
func TestPreparedQuery(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
// Set up a node and a service.
reg := &CatalogRegistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
Service: &AgentService{
ID: "redis1",
Service: "redis",
Tags: []string{"master", "v1"},
Port: 8000,
},
}
catalog := c.Catalog()
testutil.WaitForResult(func() (bool, error) {
if _, err := catalog.Register(reg, nil); err != nil {
return false, err
}
if _, _, err := catalog.Node("foobar", nil); err != nil {
return false, err
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Create a simple prepared query.
def := &PreparedQueryDefinition{
Service: ServiceQuery{
Service: "redis",
},
}
query := c.PreparedQuery()
var err error
def.ID, _, err = query.Create(def, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
// Read it back.
defs, _, err := query.Get(def.ID, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(defs) != 1 || !reflect.DeepEqual(defs[0], def) {
t.Fatalf("bad: %v", defs)
}
// List them all.
defs, _, err = query.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(defs) != 1 || !reflect.DeepEqual(defs[0], def) {
t.Fatalf("bad: %v", defs)
}
// Make an update.
def.Name = "my-query"
_, err = query.Update(def, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
// Read it back again to verify the update worked.
defs, _, err = query.Get(def.ID, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(defs) != 1 || !reflect.DeepEqual(defs[0], def) {
t.Fatalf("bad: %v", defs)
}
// Execute by ID.
results, _, err := query.Execute(def.ID, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(results.Nodes) != 1 || results.Nodes[0].Node.Node != "foobar" {
t.Fatalf("bad: %v", results)
}
// Execute by name.
results, _, err = query.Execute("my-query", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(results.Nodes) != 1 || results.Nodes[0].Node.Node != "foobar" {
t.Fatalf("bad: %v", results)
}
// Delete it.
_, err = query.Delete(def.ID, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
// Make sure there are no longer any queries.
defs, _, err = query.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(defs) != 0 {
t.Fatalf("bad: %v", defs)
}
}

View File

@ -9,6 +9,7 @@ import (
"net"
"os"
"path/filepath"
"reflect"
"regexp"
"strconv"
"sync"
@ -104,6 +105,11 @@ type Agent struct {
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
// endpoints lets you override RPC endpoints for testing. Not all
// agent methods use this, so use with care and never override
// outside of a unit test.
endpoints map[string]string
}
// Create is used to create a new Agent. Returns
@ -158,6 +164,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}
// Initialize the local state
@ -1456,3 +1463,30 @@ func (a *Agent) DisableNodeMaintenance() {
a.RemoveCheck(nodeMaintCheckID, true)
a.logger.Printf("[INFO] agent: Node left maintenance mode")
}
// InjectEndpoint overrides the given endpoint with a substitute one. Note
// that not all agent methods use this mechanism, and that is should only
// be used for testing.
func (a *Agent) InjectEndpoint(endpoint string, handler interface{}) error {
if a.server == nil {
return fmt.Errorf("agent must be a server")
}
if err := a.server.InjectEndpoint(handler); err != nil {
return err
}
name := reflect.Indirect(reflect.ValueOf(handler)).Type().Name()
a.endpoints[endpoint] = name
a.logger.Printf("[WARN] agent: endpoint injected; this should only be used for testing")
return nil
}
// getEndpoint returns the endpoint name to use for the given endpoint,
// which may be overridden.
func (a *Agent) getEndpoint(endpoint string) string {
if override, ok := a.endpoints[endpoint]; ok {
return override
}
return endpoint
}

View File

@ -4,12 +4,12 @@ import (
"fmt"
"io"
"log"
"math/rand"
"net"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/miekg/dns"
)
@ -290,7 +290,7 @@ func (d *DNSServer) dispatch(network string, req, resp *dns.Msg) {
// Split into the label parts
labels := dns.SplitDomainName(qName)
// The last label is either "node", "service" or a datacenter name
// The last label is either "node", "service", "query", or a datacenter name
PARSE:
n := len(labels)
if n == 0 {
@ -330,13 +330,23 @@ PARSE:
}
case "node":
if len(labels) == 1 {
if n == 1 {
goto INVALID
}
// Allow a "." in the node name, just join all the parts
node := strings.Join(labels[:n-1], ".")
d.nodeLookup(network, datacenter, node, req, resp)
case "query":
if n == 1 {
goto INVALID
}
// Allow a "." in the query name, just join all the parts.
query := strings.Join(labels[:n-1], ".")
d.preparedQueryLookup(network, datacenter, query, req, resp)
default:
// Store the DC, and re-parse
datacenter = labels[n-1]
@ -499,7 +509,7 @@ RPC:
}
// Filter out any service nodes due to health checks
out.Nodes = d.filterServiceNodes(out.Nodes)
out.Nodes = out.Nodes.Filter(d.config.OnlyPassing)
// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
@ -509,7 +519,7 @@ RPC:
}
// Perform a random shuffle
shuffleServiceNodes(out.Nodes)
out.Nodes.Shuffle()
// Add various responses depending on the request
qType := req.Question[0].Qtype
@ -536,33 +546,99 @@ RPC:
}
}
// filterServiceNodes is used to filter out nodes that are failing
// health checks to prevent routing to unhealthy nodes
func (d *DNSServer) filterServiceNodes(nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
n := len(nodes)
OUTER:
for i := 0; i < n; i++ {
node := nodes[i]
for _, check := range node.Checks {
if check.Status == structs.HealthCritical ||
(d.config.OnlyPassing && check.Status != structs.HealthPassing) {
d.logger.Printf("[WARN] dns: node '%s' failing health check '%s: %s', dropping from service '%s'",
node.Node.Node, check.CheckID, check.Name, node.Service.Service)
nodes[i], nodes[n-1] = nodes[n-1], structs.CheckServiceNode{}
n--
i--
continue OUTER
}
// preparedQueryLookup is used to handle a prepared query.
func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req, resp *dns.Msg) {
// Execute the prepared query.
args := structs.PreparedQueryExecuteRequest{
Datacenter: datacenter,
QueryIDOrName: query,
QueryOptions: structs.QueryOptions{
Token: d.agent.config.ACLToken,
AllowStale: d.config.AllowStale,
},
}
// TODO (slackpad) - What's a safe limit we can set here? It seems like
// with dup filtering done at this level we need to get everything to
// match the previous behavior. We can optimize by pushing more filtering
// into the query execution, but for now I think we need to get the full
// response. We could also choose a large arbitrary number that will
// likely work in practice, like 10*maxServiceResponses which should help
// reduce bandwidth if there are thousands of nodes available.
endpoint := d.agent.getEndpoint(preparedQueryEndpoint)
var out structs.PreparedQueryExecuteResponse
RPC:
if err := d.agent.RPC(endpoint+".Execute", &args, &out); err != nil {
// If they give a bogus query name, treat that as a name error,
// not a full on server error. We have to use a string compare
// here since the RPC layer loses the type information.
if err.Error() == consul.ErrQueryNotFound.Error() {
d.addSOA(d.domain, resp)
resp.SetRcode(req, dns.RcodeNameError)
return
}
d.logger.Printf("[ERR] dns: rpc error: %v", err)
resp.SetRcode(req, dns.RcodeServerFailure)
return
}
// Verify that request is not too stale, redo the request.
if args.AllowStale && out.LastContact > d.config.MaxStale {
args.AllowStale = false
d.logger.Printf("[WARN] dns: Query results too stale, re-requesting")
goto RPC
}
// Determine the TTL. The parse should never fail since we vet it when
// the query is created, but we check anyway. If the query didn't
// specify a TTL then we will try to use the agent's service-specific
// TTL configs.
var ttl time.Duration
if out.DNS.TTL != "" {
var err error
ttl, err = time.ParseDuration(out.DNS.TTL)
if err != nil {
d.logger.Printf("[WARN] dns: Failed to parse TTL '%s' for prepared query '%s', ignoring", out.DNS.TTL, query)
}
} else if d.config.ServiceTTL != nil {
var ok bool
ttl, ok = d.config.ServiceTTL[out.Service]
if !ok {
ttl = d.config.ServiceTTL["*"]
}
}
return nodes[:n]
}
// shuffleServiceNodes does an in-place random shuffle using the Fisher-Yates algorithm
func shuffleServiceNodes(nodes structs.CheckServiceNodes) {
for i := len(nodes) - 1; i > 0; i-- {
j := rand.Int31() % int32(i+1)
nodes[i], nodes[j] = nodes[j], nodes[i]
// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
d.addSOA(d.domain, resp)
resp.SetRcode(req, dns.RcodeNameError)
return
}
// Add various responses depending on the request.
qType := req.Question[0].Qtype
d.serviceNodeRecords(out.Nodes, req, resp, ttl)
if qType == dns.TypeSRV {
d.serviceSRVRecords(datacenter, out.Nodes, req, resp, ttl)
}
// If the network is not TCP, restrict the number of responses.
if network != "tcp" && len(resp.Answer) > maxServiceResponses {
resp.Answer = resp.Answer[:maxServiceResponses]
// Flag that there are more records to return in the UDP
// response.
if d.config.EnableTruncate {
resp.Truncated = true
}
}
// If the answer is empty, return not found.
if len(resp.Answer) == 0 {
d.addSOA(d.domain, resp)
return
}
}

File diff suppressed because it is too large Load Diff

View File

@ -265,6 +265,9 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled))
}
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.mux.HandleFunc("/v1/query/", s.wrap(s.PreparedQuerySpecific))
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
@ -307,6 +310,15 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
}
}
// TODO (slackpad) We may want to consider redacting prepared
// query names/IDs here since they are proxies for tokens. But,
// knowing one only gives you read access to service listings
// which is pretty trivial, so it's probably not worth the code
// complexity and overhead of filtering them out. You can't
// recover the token it's a proxy for with just the query info;
// you'd need the actual token (or a management token) to read
// that back.
// Invoke the handler
start := time.Now()
defer func() {

View File

@ -0,0 +1,224 @@
package agent
import (
"fmt"
"net/http"
"strconv"
"strings"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
)
const (
preparedQueryEndpoint = "PreparedQuery"
preparedQueryExecuteSuffix = "/execute"
)
// preparedQueryCreateResponse is used to wrap the query ID.
type preparedQueryCreateResponse struct {
ID string
}
// preparedQueryCreate makes a new prepared query.
func (s *HTTPServer) preparedQueryCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryRequest{
Op: structs.PreparedQueryCreate,
}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
if req.ContentLength > 0 {
if err := decodeBody(req, &args.Query, nil); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err)))
return nil, nil
}
}
var reply string
endpoint := s.agent.getEndpoint(preparedQueryEndpoint)
if err := s.agent.RPC(endpoint+".Apply", &args, &reply); err != nil {
return nil, err
}
return preparedQueryCreateResponse{reply}, nil
}
// preparedQueryList returns all the prepared queries.
func (s *HTTPServer) preparedQueryList(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.DCSpecificRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.IndexedPreparedQueries
endpoint := s.agent.getEndpoint(preparedQueryEndpoint)
if err := s.agent.RPC(endpoint+".List", &args, &reply); err != nil {
return nil, err
}
// Use empty list instead of nil.
if reply.Queries == nil {
reply.Queries = make(structs.PreparedQueries, 0)
}
return reply.Queries, nil
}
// PreparedQueryGeneral handles all the general prepared query requests.
func (s *HTTPServer) PreparedQueryGeneral(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case "POST":
return s.preparedQueryCreate(resp, req)
case "GET":
return s.preparedQueryList(resp, req)
default:
resp.WriteHeader(405)
return nil, nil
}
}
// parseLimit parses the optional limit parameter for a prepared query execution.
func parseLimit(req *http.Request, limit *int) error {
*limit = 0
if arg := req.URL.Query().Get("limit"); arg != "" {
if i, err := strconv.Atoi(arg); err != nil {
return err
} else {
*limit = i
}
}
return nil
}
// preparedQueryExecute executes a prepared query.
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
if err := parseLimit(req, &args.Limit); err != nil {
return nil, fmt.Errorf("Bad limit: %s", err)
}
var reply structs.PreparedQueryExecuteResponse
endpoint := s.agent.getEndpoint(preparedQueryEndpoint)
if err := s.agent.RPC(endpoint+".Execute", &args, &reply); err != nil {
// We have to check the string since the RPC sheds
// the specific error type.
if err.Error() == consul.ErrQueryNotFound.Error() {
resp.WriteHeader(404)
resp.Write([]byte(err.Error()))
return nil, nil
}
return nil, err
}
// Use empty list instead of nil.
if reply.Nodes == nil {
reply.Nodes = make(structs.CheckServiceNodes, 0)
}
return reply, nil
}
// preparedQueryGet returns a single prepared query.
func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQuerySpecificRequest{
QueryID: id,
}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.IndexedPreparedQueries
endpoint := s.agent.getEndpoint(preparedQueryEndpoint)
if err := s.agent.RPC(endpoint+".Get", &args, &reply); err != nil {
// We have to check the string since the RPC sheds
// the specific error type.
if err.Error() == consul.ErrQueryNotFound.Error() {
resp.WriteHeader(404)
resp.Write([]byte(err.Error()))
return nil, nil
}
return nil, err
}
return reply.Queries, nil
}
// preparedQueryUpdate updates a prepared query.
func (s *HTTPServer) preparedQueryUpdate(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryRequest{
Op: structs.PreparedQueryUpdate,
}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
if req.ContentLength > 0 {
if err := decodeBody(req, &args.Query, nil); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err)))
return nil, nil
}
}
// Take the ID from the URL, not the embedded one.
args.Query.ID = id
var reply string
endpoint := s.agent.getEndpoint(preparedQueryEndpoint)
if err := s.agent.RPC(endpoint+".Apply", &args, &reply); err != nil {
return nil, err
}
return nil, nil
}
// preparedQueryDelete deletes prepared query.
func (s *HTTPServer) preparedQueryDelete(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryRequest{
Op: structs.PreparedQueryDelete,
Query: &structs.PreparedQuery{
ID: id,
},
}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
var reply string
endpoint := s.agent.getEndpoint(preparedQueryEndpoint)
if err := s.agent.RPC(endpoint+".Apply", &args, &reply); err != nil {
return nil, err
}
return nil, nil
}
// PreparedQuerySpecific handles all the prepared query requests specific to a
// particular query.
func (s *HTTPServer) PreparedQuerySpecific(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
id := strings.TrimPrefix(req.URL.Path, "/v1/query/")
execute := false
if strings.HasSuffix(id, preparedQueryExecuteSuffix) {
execute = true
id = strings.TrimSuffix(id, preparedQueryExecuteSuffix)
}
switch req.Method {
case "GET":
if execute {
return s.preparedQueryExecute(id, resp, req)
} else {
return s.preparedQueryGet(id, resp, req)
}
case "PUT":
return s.preparedQueryUpdate(id, resp, req)
case "DELETE":
return s.preparedQueryDelete(id, resp, req)
default:
resp.WriteHeader(405)
return nil, nil
}
}

View File

@ -0,0 +1,783 @@
package agent
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
// MockPreparedQuery is a fake endpoint that we inject into the Consul server
// in order to observe the RPC calls made by these HTTP endpoints. This lets
// us make sure that the request is being formed properly without having to
// set up a realistic environment for prepared queries, which is a huge task and
// already done in detail inside the prepared query endpoint's unit tests. If we
// can prove this formats proper requests into that then we should be good to
// go. We will do a single set of end-to-end tests in here to make sure that the
// server is wired up to the right endpoint when not "injected".
type MockPreparedQuery struct {
applyFn func(*structs.PreparedQueryRequest, *string) error
getFn func(*structs.PreparedQuerySpecificRequest, *structs.IndexedPreparedQueries) error
listFn func(*structs.DCSpecificRequest, *structs.IndexedPreparedQueries) error
executeFn func(*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse) error
}
func (m *MockPreparedQuery) Apply(args *structs.PreparedQueryRequest,
reply *string) (err error) {
if m.applyFn != nil {
return m.applyFn(args, reply)
}
return fmt.Errorf("should not have called Apply")
}
func (m *MockPreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
reply *structs.IndexedPreparedQueries) error {
if m.getFn != nil {
return m.getFn(args, reply)
}
return fmt.Errorf("should not have called Get")
}
func (m *MockPreparedQuery) List(args *structs.DCSpecificRequest,
reply *structs.IndexedPreparedQueries) error {
if m.listFn != nil {
return m.listFn(args, reply)
}
return fmt.Errorf("should not have called List")
}
func (m *MockPreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
reply *structs.PreparedQueryExecuteResponse) error {
if m.executeFn != nil {
return m.executeFn(args, reply)
}
return fmt.Errorf("should not have called Execute")
}
func TestPreparedQuery_Create(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.applyFn = func(args *structs.PreparedQueryRequest, reply *string) error {
expected := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "my-query",
Session: "my-session",
Service: structs.ServiceQuery{
Service: "my-service",
Failover: structs.QueryDatacenterOptions{
NearestN: 4,
Datacenters: []string{"dc1", "dc2"},
},
OnlyPassing: true,
Tags: []string{"foo", "bar"},
},
DNS: structs.QueryDNSOptions{
TTL: "10s",
},
},
WriteRequest: structs.WriteRequest{
Token: "my-token",
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
*reply = "my-id"
return nil
}
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Name": "my-query",
"Session": "my-session",
"Service": map[string]interface{}{
"Service": "my-service",
"Failover": map[string]interface{}{
"NearestN": 4,
"Datacenters": []string{"dc1", "dc2"},
},
"OnlyPassing": true,
"Tags": []string{"foo", "bar"},
},
"DNS": map[string]interface{}{
"TTL": "10s",
},
}
if err := enc.Encode(raw); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("POST", "/v1/query?token=my-token", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQueryGeneral(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(preparedQueryCreateResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if r.ID != "my-id" {
t.Fatalf("bad ID: %s", r.ID)
}
})
}
func TestPreparedQuery_List(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.listFn = func(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
// Return an empty response.
return nil
}
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQueryGeneral(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueries)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if r == nil || len(r) != 0 {
t.Fatalf("bad: %v", r)
}
})
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.listFn = func(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
expected := &structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
query := &structs.PreparedQuery{
ID: "my-id",
}
reply.Queries = append(reply.Queries, query)
return nil
}
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query?token=my-token&consistent=true", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQueryGeneral(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueries)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if len(r) != 1 || r[0].ID != "my-id" {
t.Fatalf("bad: %v", r)
}
})
}
func TestPreparedQuery_Execute(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
// Just return an empty response.
return nil
}
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/my-id/execute", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueryExecuteResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if r.Nodes == nil || len(r.Nodes) != 0 {
t.Fatalf("bad: %v", r)
}
})
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
expected := &structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: "my-id",
Limit: 5,
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "my-node",
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
// Just set something so we can tell this is returned.
reply.Failovers = 99
return nil
}
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=my-node&limit=5", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueryExecuteResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if r.Failovers != 99 {
t.Fatalf("bad: %v", r)
}
})
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 404 {
t.Fatalf("bad code: %d", resp.Code)
}
})
}
func TestPreparedQuery_Get(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.getFn = func(args *structs.PreparedQuerySpecificRequest, reply *structs.IndexedPreparedQueries) error {
expected := &structs.PreparedQuerySpecificRequest{
Datacenter: "dc1",
QueryID: "my-id",
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
query := &structs.PreparedQuery{
ID: "my-id",
}
reply.Queries = append(reply.Queries, query)
return nil
}
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/my-id?token=my-token&consistent=true", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueries)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if len(r) != 1 || r[0].ID != "my-id" {
t.Fatalf("bad: %v", r)
}
})
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/f004177f-2c28-83b7-4229-eacc25fe55d1", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 404 {
t.Fatalf("bad code: %d", resp.Code)
}
})
}
func TestPreparedQuery_Update(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.applyFn = func(args *structs.PreparedQueryRequest, reply *string) error {
expected := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryUpdate,
Query: &structs.PreparedQuery{
ID: "my-id",
Name: "my-query",
Session: "my-session",
Service: structs.ServiceQuery{
Service: "my-service",
Failover: structs.QueryDatacenterOptions{
NearestN: 4,
Datacenters: []string{"dc1", "dc2"},
},
OnlyPassing: true,
Tags: []string{"foo", "bar"},
},
DNS: structs.QueryDNSOptions{
TTL: "10s",
},
},
WriteRequest: structs.WriteRequest{
Token: "my-token",
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
*reply = "don't care"
return nil
}
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"ID": "this should get ignored",
"Name": "my-query",
"Session": "my-session",
"Service": map[string]interface{}{
"Service": "my-service",
"Failover": map[string]interface{}{
"NearestN": 4,
"Datacenters": []string{"dc1", "dc2"},
},
"OnlyPassing": true,
"Tags": []string{"foo", "bar"},
},
"DNS": map[string]interface{}{
"TTL": "10s",
},
}
if err := enc.Encode(raw); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("PUT", "/v1/query/my-id?token=my-token", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
})
}
func TestPreparedQuery_Delete(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
m := MockPreparedQuery{}
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
m.applyFn = func(args *structs.PreparedQueryRequest, reply *string) error {
expected := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryDelete,
Query: &structs.PreparedQuery{
ID: "my-id",
},
WriteRequest: structs.WriteRequest{
Token: "my-token",
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
*reply = "don't care"
return nil
}
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"ID": "this should get ignored",
}
if err := enc.Encode(raw); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("DELETE", "/v1/query/my-id?token=my-token", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
})
}
func TestPreparedQuery_BadMethods(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("DELETE", "/v1/query", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQueryGeneral(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 405 {
t.Fatalf("bad code: %d", resp.Code)
}
})
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("POST", "/v1/query/my-id", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 405 {
t.Fatalf("bad code: %d", resp.Code)
}
})
}
func TestPreparedQuery_parseLimit(t *testing.T) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query", body)
if err != nil {
t.Fatalf("err: %v", err)
}
limit := 99
if err := parseLimit(req, &limit); err != nil {
t.Fatalf("err: %v", err)
}
if limit != 0 {
t.Fatalf("bad limit: %d", limit)
}
req, err = http.NewRequest("GET", "/v1/query?limit=11", body)
if err != nil {
t.Fatalf("err: %v", err)
}
if err := parseLimit(req, &limit); err != nil {
t.Fatalf("err: %v", err)
}
if limit != 11 {
t.Fatalf("bad limit: %d", limit)
}
req, err = http.NewRequest("GET", "/v1/query?limit=bob", body)
if err != nil {
t.Fatalf("err: %v", err)
}
if err := parseLimit(req, &limit); err == nil {
t.Fatalf("bad: %v", err)
}
}
// Since we've done exhaustive testing of the calls into the endpoints above
// this is just a basic end-to-end sanity check to make sure things are wired
// correctly when calling through to the real endpoints.
func TestPreparedQuery_Integration(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
// Register a node and a service.
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: srv.agent.config.NodeName,
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "my-service",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Create a query.
var id string
{
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Name": "my-query",
"Service": map[string]interface{}{
"Service": "my-service",
},
}
if err := enc.Encode(raw); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("POST", "/v1/query", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQueryGeneral(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(preparedQueryCreateResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
id = r.ID
}
// List them all.
{
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query?token=root", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQueryGeneral(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueries)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if len(r) != 1 {
t.Fatalf("bad: %v", r)
}
}
// Execute it.
{
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/"+id+"/execute", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueryExecuteResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if len(r.Nodes) != 1 {
t.Fatalf("bad: %v", r)
}
}
// Read it back.
{
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/"+id, body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueries)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if len(r) != 1 {
t.Fatalf("bad: %v", r)
}
}
// Make an update to it.
{
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Name": "my-query",
"Service": map[string]interface{}{
"Service": "my-service",
"OnlyPassing": true,
},
}
if err := enc.Encode(raw); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("PUT", "/v1/query/"+id, body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
}
// Delete it.
{
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("DELETE", "/v1/query/"+id, body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
}
})
}

View File

@ -2,7 +2,6 @@ package consul
import (
"fmt"
"sort"
"time"
"github.com/armon/go-metrics"
@ -96,24 +95,11 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
// ListDatacenters is used to query for the list of known datacenters
func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
c.srv.remoteLock.RLock()
defer c.srv.remoteLock.RUnlock()
// Read the known DCs
var dcs []string
for dc := range c.srv.remoteConsuls {
dcs = append(dcs, dc)
}
// TODO - do we want to control the sort behavior with an argument?
// Sort the DCs by name first, then apply a stable sort by distance.
sort.Strings(dcs)
if err := c.srv.sortDatacentersByDistance(dcs); err != nil {
dcs, err := c.srv.getDatacentersByDistance()
if err != nil {
return err
}
// Return
*reply = dcs
return nil
}

View File

@ -91,6 +91,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyTombstoneOperation(buf[1:], log.Index)
case structs.CoordinateBatchUpdateType:
return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
case structs.PreparedQueryRequestType:
return c.applyPreparedQueryOperation(buf[1:], log.Index)
default:
if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -264,6 +266,26 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa
return nil
}
// applyPreparedQueryOperation applies the given prepared query operation to the
// state store.
func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
var req structs.PreparedQueryRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now())
switch req.Op {
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
return c.state.PreparedQuerySet(index, req.Query)
case structs.PreparedQueryDelete:
return c.state.PreparedQueryDelete(index, req.Query.ID)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid PreparedQuery operation '%s'", req.Op)
return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op)
}
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
@ -371,6 +393,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
return err
}
case structs.PreparedQueryRequestType:
var req structs.PreparedQuery
if err := dec.Decode(&req); err != nil {
return err
}
if err := restore.PreparedQuery(&req); err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
@ -419,6 +450,12 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistPreparedQueries(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
@ -565,6 +602,22 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
return nil
}
func (s *consulSnapshot) persistPreparedQueries(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
queries, err := s.state.PreparedQueries()
if err != nil {
return err
}
for query := queries.Next(); query != nil; query = queries.Next() {
sink.Write([]byte{byte(structs.PreparedQueryRequestType)})
if err := encoder.Encode(query.(*structs.PreparedQuery)); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) Release() {
s.state.Close()
}

View File

@ -397,6 +397,20 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("err: %s", err)
}
query := structs.PreparedQuery{
ID: generateUUID(),
Service: structs.ServiceQuery{
Service: "web",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 14,
ModifyIndex: 14,
},
}
if err := fsm.state.PreparedQuerySet(14, &query); err != nil {
t.Fatalf("err: %s", err)
}
// Snapshot
snap, err := fsm.Snapshot()
if err != nil {
@ -514,6 +528,18 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
// Verify queries are restored.
_, queries, err := fsm2.state.PreparedQueryList()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(queries) != 1 {
t.Fatalf("bad: %#v", queries)
}
if !reflect.DeepEqual(queries[0], &query) {
t.Fatalf("bad: %#v", queries[0])
}
}
func TestFSM_KVSSet(t *testing.T) {
@ -1049,6 +1075,103 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
}
}
func TestFSM_PreparedQuery_CRUD(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Register a service to query on.
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
// Create a new query.
query := structs.PreparedQueryRequest{
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
ID: generateUUID(),
Service: structs.ServiceQuery{
Service: "web",
},
},
}
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify it's in the state store.
{
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Make an update to the query.
query.Op = structs.PreparedQueryUpdate
query.Query.Name = "my-query"
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify the update.
{
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Delete the query.
query.Op = structs.PreparedQueryDelete
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Make sure it's gone.
{
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
}
func TestFSM_TombstoneReap(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {

View File

@ -0,0 +1,581 @@
package consul
import (
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
)
var (
// ErrQueryNotFound is returned if the query lookup failed.
ErrQueryNotFound = errors.New("Query not found")
)
// PreparedQuery manages the prepared query endpoint.
type PreparedQuery struct {
srv *Server
}
// Apply is used to apply a modifying request to the data store. This should
// only be used for operations that modify the data. The ID of the session is
// returned in the reply.
func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) {
if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "prepared-query", "apply"}, time.Now())
// Validate the ID. We must create new IDs before applying to the Raft
// log since it's not deterministic.
if args.Op == structs.PreparedQueryCreate {
if args.Query.ID != "" {
return fmt.Errorf("ID must be empty when creating a new prepared query")
}
// We are relying on the fact that UUIDs are random and unlikely
// to collide since this isn't inside a write transaction.
state := p.srv.fsm.State()
for {
args.Query.ID = generateUUID()
_, query, err := state.PreparedQueryGet(args.Query.ID)
if err != nil {
return fmt.Errorf("Prepared query lookup failed: %v", err)
}
if query == nil {
break
}
}
}
*reply = args.Query.ID
// Grab the ACL because we need it in several places below.
acl, err := p.srv.resolveToken(args.Token)
if err != nil {
return err
}
// Enforce that any modify operation has the same token used when the
// query was created, or a management token with sufficient rights.
if args.Op != structs.PreparedQueryCreate {
state := p.srv.fsm.State()
_, query, err := state.PreparedQueryGet(args.Query.ID)
if err != nil {
return fmt.Errorf("Prepared Query lookup failed: %v", err)
}
if query == nil {
return fmt.Errorf("Cannot modify non-existent prepared query: '%s'", args.Query.ID)
}
if (query.Token != args.Token) && (acl != nil && !acl.QueryModify()) {
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied because ACL didn't match ACL used to create the query, and a management token wasn't supplied", args.Query.ID)
return permissionDeniedErr
}
}
// Parse the query and prep it for the state store.
switch args.Op {
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
if err := parseQuery(args.Query); err != nil {
return fmt.Errorf("Invalid prepared query: %v", err)
}
if acl != nil && !acl.ServiceRead(args.Query.Service.Service) {
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query for service '%s' denied due to ACLs", args.Query.Service.Service)
return permissionDeniedErr
}
case structs.PreparedQueryDelete:
// Nothing else to verify here, just do the delete (we only look
// at the ID field for this op).
default:
return fmt.Errorf("Unknown prepared query operation: %s", args.Op)
}
// At this point the token has been vetted, so make sure the token that
// is stored in the state store matches what was supplied.
args.Query.Token = args.Token
resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
if err != nil {
p.srv.logger.Printf("[ERR] consul.prepared_query: Apply failed %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
// parseQuery makes sure the entries of a query are valid for a create or
// update operation. Some of the fields are not checked or are partially
// checked, as noted in the comments below. This also updates all the parsed
// fields of the query.
func parseQuery(query *structs.PreparedQuery) error {
// We skip a few fields:
// - ID is checked outside this fn.
// - Name is optional with no restrictions, except for uniqueness which
// is checked for integrity during the transaction. We also make sure
// names do not overlap with IDs, which is also checked during the
// transaction. Otherwise, people could "steal" queries that they don't
// have proper ACL rights to change.
// - Session is optional and checked for integrity during the transaction.
// - Token is checked outside this fn.
// Parse the service query sub-structure.
if err := parseService(&query.Service); err != nil {
return err
}
// Parse the DNS options sub-structure.
if err := parseDNS(&query.DNS); err != nil {
return err
}
return nil
}
// parseService makes sure the entries of a query are valid for a create or
// update operation. Some of the fields are not checked or are partially
// checked, as noted in the comments below. This also updates all the parsed
// fields of the query.
func parseService(svc *structs.ServiceQuery) error {
// Service is required. We check integrity during the transaction.
if svc.Service == "" {
return fmt.Errorf("Must provide a service name to query")
}
// NearestN can be 0 which means "don't fail over by RTT".
if svc.Failover.NearestN < 0 {
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
}
// We skip a few fields:
// - There's no validation for Datacenters; we skip any unknown entries
// at execution time.
// - OnlyPassing is just a boolean so doesn't need further validation.
// - Tags is a free-form list of tags and doesn't need further validation.
return nil
}
// parseDNS makes sure the entries of a query are valid for a create or
// update operation. This also updates all the parsed fields of the query.
func parseDNS(dns *structs.QueryDNSOptions) error {
if dns.TTL != "" {
ttl, err := time.ParseDuration(dns.TTL)
if err != nil {
return fmt.Errorf("Bad DNS TTL '%s': %v", dns.TTL, err)
}
if ttl < 0 {
return fmt.Errorf("DNS TTL '%d', must be >=0", ttl)
}
}
return nil
}
// Get returns a single prepared query by ID.
func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
reply *structs.IndexedPreparedQueries) error {
if done, err := p.srv.forward("PreparedQuery.Get", args, args, reply); done {
return err
}
// We will use this in the loop to see if the caller is allowed to see
// the query.
acl, err := p.srv.resolveToken(args.Token)
if err != nil {
return err
}
// Get the requested query.
state := p.srv.fsm.State()
return p.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("PreparedQueryGet"),
func() error {
index, query, err := state.PreparedQueryGet(args.QueryID)
if err != nil {
return err
}
if query == nil {
return ErrQueryNotFound
}
if (query.Token != args.Token) && (acl != nil && !acl.QueryList()) {
p.srv.logger.Printf("[WARN] consul.prepared_query: Request to get prepared query '%s' denied because ACL didn't match ACL used to create the query, and a management token wasn't supplied", args.QueryID)
return permissionDeniedErr
}
reply.Index = index
reply.Queries = structs.PreparedQueries{query}
return nil
})
return nil
}
// List returns all the prepared queries.
func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done {
return err
}
// This always requires a management token.
acl, err := p.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.QueryList() {
p.srv.logger.Printf("[WARN] consul.prepared_query: Request to list prepared queries denied due to ACLs")
return permissionDeniedErr
}
// Get the list of queries.
state := p.srv.fsm.State()
return p.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("PreparedQueryList"),
func() error {
index, queries, err := state.PreparedQueryList()
if err != nil {
return err
}
reply.Index, reply.Queries = index, queries
return nil
})
}
// Execute runs a prepared query and returns the results. This will perform the
// failover logic if no local results are available. This is typically called as
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
reply *structs.PreparedQueryExecuteResponse) error {
if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
p.srv.setQueryMeta(&reply.QueryMeta)
if args.RequireConsistent {
if err := p.srv.consistentRead(); err != nil {
return err
}
}
// Try to locate the query.
state := p.srv.fsm.State()
_, query, err := state.PreparedQueryLookup(args.QueryIDOrName)
if err != nil {
return err
}
if query == nil {
return ErrQueryNotFound
}
// Execute the query for the local DC.
if err := p.execute(query, reply); err != nil {
return err
}
// Shuffle the results in case coordinates are not available if they
// requested an RTT sort.
reply.Nodes.Shuffle()
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
return err
}
// Apply the limit if given.
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
reply.Nodes = reply.Nodes[:args.Limit]
}
// In the happy path where we found some healthy nodes we go with that
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
// by the query setup.
if len(reply.Nodes) == 0 {
wrapper := &queryServerWrapper{p.srv}
if err := queryFailover(wrapper, query, args.Limit, args.QueryOptions, reply); err != nil {
return err
}
}
return nil
}
// ExecuteRemote is used when a local node doesn't have any instances of a
// service available and needs to probe remote DCs. This sends the full query
// over since the remote side won't have it in its state store, and this doesn't
// do the failover logic since that's already being run on the originating DC.
// We don't want things to fan out further than one level.
func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest,
reply *structs.PreparedQueryExecuteResponse) error {
if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute_remote"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
p.srv.setQueryMeta(&reply.QueryMeta)
if args.RequireConsistent {
if err := p.srv.consistentRead(); err != nil {
return err
}
}
// Run the query locally to see what we can find.
if err := p.execute(&args.Query, reply); err != nil {
return err
}
// We don't bother trying to do an RTT sort here since we are by
// definition in another DC. We just shuffle to make sure that we
// balance the load across the results.
reply.Nodes.Shuffle()
// Apply the limit if given.
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
reply.Nodes = reply.Nodes[:args.Limit]
}
return nil
}
// execute runs a prepared query in the local DC without any failover. We don't
// apply any sorting options at this level - it should be done up above.
func (p *PreparedQuery) execute(query *structs.PreparedQuery,
reply *structs.PreparedQueryExecuteResponse) error {
state := p.srv.fsm.State()
_, nodes, err := state.CheckServiceNodes(query.Service.Service)
if err != nil {
return err
}
// This is kind of a paranoia ACL check, in case something changed with
// the token from the time the query was registered. Note that we use
// the token stored with the query, NOT the passed-in one, which is
// critical to how queries work (the query becomes a proxy for a lookup
// using the ACL it was created with).
acl, err := p.srv.resolveToken(query.Token)
if err != nil {
return err
}
if acl != nil && !acl.ServiceRead(query.Service.Service) {
p.srv.logger.Printf("[WARN] consul.prepared_query: Execute of prepared query for service '%s' denied due to ACLs", query.Service.Service)
return permissionDeniedErr
}
// Filter out any unhealthy nodes.
nodes = nodes.Filter(query.Service.OnlyPassing)
// Apply the tag filters, if any.
if len(query.Service.Tags) > 0 {
nodes = tagFilter(query.Service.Tags, nodes)
}
// Capture the nodes and pass the DNS information through to the reply.
reply.Service = query.Service.Service
reply.Nodes = nodes
reply.DNS = query.DNS
// Stamp the result for this datacenter.
reply.Datacenter = p.srv.config.Datacenter
return nil
}
// tagFilter returns a list of nodes who satisfy the given tags. Nodes must have
// ALL the given tags, and NONE of the forbidden tags (prefixed with !). Note
// for performance this modifies the original slice.
func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
// Build up lists of required and disallowed tags.
must, not := make([]string, 0), make([]string, 0)
for _, tag := range tags {
tag = strings.ToLower(tag)
if strings.HasPrefix(tag, "!") {
tag = tag[1:]
not = append(not, tag)
} else {
must = append(must, tag)
}
}
n := len(nodes)
for i := 0; i < n; i++ {
node := nodes[i]
// Index the tags so lookups this way are cheaper.
index := make(map[string]struct{})
if node.Service != nil {
for _, tag := range node.Service.Tags {
tag = strings.ToLower(tag)
index[tag] = struct{}{}
}
}
// Bail if any of the required tags are missing.
for _, tag := range must {
if _, ok := index[tag]; !ok {
goto DELETE
}
}
// Bail if any of the disallowed tags are present.
for _, tag := range not {
if _, ok := index[tag]; ok {
goto DELETE
}
}
// At this point, the service is ok to leave in the list.
continue
DELETE:
nodes[i], nodes[n-1] = nodes[n-1], structs.CheckServiceNode{}
n--
i--
}
return nodes[:n]
}
// queryServer is a wrapper that makes it easier to test the failover logic.
type queryServer interface {
GetLogger() *log.Logger
GetOtherDatacentersByDistance() ([]string, error)
ForwardDC(method, dc string, args interface{}, reply interface{}) error
}
// queryServerWrapper applies the queryServer interface to a Server.
type queryServerWrapper struct {
srv *Server
}
// GetLogger returns the server's logger.
func (q *queryServerWrapper) GetLogger() *log.Logger {
return q.srv.logger
}
// GetOtherDatacentersByDistance calls into the server's fn and filters out the
// server's own DC.
func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
dcs, err := q.srv.getDatacentersByDistance()
if err != nil {
return nil, err
}
var result []string
for _, dc := range dcs {
if dc != q.srv.config.Datacenter {
result = append(result, dc)
}
}
return result, nil
}
// ForwardDC calls into the server's RPC forwarder.
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
return q.srv.forwardDC(method, dc, args, reply)
}
// queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery,
limit int, options structs.QueryOptions,
reply *structs.PreparedQueryExecuteResponse) error {
// Pull the list of other DCs. This is sorted by RTT in case the user
// has selected that.
nearest, err := q.GetOtherDatacentersByDistance()
if err != nil {
return err
}
// This will help us filter unknown DCs supplied by the user.
known := make(map[string]struct{})
for _, dc := range nearest {
known[dc] = struct{}{}
}
// Build a candidate list of DCs to try, starting with the nearest N
// from RTTs.
var dcs []string
index := make(map[string]struct{})
if query.Service.Failover.NearestN > 0 {
for i, dc := range nearest {
if !(i < query.Service.Failover.NearestN) {
break
}
dcs = append(dcs, dc)
index[dc] = struct{}{}
}
}
// Then add any DCs explicitly listed that weren't selected above.
for _, dc := range query.Service.Failover.Datacenters {
// This will prevent a log of other log spammage if we do not
// attempt to talk to datacenters we don't know about.
if _, ok := known[dc]; !ok {
q.GetLogger().Printf("[DEBUG] consul.prepared_query: Skipping unknown datacenter '%s' in prepared query", dc)
continue
}
// This will make sure we don't re-try something that fails
// from the NearestN list.
if _, ok := index[dc]; !ok {
dcs = append(dcs, dc)
}
}
// Now try the selected DCs in priority order.
failovers := 0
for _, dc := range dcs {
// This keeps track of how many iterations we actually run.
failovers++
// Be super paranoid and set the nodes slice to nil since it's
// the same slice we used before. We know there's nothing in
// there, but the underlying msgpack library has a policy of
// updating the slice when it's non-nil, and that feels dirty.
// Let's just set it to nil so there's no way to communicate
// through this slice across successive RPC calls.
reply.Nodes = nil
// Note that we pass along the limit since it can be applied
// remotely to save bandwidth. We also pass along the consistency
// mode information we were given, so that applies to the remote
// query as well.
remote := &structs.PreparedQueryExecuteRemoteRequest{
Datacenter: dc,
Query: *query,
Limit: limit,
QueryOptions: options,
}
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
q.GetLogger().Printf("[WARN] consul.prepared_query: Failed querying for service '%s' in datacenter '%s': %s", query.Service.Service, dc, err)
continue
}
// We can stop if we found some nodes.
if len(reply.Nodes) > 0 {
break
}
}
// Set this at the end because the response from the remote doesn't have
// this information.
reply.Failovers = failovers
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -276,23 +276,6 @@ func (s *serverSerfer) GetNodesForDatacenter(dc string) []string {
return nodes
}
// sortDatacentersByDistance will sort the given list of DCs based on the
// median RTT to all nodes we know about from the WAN gossip pool). DCs with
// missing coordinates will be stable sorted to the end of the list.
//
// If coordinates are disabled this will be a no-op.
func (s *Server) sortDatacentersByDistance(dcs []string) error {
// Make it safe to call this without having to check if coordinates are
// disabled first.
if s.config.DisableCoordinates {
return nil
}
// Do the sort!
serfer := serverSerfer{s}
return sortDatacentersByDistance(&serfer, dcs)
}
// getDatacenterDistance will return the median round trip time estimate for
// the given DC from the given serfer, in seconds. This will return positive
// infinity if no coordinates are available.
@ -396,3 +379,34 @@ func getDatacenterMaps(s serfer, dcs []string) []structs.DatacenterMap {
}
return maps
}
// getDatacentersByDistance will return the list of DCs, sorted in order
// of increasing distance based on the median distance to that DC from all
// servers we know about in the WAN gossip pool. This will sort by name all
// other things being equal (or if coordinates are disabled).
func (s *Server) getDatacentersByDistance() ([]string, error) {
s.remoteLock.RLock()
defer s.remoteLock.RUnlock()
var dcs []string
for dc := range s.remoteConsuls {
dcs = append(dcs, dc)
}
// Sort by name first, since the coordinate sort is stable.
sort.Strings(dcs)
// Make it safe to call this without having to check if coordinates are
// disabled first.
if s.config.DisableCoordinates {
return dcs, nil
}
// Do the sort!
serfer := serverSerfer{s}
if err := sortDatacentersByDistance(&serfer, dcs); err != nil {
return nil, err
}
return dcs, nil
}

View File

@ -141,7 +141,7 @@ func seedCoordinates(t *testing.T, codec rpc.ClientCodec, server *Server) {
time.Sleep(2 * server.config.CoordinateUpdatePeriod)
}
func TestRtt_sortNodesByDistanceFrom(t *testing.T) {
func TestRTT_sortNodesByDistanceFrom(t *testing.T) {
dir, server := testServer(t)
defer os.RemoveAll(dir)
defer server.Shutdown()
@ -202,7 +202,7 @@ func TestRtt_sortNodesByDistanceFrom(t *testing.T) {
verifyNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
}
func TestRtt_sortNodesByDistanceFrom_Nodes(t *testing.T) {
func TestRTT_sortNodesByDistanceFrom_Nodes(t *testing.T) {
dir, server := testServer(t)
defer os.RemoveAll(dir)
defer server.Shutdown()
@ -251,7 +251,7 @@ func TestRtt_sortNodesByDistanceFrom_Nodes(t *testing.T) {
verifyNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
}
func TestRtt_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) {
func TestRTT_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) {
dir, server := testServer(t)
defer os.RemoveAll(dir)
defer server.Shutdown()
@ -300,7 +300,7 @@ func TestRtt_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) {
verifyServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
}
func TestRtt_sortNodesByDistanceFrom_HealthChecks(t *testing.T) {
func TestRTT_sortNodesByDistanceFrom_HealthChecks(t *testing.T) {
dir, server := testServer(t)
defer os.RemoveAll(dir)
defer server.Shutdown()
@ -349,7 +349,7 @@ func TestRtt_sortNodesByDistanceFrom_HealthChecks(t *testing.T) {
verifyHealthCheckSort(t, checks, "node2,node3,node5,node4,node1,apple")
}
func TestRtt_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) {
func TestRTT_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) {
dir, server := testServer(t)
defer os.RemoveAll(dir)
defer server.Shutdown()
@ -473,7 +473,7 @@ func (s *mockServer) GetNodesForDatacenter(dc string) []string {
return nodes
}
func TestRtt_getDatacenterDistance(t *testing.T) {
func TestRTT_getDatacenterDistance(t *testing.T) {
s := newMockServer()
// The serfer's own DC is always 0 ms away.
@ -508,7 +508,7 @@ func TestRtt_getDatacenterDistance(t *testing.T) {
}
}
func TestRtt_sortDatacentersByDistance(t *testing.T) {
func TestRTT_sortDatacentersByDistance(t *testing.T) {
s := newMockServer()
dcs := []string{"acdc", "dc0", "dc1", "dc2", "dcX"}
@ -533,7 +533,7 @@ func TestRtt_sortDatacentersByDistance(t *testing.T) {
}
}
func TestRtt_getDatacenterMaps(t *testing.T) {
func TestRTT_getDatacenterMaps(t *testing.T) {
s := newMockServer()
dcs := []string{"dc0", "acdc", "dc1", "dc2", "dcX"}
@ -578,3 +578,71 @@ func TestRtt_getDatacenterMaps(t *testing.T) {
t.Fatalf("bad: %v", maps[4])
}
}
func TestRTT_getDatacentersByDistance(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "xxx"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
codec3 := rpcClient(t, s3)
defer codec3.Close()
testutil.WaitForLeader(t, s1.RPC, "xxx")
testutil.WaitForLeader(t, s2.RPC, "dc1")
testutil.WaitForLeader(t, s3.RPC, "dc2")
// Do the WAN joins.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(
func() (bool, error) {
return len(s1.WANMembers()) > 2, nil
},
func(err error) {
t.Fatalf("Failed waiting for WAN join: %v", err)
})
// Get the DCs by distance. We don't have coordinate updates yet, but
// having xxx show up first proves we are calling the distance sort,
// since it would normally do a string sort.
dcs, err := s1.getDatacentersByDistance()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dcs) != 3 || dcs[0] != "xxx" {
t.Fatalf("bad: %v", dcs)
}
// Let's disable coordinates just to be sure.
s1.config.DisableCoordinates = true
dcs, err = s1.getDatacentersByDistance()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dcs) != 3 || dcs[0] != "dc1" {
t.Fatalf("bad: %v", dcs)
}
}

View File

@ -153,14 +153,15 @@ type Server struct {
// Holds the RPC endpoints
type endpoints struct {
Catalog *Catalog
Health *Health
Status *Status
KVS *KVS
Session *Session
Internal *Internal
ACL *ACL
Coordinate *Coordinate
Catalog *Catalog
Health *Health
Status *Status
KVS *KVS
Session *Session
Internal *Internal
ACL *ACL
Coordinate *Coordinate
PreparedQuery *PreparedQuery
}
// NewServer is used to construct a new Consul server from the
@ -411,6 +412,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Internal = &Internal{s}
s.endpoints.ACL = &ACL{s}
s.endpoints.Coordinate = NewCoordinate(s)
s.endpoints.PreparedQuery = &PreparedQuery{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
@ -421,6 +423,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.Internal)
s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Coordinate)
s.rpcServer.Register(s.endpoints.PreparedQuery)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {
@ -686,6 +689,12 @@ func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
return codec.err
}
// InjectEndpoint is used to substitute an endpoint for testing.
func (s *Server) InjectEndpoint(endpoint interface{}) error {
s.logger.Printf("[WARN] consul: endpoint injected; this should only be used for testing")
return s.rpcServer.Register(endpoint)
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (s *Server) Stats() map[string]map[string]string {

View File

@ -0,0 +1,251 @@
package state
import (
"fmt"
"regexp"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// validUUID is used to check if a given string looks like a UUID
var validUUID = regexp.MustCompile(`(?i)^[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}$`)
// isUUID returns true if the given string is a valid UUID.
func isUUID(str string) bool {
return validUUID.MatchString(str)
}
// PreparedQueries is used to pull all the prepared queries from the snapshot.
func (s *StateSnapshot) PreparedQueries() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("prepared-queries", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// PrepparedQuery is used when restoring from a snapshot. For general inserts,
// use PreparedQuerySet.
func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
if err := s.tx.Insert("prepared-queries", query); err != nil {
return fmt.Errorf("failed restoring prepared query: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, query.ModifyIndex, "prepared-queries"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("prepared-queries")
return nil
}
// PreparedQuerySet is used to create or update a prepared query.
func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := s.preparedQuerySetTxn(tx, idx, query); err != nil {
return err
}
tx.Commit()
return nil
}
// preparedQuerySetTxn is the inner method used to insert a prepared query with
// the proper indexes into the state store.
func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
// Check that the ID is set.
if query.ID == "" {
return ErrMissingQueryID
}
// Check for an existing query.
existing, err := tx.First("prepared-queries", "id", query.ID)
if err != nil {
return fmt.Errorf("failed prepared query lookup: %s", err)
}
// Set the indexes.
if existing != nil {
query.CreateIndex = existing.(*structs.PreparedQuery).CreateIndex
query.ModifyIndex = idx
} else {
query.CreateIndex = idx
query.ModifyIndex = idx
}
// Verify that the name doesn't alias any existing ID. We allow queries
// to be looked up by ID *or* name so we don't want anyone to try to
// register a query with a name equal to some other query's ID in an
// attempt to hijack it. We also look up by ID *then* name in order to
// prevent this, but it seems prudent to prevent these types of rogue
// queries from ever making it into the state store. Note that we have
// to see if the name looks like a UUID before checking since the UUID
// index will complain if we look up something that's not formatted
// like one.
if isUUID(query.Name) {
existing, err := tx.First("prepared-queries", "id", query.Name)
if err != nil {
return fmt.Errorf("failed prepared query lookup: %s", err)
}
if existing != nil {
return fmt.Errorf("name '%s' aliases an existing query id", query.Name)
}
}
// Verify that the session exists.
if query.Session != "" {
sess, err := tx.First("sessions", "id", query.Session)
if err != nil {
return fmt.Errorf("failed session lookup: %s", err)
}
if sess == nil {
return fmt.Errorf("invalid session %#v", query.Session)
}
}
// Verify that the service exists.
service, err := tx.First("services", "service", query.Service.Service)
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
if service == nil {
return fmt.Errorf("invalid service %#v", query.Service.Service)
}
// Insert the query.
if err := tx.Insert("prepared-queries", query); err != nil {
return fmt.Errorf("failed inserting prepared query: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"prepared-queries", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["prepared-queries"].Notify() })
return nil
}
// PreparedQueryDelete deletes the given query by ID.
func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
watches := NewDumbWatchManager(s.tableWatches)
if err := s.preparedQueryDeleteTxn(tx, idx, watches, queryID); err != nil {
return fmt.Errorf("failed prepared query delete: %s", err)
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
// with the proper indexes into the state store.
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
queryID string) error {
// Pull the query.
query, err := tx.First("prepared-queries", "id", queryID)
if err != nil {
return fmt.Errorf("failed prepared query lookup: %s", err)
}
if query == nil {
return nil
}
// Delete the query and update the index.
if err := tx.Delete("prepared-queries", query); err != nil {
return fmt.Errorf("failed prepared query delete: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"prepared-queries", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("prepared-queries")
return nil
}
// PreparedQueryGet returns the given prepared query by ID.
func (s *StateStore) PreparedQueryGet(queryID string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryGet")...)
// Look up the query by its ID.
query, err := tx.First("prepared-queries", "id", queryID)
if err != nil {
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
}
if query != nil {
return idx, query.(*structs.PreparedQuery), nil
}
return idx, nil, nil
}
// PreparedQueryLookup returns the given prepared query by looking up an ID or
// Name.
func (s *StateStore) PreparedQueryLookup(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryLookup")...)
// Explicitly ban an empty query. This will never match an ID and the
// schema is set up so it will never match a query with an empty name,
// but we check it here to be explicit about it (we'd never want to
// return the results from the first query w/o a name).
if queryIDOrName == "" {
return 0, nil, ErrMissingQueryID
}
// Try first by ID if it looks like they gave us an ID. We check the
// format before trying this because the UUID index will complain if
// we look up something that's not formatted like one.
if isUUID(queryIDOrName) {
query, err := tx.First("prepared-queries", "id", queryIDOrName)
if err != nil {
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
}
if query != nil {
return idx, query.(*structs.PreparedQuery), nil
}
}
// Then try by name.
query, err := tx.First("prepared-queries", "name", queryIDOrName)
if err != nil {
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
}
if query != nil {
return idx, query.(*structs.PreparedQuery), nil
}
return idx, nil, nil
}
// PreparedQueryList returns all the prepared queries.
func (s *StateStore) PreparedQueryList() (uint64, structs.PreparedQueries, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryList")...)
// Query all of the prepared queries in the state store.
queries, err := tx.Get("prepared-queries", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
}
// Go over all of the queries and build the response.
var result structs.PreparedQueries
for query := queries.Next(); query != nil; query = queries.Next() {
result = append(result, query.(*structs.PreparedQuery))
}
return idx, result, nil
}

View File

@ -0,0 +1,624 @@
package state
import (
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestStateStore_PreparedQuery_isUUID(t *testing.T) {
cases := map[string]bool{
"": false,
"nope": false,
"f004177f-2c28-83b7-4229-eacc25fe55d1": true,
"F004177F-2C28-83B7-4229-EACC25FE55D1": true,
"x004177f-2c28-83b7-4229-eacc25fe55d1": false, // Bad hex
"f004177f-xc28-83b7-4229-eacc25fe55d1": false, // Bad hex
"f004177f-2c28-x3b7-4229-eacc25fe55d1": false, // Bad hex
"f004177f-2c28-83b7-x229-eacc25fe55d1": false, // Bad hex
"f004177f-2c28-83b7-4229-xacc25fe55d1": false, // Bad hex
" f004177f-2c28-83b7-4229-eacc25fe55d1": false, // Leading whitespace
"f004177f-2c28-83b7-4229-eacc25fe55d1 ": false, // Trailing whitespace
}
for i := 0; i < 100; i++ {
cases[testUUID()] = true
}
for str, expected := range cases {
if actual := isUUID(str); actual != expected {
t.Fatalf("bad: '%s'", str)
}
}
}
func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
s := testStateStore(t)
// Querying with no results returns nil.
idx, res, err := s.PreparedQueryGet(testUUID())
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Inserting a query with empty ID is disallowed.
if err := s.PreparedQuerySet(1, &structs.PreparedQuery{}); err == nil {
t.Fatalf("expected %#v, got: %#v", ErrMissingQueryID, err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("prepared-queries"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Build a legit-looking query with the most basic options.
query := &structs.PreparedQuery{
ID: testUUID(),
Service: structs.ServiceQuery{
Service: "redis",
},
}
// The set will still fail because the service isn't registered yet.
err = s.PreparedQuerySet(1, query)
if err == nil || !strings.Contains(err.Error(), "invalid service") {
t.Fatalf("bad: %v", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("prepared-queries"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Now register the service.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
// This should go through.
if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("prepared-queries"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Read it back out and verify it.
expected := &structs.PreparedQuery{
ID: query.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
}
idx, actual, err := s.PreparedQueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Give it a name and set it again.
query.Name = "test-query"
if err := s.PreparedQuerySet(4, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("prepared-queries"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Read it back and verify the data was updated as well as the index.
expected.Name = "test-query"
expected.ModifyIndex = 4
idx, actual, err = s.PreparedQueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Try to tie it to a bogus session.
query.Session = testUUID()
err = s.PreparedQuerySet(5, query)
if err == nil || !strings.Contains(err.Error(), "invalid session") {
t.Fatalf("bad: %v", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("prepared-queries"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Now make a session and try again.
session := &structs.Session{
ID: query.Session,
Node: "foo",
}
if err := s.SessionCreate(5, session); err != nil {
t.Fatalf("err: %s", err)
}
if err := s.PreparedQuerySet(6, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("prepared-queries"); idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Read it back and verify the data was updated as well as the index.
expected.Session = query.Session
expected.ModifyIndex = 6
idx, actual, err = s.PreparedQueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Finally, try to abuse the system by trying to register a query whose
// name aliases a real query ID.
evil := &structs.PreparedQuery{
ID: testUUID(),
Name: query.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
}
err = s.PreparedQuerySet(7, evil)
if err == nil || !strings.Contains(err.Error(), "aliases an existing query") {
t.Fatalf("bad: %v", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("prepared-queries"); idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Sanity check to make sure it's not there.
idx, actual, err = s.PreparedQueryGet(evil.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
func TestStateStore_PreparedQueryDelete(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
// Create a new query.
query := &structs.PreparedQuery{
ID: testUUID(),
Service: structs.ServiceQuery{
Service: "redis",
},
}
// Deleting a query that doesn't exist should be a no-op.
if err := s.PreparedQueryDelete(3, query.ID); err != nil {
t.Fatalf("err: %s", err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex("prepared-queries"); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
// Now add the query to the data store.
if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("prepared-queries"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Read it back out and verify it.
expected := &structs.PreparedQuery{
ID: query.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
}
idx, actual, err := s.PreparedQueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Now delete it.
if err := s.PreparedQueryDelete(4, query.ID); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("prepared-queries"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Sanity check to make sure it's not there.
idx, actual, err = s.PreparedQueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
func TestStateStore_PreparedQueryLookup(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
// Create a new query.
query := &structs.PreparedQuery{
ID: testUUID(),
Name: "my-test-query",
Service: structs.ServiceQuery{
Service: "redis",
},
}
// Try to lookup a query that's not there using something that looks
// like a real ID.
idx, actual, err := s.PreparedQueryLookup(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
// Try to lookup a query that's not there using something that looks
// like a name
idx, actual, err = s.PreparedQueryLookup(query.Name)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
// Now actually insert the query.
if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex("prepared-queries"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Read it back out using the ID and verify it.
expected := &structs.PreparedQuery{
ID: query.ID,
Name: "my-test-query",
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
}
idx, actual, err = s.PreparedQueryLookup(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Read it back using the name and verify it again.
idx, actual, err = s.PreparedQueryLookup(query.Name)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Make sure an empty lookup is well-behaved if there are actual queries
// in the state store.
idx, actual, err = s.PreparedQueryLookup("")
if err != ErrMissingQueryID {
t.Fatalf("bad: %v ", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
func TestStateStore_PreparedQueryList(t *testing.T) {
s := testStateStore(t)
// Make sure nothing is returned for an empty query
idx, actual, err := s.PreparedQueryList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if len(actual) != 0 {
t.Fatalf("bad: %v", actual)
}
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
testRegisterService(t, s, 3, "foo", "mongodb")
// Create some queries.
queries := structs.PreparedQueries{
&structs.PreparedQuery{
ID: testUUID(),
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
},
&structs.PreparedQuery{
ID: testUUID(),
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
},
}
// Force the sort order of the UUIDs before we create them so the
// order is deterministic.
queries[0].ID = "a" + queries[0].ID[1:]
queries[1].ID = "b" + queries[1].ID[1:]
// Now create the queries.
for i, query := range queries {
if err := s.PreparedQuerySet(uint64(4+i), query); err != nil {
t.Fatalf("err: %s", err)
}
}
// Read it back and verify.
expected := structs.PreparedQueries{
&structs.PreparedQuery{
ID: queries[0].ID,
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
ModifyIndex: 4,
},
},
&structs.PreparedQuery{
ID: queries[1].ID,
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
}
idx, actual, err = s.PreparedQueryList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
}
func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
testRegisterService(t, s, 3, "foo", "mongodb")
// Create some queries.
queries := structs.PreparedQueries{
&structs.PreparedQuery{
ID: testUUID(),
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
},
&structs.PreparedQuery{
ID: testUUID(),
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
},
}
// Force the sort order of the UUIDs before we create them so the
// order is deterministic.
queries[0].ID = "a" + queries[0].ID[1:]
queries[1].ID = "b" + queries[1].ID[1:]
// Now create the queries.
for i, query := range queries {
if err := s.PreparedQuerySet(uint64(4+i), query); err != nil {
t.Fatalf("err: %s", err)
}
}
// Snapshot the queries.
snap := s.Snapshot()
defer snap.Close()
// Alter the real state store.
if err := s.PreparedQueryDelete(6, queries[0].ID); err != nil {
t.Fatalf("err: %s", err)
}
// Verify the snapshot.
if idx := snap.LastIndex(); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
expected := structs.PreparedQueries{
&structs.PreparedQuery{
ID: queries[0].ID,
Name: "alice",
Service: structs.ServiceQuery{
Service: "redis",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
ModifyIndex: 4,
},
},
&structs.PreparedQuery{
ID: queries[1].ID,
Name: "bob",
Service: structs.ServiceQuery{
Service: "mongodb",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
}
iter, err := snap.PreparedQueries()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.PreparedQueries
for query := iter.Next(); query != nil; query = iter.Next() {
dump = append(dump, query.(*structs.PreparedQuery))
}
if !reflect.DeepEqual(dump, expected) {
t.Fatalf("bad: %v", dump)
}
// Restore the values into a new state store.
func() {
s := testStateStore(t)
restore := s.Restore()
for _, query := range dump {
if err := restore.PreparedQuery(query); err != nil {
t.Fatalf("err: %s", err)
}
}
restore.Commit()
// Read the restored queries back out and verify that they
// match.
idx, actual, err := s.PreparedQueryList()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
}()
}
func TestStateStore_PreparedQuery_Watches(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
query := &structs.PreparedQuery{
ID: testUUID(),
Service: structs.ServiceQuery{
Service: "redis",
},
}
// Call functions that update the queries table and make sure a watch
// fires each time.
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.PreparedQueryDelete(4, query.ID); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
restore := s.Restore()
if err := restore.PreparedQuery(query); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
}

View File

@ -30,6 +30,7 @@ func stateStoreSchema() *memdb.DBSchema {
sessionChecksTableSchema,
aclsTableSchema,
coordinatesTableSchema,
preparedQueriesTableSchema,
}
// Add the tables to the root schema
@ -365,3 +366,38 @@ func coordinatesTableSchema() *memdb.TableSchema {
},
}
}
// preparedQueriesTableSchema returns a new table schema used for storing
// prepared queries.
func preparedQueriesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "prepared-queries",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.UUIDFieldIndex{
Field: "ID",
},
},
"name": &memdb.IndexSchema{
Name: "name",
AllowMissing: true,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Name",
Lowercase: true,
},
},
"session": &memdb.IndexSchema{
Name: "session",
AllowMissing: true,
Unique: false,
Indexer: &memdb.UUIDFieldIndex{
Field: "Session",
},
},
},
}
}

View File

@ -24,9 +24,13 @@ var (
// is attempted with an empty session ID.
ErrMissingSessionID = errors.New("Missing session ID")
// ErrMissingACLID is returned when a session set is called on
// a session with an empty ID.
// ErrMissingACLID is returned when an ACL set is called on
// an ACL with an empty ID.
ErrMissingACLID = errors.New("Missing ACL ID")
// ErrMissingQueryID is returned when a Query set is called on
// a Query with an empty ID.
ErrMissingQueryID = errors.New("Missing Query ID")
)
// StateStore is where we store all of Consul's state, including
@ -409,6 +413,8 @@ func (s *StateStore) getWatchTables(method string) []string {
return []string{"acls"}
case "Coordinates":
return []string{"coordinates"}
case "PreparedQueryGet", "PreparedQueryLookup", "PreparedQueryList":
return []string{"prepared-queries"}
}
panic(fmt.Sprintf("Unknown method %s", method))
@ -2120,15 +2126,37 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
if err != nil {
return fmt.Errorf("failed session checks lookup: %s", err)
}
var objs []interface{}
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
objs = append(objs, mapping)
{
var objs []interface{}
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
objs = append(objs, mapping)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
if err := tx.Delete("session_checks", obj); err != nil {
return fmt.Errorf("failed deleting session check: %s", err)
}
}
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
if err := tx.Delete("session_checks", obj); err != nil {
return fmt.Errorf("failed deleting session check: %s", err)
// Delete any prepared queries.
queries, err := tx.Get("prepared-queries", "session", sessionID)
if err != nil {
return fmt.Errorf("failed prepared query lookup: %s", err)
}
{
var objs []interface{}
for query := queries.Next(); query != nil; query = queries.Next() {
objs = append(objs, query)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
q := obj.(*structs.PreparedQuery)
if err := s.preparedQueryDeleteTxn(tx, idx, watches, q.ID); err != nil {
return fmt.Errorf("failed prepared query delete: %s", err)
}
}
}

View File

@ -4445,6 +4445,64 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
}
}
func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) {
s := testStateStore(t)
// Set up our test environment.
testRegisterNode(t, s, 1, "foo")
testRegisterService(t, s, 2, "foo", "redis")
session := &structs.Session{
ID: testUUID(),
Node: "foo",
}
if err := s.SessionCreate(3, session); err != nil {
t.Fatalf("err: %v", err)
}
query := &structs.PreparedQuery{
ID: testUUID(),
Session: session.ID,
Service: structs.ServiceQuery{
Service: "redis",
},
}
if err := s.PreparedQuerySet(4, query); err != nil {
t.Fatalf("err: %s", err)
}
// Invalidate the session and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.SessionDestroy(5, session.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
})
// Make sure the session is gone.
idx, s2, err := s.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
// Make sure the query is gone and the index is updated.
idx, q2, err := s.PreparedQueryGet(query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if q2 != nil {
t.Fatalf("bad: %v", q2)
}
}
func TestStateStore_ACLSet_ACLGet(t *testing.T) {
s := testStateStore(t)

View File

@ -0,0 +1,198 @@
package structs
// QueryDatacenterOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter.
type QueryDatacenterOptions struct {
// NearestN is set to the number of remote datacenters to try, based on
// network coordinates.
NearestN int
// Datacenters is a fixed list of datacenters to try after NearestN. We
// never try a datacenter multiple times, so those are subtracted from
// this list before proceeding.
Datacenters []string
}
// QueryDNSOptions controls settings when query results are served over DNS.
type QueryDNSOptions struct {
// TTL is the time to live for the served DNS results.
TTL string
}
// ServiceQuery is used to query for a set of healthy nodes offering a specific
// service.
type ServiceQuery struct {
// Service is the service to query.
Service string
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryDatacenterOptions
// If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be
// discarded)
OnlyPassing bool
// Tags are a set of required and/or disallowed tags. If a tag is in
// this list it must be present. If the tag is preceded with "!" then
// it is disallowed.
Tags []string
}
// PreparedQuery defines a complete prepared query, and is the structure we
// maintain in the state store.
type PreparedQuery struct {
// ID is this UUID-based ID for the query, always generated by Consul.
ID string
// Name is an optional friendly name for the query supplied by the
// user. NOTE - if this feature is used then it will reduce the security
// of any read ACL associated with this query/service since this name
// can be used to locate nodes with supplying any ACL.
Name string
// Session is an optional session to tie this query's lifetime to. If
// this is omitted then the query will not expire.
Session string
// Token is the ACL token used when the query was created, and it is
// used when a query is subsequently executed. This token, or a token
// with management privileges, must be used to change the query later.
Token string
// Service defines a service query (leaving things open for other types
// later).
Service ServiceQuery
// DNS has options that control how the results of this query are
// served over DNS.
DNS QueryDNSOptions
RaftIndex
}
type PreparedQueries []*PreparedQuery
type IndexedPreparedQueries struct {
Queries PreparedQueries
QueryMeta
}
type PreparedQueryOp string
const (
PreparedQueryCreate PreparedQueryOp = "create"
PreparedQueryUpdate PreparedQueryOp = "update"
PreparedQueryDelete PreparedQueryOp = "delete"
)
// QueryRequest is used to create or change prepared queries.
type PreparedQueryRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// Op is the operation to apply.
Op PreparedQueryOp
// Query is the query itself.
Query *PreparedQuery
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (q *PreparedQueryRequest) RequestDatacenter() string {
return q.Datacenter
}
// PreparedQuerySpecificRequest is used to get information about a prepared
// query.
type PreparedQuerySpecificRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// QueryID is the ID of a query.
QueryID string
// QueryOptions (unfortunately named here) controls the consistency
// settings for the query lookup itself, as well as the service lookups.
QueryOptions
}
// RequestDatacenter returns the datacenter for a given request.
func (q *PreparedQuerySpecificRequest) RequestDatacenter() string {
return q.Datacenter
}
// PreparedQueryExecuteRequest is used to execute a prepared query.
type PreparedQueryExecuteRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// QueryIDOrName is the ID of a query _or_ the name of one, either can
// be provided.
QueryIDOrName string
// Limit will trim the resulting list down to the given limit.
Limit int
// Source is used to sort the results relative to a given node using
// network coordinates.
Source QuerySource
// QueryOptions (unfortunately named here) controls the consistency
// settings for the query lookup itself, as well as the service lookups.
QueryOptions
}
// RequestDatacenter returns the datacenter for a given request.
func (q *PreparedQueryExecuteRequest) RequestDatacenter() string {
return q.Datacenter
}
// PreparedQueryExecuteRemoteRequest is used when running a local query in a
// remote datacenter.
type PreparedQueryExecuteRemoteRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// Query is a copy of the query to execute. We have to ship the entire
// query over since it won't be present in the remote state store.
Query PreparedQuery
// Limit will trim the resulting list down to the given limit.
Limit int
// QueryOptions (unfortunately named here) controls the consistency
// settings for the the service lookups.
QueryOptions
}
// RequestDatacenter returns the datacenter for a given request.
func (q *PreparedQueryExecuteRemoteRequest) RequestDatacenter() string {
return q.Datacenter
}
// PreparedQueryExecuteResponse has the results of executing a query.
type PreparedQueryExecuteResponse struct {
// Service is the service that was queried.
Service string
// Nodes has the nodes that were output by the query.
Nodes CheckServiceNodes
// DNS has the options for serving these results over DNS.
DNS QueryDNSOptions
// Datacenter is the datacenter that these results came from.
Datacenter string
// Failovers is a count of how many times we had to query a remote
// datacenter.
Failovers int
// QueryMeta has freshness information about the query.
QueryMeta
}

View File

@ -3,6 +3,7 @@ package structs
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"time"
@ -34,6 +35,7 @@ const (
ACLRequestType
TombstoneRequestType
CoordinateBatchUpdateType
PreparedQueryRequestType
)
const (
@ -403,6 +405,35 @@ type CheckServiceNode struct {
}
type CheckServiceNodes []CheckServiceNode
// Shuffle does an in-place random shuffle using the Fisher-Yates algorithm.
func (nodes CheckServiceNodes) Shuffle() {
for i := len(nodes) - 1; i > 0; i-- {
j := rand.Int31() % int32(i+1)
nodes[i], nodes[j] = nodes[j], nodes[i]
}
}
// Filter removes nodes that are failing health checks (and any non-passing
// check if that option is selected). Note that this returns the filtered
// results AND modifies the receiver for performance.
func (nodes CheckServiceNodes) Filter(onlyPassing bool) CheckServiceNodes {
n := len(nodes)
OUTER:
for i := 0; i < n; i++ {
node := nodes[i]
for _, check := range node.Checks {
if check.Status == HealthCritical ||
(onlyPassing && check.Status != HealthPassing) {
nodes[i], nodes[n-1] = nodes[n-1], CheckServiceNode{}
n--
i--
continue OUTER
}
}
}
return nodes[:n]
}
// NodeInfo is used to dump all associated information about
// a node. This is currently used for the UI only, as it is
// rather expensive to generate.

View File

@ -1,7 +1,9 @@
package structs
import (
"fmt"
"reflect"
"strings"
"testing"
)
@ -209,6 +211,108 @@ func TestStructs_HealthCheck_IsSame(t *testing.T) {
check(&other.ServiceName)
}
func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) {
// Make a huge list of nodes.
var nodes CheckServiceNodes
for i := 0; i < 100; i++ {
nodes = append(nodes, CheckServiceNode{
Node: &Node{
Node: fmt.Sprintf("node%d", i),
Address: fmt.Sprintf("127.0.0.%d", i+1),
},
})
}
// Keep track of how many unique shuffles we get.
uniques := make(map[string]struct{})
for i := 0; i < 100; i++ {
nodes.Shuffle()
var names []string
for _, node := range nodes {
names = append(names, node.Node.Node)
}
key := strings.Join(names, "|")
uniques[key] = struct{}{}
}
// We have to allow for the fact that there won't always be a unique
// shuffle each pass, so we just look for smell here without the test
// being flaky.
if len(uniques) < 50 {
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
}
}
func TestStructs_CheckServiceNodes_Filter(t *testing.T) {
nodes := CheckServiceNodes{
CheckServiceNode{
Node: &Node{
Node: "node1",
Address: "127.0.0.1",
},
Checks: HealthChecks{
&HealthCheck{
Status: HealthWarning,
},
},
},
CheckServiceNode{
Node: &Node{
Node: "node2",
Address: "127.0.0.2",
},
Checks: HealthChecks{
&HealthCheck{
Status: HealthPassing,
},
},
},
CheckServiceNode{
Node: &Node{
Node: "node3",
Address: "127.0.0.3",
},
Checks: HealthChecks{
&HealthCheck{
Status: HealthCritical,
},
},
},
}
// Test the case where warnings are allowed.
{
twiddle := make(CheckServiceNodes, len(nodes))
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
filtered := twiddle.Filter(false)
expected := CheckServiceNodes{
nodes[0],
nodes[1],
}
if !reflect.DeepEqual(filtered, expected) {
t.Fatalf("bad: %v", filtered)
}
}
// Limit to only passing checks.
{
twiddle := make(CheckServiceNodes, len(nodes))
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
filtered := twiddle.Filter(true)
expected := CheckServiceNodes{
nodes[1],
}
if !reflect.DeepEqual(filtered, expected) {
t.Fatalf("bad: %v", filtered)
}
}
}
func TestStructs_DirEntry_Clone(t *testing.T) {
e := &DirEntry{
LockIndex: 5,

View File

@ -1,5 +1,5 @@
source "https://rubygems.org"
ruby "2.2.2"
ruby "2.2.3"
gem "middleman-hashicorp", github: "hashicorp/middleman-hashicorp"

View File

@ -174,6 +174,26 @@ rabbitmq.node1.dc1.consul. 0 IN A 10.1.11.20
Again, note that the SRV record returns the port of the service as well as its IP.
### Prepared Query Lookups
The format of a prepared query lookup is:
<query or name>.query[.datacenter].<domain>
The `datacenter` is optional, and if not provided, the datacenter of this Consul
agent is assumed.
The `query or name` is the ID or given name of an existing
[Prepared Query](/docs/agent/http/query.html). These behave like standard service
queries but provide a much richer set of features, such as filtering by multiple
tags and automatically failing over to look for services in remote datacenters if
no healthy nodes are available in the local datacenter.
To allow for simple load balancing, the set of nodes returned is randomized each time.
Both A and SRV records are supported. SRV records provide the port that a service is
registered on, enabling clients to avoid relying on well-known ports. SRV records are
only served if the client specifically requests them.
### UDP Based DNS Queries
When the DNS query is performed using UDP, Consul will truncate the results

View File

@ -21,6 +21,7 @@ Each endpoint manages a different aspect of Consul:
* [event](http/event.html) - User Events
* [health](http/health.html) - Health checks
* [kv](http/kv.html) - Key/Value store
* [query](http/query.html) - Prepared Queries
* [session](http/session.html) - Sessions
* [status](http/status.html) - Consul system status

View File

@ -0,0 +1,319 @@
---
layout: "docs"
page_title: "Prepared Queries (HTTP)"
sidebar_current: "docs-agent-http-query"
description: >
The Query endpoints are used to manage and execute prepared queries.
---
# Prepared Query HTTP Endpoint
The Prepared Query endpoints are used to create, update, destroy, and execute
prepared queries.
Prepared queries allow you to register a complex service query and then execute
it later via its ID or name to get a set of healthy nodes that provide a given
service. This is particularly useful in combination with Consul's
[DNS Interface](/docs/agent/dns.html) as it allows for much richer queries than
would be possible given the limited interface DNS provides.
The following endpoints are supported:
* [`/v1/query`](#general): Creates a new prepared query or lists
all prepared queries
* [`/v1/query/<query>`](#specific): Updates, fetches, or deletes
a prepared query
* [`/v1/query/<query or name>/execute`](#execute): Executes a
prepared query by its ID or optional name
Not all endpoints support blocking queries and all consistency modes,
see details in the sections below.
The query endpoints support the use of ACL tokens. Prepared queries have some
special handling of ACL tokens that are highlighted in the sections below.
### <a name="general"></a> /v1/query
The general query endpoint supports the `POST` and `GET` methods.
#### POST Method
When using the `POST` method, Consul will create a new prepared query and return
its ID if it is created successfully.
By default, the datacenter of the agent is queried; however, the dc can be
provided using the "?dc=" query parameter.
The create operation expects a JSON request body that defines the prepared query,
like this example:
```javascript
{
"Name": "my-query",
"Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"Service": {
"Service": "redis",
"Failover": {
"NearestN": 3,
"Datacenters": ["dc1", "dc2"]
},
"OnlyPassing": false,
"Tags": ["master", "!experimental"]
},
"DNS": {
"TTL": "10s"
}
}
```
Only the `Service` field inside the `Service` structure is mandatory, all other
fields will take their default values if they are not included.
`Name` is an optional friendly name that can be used to execute a query instead
of using its ID.
`Session` provides a way to automatically remove a prepared query when the
given session is invalidated. This is optional, and if not given the prepared
query must be manually removed when no longer needed.
The set of fields inside the `Service` structure define the query's behavior.
`Service` is the name of the service to query. This is required.
`Failover` contains two fields, both of which are optional, and determine what
happens if no healthy nodes are available in the local datacenter when the query
is executed. It allows the use of nodes in other datacenters with very little
configuration.
If `NearestN` is set to a value greater than zero, then the query
will be forwarded to up to `NearestN` other datacenters based on their estimated
network round trip time using [Network Coordinates](/docs/internals/coordinates.html)
from the WAN gossip pool. The median round trip time from the server handling the
query to the servers in the remote datacenter is used to determine the priority.
The default value is zero. All Consul servers must be running version 0.6.0 or
above in order for this feature to work correctly. If any servers are not running
the required version of Consul they will be considered last since they won't have
any available network coordinate information.
`Datacenters` contains a fixed list of remote datacenters to forward the query
to if there are no healthy nodes in the local datacenter. Datacenters are queried
in the order given in the list. If this option is combined with `NearestN`, then
the `NearestN` queries will be performed first, followed by the list given by
`Datacenters`. A given datacenter will only be queried one time during a failover,
even if it is selected by both `NearestN` and is listed in `Datacenters`. The
default value is an empty list.
`OnlyPassing` controls the behavior of the query's health check filtering. If
this is set to false, the results will include nodes with checks in the passing
as well as the warning states. If this is set to true, only nodes with checks
in the passing state will be returned. The default value is false.
`Tags` provides a list of service tags to filter the query results. For a service
to pass the tag filter it must have *all* of the required tags, and *none* of the
excluded tags (prefixed with `!`). The default value is an empty list, which does
no tag filtering.
`TTL` in the `DNS` structure is a duration string that can use "s" as a
suffix for seconds. It controls how the TTL is set when query results are served
over DNS. If this isn't specified, then the Consul agent configuration for the given
service will be used (see [DNS Caching](/docs/guides/dns-cache.html)). If this is
specified, it will take precedence over any Consul agent-specific configuration.
If no TTL is specified here or at the Consul agent level, then the TTL will
default to 0.
The return code is 200 on success and the ID of the created query is returned in
a JSON body:
```javascript
{
"ID": "8f246b77-f3e1-ff88-5b48-8ec93abf3e05"
}
```
If ACLs are enabled, then the provided token will be used to check access to
the service being queried, and it will be saved along with the query for use
when the query is executed. This is key to allowing prepared queries to work
via the DNS interface, and it's important to note that prepared query IDs and
names become a read-only proxy for the token used to create the query.
The query IDs that Consul generates are done in the same manner as ACL tokens,
so provide equal strength, but names may be more guessable and should be used
carefully with ACLs. Also, the token used to create the prepared query (or a
management token) is required to read the query back, so the ability to execute
a prepared query is not enough to get access to the actual token.
#### GET Method
When using the GET method, Consul will provide a listing of all prepared queries.
By default, the datacenter of the agent is queried; however, the dc can be
provided using the "?dc=" query parameter. This endpoint supports blocking
queries and all consistency modes.
Since this listing includes sensitive ACL tokens, this is a privileged endpoint
and always requires a management token to be supplied if ACLs are enabled.
This returns a JSON list of prepared queries, which looks like:
```javascript
[
{
"ID": "8f246b77-f3e1-ff88-5b48-8ec93abf3e05",
"Name": "my-query",
"Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"Token": "",
"Service": {
"Service": "redis",
"Failover": {
"NearestN": 3,
"Datacenters": ["dc1", "dc2"]
},
"OnlyPassing": false,
"Tags": ["master", "!experimental"]
},
"DNS": {
"TTL": "10s"
},
"RaftIndex": {
"CreateIndex": 23,
"ModifyIndex": 42
}
}
]
```
### <a name="specific"></a> /v1/query/\<query\>
The query-specific endpoint supports the `GET`, `PUT`, and `DELETE` methods. The
\<query\> argument is the ID of an existing prepared query.
#### PUT Method
The `PUT` method allows an existing prepared query to be updated.
By default, the datacenter of the agent is queried; however, the dc can be
provided using the "?dc=" query parameter.
If ACLs are enabled, then the same token used to create the query (or a
management token) must be supplied.
The body is the same as is used to create a prepared query, as described above.
If the API call succeeds, a 200 status code is returned.
#### GET Method
The `GET` method allows an existing prepared query to be fetched.
By default, the datacenter of the agent is queried; however, the dc can be
provided using the "?dc=" query parameter. This endpoint supports blocking
queries and all consistency modes.
The returned response is the same as the list of prepared queries above,
only with a single item present. If the query does not exist then a 404
status code will be returned.
If ACLs are enabled, then the same token used to create the query (or a
management token) must be supplied.
#### DELETE Method
The `DELETE` method is used to delete a prepared query.
By default, the datacenter of the agent is queried; however, the dc can be
provided using the "?dc=" query parameter.
If ACLs are enabled, then the same token used to create the query (or a
management token) must be supplied.
No body is required as part of this request.
If the API call succeeds, a 200 status code is returned.
### <a name="execute"></a> /v1/query/\<query or name\>/execute
The query execute endpoint supports only the `GET` method and is used to
execute a prepared query. The \<query or name\> argument is the ID or name
of an existing prepared query.
By default, the datacenter of the agent is queried; however, the dc can be
provided using the "?dc=" query parameter. This endpoint does not support
blocking queries, but it does support all consistency modes.
Adding the optional "?near=" parameter with a node name will sort the resulting
list in ascending order based on the estimated round trip time from that node.
Passing "?near=_agent" will use the agent's node for the sort. If this is not
present, then the nodes will be shuffled randomly and will be in a different
order each time the query is executed.
An optional "?limit=" parameter can be used to limit the size of the list to
the given number of nodes. This is applied after any sorting or shuffling.
The ACL token supplied when the prepared query was created will be used to
execute the request, so no ACL token needs to be supplied (it will be ignored).
No body is required as part of this request.
If the query does not exist then a 404 status code will be returned. Otherwise,
a JSON body will be returned like this:
```javascript
{
"Service": "redis",
"Nodes": [
{
"Node": {
"Node": "foobar",
"Address": "10.1.10.12"
},
"Service": {
"ID": "redis",
"Service": "redis",
"Tags": null,
"Port": 8000
},
"Checks": [
{
"Node": "foobar",
"CheckID": "service:redis",
"Name": "Service 'redis' check",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "redis",
"ServiceName": "redis"
},
{
"Node": "foobar",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "",
"ServiceName": ""
}
],
"DNS": {
"TTL": "10s"
},
"Datacenter": "dc3",
"Failovers": 2
}
}
```
The `Nodes` section contains the list of healthy nodes providing the given
service, as specified by the constraints of the prepared query.
`Service` has the service name that the query was selecting. This is useful
for context in case an empty list of nodes is returned.
`DNS` has information used when serving the results over DNS. This is just a
copy of the structure given when the prepared query was created.
`Datacenter` has the datacenter that ultimately provided the list of nodes
and `Failvovers` has the number of remote datacenters that were queried
while executing the query. This provides some insight into where the data
came from. This will be zero during non-failover operations where there
were healthy nodes found in the local datacenter.

View File

@ -93,3 +93,10 @@ a wildcard TTL and a specific TTL for a service might look like this:
This sets all lookups to "web.service.consul" to use a 30 second TTL
while lookups to "db.service.consul" or "api.service.consul" will use the
5 second TTL from the wildcard.
[Prepared Queries](/docs/agent/http/query.html) provide an additional
level of control over TTL. They allow for the TTL to be defined along with
the query, and they can be changed on the fly by updating the query definition.
If a TTL is not configured for a prepared query, then it will fall back to the
service-specific configuration defined in the Consul agent as described above,
and ultimately to 0 if no TTL is configured for the service in the Consul agent.

View File

@ -145,3 +145,8 @@ the goal of Consul to protect against misbehaving clients.
The primitives provided by sessions and the locking mechanisms of the KV
store can be used to build client-side leader election algorithms.
These are covered in more detail in the [Leader Election guide](/docs/guides/leader-election.html).
## Prepared Query Integration
Prepared queries may be attached to a session in order to automatically delete
the prepared query when the session is invalidated.

View File

@ -178,6 +178,10 @@
<a href="/docs/agent/http/coordinate.html">Network Coordinates</a>
</li>
<li<%= sidebar_current("docs-agent-http-query") %>>
<a href="/docs/agent/http/query.html">Prepared Queries</a>
</li>
<li<%= sidebar_current("docs-agent-http-session") %>>
<a href="/docs/agent/http/session.html">Sessions</a>
</li>