diff --git a/CHANGELOG.md b/CHANGELOG.md index bbe20fe48..4adc7a066 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ IMPROVEMENTS: * agent: List of supported TLS cipher suites updated to include newer options, [[GH-3962](https://github.com/hashicorp/consul/pull/3962)] * agent: WAN federation can now be disabled by setting the serf WAN port to -1. [[GH-3984](https://github.com/hashicorp/consul/issues/3984)] * agent: Added support for specifying metadata during service registration. [[GH-3881](https://github.com/hashicorp/consul/issues/3881)] +* agent: Added a new `discover-max-stale` config option to enable stale requests for service discovery endpoints. [[GH-4004](https://github.com/hashicorp/consul/issues/4004)] * dns: Introduced a new config param to limit the number of A/AAAA records returned. [[GH-3940](https://github.com/hashicorp/consul/issues/3940)] * dns: Upgrade vendored DNS library to pick up bugfixes and improvements. [[GH-3978](https://github.com/hashicorp/consul/issues/3978)] * server: Updated yamux library to pick up a performance improvement. [[GH-3982](https://github.com/hashicorp/consul/issues/3982)] @@ -16,7 +17,8 @@ BUG FIXES: * agent: Fixed an issue where the coordinate update endpoint was not correctly parsing the ACL token. [[GH-3892](https://github.com/hashicorp/consul/issues/3892)] * agent: Fixed an issue where `consul monitor` couldn't be terminated until the first log line is delivered [[GH-3891](https://github.com/hashicorp/consul/issues/3891)] -* agent: Added warnings for when a node name isn't a valid DNS name and when the node name, a service name or service tags would exceed the allowed lengths for DNS names [[GH-3854](https://github.com/hashicorp/consul/issue/3854)] +* agent: Added warnings for when a node name isn't a valid DNS name and when the node name, a service name or service tags would exceed the allowed lengths for DNS names [[GH-3854](https://github.com/hashicorp/consul/issues/3854)] +* agent: Added truncation of TCP DNS responses to prevent errors for exceeding message size limits [[GH-3850](https://github.com/hashicorp/consul/issues/3850)] * server: Fixed an issue where the leader could miss clean up after a leadership transition. [[GH-3909](https://github.com/hashicorp/consul/issues/3909)] ## 1.0.6 (February 9, 2018) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 2e7d2f0ca..8904f436d 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -515,7 +515,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re // Get the node service. ns := args.NodeService() - if err := structs.ValidateMetadata(ns.ServiceMeta, false); err != nil { + if err := structs.ValidateMetadata(ns.Meta, false); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, fmt.Errorf("Invalid Service Meta: %v", err)) return nil, nil diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index ac330f3b1..0088741e1 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -100,9 +100,17 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( var out structs.IndexedNodes defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() + s.agent.TranslateAddresses(args.Datacenter, out.Nodes) // Use empty list instead of nil @@ -127,11 +135,18 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request var out structs.IndexedServices defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty map instead of nil if out.Services == nil { @@ -172,11 +187,18 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedServiceNodes defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes) // Use empty list instead of nil @@ -216,11 +238,18 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedNodeServices defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_node_services"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() if out.NodeServices != nil && out.NodeServices.Node != nil { s.agent.TranslateAddresses(args.Datacenter, out.NodeServices.Node) } diff --git a/agent/config/builder.go b/agent/config/builder.go index 22e451bf5..e9cb19394 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -655,6 +655,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { DisableRemoteExec: b.boolVal(c.DisableRemoteExec), DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck), DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput), + DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale), EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks), EnableDebug: b.boolVal(c.EnableDebug), EnableScriptChecks: b.boolVal(c.EnableScriptChecks), diff --git a/agent/config/config.go b/agent/config/config.go index 6b23449ed..2a14abc6b 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -174,6 +174,7 @@ type Config struct { DisableRemoteExec *bool `json:"disable_remote_exec,omitempty" hcl:"disable_remote_exec" mapstructure:"disable_remote_exec"` DisableUpdateCheck *bool `json:"disable_update_check,omitempty" hcl:"disable_update_check" mapstructure:"disable_update_check"` DiscardCheckOutput *bool `json:"discard_check_output" hcl:"discard_check_output" mapstructure:"discard_check_output"` + DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"` EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"` EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"` EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 30cc1c656..f43fac24f 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -473,6 +473,14 @@ type RuntimeConfig struct { // flag: -datacenter string Datacenter string + // Defines the maximum stale value for discovery path. Defauls to "0s". + // Discovery paths are /v1/heath/ paths + // + // If not set to 0, it will try to perform stale read and perform only a + // consistent read whenever the value is too old. + // hcl: discovery_max_stale = "duration" + DiscoveryMaxStale time.Duration + // Node name is the name we use to advertise. Defaults to hostname. // // NodeName is exposed via /v1/agent/self from here and diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 4c4358c58..da62e0339 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -2304,6 +2304,7 @@ func TestFullConfig(t *testing.T) { "disable_remote_exec": true, "disable_update_check": true, "discard_check_output": true, + "discovery_max_stale": "5s", "domain": "7W1xXSqd", "dns_config": { "allow_stale": true, @@ -2740,6 +2741,7 @@ func TestFullConfig(t *testing.T) { disable_remote_exec = true disable_update_check = true discard_check_output = true + discovery_max_stale = "5s" domain = "7W1xXSqd" dns_config { allow_stale = true @@ -3067,6 +3069,7 @@ func TestFullConfig(t *testing.T) { "ae_interval": "10003s", "check_deregister_interval_min": "27870s", "check_reap_interval": "10662s", + "discovery_max_stale": "5s", "segment_limit": 24705, "segment_name_limit": 27046, "sync_coordinate_interval_min": "27983s", @@ -3121,6 +3124,7 @@ func TestFullConfig(t *testing.T) { ae_interval = "10003s" check_deregister_interval_min = "27870s" check_reap_interval = "10662s" + discovery_max_stale = "5s" segment_limit = 24705 segment_name_limit = 27046 sync_coordinate_interval_min = "27983s" @@ -3327,6 +3331,7 @@ func TestFullConfig(t *testing.T) { DisableRemoteExec: true, DisableUpdateCheck: true, DiscardCheckOutput: true, + DiscoveryMaxStale: 5 * time.Second, EnableACLReplication: true, EnableAgentTLSForChecks: true, EnableDebug: true, @@ -4008,6 +4013,7 @@ func TestSanitize(t *testing.T) { "DisableRemoteExec": false, "DisableUpdateCheck": false, "DiscardCheckOutput": false, + "DiscoveryMaxStale": "0s", "EnableACLReplication": false, "EnableAgentTLSForChecks": false, "EnableDebug": false, @@ -4096,9 +4102,9 @@ func TestSanitize(t *testing.T) { "Checks": [], "EnableTagOverride": false, "ID": "", + "Meta": {}, "Name": "foo", "Port": 0, - "ServiceMeta": {}, "Tags": [], "Token": "hidden" } diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 91489f57a..36fa42a54 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -614,7 +614,7 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st return fmt.Errorf("failed service lookup: %s", err) } - if err = structs.ValidateMetadata(svc.ServiceMeta, false); err != nil { + if err = structs.ValidateMetadata(svc.Meta, false); err != nil { return fmt.Errorf("Invalid Service Meta for node %s and serviceID %s: %v", node, svc.ID, err) } // Create the service node entry and populate the indexes. Note that diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 9ffed8fba..3fbb8124f 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -69,17 +69,17 @@ func TestStateStore_EnsureRegistration(t *testing.T) { } verifyNode() - // Add in a invalid service definition with too long Key value for ServiceMeta + // Add in a invalid service definition with too long Key value for Meta req.Service = &structs.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - ServiceMeta: map[string]string{strings.Repeat("a", 129): "somevalue"}, - Tags: []string{"master"}, + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + Meta: map[string]string{strings.Repeat("a", 129): "somevalue"}, + Tags: []string{"master"}, } if err := s.EnsureRegistration(9, req); err == nil { - t.Fatalf("Service should not have been registered since ServiceMeta is invalid") + t.Fatalf("Service should not have been registered since Meta is invalid") } // Add in a service definition. diff --git a/agent/dns.go b/agent/dns.go index 5cd175ab5..d50383449 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -717,6 +717,56 @@ func syncExtra(index map[string]dns.RR, resp *dns.Msg) { resp.Extra = extra } +// trimTCPResponse limit the MaximumSize of messages to 64k as it is the limit +// of DNS responses +func (d *DNSServer) trimTCPResponse(req, resp *dns.Msg) (trimmed bool) { + hasExtra := len(resp.Extra) > 0 + // There is some overhead, 65535 does not work + maxSize := 65533 // 64k - 2 bytes + // In order to compute properly, we have to avoid compress first + compressed := resp.Compress + resp.Compress = false + + // We avoid some function calls and allocations by only handling the + // extra data when necessary. + var index map[string]dns.RR + originalSize := resp.Len() + originalNumRecords := len(resp.Answer) + + // Beyond 2500 records, performance gets bad + // Limit the number of records at once, anyway, it won't fit in 64k + // For SRV Records, the max is around 500 records, for A, less than 2k + truncateAt := 2048 + if req.Question[0].Qtype == dns.TypeSRV { + truncateAt = 640 + } + if len(resp.Answer) > truncateAt { + resp.Answer = resp.Answer[:truncateAt] + } + if hasExtra { + index = make(map[string]dns.RR, len(resp.Extra)) + indexRRs(resp.Extra, index) + } + truncated := false + + // This enforces the given limit on 64k, the max limit for DNS messages + for len(resp.Answer) > 0 && resp.Len() > maxSize { + truncated = true + resp.Answer = resp.Answer[:len(resp.Answer)-1] + if hasExtra { + syncExtra(index, resp) + } + } + if truncated { + d.logger.Printf("[DEBUG] dns: TCP answer to %v too large truncated recs:=%d/%d, size:=%d/%d", + req.Question, + len(resp.Answer), originalNumRecords, resp.Len(), originalSize) + } + // Restore compression if any + resp.Compress = compressed + return truncated +} + // trimUDPResponse makes sure a UDP response is not longer than allowed by RFC // 1035. Enforce an arbitrary limit that can be further ratcheted down by // config, and then make sure the response doesn't exceed 512 bytes. Any extra @@ -769,6 +819,20 @@ func trimUDPResponse(req, resp *dns.Msg, udpAnswerLimit int) (trimmed bool) { return len(resp.Answer) < numAnswers } +// trimDNSResponse will trim the response for UDP and TCP +func (d *DNSServer) trimDNSResponse(network string, req, resp *dns.Msg) (trimmed bool) { + if network != "tcp" { + trimmed = trimUDPResponse(req, resp, d.config.UDPAnswerLimit) + } else { + trimmed = d.trimTCPResponse(req, resp) + } + // Flag that there are more records to return in the UDP response + if trimmed && d.config.EnableTruncate { + resp.Truncated = true + } + return trimmed +} + // lookupServiceNodes returns nodes with a given service. func (d *DNSServer) lookupServiceNodes(datacenter, service, tag string) (structs.IndexedCheckServiceNodes, error) { args := structs.ServiceSpecificRequest{ @@ -844,15 +908,7 @@ func (d *DNSServer) serviceLookup(network, datacenter, service, tag string, req, d.serviceNodeRecords(datacenter, out.Nodes, req, resp, ttl) } - // If the network is not TCP, restrict the number of responses - if network != "tcp" { - wasTrimmed := trimUDPResponse(req, resp, d.config.UDPAnswerLimit) - - // Flag that there are more records to return in the UDP response - if wasTrimmed && d.config.EnableTruncate { - resp.Truncated = true - } - } + d.trimDNSResponse(network, req, resp) // If the answer is empty and the response isn't truncated, return not found if len(resp.Answer) == 0 && !resp.Truncated { @@ -954,15 +1010,7 @@ RPC: d.serviceNodeRecords(out.Datacenter, out.Nodes, req, resp, ttl) } - // If the network is not TCP, restrict the number of responses. - if network != "tcp" { - wasTrimmed := trimUDPResponse(req, resp, d.config.UDPAnswerLimit) - - // Flag that there are more records to return in the UDP response - if wasTrimmed && d.config.EnableTruncate { - resp.Truncated = true - } - } + d.trimDNSResponse(network, req, resp) // If the answer is empty and the response isn't truncated, return not found if len(resp.Answer) == 0 && !resp.Truncated { diff --git a/agent/dns_test.go b/agent/dns_test.go index 5d100126d..2ac630625 100644 --- a/agent/dns_test.go +++ b/agent/dns_test.go @@ -2740,6 +2740,97 @@ func TestDNS_ServiceLookup_Randomize(t *testing.T) { } } +func TestDNS_TCP_and_UDP_Truncate(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), ` + dns_config { + enable_truncate = true + } + `) + defer a.Shutdown() + + services := []string{"normal", "truncated"} + for index, service := range services { + numServices := (index * 5000) + 2 + for i := 1; i < numServices; i++ { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: fmt.Sprintf("%s-%d.acme.com", service, i), + Address: fmt.Sprintf("127.%d.%d.%d", index, (i / 255), i%255), + Service: &structs.NodeService{ + Service: service, + Port: 8000, + }, + } + + var out struct{} + if err := a.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Register an equivalent prepared query. + var id string + { + args := &structs.PreparedQueryRequest{ + Datacenter: "dc1", + Op: structs.PreparedQueryCreate, + Query: &structs.PreparedQuery{ + Name: service, + Service: structs.ServiceQuery{ + Service: service, + }, + }, + } + if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Look up the service directly and via prepared query. Ensure the + // response is truncated each time. + questions := []string{ + fmt.Sprintf("%s.service.consul.", service), + id + ".query.consul.", + } + protocols := []string{ + "tcp", + "udp", + } + for _, qType := range []uint16{dns.TypeANY, dns.TypeA, dns.TypeSRV} { + for _, question := range questions { + for _, protocol := range protocols { + for _, compress := range []bool{true, false} { + t.Run(fmt.Sprintf("lookup %s %s (qType:=%d) compressed=%v", question, protocol, qType, compress), func(t *testing.T) { + m := new(dns.Msg) + m.SetQuestion(question, dns.TypeANY) + if protocol == "udp" { + m.SetEdns0(8192, true) + } + c := new(dns.Client) + c.Net = protocol + m.Compress = compress + in, out, err := c.Exchange(m, a.DNSAddr()) + if err != nil && err != dns.ErrTruncated { + t.Fatalf("err: %v", err) + } + + // Check for the truncate bit + shouldBeTruncated := numServices > 4095 + + if shouldBeTruncated != in.Truncated || len(in.Answer) > 2000 || len(in.Answer) < 1 || in.Len() > 65535 { + info := fmt.Sprintf("service %s question:=%s (%s) (%d total records) sz:= %d in %v", + service, question, protocol, numServices, len(in.Answer), out) + t.Fatalf("Should have truncated:=%v for %s", shouldBeTruncated, info) + } + }) + } + } + } + } + } +} + func TestDNS_ServiceLookup_Truncate(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), ` diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index c04cc423b..9c0aac2b6 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -30,9 +30,16 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil if out.HealthChecks == nil { @@ -66,9 +73,16 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil if out.HealthChecks == nil { @@ -104,9 +118,16 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil if out.HealthChecks == nil { @@ -149,9 +170,16 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ // Make the RPC request var out structs.IndexedCheckServiceNodes defer setMeta(resp, &out.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Filter to only passing if specified if _, ok := params[api.HealthPassing]; ok { diff --git a/agent/http.go b/agent/http.go index 823606f2f..ac254795e 100644 --- a/agent/http.go +++ b/agent/http.go @@ -366,6 +366,12 @@ func setKnownLeader(resp http.ResponseWriter, known bool) { resp.Header().Set("X-Consul-KnownLeader", s) } +func setConsistency(resp http.ResponseWriter, consistency string) { + if consistency != "" { + resp.Header().Set("X-Consul-Effective-Consistency", consistency) + } +} + // setLastContact is used to set the last contact header func setLastContact(resp http.ResponseWriter, last time.Duration) { if last < 0 { @@ -380,6 +386,7 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) { setIndex(resp, m.Index) setLastContact(resp, m.LastContact) setKnownLeader(resp, m.KnownLeader) + setConsistency(resp, m.ConsistencyLevel) } // setHeaders is used to set canonical response header fields @@ -416,13 +423,42 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOpti // parseConsistency is used to parse the ?stale and ?consistent query params. // Returns true on error -func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { +func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { query := req.URL.Query() + defaults := true if _, ok := query["stale"]; ok { b.AllowStale = true + defaults = false } if _, ok := query["consistent"]; ok { b.RequireConsistent = true + defaults = false + } + if _, ok := query["leader"]; ok { + defaults = false + } + if maxStale := query.Get("max_stale"); maxStale != "" { + dur, err := time.ParseDuration(maxStale) + if err != nil { + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale) + return true + } + b.MaxStaleDuration = dur + if dur.Nanoseconds() > 0 { + b.AllowStale = true + defaults = false + } + } + // No specific Consistency has been specified by caller + if defaults { + path := req.URL.Path + if strings.HasPrefix(path, "/v1/catalog") || strings.HasPrefix(path, "/v1/health") { + if s.agent.config.DiscoveryMaxStale.Nanoseconds() > 0 { + b.MaxStaleDuration = s.agent.config.DiscoveryMaxStale + b.AllowStale = true + } + } } if b.AllowStale && b.RequireConsistent { resp.WriteHeader(http.StatusBadRequest) @@ -490,7 +526,7 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string { func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool { s.parseDC(req, dc) s.parseToken(req, &b.Token) - if parseConsistency(resp, req, b) { + if s.parseConsistency(resp, req, b) { return true } return parseWait(resp, req, b) diff --git a/agent/http_test.go b/agent/http_test.go index 23c90014e..42e0eec55 100644 --- a/agent/http_test.go +++ b/agent/http_test.go @@ -607,7 +607,9 @@ func TestParseConsistency(t *testing.T) { var b structs.QueryOptions req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale", nil) - if d := parseConsistency(resp, req, &b); d { + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + if d := a.srv.parseConsistency(resp, req, &b); d { t.Fatalf("unexpected done") } @@ -620,7 +622,7 @@ func TestParseConsistency(t *testing.T) { b = structs.QueryOptions{} req, _ = http.NewRequest("GET", "/v1/catalog/nodes?consistent", nil) - if d := parseConsistency(resp, req, &b); d { + if d := a.srv.parseConsistency(resp, req, &b); d { t.Fatalf("unexpected done") } @@ -632,13 +634,70 @@ func TestParseConsistency(t *testing.T) { } } +// ensureConsistency check if consistency modes are correctly applied +// if maxStale < 0 => stale, without MaxStaleDuration +// if maxStale == 0 => no stale +// if maxStale > 0 => stale + check duration +func ensureConsistency(t *testing.T, a *TestAgent, path string, maxStale time.Duration, requireConsistent bool) { + t.Helper() + req, _ := http.NewRequest("GET", path, nil) + var b structs.QueryOptions + resp := httptest.NewRecorder() + if d := a.srv.parseConsistency(resp, req, &b); d { + t.Fatalf("unexpected done") + } + allowStale := maxStale.Nanoseconds() != 0 + if b.AllowStale != allowStale { + t.Fatalf("Bad Allow Stale") + } + if maxStale > 0 && b.MaxStaleDuration != maxStale { + t.Fatalf("Bad MaxStaleDuration: %d VS expected %d", b.MaxStaleDuration, maxStale) + } + if b.RequireConsistent != requireConsistent { + t.Fatal("Bad Consistent") + } +} + +func TestParseConsistencyAndMaxStale(t *testing.T) { + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + // Default => Consistent + a.config.DiscoveryMaxStale = time.Duration(0) + ensureConsistency(t, a, "/v1/catalog/nodes", 0, false) + // Stale, without MaxStale + ensureConsistency(t, a, "/v1/catalog/nodes?stale", -1, false) + // Override explicitly + ensureConsistency(t, a, "/v1/catalog/nodes?max_stale=3s", 3*time.Second, false) + ensureConsistency(t, a, "/v1/catalog/nodes?stale&max_stale=3s", 3*time.Second, false) + + // stale by defaul on discovery + a.config.DiscoveryMaxStale = time.Duration(7 * time.Second) + ensureConsistency(t, a, "/v1/catalog/nodes", a.config.DiscoveryMaxStale, false) + // Not in KV + ensureConsistency(t, a, "/v1/kv/my/path", 0, false) + + // DiscoveryConsistencyLevel should apply + ensureConsistency(t, a, "/v1/health/service/one", a.config.DiscoveryMaxStale, false) + ensureConsistency(t, a, "/v1/catalog/service/one", a.config.DiscoveryMaxStale, false) + ensureConsistency(t, a, "/v1/catalog/services", a.config.DiscoveryMaxStale, false) + + // Query path should be taken into account + ensureConsistency(t, a, "/v1/catalog/services?consistent", 0, true) + // Since stale is added, no MaxStale should be applied + ensureConsistency(t, a, "/v1/catalog/services?stale", -1, false) + ensureConsistency(t, a, "/v1/catalog/services?leader", 0, false) +} + func TestParseConsistency_Invalid(t *testing.T) { t.Parallel() resp := httptest.NewRecorder() var b structs.QueryOptions req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale&consistent", nil) - if d := parseConsistency(resp, req, &b); !d { + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + if d := a.srv.parseConsistency(resp, req, &b); !d { t.Fatalf("expected done") } diff --git a/agent/prepared_query_endpoint.go b/agent/prepared_query_endpoint.go index 532cf70f4..38c9b4f39 100644 --- a/agent/prepared_query_endpoint.go +++ b/agent/prepared_query_endpoint.go @@ -43,9 +43,17 @@ func (s *HTTPServer) preparedQueryList(resp http.ResponseWriter, req *http.Reque } var reply structs.IndexedPreparedQueries + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.List", &args, &reply); err != nil { return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty list instead of nil. if reply.Queries == nil { @@ -100,6 +108,8 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r } var reply structs.PreparedQueryExecuteResponse + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.Execute", &args, &reply); err != nil { // We have to check the string since the RPC sheds // the specific error type. @@ -110,6 +120,12 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r } return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Note that we translate using the DC that the results came from, since // a query can fail over to a different DC than where the execute request @@ -145,6 +161,8 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r } var reply structs.PreparedQueryExplainResponse + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.Explain", &args, &reply); err != nil { // We have to check the string since the RPC sheds // the specific error type. @@ -155,6 +173,12 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r } return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() return reply, nil } @@ -168,6 +192,8 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req * } var reply structs.IndexedPreparedQueries + defer setMeta(resp, &reply.QueryMeta) +RETRY_ONCE: if err := s.agent.RPC("PreparedQuery.Get", &args, &reply); err != nil { // We have to check the string since the RPC sheds // the specific error type. @@ -178,6 +204,12 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req * } return nil, err } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } + reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() return reply.Queries, nil } diff --git a/agent/structs/service_definition.go b/agent/structs/service_definition.go index aa1cda535..4dc8ccfca 100644 --- a/agent/structs/service_definition.go +++ b/agent/structs/service_definition.go @@ -6,7 +6,7 @@ type ServiceDefinition struct { Name string Tags []string Address string - ServiceMeta map[string]string + Meta map[string]string Port int Check CheckType Checks CheckTypes @@ -20,7 +20,7 @@ func (s *ServiceDefinition) NodeService() *NodeService { Service: s.Name, Tags: s.Tags, Address: s.Address, - ServiceMeta: s.ServiceMeta, + Meta: s.Meta, Port: s.Port, EnableTagOverride: s.EnableTagOverride, } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 0056af487..9661e5ac1 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -110,6 +110,11 @@ type QueryOptions struct { // If set, the leader must verify leadership prior to // servicing the request. Prevents a stale read. RequireConsistent bool + + // If set and AllowStale is true, will try first a stale + // read, and then will perform a consistent read if stale + // read is older than value + MaxStaleDuration time.Duration } // IsRead is always true for QueryOption. @@ -117,6 +122,17 @@ func (q QueryOptions) IsRead() bool { return true } +// ConsistencyLevel display the consistency required by a request +func (q QueryOptions) ConsistencyLevel() string { + if q.RequireConsistent { + return "consistent" + } else if q.AllowStale { + return "stale" + } else { + return "leader" + } +} + func (q QueryOptions) AllowStaleRead() bool { return q.AllowStale } @@ -157,6 +173,11 @@ type QueryMeta struct { // Used to indicate if there is a known leader node KnownLeader bool + + // Consistencylevel returns the consistency used to serve the query + // Having `discovery_max_stale` on the agent can affect whether + // the request was served by a leader. + ConsistencyLevel string } // RegisterRequest is used for the Catalog.Register endpoint @@ -412,7 +433,7 @@ func (s *ServiceNode) ToNodeService() *NodeService { Tags: s.ServiceTags, Address: s.ServiceAddress, Port: s.ServicePort, - ServiceMeta: s.ServiceMeta, + Meta: s.ServiceMeta, EnableTagOverride: s.ServiceEnableTagOverride, RaftIndex: RaftIndex{ CreateIndex: s.CreateIndex, @@ -429,7 +450,7 @@ type NodeService struct { Service string Tags []string Address string - ServiceMeta map[string]string + Meta map[string]string Port int EnableTagOverride bool @@ -446,7 +467,7 @@ func (s *NodeService) IsSame(other *NodeService) bool { !reflect.DeepEqual(s.Tags, other.Tags) || s.Address != other.Address || s.Port != other.Port || - !reflect.DeepEqual(s.ServiceMeta, other.ServiceMeta) || + !reflect.DeepEqual(s.Meta, other.Meta) || s.EnableTagOverride != other.EnableTagOverride { return false } @@ -466,7 +487,7 @@ func (s *NodeService) ToServiceNode(node string) *ServiceNode { ServiceTags: s.Tags, ServiceAddress: s.Address, ServicePort: s.Port, - ServiceMeta: s.ServiceMeta, + ServiceMeta: s.Meta, ServiceEnableTagOverride: s.EnableTagOverride, RaftIndex: RaftIndex{ CreateIndex: s.CreateIndex, diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index af7cc2b04..e1cb8ed8a 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -187,7 +187,7 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) { } sn.ServiceMeta["new_meta"] = "new_value" if reflect.DeepEqual(sn, clone) { - t.Fatalf("clone wasn't independent of the original for ServiceMeta") + t.Fatalf("clone wasn't independent of the original for Meta") } } @@ -214,7 +214,7 @@ func TestStructs_NodeService_IsSame(t *testing.T) { Service: "theservice", Tags: []string{"foo", "bar"}, Address: "127.0.0.1", - ServiceMeta: map[string]string{ + Meta: map[string]string{ "meta1": "value1", "meta2": "value2", }, @@ -232,7 +232,7 @@ func TestStructs_NodeService_IsSame(t *testing.T) { Address: "127.0.0.1", Port: 1234, EnableTagOverride: true, - ServiceMeta: map[string]string{ + Meta: map[string]string{ // We don't care about order "meta2": "value2", "meta1": "value1", @@ -268,7 +268,7 @@ func TestStructs_NodeService_IsSame(t *testing.T) { check(func() { other.Tags = []string{"foo"} }, func() { other.Tags = []string{"foo", "bar"} }) check(func() { other.Address = "XXX" }, func() { other.Address = "127.0.0.1" }) check(func() { other.Port = 9999 }, func() { other.Port = 1234 }) - check(func() { other.ServiceMeta["meta2"] = "wrongValue" }, func() { other.ServiceMeta["meta2"] = "value2" }) + check(func() { other.Meta["meta2"] = "wrongValue" }, func() { other.Meta["meta2"] = "value2" }) check(func() { other.EnableTagOverride = false }, func() { other.EnableTagOverride = true }) } diff --git a/api/agent.go b/api/agent.go index bd414d6c6..b42baed41 100644 --- a/api/agent.go +++ b/api/agent.go @@ -66,7 +66,7 @@ type AgentServiceRegistration struct { Port int `json:",omitempty"` Address string `json:",omitempty"` EnableTagOverride bool `json:",omitempty"` - ServiceMeta map[string]string `json:",omitempty"` + Meta map[string]string `json:",omitempty"` Check *AgentServiceCheck Checks AgentServiceChecks } diff --git a/website/source/api/agent/service.html.md b/website/source/api/agent/service.html.md index 033a62011..c7b100111 100644 --- a/website/source/api/agent/service.html.md +++ b/website/source/api/agent/service.html.md @@ -53,7 +53,7 @@ $ curl \ "Service": "redis", "Tags": [], "Address": "", - "ServiceMeta": { + "Meta": { "redis_version": "4.0" }, "Port": 8000 @@ -99,7 +99,7 @@ The table below shows this endpoint's support for provided, the agent's address is used as the address for the service during DNS queries. -- `ServiceMeta` `(map: nil)` - Specifies arbitrary KV metadata +- `Meta` `(map: nil)` - Specifies arbitrary KV metadata linked to the service instance. - `Port` `(int: 0)` - Specifies the port of the service. @@ -153,7 +153,7 @@ The table below shows this endpoint's support for ], "Address": "127.0.0.1", "Port": 8000, - "ServiceMeta": { + "Meta": { "redis_version": "4.0" }, "EnableTagOverride": false, diff --git a/website/source/api/catalog.html.md b/website/source/api/catalog.html.md index b63837e02..4bbb612c6 100644 --- a/website/source/api/catalog.html.md +++ b/website/source/api/catalog.html.md @@ -105,7 +105,7 @@ and vice versa. A catalog entry can have either, neither, or both. "v1" ], "Address": "127.0.0.1", - "ServiceMeta": { + "Meta": { "redis_version": "4.0" }, "Port": 8000 @@ -537,7 +537,7 @@ $ curl \ "ID": "consul", "Service": "consul", "Tags": null, - "ServiceMeta": {}, + "Meta": {}, "Port": 8300 }, "redis": { @@ -546,7 +546,7 @@ $ curl \ "Tags": [ "v1" ], - "ServiceMeta": { + "Meta": { "redis_version": "4.0" }, "Port": 8000 diff --git a/website/source/api/health.html.md b/website/source/api/health.html.md index ef22d8648..a404bda2e 100644 --- a/website/source/api/health.html.md +++ b/website/source/api/health.html.md @@ -216,7 +216,7 @@ $ curl \ "Service": "redis", "Tags": ["primary"], "Address": "10.1.10.12", - "ServiceMeta": { + "Meta": { "redis_version": "4.0" }, "Port": 8000 diff --git a/website/source/api/query.html.md b/website/source/api/query.html.md index e12c0d754..586244fcf 100644 --- a/website/source/api/query.html.md +++ b/website/source/api/query.html.md @@ -241,7 +241,6 @@ The table below shows this endpoint's support for "Near": "node1", "OnlyPassing": false, "Tags": ["primary", "!experimental"], - "ServiceMeta": {"redis_version": "4.0"}, "NodeMeta": {"instance_type": "m3.large"} }, "DNS": { @@ -314,7 +313,6 @@ $ curl \ }, "OnlyPassing": false, "Tags": ["primary", "!experimental"], - "ServiceMeta": {"redis_version": "4.0"}, "NodeMeta": {"instance_type": "m3.large"} }, "DNS": { @@ -512,7 +510,7 @@ $ curl \ "ID": "redis", "Service": "redis", "Tags": null, - "ServiceMeta": {"redis_version": "4.0"}, + "Meta": {"redis_version": "4.0"}, "Port": 8000 }, "Checks": [ @@ -619,7 +617,7 @@ $ curl \ }, "OnlyPassing": true, "Tags": ["primary"], - "ServiceMeta": { "mysql_version": "5.7.20" }, + "Meta": { "mysql_version": "5.7.20" }, "NodeMeta": {"instance_type": "m3.large"} } } diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md index aa33ea2b6..0dcf866e2 100644 --- a/website/source/docs/agent/options.html.md +++ b/website/source/docs/agent/options.html.md @@ -917,6 +917,17 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass leader, so this lets Consul continue serving requests in long outage scenarios where no leader can be elected. + * `discovery_max_stale` - Enables + stale requests for all service discovery HTTP endpoints. This is equivalent to the + [`max_stale`](#max_stale) configuration for DNS requests. If this value is zero (default), all service + discovery HTTP endpoints are forwarded to the leader. If this value is greater than zero, any Consul server + can handle the service discovery request. If a Consul server is behind the leader by more than `discovery_max_stale`, + the query will be re-evaluated on the leader to get more up-to-date results. Consul agents also add a new + `X-Consul-Effective-Consistency` response header which indicates if the agent did a stale read. `discover-max-stale` + was introduced in Consul 1.0.7 as a way for Consul operators to force stale requests from clients at the agent level, + and defaults to zero which matches default consistency behavior in earlier Consul versions. + + * `node_ttl` - By default, this is "0s", so all node lookups are served with a 0 TTL value. DNS caching for node lookups can be enabled by setting this value. This should be specified with the "s" suffix for second or "m" for minute.