diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index ac330f3b1..0088741e1 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -100,9 +100,17 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( var out structs.IndexedNodes defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() + s.agent.TranslateAddresses(args.Datacenter, out.Nodes) // Use empty list instead of nil @@ -127,11 +135,18 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request var out structs.IndexedServices defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty map instead of nil if out.Services == nil { @@ -172,11 +187,18 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedServiceNodes defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes) // Use empty list instead of nil @@ -216,11 +238,18 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedNodeServices defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_node_services"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() if out.NodeServices != nil && out.NodeServices.Node != nil { s.agent.TranslateAddresses(args.Datacenter, out.NodeServices.Node) } diff --git a/agent/config/builder.go b/agent/config/builder.go index 22e451bf5..e9cb19394 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -655,6 +655,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { DisableRemoteExec: b.boolVal(c.DisableRemoteExec), DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck), DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput), + DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale), EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks), EnableDebug: b.boolVal(c.EnableDebug), EnableScriptChecks: b.boolVal(c.EnableScriptChecks), diff --git a/agent/config/config.go b/agent/config/config.go index 6b23449ed..2a14abc6b 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -174,6 +174,7 @@ type Config struct { DisableRemoteExec *bool `json:"disable_remote_exec,omitempty" hcl:"disable_remote_exec" mapstructure:"disable_remote_exec"` DisableUpdateCheck *bool `json:"disable_update_check,omitempty" hcl:"disable_update_check" mapstructure:"disable_update_check"` DiscardCheckOutput *bool `json:"discard_check_output" hcl:"discard_check_output" mapstructure:"discard_check_output"` + DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"` EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"` EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"` EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 30cc1c656..f43fac24f 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -473,6 +473,14 @@ type RuntimeConfig struct { // flag: -datacenter string Datacenter string + // Defines the maximum stale value for discovery path. Defauls to "0s". + // Discovery paths are /v1/heath/ paths + // + // If not set to 0, it will try to perform stale read and perform only a + // consistent read whenever the value is too old. + // hcl: discovery_max_stale = "duration" + DiscoveryMaxStale time.Duration + // Node name is the name we use to advertise. Defaults to hostname. // // NodeName is exposed via /v1/agent/self from here and diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index d7ff1d2bd..da62e0339 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -2304,6 +2304,7 @@ func TestFullConfig(t *testing.T) { "disable_remote_exec": true, "disable_update_check": true, "discard_check_output": true, + "discovery_max_stale": "5s", "domain": "7W1xXSqd", "dns_config": { "allow_stale": true, @@ -2740,6 +2741,7 @@ func TestFullConfig(t *testing.T) { disable_remote_exec = true disable_update_check = true discard_check_output = true + discovery_max_stale = "5s" domain = "7W1xXSqd" dns_config { allow_stale = true @@ -3067,6 +3069,7 @@ func TestFullConfig(t *testing.T) { "ae_interval": "10003s", "check_deregister_interval_min": "27870s", "check_reap_interval": "10662s", + "discovery_max_stale": "5s", "segment_limit": 24705, "segment_name_limit": 27046, "sync_coordinate_interval_min": "27983s", @@ -3121,6 +3124,7 @@ func TestFullConfig(t *testing.T) { ae_interval = "10003s" check_deregister_interval_min = "27870s" check_reap_interval = "10662s" + discovery_max_stale = "5s" segment_limit = 24705 segment_name_limit = 27046 sync_coordinate_interval_min = "27983s" @@ -3327,6 +3331,7 @@ func TestFullConfig(t *testing.T) { DisableRemoteExec: true, DisableUpdateCheck: true, DiscardCheckOutput: true, + DiscoveryMaxStale: 5 * time.Second, EnableACLReplication: true, EnableAgentTLSForChecks: true, EnableDebug: true, @@ -4008,6 +4013,7 @@ func TestSanitize(t *testing.T) { "DisableRemoteExec": false, "DisableUpdateCheck": false, "DiscardCheckOutput": false, + "DiscoveryMaxStale": "0s", "EnableACLReplication": false, "EnableAgentTLSForChecks": false, "EnableDebug": false, diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index c04cc423b..9c0aac2b6 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -30,9 +30,16 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil if out.HealthChecks == nil { @@ -66,9 +73,16 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil if out.HealthChecks == nil { @@ -104,9 +118,16 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil if out.HealthChecks == nil { @@ -149,9 +170,16 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ // Make the RPC request var out structs.IndexedCheckServiceNodes defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Filter to only passing if specified if _, ok := params[api.HealthPassing]; ok { diff --git a/agent/http.go b/agent/http.go index 823606f2f..ac254795e 100644 --- a/agent/http.go +++ b/agent/http.go @@ -366,6 +366,12 @@ func setKnownLeader(resp http.ResponseWriter, known bool) { resp.Header().Set("X-Consul-KnownLeader", s) } +func setConsistency(resp http.ResponseWriter, consistency string) { + if consistency != "" { + resp.Header().Set("X-Consul-Effective-Consistency", consistency) + } +} + // setLastContact is used to set the last contact header func setLastContact(resp http.ResponseWriter, last time.Duration) { if last < 0 { @@ -380,6 +386,7 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) { setIndex(resp, m.Index) setLastContact(resp, m.LastContact) setKnownLeader(resp, m.KnownLeader) + setConsistency(resp, m.ConsistencyLevel) } // setHeaders is used to set canonical response header fields @@ -416,13 +423,42 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOpti // parseConsistency is used to parse the ?stale and ?consistent query params. // Returns true on error -func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { +func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { query := req.URL.Query() + defaults := true if _, ok := query["stale"]; ok { b.AllowStale = true + defaults = false } if _, ok := query["consistent"]; ok { b.RequireConsistent = true + defaults = false + } + if _, ok := query["leader"]; ok { + defaults = false + } + if maxStale := query.Get("max_stale"); maxStale != "" { + dur, err := time.ParseDuration(maxStale) + if err != nil { + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale) + return true + } + b.MaxStaleDuration = dur + if dur.Nanoseconds() > 0 { + b.AllowStale = true + defaults = false + } + } + // No specific Consistency has been specified by caller + if defaults { + path := req.URL.Path + if strings.HasPrefix(path, "/v1/catalog") || strings.HasPrefix(path, "/v1/health") { + if s.agent.config.DiscoveryMaxStale.Nanoseconds() > 0 { + b.MaxStaleDuration = s.agent.config.DiscoveryMaxStale + b.AllowStale = true + } + } } if b.AllowStale && b.RequireConsistent { resp.WriteHeader(http.StatusBadRequest) @@ -490,7 +526,7 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string { func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool { s.parseDC(req, dc) s.parseToken(req, &b.Token) - if parseConsistency(resp, req, b) { + if s.parseConsistency(resp, req, b) { return true } return parseWait(resp, req, b) diff --git a/agent/http_test.go b/agent/http_test.go index 23c90014e..42e0eec55 100644 --- a/agent/http_test.go +++ b/agent/http_test.go @@ -607,7 +607,9 @@ func TestParseConsistency(t *testing.T) { var b structs.QueryOptions req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale", nil) - if d := parseConsistency(resp, req, &b); d { + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + if d := a.srv.parseConsistency(resp, req, &b); d { t.Fatalf("unexpected done") } @@ -620,7 +622,7 @@ func TestParseConsistency(t *testing.T) { b = structs.QueryOptions{} req, _ = http.NewRequest("GET", "/v1/catalog/nodes?consistent", nil) - if d := parseConsistency(resp, req, &b); d { + if d := a.srv.parseConsistency(resp, req, &b); d { t.Fatalf("unexpected done") } @@ -632,13 +634,70 @@ func TestParseConsistency(t *testing.T) { } } +// ensureConsistency check if consistency modes are correctly applied +// if maxStale < 0 => stale, without MaxStaleDuration +// if maxStale == 0 => no stale +// if maxStale > 0 => stale + check duration +func ensureConsistency(t *testing.T, a *TestAgent, path string, maxStale time.Duration, requireConsistent bool) { + t.Helper() + req, _ := http.NewRequest("GET", path, nil) + var b structs.QueryOptions + resp := httptest.NewRecorder() + if d := a.srv.parseConsistency(resp, req, &b); d { + t.Fatalf("unexpected done") + } + allowStale := maxStale.Nanoseconds() != 0 + if b.AllowStale != allowStale { + t.Fatalf("Bad Allow Stale") + } + if maxStale > 0 && b.MaxStaleDuration != maxStale { + t.Fatalf("Bad MaxStaleDuration: %d VS expected %d", b.MaxStaleDuration, maxStale) + } + if b.RequireConsistent != requireConsistent { + t.Fatal("Bad Consistent") + } +} + +func TestParseConsistencyAndMaxStale(t *testing.T) { + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + // Default => Consistent + a.config.DiscoveryMaxStale = time.Duration(0) + ensureConsistency(t, a, "/v1/catalog/nodes", 0, false) + // Stale, without MaxStale + ensureConsistency(t, a, "/v1/catalog/nodes?stale", -1, false) + // Override explicitly + ensureConsistency(t, a, "/v1/catalog/nodes?max_stale=3s", 3*time.Second, false) + ensureConsistency(t, a, "/v1/catalog/nodes?stale&max_stale=3s", 3*time.Second, false) + + // stale by defaul on discovery + a.config.DiscoveryMaxStale = time.Duration(7 * time.Second) + ensureConsistency(t, a, "/v1/catalog/nodes", a.config.DiscoveryMaxStale, false) + // Not in KV + ensureConsistency(t, a, "/v1/kv/my/path", 0, false) + + // DiscoveryConsistencyLevel should apply + ensureConsistency(t, a, "/v1/health/service/one", a.config.DiscoveryMaxStale, false) + ensureConsistency(t, a, "/v1/catalog/service/one", a.config.DiscoveryMaxStale, false) + ensureConsistency(t, a, "/v1/catalog/services", a.config.DiscoveryMaxStale, false) + + // Query path should be taken into account + ensureConsistency(t, a, "/v1/catalog/services?consistent", 0, true) + // Since stale is added, no MaxStale should be applied + ensureConsistency(t, a, "/v1/catalog/services?stale", -1, false) + ensureConsistency(t, a, "/v1/catalog/services?leader", 0, false) +} + func TestParseConsistency_Invalid(t *testing.T) { t.Parallel() resp := httptest.NewRecorder() var b structs.QueryOptions req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale&consistent", nil) - if d := parseConsistency(resp, req, &b); !d { + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + if d := a.srv.parseConsistency(resp, req, &b); !d { t.Fatalf("expected done") } diff --git a/agent/prepared_query_endpoint.go b/agent/prepared_query_endpoint.go index 532cf70f4..38c9b4f39 100644 --- a/agent/prepared_query_endpoint.go +++ b/agent/prepared_query_endpoint.go @@ -43,9 +43,17 @@ func (s *HTTPServer) preparedQueryList(resp http.ResponseWriter, req *http.Reque } var reply structs.IndexedPreparedQueries + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.List", &args, &reply); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil. if reply.Queries == nil { @@ -100,6 +108,8 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r } var reply structs.PreparedQueryExecuteResponse + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.Execute", &args, &reply); err != nil { // We have to check the string since the RPC sheds // the specific error type. @@ -110,6 +120,12 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r } return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Note that we translate using the DC that the results came from, since // a query can fail over to a different DC than where the execute request @@ -145,6 +161,8 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r } var reply structs.PreparedQueryExplainResponse + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.Explain", &args, &reply); err != nil { // We have to check the string since the RPC sheds // the specific error type. @@ -155,6 +173,12 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r } return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() return reply, nil } @@ -168,6 +192,8 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req * } var reply structs.IndexedPreparedQueries + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.Get", &args, &reply); err != nil { // We have to check the string since the RPC sheds // the specific error type. @@ -178,6 +204,12 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req * } return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() return reply.Queries, nil } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 92f942d0d..9661e5ac1 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -110,6 +110,11 @@ type QueryOptions struct { // If set, the leader must verify leadership prior to // servicing the request. Prevents a stale read. RequireConsistent bool + + // If set and AllowStale is true, will try first a stale + // read, and then will perform a consistent read if stale + // read is older than value + MaxStaleDuration time.Duration } // IsRead is always true for QueryOption. @@ -117,6 +122,17 @@ func (q QueryOptions) IsRead() bool { return true } +// ConsistencyLevel display the consistency required by a request +func (q QueryOptions) ConsistencyLevel() string { + if q.RequireConsistent { + return "consistent" + } else if q.AllowStale { + return "stale" + } else { + return "leader" + } +} + func (q QueryOptions) AllowStaleRead() bool { return q.AllowStale } @@ -157,6 +173,11 @@ type QueryMeta struct { // Used to indicate if there is a known leader node KnownLeader bool + + // Consistencylevel returns the consistency used to serve the query + // Having `discovery_max_stale` on the agent can affect whether + // the request was served by a leader. + ConsistencyLevel string } // RegisterRequest is used for the Catalog.Register endpoint diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md index aa33ea2b6..0dcf866e2 100644 --- a/website/source/docs/agent/options.html.md +++ b/website/source/docs/agent/options.html.md @@ -917,6 +917,17 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass leader, so this lets Consul continue serving requests in long outage scenarios where no leader can be elected. + * `discovery_max_stale` - Enables + stale requests for all service discovery HTTP endpoints. This is equivalent to the + [`max_stale`](#max_stale) configuration for DNS requests. If this value is zero (default), all service + discovery HTTP endpoints are forwarded to the leader. If this value is greater than zero, any Consul server + can handle the service discovery request. If a Consul server is behind the leader by more than `discovery_max_stale`, + the query will be re-evaluated on the leader to get more up-to-date results. Consul agents also add a new + `X-Consul-Effective-Consistency` response header which indicates if the agent did a stale read. `discover-max-stale` + was introduced in Consul 1.0.7 as a way for Consul operators to force stale requests from clients at the agent level, + and defaults to zero which matches default consistency behavior in earlier Consul versions. + + * `node_ttl` - By default, this is "0s", so all node lookups are served with a 0 TTL value. DNS caching for node lookups can be enabled by setting this value. This should be specified with the "s" suffix for second or "m" for minute.