Adds discovery_max_stale (#4004)

Adds a new option to allow service discovery endpoints to return stale results if configured at the agent level.
This commit is contained in:
Preetha 2018-03-30 10:14:44 -05:00 committed by GitHub
parent 1f5e4acf8f
commit 8fbe3dfceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 237 additions and 5 deletions

View File

@ -100,9 +100,17 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (
var out structs.IndexedNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
s.agent.TranslateAddresses(args.Datacenter, out.Nodes)
// Use empty list instead of nil
@ -127,11 +135,18 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request
var out structs.IndexedServices
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Use empty map instead of nil
if out.Services == nil {
@ -172,11 +187,18 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedServiceNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes)
// Use empty list instead of nil
@ -216,11 +238,18 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedNodeServices
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_node_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
if out.NodeServices != nil && out.NodeServices.Node != nil {
s.agent.TranslateAddresses(args.Datacenter, out.NodeServices.Node)
}

View File

@ -655,6 +655,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
EnableDebug: b.boolVal(c.EnableDebug),
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),

View File

@ -174,6 +174,7 @@ type Config struct {
DisableRemoteExec *bool `json:"disable_remote_exec,omitempty" hcl:"disable_remote_exec" mapstructure:"disable_remote_exec"`
DisableUpdateCheck *bool `json:"disable_update_check,omitempty" hcl:"disable_update_check" mapstructure:"disable_update_check"`
DiscardCheckOutput *bool `json:"discard_check_output" hcl:"discard_check_output" mapstructure:"discard_check_output"`
DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"`
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`

View File

@ -473,6 +473,14 @@ type RuntimeConfig struct {
// flag: -datacenter string
Datacenter string
// Defines the maximum stale value for discovery path. Defauls to "0s".
// Discovery paths are /v1/heath/ paths
//
// If not set to 0, it will try to perform stale read and perform only a
// consistent read whenever the value is too old.
// hcl: discovery_max_stale = "duration"
DiscoveryMaxStale time.Duration
// Node name is the name we use to advertise. Defaults to hostname.
//
// NodeName is exposed via /v1/agent/self from here and

View File

@ -2304,6 +2304,7 @@ func TestFullConfig(t *testing.T) {
"disable_remote_exec": true,
"disable_update_check": true,
"discard_check_output": true,
"discovery_max_stale": "5s",
"domain": "7W1xXSqd",
"dns_config": {
"allow_stale": true,
@ -2740,6 +2741,7 @@ func TestFullConfig(t *testing.T) {
disable_remote_exec = true
disable_update_check = true
discard_check_output = true
discovery_max_stale = "5s"
domain = "7W1xXSqd"
dns_config {
allow_stale = true
@ -3067,6 +3069,7 @@ func TestFullConfig(t *testing.T) {
"ae_interval": "10003s",
"check_deregister_interval_min": "27870s",
"check_reap_interval": "10662s",
"discovery_max_stale": "5s",
"segment_limit": 24705,
"segment_name_limit": 27046,
"sync_coordinate_interval_min": "27983s",
@ -3121,6 +3124,7 @@ func TestFullConfig(t *testing.T) {
ae_interval = "10003s"
check_deregister_interval_min = "27870s"
check_reap_interval = "10662s"
discovery_max_stale = "5s"
segment_limit = 24705
segment_name_limit = 27046
sync_coordinate_interval_min = "27983s"
@ -3327,6 +3331,7 @@ func TestFullConfig(t *testing.T) {
DisableRemoteExec: true,
DisableUpdateCheck: true,
DiscardCheckOutput: true,
DiscoveryMaxStale: 5 * time.Second,
EnableACLReplication: true,
EnableAgentTLSForChecks: true,
EnableDebug: true,
@ -4008,6 +4013,7 @@ func TestSanitize(t *testing.T) {
"DisableRemoteExec": false,
"DisableUpdateCheck": false,
"DiscardCheckOutput": false,
"DiscoveryMaxStale": "0s",
"EnableACLReplication": false,
"EnableAgentTLSForChecks": false,
"EnableDebug": false,

View File

@ -30,9 +30,16 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Use empty list instead of nil
if out.HealthChecks == nil {
@ -66,9 +73,16 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Use empty list instead of nil
if out.HealthChecks == nil {
@ -104,9 +118,16 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Use empty list instead of nil
if out.HealthChecks == nil {
@ -149,9 +170,16 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
// Make the RPC request
var out structs.IndexedCheckServiceNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Filter to only passing if specified
if _, ok := params[api.HealthPassing]; ok {

View File

@ -366,6 +366,12 @@ func setKnownLeader(resp http.ResponseWriter, known bool) {
resp.Header().Set("X-Consul-KnownLeader", s)
}
func setConsistency(resp http.ResponseWriter, consistency string) {
if consistency != "" {
resp.Header().Set("X-Consul-Effective-Consistency", consistency)
}
}
// setLastContact is used to set the last contact header
func setLastContact(resp http.ResponseWriter, last time.Duration) {
if last < 0 {
@ -380,6 +386,7 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
setIndex(resp, m.Index)
setLastContact(resp, m.LastContact)
setKnownLeader(resp, m.KnownLeader)
setConsistency(resp, m.ConsistencyLevel)
}
// setHeaders is used to set canonical response header fields
@ -416,13 +423,42 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOpti
// parseConsistency is used to parse the ?stale and ?consistent query params.
// Returns true on error
func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
query := req.URL.Query()
defaults := true
if _, ok := query["stale"]; ok {
b.AllowStale = true
defaults = false
}
if _, ok := query["consistent"]; ok {
b.RequireConsistent = true
defaults = false
}
if _, ok := query["leader"]; ok {
defaults = false
}
if maxStale := query.Get("max_stale"); maxStale != "" {
dur, err := time.ParseDuration(maxStale)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
return true
}
b.MaxStaleDuration = dur
if dur.Nanoseconds() > 0 {
b.AllowStale = true
defaults = false
}
}
// No specific Consistency has been specified by caller
if defaults {
path := req.URL.Path
if strings.HasPrefix(path, "/v1/catalog") || strings.HasPrefix(path, "/v1/health") {
if s.agent.config.DiscoveryMaxStale.Nanoseconds() > 0 {
b.MaxStaleDuration = s.agent.config.DiscoveryMaxStale
b.AllowStale = true
}
}
}
if b.AllowStale && b.RequireConsistent {
resp.WriteHeader(http.StatusBadRequest)
@ -490,7 +526,7 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string {
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {
s.parseDC(req, dc)
s.parseToken(req, &b.Token)
if parseConsistency(resp, req, b) {
if s.parseConsistency(resp, req, b) {
return true
}
return parseWait(resp, req, b)

View File

@ -607,7 +607,9 @@ func TestParseConsistency(t *testing.T) {
var b structs.QueryOptions
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale", nil)
if d := parseConsistency(resp, req, &b); d {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}
@ -620,7 +622,7 @@ func TestParseConsistency(t *testing.T) {
b = structs.QueryOptions{}
req, _ = http.NewRequest("GET", "/v1/catalog/nodes?consistent", nil)
if d := parseConsistency(resp, req, &b); d {
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}
@ -632,13 +634,70 @@ func TestParseConsistency(t *testing.T) {
}
}
// ensureConsistency check if consistency modes are correctly applied
// if maxStale < 0 => stale, without MaxStaleDuration
// if maxStale == 0 => no stale
// if maxStale > 0 => stale + check duration
func ensureConsistency(t *testing.T, a *TestAgent, path string, maxStale time.Duration, requireConsistent bool) {
t.Helper()
req, _ := http.NewRequest("GET", path, nil)
var b structs.QueryOptions
resp := httptest.NewRecorder()
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}
allowStale := maxStale.Nanoseconds() != 0
if b.AllowStale != allowStale {
t.Fatalf("Bad Allow Stale")
}
if maxStale > 0 && b.MaxStaleDuration != maxStale {
t.Fatalf("Bad MaxStaleDuration: %d VS expected %d", b.MaxStaleDuration, maxStale)
}
if b.RequireConsistent != requireConsistent {
t.Fatal("Bad Consistent")
}
}
func TestParseConsistencyAndMaxStale(t *testing.T) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
// Default => Consistent
a.config.DiscoveryMaxStale = time.Duration(0)
ensureConsistency(t, a, "/v1/catalog/nodes", 0, false)
// Stale, without MaxStale
ensureConsistency(t, a, "/v1/catalog/nodes?stale", -1, false)
// Override explicitly
ensureConsistency(t, a, "/v1/catalog/nodes?max_stale=3s", 3*time.Second, false)
ensureConsistency(t, a, "/v1/catalog/nodes?stale&max_stale=3s", 3*time.Second, false)
// stale by defaul on discovery
a.config.DiscoveryMaxStale = time.Duration(7 * time.Second)
ensureConsistency(t, a, "/v1/catalog/nodes", a.config.DiscoveryMaxStale, false)
// Not in KV
ensureConsistency(t, a, "/v1/kv/my/path", 0, false)
// DiscoveryConsistencyLevel should apply
ensureConsistency(t, a, "/v1/health/service/one", a.config.DiscoveryMaxStale, false)
ensureConsistency(t, a, "/v1/catalog/service/one", a.config.DiscoveryMaxStale, false)
ensureConsistency(t, a, "/v1/catalog/services", a.config.DiscoveryMaxStale, false)
// Query path should be taken into account
ensureConsistency(t, a, "/v1/catalog/services?consistent", 0, true)
// Since stale is added, no MaxStale should be applied
ensureConsistency(t, a, "/v1/catalog/services?stale", -1, false)
ensureConsistency(t, a, "/v1/catalog/services?leader", 0, false)
}
func TestParseConsistency_Invalid(t *testing.T) {
t.Parallel()
resp := httptest.NewRecorder()
var b structs.QueryOptions
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale&consistent", nil)
if d := parseConsistency(resp, req, &b); !d {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
if d := a.srv.parseConsistency(resp, req, &b); !d {
t.Fatalf("expected done")
}

View File

@ -43,9 +43,17 @@ func (s *HTTPServer) preparedQueryList(resp http.ResponseWriter, req *http.Reque
}
var reply structs.IndexedPreparedQueries
defer setMeta(resp, &reply.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("PreparedQuery.List", &args, &reply); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Use empty list instead of nil.
if reply.Queries == nil {
@ -100,6 +108,8 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
}
var reply structs.PreparedQueryExecuteResponse
defer setMeta(resp, &reply.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("PreparedQuery.Execute", &args, &reply); err != nil {
// We have to check the string since the RPC sheds
// the specific error type.
@ -110,6 +120,12 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
}
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Note that we translate using the DC that the results came from, since
// a query can fail over to a different DC than where the execute request
@ -145,6 +161,8 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r
}
var reply structs.PreparedQueryExplainResponse
defer setMeta(resp, &reply.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("PreparedQuery.Explain", &args, &reply); err != nil {
// We have to check the string since the RPC sheds
// the specific error type.
@ -155,6 +173,12 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r
}
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
return reply, nil
}
@ -168,6 +192,8 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req *
}
var reply structs.IndexedPreparedQueries
defer setMeta(resp, &reply.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("PreparedQuery.Get", &args, &reply); err != nil {
// We have to check the string since the RPC sheds
// the specific error type.
@ -178,6 +204,12 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req *
}
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
return reply.Queries, nil
}

View File

@ -110,6 +110,11 @@ type QueryOptions struct {
// If set, the leader must verify leadership prior to
// servicing the request. Prevents a stale read.
RequireConsistent bool
// If set and AllowStale is true, will try first a stale
// read, and then will perform a consistent read if stale
// read is older than value
MaxStaleDuration time.Duration
}
// IsRead is always true for QueryOption.
@ -117,6 +122,17 @@ func (q QueryOptions) IsRead() bool {
return true
}
// ConsistencyLevel display the consistency required by a request
func (q QueryOptions) ConsistencyLevel() string {
if q.RequireConsistent {
return "consistent"
} else if q.AllowStale {
return "stale"
} else {
return "leader"
}
}
func (q QueryOptions) AllowStaleRead() bool {
return q.AllowStale
}
@ -157,6 +173,11 @@ type QueryMeta struct {
// Used to indicate if there is a known leader node
KnownLeader bool
// Consistencylevel returns the consistency used to serve the query
// Having `discovery_max_stale` on the agent can affect whether
// the request was served by a leader.
ConsistencyLevel string
}
// RegisterRequest is used for the Catalog.Register endpoint

View File

@ -917,6 +917,17 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
leader, so this lets Consul continue serving requests in long outage scenarios where no leader can
be elected.
* <a name="discovery_max_stale"></a><a href="discovery_max_stale">`discovery_max_stale`</a> - Enables
stale requests for all service discovery HTTP endpoints. This is equivalent to the
[`max_stale`](#max_stale) configuration for DNS requests. If this value is zero (default), all service
discovery HTTP endpoints are forwarded to the leader. If this value is greater than zero, any Consul server
can handle the service discovery request. If a Consul server is behind the leader by more than `discovery_max_stale`,
the query will be re-evaluated on the leader to get more up-to-date results. Consul agents also add a new
`X-Consul-Effective-Consistency` response header which indicates if the agent did a stale read. `discover-max-stale`
was introduced in Consul 1.0.7 as a way for Consul operators to force stale requests from clients at the agent level,
and defaults to zero which matches default consistency behavior in earlier Consul versions.
* <a name="node_ttl"></a><a href="#node_ttl">`node_ttl`</a> - By default, this is "0s", so all
node lookups are served with a 0 TTL value. DNS caching for node lookups can be enabled by
setting this value. This should be specified with the "s" suffix for second or "m" for minute.