diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 70073bdcc..87aeae5b3 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -172,6 +172,13 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) // Use empty list instead of nil for id, s := range services { + weights := api.AgentWeights{Passing: 1, Warning: 1} + if s.Weights != nil { + if s.Weights.Passing > 0 { + weights.Passing = s.Weights.Passing + } + weights.Warning = s.Weights.Warning + } as := &api.AgentService{ Kind: api.ServiceKind(s.Kind), ID: s.ID, @@ -184,6 +191,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) CreateIndex: s.CreateIndex, ModifyIndex: s.ModifyIndex, ProxyDestination: s.ProxyDestination, + Weights: weights, } if as.Tags == nil { as.Tags = []string{} @@ -581,6 +589,13 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re // Get the node service. ns := args.NodeService() + if ns.Weights != nil { + if err := structs.ValidateWeights(ns.Weights); err != nil { + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprint(resp, fmt.Errorf("Invalid Weights: %v", err)) + return nil, nil + } + } if err := structs.ValidateMetadata(ns.Meta, false); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, fmt.Errorf("Invalid Service Meta: %v", err)) diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index abaaacb35..7006c3e5f 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -1329,6 +1329,10 @@ func TestAgent_RegisterService(t *testing.T) { TTL: 30 * time.Second, }, }, + Weights: &structs.Weights{ + Passing: 100, + Warning: 3, + }, } req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=abc123", jsonReader(args)) @@ -1347,6 +1351,12 @@ func TestAgent_RegisterService(t *testing.T) { if val := a.State.Service("test").Meta["hello"]; val != "world" { t.Fatalf("Missing meta: %v", a.State.Service("test").Meta) } + if val := a.State.Service("test").Weights.Passing; val != 100 { + t.Fatalf("Expected 100 for Weights.Passing, got: %v", val) + } + if val := a.State.Service("test").Weights.Warning; val != 3 { + t.Fatalf("Expected 3 for Weights.Warning, got: %v", val) + } // Ensure we have a check mapping checks := a.State.Checks() @@ -1370,7 +1380,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") - json := `{"name":"test", "port":8000, "enable_tag_override": true, "meta": {"some": "meta"}}` + json := `{"name":"test", "port":8000, "enable_tag_override": true, "meta": {"some": "meta"}, "weights":{"passing": 16}}` req, _ := http.NewRequest("PUT", "/v1/agent/service/register", strings.NewReader(json)) obj, err := a.srv.AgentRegisterService(nil, req) @@ -1386,6 +1396,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { Meta: map[string]string{"some": "meta"}, Port: 8000, EnableTagOverride: true, + Weights: &structs.Weights{Passing: 16, Warning: 0}, } if got, want := a.State.Service("test"), svc; !verify.Values(t, "", got, want) { diff --git a/agent/agent_test.go b/agent/agent_test.go index ab7108b02..4881b1d02 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -641,6 +641,10 @@ func verifyIndexChurn(t *testing.T, tags []string) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + weights := &structs.Weights{ + Passing: 1, + Warning: 1, + } // Ensure we have a leader before we start adding the services testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -649,6 +653,7 @@ func verifyIndexChurn(t *testing.T, tags []string) { Service: "redis", Port: 8000, Tags: tags, + Weights: weights, } if err := a.AddService(svc, nil, true, ""); err != nil { t.Fatalf("err: %v", err) diff --git a/agent/config/builder.go b/agent/config/builder.go index b24b74954..17314c1e7 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1086,6 +1086,19 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition { } else { meta = v.Meta } + serviceWeights := &structs.Weights{Passing: 1, Warning: 1} + if v.Weights != nil { + if v.Weights.Passing != nil { + serviceWeights.Passing = *v.Weights.Passing + } + if v.Weights.Warning != nil { + serviceWeights.Warning = *v.Weights.Warning + } + } + + if err := structs.ValidateWeights(serviceWeights); err != nil { + b.err = multierror.Append(fmt.Errorf("Invalid weight definition for service %s: %s", b.stringVal(v.Name), err)) + } return &structs.ServiceDefinition{ Kind: b.serviceKindVal(v.Kind), ID: b.stringVal(v.ID), @@ -1096,6 +1109,7 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition { Port: b.intVal(v.Port), Token: b.stringVal(v.Token), EnableTagOverride: b.boolVal(v.EnableTagOverride), + Weights: serviceWeights, Checks: checks, ProxyDestination: b.stringVal(v.ProxyDestination), Connect: b.serviceConnectVal(v.Connect), diff --git a/agent/config/config.go b/agent/config/config.go index 72bad9db4..1c546f72b 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -322,6 +322,12 @@ type Autopilot struct { UpgradeVersionTag *string `json:"upgrade_version_tag,omitempty" hcl:"upgrade_version_tag" mapstructure:"upgrade_version_tag"` } +// ServiceWeights defines the registration of weights used in DNS for a Service +type ServiceWeights struct { + Passing *int `json:"passing,omitempty" hcl:"passing" mapstructure:"passing"` + Warning *int `json:"warning,omitempty" hcl:"warning" mapstructure:"warning"` +} + type ServiceDefinition struct { Kind *string `json:"kind,omitempty" hcl:"kind" mapstructure:"kind"` ID *string `json:"id,omitempty" hcl:"id" mapstructure:"id"` @@ -333,6 +339,7 @@ type ServiceDefinition struct { Check *CheckDefinition `json:"check,omitempty" hcl:"check" mapstructure:"check"` Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"` Token *string `json:"token,omitempty" hcl:"token" mapstructure:"token"` + Weights *ServiceWeights `json:"weights,omitempty" hcl:"weights" mapstructure:"weights"` EnableTagOverride *bool `json:"enable_tag_override,omitempty" hcl:"enable_tag_override" mapstructure:"enable_tag_override"` ProxyDestination *string `json:"proxy_destination,omitempty" hcl:"proxy_destination" mapstructure:"proxy_destination"` Connect *ServiceConnect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"` diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 0e5c916ae..6b87fe89a 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -2005,16 +2005,22 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { }, json: []string{ `{ "service": { "name": "a", "port": 80 } }`, - `{ "service": { "name": "b", "port": 90, "meta": {"my": "value"} } }`, + `{ "service": { "name": "b", "port": 90, "meta": {"my": "value"}, "weights": {"passing": 13} } }`, }, hcl: []string{ `service = { name = "a" port = 80 }`, - `service = { name = "b" port = 90 meta={my="value"}}`, + `service = { name = "b" port = 90 meta={my="value"}, weights={passing=13}}`, }, patch: func(rt *RuntimeConfig) { rt.Services = []*structs.ServiceDefinition{ - &structs.ServiceDefinition{Name: "a", Port: 80}, - &structs.ServiceDefinition{Name: "b", Port: 90, Meta: map[string]string{"my": "value"}}, + &structs.ServiceDefinition{Name: "a", Port: 80, Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }}, + &structs.ServiceDefinition{Name: "b", Port: 90, Meta: map[string]string{"my": "value"}, Weights: &structs.Weights{ + Passing: 13, + Warning: 1, + }}, } rt.DataDir = dataDir }, @@ -2108,6 +2114,10 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { ScriptArgs: []string{"a", "b"}, }, }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, }, } rt.DataDir = dataDir @@ -2167,6 +2177,10 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { }, }, }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, }, } }, @@ -2211,6 +2225,10 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { }, }, }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, }, } }, @@ -2266,10 +2284,18 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { }, }, }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, }, &structs.ServiceDefinition{ Name: "service-A2", Port: 81, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, }, } }, @@ -2747,6 +2773,10 @@ func TestFullConfig(t *testing.T) { "address": "cOlSOhbp", "token": "msy7iWER", "port": 24237, + "weights": { + "passing": 100, + "warning": 1 + }, "enable_tag_override": true, "check": { "id": "RMi85Dv8", @@ -2855,6 +2885,10 @@ func TestFullConfig(t *testing.T) { "address": "R6H6g8h0", "token": "ZgY8gjMI", "port": 38292, + "weights": { + "passing": 1979, + "warning": 6 + }, "enable_tag_override": true, "checks": [ { @@ -3238,6 +3272,10 @@ func TestFullConfig(t *testing.T) { address = "cOlSOhbp" token = "msy7iWER" port = 24237 + weights = { + passing = 100, + warning = 1 + } enable_tag_override = true check = { id = "RMi85Dv8" @@ -3346,6 +3384,10 @@ func TestFullConfig(t *testing.T) { address = "R6H6g8h0" token = "ZgY8gjMI" port = 38292 + weights = { + passing = 1979, + warning = 6 + } enable_tag_override = true checks = [ { @@ -3798,12 +3840,16 @@ func TestFullConfig(t *testing.T) { ServerPort: 3757, Services: []*structs.ServiceDefinition{ { - ID: "wI1dzxS4", - Name: "7IszXMQ1", - Tags: []string{"0Zwg8l6v", "zebELdN5"}, - Address: "9RhqPSPB", - Token: "myjKJkWH", - Port: 72219, + ID: "wI1dzxS4", + Name: "7IszXMQ1", + Tags: []string{"0Zwg8l6v", "zebELdN5"}, + Address: "9RhqPSPB", + Token: "myjKJkWH", + Port: 72219, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, EnableTagOverride: true, Checks: []*structs.CheckType{ &structs.CheckType{ @@ -3830,12 +3876,16 @@ func TestFullConfig(t *testing.T) { }, }, { - ID: "MRHVMZuD", - Name: "6L6BVfgH", - Tags: []string{"7Ale4y6o", "PMBW08hy"}, - Address: "R6H6g8h0", - Token: "ZgY8gjMI", - Port: 38292, + ID: "MRHVMZuD", + Name: "6L6BVfgH", + Tags: []string{"7Ale4y6o", "PMBW08hy"}, + Address: "R6H6g8h0", + Token: "ZgY8gjMI", + Port: 38292, + Weights: &structs.Weights{ + Passing: 1979, + Warning: 6, + }, EnableTagOverride: true, Checks: structs.CheckTypes{ &structs.CheckType{ @@ -3897,15 +3947,23 @@ func TestFullConfig(t *testing.T) { Port: 31471, Kind: "connect-proxy", ProxyDestination: "6L6BVfgH", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, }, { - ID: "dLOXpSCI", - Name: "o1ynPkp0", - Tags: []string{"nkwshvM5", "NTDWn3ek"}, - Address: "cOlSOhbp", - Token: "msy7iWER", - Meta: map[string]string{"mymeta": "data"}, - Port: 24237, + ID: "dLOXpSCI", + Name: "o1ynPkp0", + Tags: []string{"nkwshvM5", "NTDWn3ek"}, + Address: "cOlSOhbp", + Token: "msy7iWER", + Meta: map[string]string{"mymeta": "data"}, + Port: 24237, + Weights: &structs.Weights{ + Passing: 100, + Warning: 1, + }, EnableTagOverride: true, Connect: &structs.ServiceConnect{ Native: true, @@ -4324,6 +4382,10 @@ func TestSanitize(t *testing.T) { Check: structs.CheckType{ Name: "blurb", }, + Weights: &structs.Weights{ + Passing: 67, + Warning: 3, + }, }, }, Checks: []*structs.CheckDefinition{ @@ -4552,7 +4614,11 @@ func TestSanitize(t *testing.T) { "Port": 0, "ProxyDestination": "", "Tags": [], - "Token": "hidden" + "Token": "hidden", + "Weights": { + "Passing": 67, + "Warning": 3 + } }], "SessionTTLMin": "0s", "SkipLeaveOnInt": false, diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 7b2ec445c..bb42eb562 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -189,6 +189,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { Address: "1.1.1.1", Port: 8080, Tags: []string{"master"}, + Weights: &structs.Weights{Passing: 1, Warning: 1}, } if err := s.EnsureRegistration(2, req); err != nil { t.Fatalf("err: %s", err) @@ -203,6 +204,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { Address: "1.1.1.1", Port: 8080, Tags: []string{"master"}, + Weights: &structs.Weights{Passing: 1, Warning: 1}, RaftIndex: structs.RaftIndex{CreateIndex: 2, ModifyIndex: 2}, }, } @@ -393,6 +395,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { Service: "redis", Address: "1.1.1.1", Port: 8080, + Weights: &structs.Weights{Passing: 1, Warning: 1}, } restore = s.Restore() if err := restore.Registration(2, req); err != nil { @@ -1299,6 +1302,7 @@ func TestStateStore_EnsureService(t *testing.T) { Tags: []string{"prod"}, Address: "1.1.1.1", Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 0}, } // Creating a service without a node returns an error. @@ -1430,6 +1434,10 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) { Address: "1.1.1.1", Port: 1111, ProxyDestination: "foo", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } // Service successfully registers into the state store. @@ -2065,6 +2073,7 @@ func TestStateStore_Service_Snapshot(t *testing.T) { Tags: []string{"prod"}, Address: "1.1.1.1", Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 0}, }, &structs.NodeService{ ID: "service2", @@ -2072,6 +2081,7 @@ func TestStateStore_Service_Snapshot(t *testing.T) { Tags: []string{"dev"}, Address: "1.1.1.2", Port: 1112, + Weights: &structs.Weights{Passing: 1, Warning: 1}, }, } for i, svc := range ns { @@ -3252,6 +3262,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) { Service: "service1", Address: "1.1.1.1", Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 1}, RaftIndex: structs.RaftIndex{ CreateIndex: 2, ModifyIndex: 2, @@ -3262,6 +3273,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) { Service: "service2", Address: "1.1.1.1", Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 1}, RaftIndex: structs.RaftIndex{ CreateIndex: 3, ModifyIndex: 3, @@ -3301,6 +3313,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) { Service: "service1", Address: "1.1.1.1", Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 1}, RaftIndex: structs.RaftIndex{ CreateIndex: 4, ModifyIndex: 4, @@ -3311,6 +3324,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) { Service: "service2", Address: "1.1.1.1", Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 1}, RaftIndex: structs.RaftIndex{ CreateIndex: 5, ModifyIndex: 5, diff --git a/agent/coordinate_endpoint_test.go b/agent/coordinate_endpoint_test.go index b07125651..06e8f70a0 100644 --- a/agent/coordinate_endpoint_test.go +++ b/agent/coordinate_endpoint_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/serf/coordinate" ) @@ -72,6 +73,7 @@ func TestCoordinate_Nodes(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Make sure an empty list is non-nil. req, _ := http.NewRequest("GET", "/v1/coordinate/nodes?dc=dc1", nil) @@ -182,6 +184,7 @@ func TestCoordinate_Node(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Make sure we get a 404 with no coordinates. req, _ := http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil) diff --git a/agent/dns.go b/agent/dns.go index af3a80115..e499cc5ab 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/miekg/dns" ) @@ -1205,6 +1206,51 @@ func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNode } } +func findWeight(node structs.CheckServiceNode) int { + // By default, when only_passing is false, warning and passing nodes are returned + // Those values will be used if using a client with support while server has no + // support for weights + weightPassing := 1 + weightWarning := 1 + if node.Service.Weights != nil { + weightPassing = node.Service.Weights.Passing + weightWarning = node.Service.Weights.Warning + } + serviceChecks := make(api.HealthChecks, 0) + for _, c := range node.Checks { + if c.ServiceName == node.Service.Service || c.ServiceName == "" { + healthCheck := &api.HealthCheck{ + Node: c.Node, + CheckID: string(c.CheckID), + Name: c.Name, + Status: c.Status, + Notes: c.Notes, + Output: c.Output, + ServiceID: c.ServiceID, + ServiceName: c.ServiceName, + ServiceTags: c.ServiceTags, + } + serviceChecks = append(serviceChecks, healthCheck) + } + } + status := serviceChecks.AggregatedStatus() + switch status { + case api.HealthWarning: + return weightWarning + case api.HealthPassing: + return weightPassing + case api.HealthMaint: + // Not used in theory + return 0 + case api.HealthCritical: + // Should not happen since already filtered + return 0 + default: + // When non-standard status, return 1 + return 1 + } +} + // serviceARecords is used to add the SRV records for a service lookup func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration) { handled := make(map[string]struct{}) @@ -1219,6 +1265,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes } handled[tuple] = struct{}{} + weight := findWeight(node) // Add the SRV record srvRec := &dns.SRV{ Hdr: dns.RR_Header{ @@ -1228,7 +1275,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes Ttl: uint32(ttl / time.Second), }, Priority: 1, - Weight: 1, + Weight: uint16(weight), Port: uint16(node.Service.Port), Target: fmt.Sprintf("%s.node.%s.%s", node.Node.Node, dc, d.domain), } diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index fe5823764..23e92951f 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -218,6 +218,7 @@ func TestHealthServiceChecks(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1", nil) resp := httptest.NewRecorder() @@ -322,6 +323,7 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Create a service check args := &structs.RegisterRequest{ diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 3e77463a0..2e5562af3 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -47,6 +47,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Service: "mysql", Tags: []string{"master"}, Port: 5000, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv1, "") args.Service = srv1 @@ -60,6 +64,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Service: "redis", Tags: []string{}, Port: 8000, + Weights: &structs.Weights{ + Passing: 1, + Warning: 0, + }, } a.State.AddService(srv2, "") @@ -77,6 +85,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Service: "web", Tags: []string{}, Port: 80, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv3, "") @@ -86,6 +98,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Service: "lb", Tags: []string{}, Port: 443, + Weights: &structs.Weights{ + Passing: 1, + Warning: 0, + }, } args.Service = srv4 if err := a.RPC("Catalog.Register", args, &out); err != nil { @@ -99,6 +115,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Tags: []string{}, Address: "127.0.0.10", Port: 8000, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv5, "") @@ -116,6 +136,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Service: "cache", Tags: []string{}, Port: 11211, + Weights: &structs.Weights{ + Passing: 1, + Warning: 0, + }, } a.State.SetServiceState(&local.ServiceState{ Service: srv6, @@ -257,6 +281,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { Service: "mysql-proxy", Port: 5000, ProxyDestination: "db", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv1, "") args.Service = srv1 @@ -269,6 +297,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { Port: 8000, Kind: structs.ServiceKindConnectProxy, ProxyDestination: "redis", + Weights: &structs.Weights{ + Passing: 1, + Warning: 0, + }, } a.State.AddService(srv2, "") @@ -285,6 +317,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { Port: 80, Kind: structs.ServiceKindConnectProxy, ProxyDestination: "web", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv3, "") @@ -295,6 +331,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { Port: 443, Kind: structs.ServiceKindConnectProxy, ProxyDestination: "lb", + Weights: &structs.Weights{ + Passing: 1, + Warning: 0, + }, } args.Service = srv4 assert.Nil(a.RPC("Catalog.Register", args, &out)) @@ -306,6 +346,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { Port: 11211, Kind: structs.ServiceKindConnectProxy, ProxyDestination: "cache-proxy", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.SetServiceState(&local.ServiceState{ Service: srv5, @@ -394,6 +438,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Tags: []string{"tag1"}, Port: 6100, EnableTagOverride: true, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv1, "") @@ -404,6 +452,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Tags: []string{"tag2"}, Port: 6200, EnableTagOverride: false, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv2, "") @@ -421,6 +473,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Tags: []string{"tag1_mod"}, Port: 7100, EnableTagOverride: true, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -432,6 +488,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Tags: []string{"tag2_mod"}, Port: 7200, EnableTagOverride: false, + Weights: &structs.Weights{ + Passing: 1, + Warning: 0, + }, } if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -465,6 +525,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { Tags: []string{"tag1_mod"}, Port: 6100, EnableTagOverride: true, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } if !verify.Values(t, "", got, want) { t.FailNow() @@ -651,6 +715,10 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Service: "mysql", Tags: []string{"master"}, Port: 5000, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv1, token) @@ -660,6 +728,10 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { Service: "api", Tags: []string{"foo"}, Port: 5001, + Weights: &structs.Weights{ + Passing: 1, + Warning: 0, + }, } a.State.AddService(srv2, token) @@ -990,6 +1062,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Service: "mysql", Tags: []string{"master"}, Port: 5000, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv1, "root") srv2 := &structs.NodeService{ @@ -997,6 +1073,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { Service: "api", Tags: []string{"foo"}, Port: 5001, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, } a.State.AddService(srv2, "root") diff --git a/agent/session_endpoint_test.go b/agent/session_endpoint_test.go index 273071592..c5c600fcd 100644 --- a/agent/session_endpoint_test.go +++ b/agent/session_endpoint_test.go @@ -496,7 +496,6 @@ func TestSessionGet(t *testing.T) { } func TestSessionList(t *testing.T) { - t.Parallel() t.Run("", func(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() diff --git a/agent/structs/service_definition.go b/agent/structs/service_definition.go index d8e416af7..3b08adc68 100644 --- a/agent/structs/service_definition.go +++ b/agent/structs/service_definition.go @@ -23,6 +23,7 @@ type ServiceDefinition struct { Port int Check CheckType Checks CheckTypes + Weights *Weights Token string EnableTagOverride bool ProxyDestination string @@ -38,6 +39,7 @@ func (s *ServiceDefinition) NodeService() *NodeService { Address: s.Address, Meta: s.Meta, Port: s.Port, + Weights: s.Weights, EnableTagOverride: s.EnableTagOverride, ProxyDestination: s.ProxyDestination, } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e22e06e08..e684647ec 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -379,6 +379,23 @@ func ValidateMetadata(meta map[string]string, allowConsulPrefix bool) error { return nil } +// ValidateWeights checks the definition of DNS weight is valid +func ValidateWeights(weights *Weights) error { + if weights == nil { + return nil + } + if weights.Passing < 1 { + return fmt.Errorf("Passing must be greater than 0") + } + if weights.Warning < 0 { + return fmt.Errorf("Warning must be greater or equal than 0") + } + if weights.Passing > 65535 || weights.Warning > 65535 { + return fmt.Errorf("DNS Weight must be between 0 and 65535") + } + return nil +} + // validateMetaPair checks that the given key/value pair is in a valid format func validateMetaPair(key, value string, allowConsulPrefix bool) error { if key == "" { @@ -430,6 +447,7 @@ type ServiceNode struct { ServiceName string ServiceTags []string ServiceAddress string + ServiceWeights Weights ServiceMeta map[string]string ServicePort int ServiceEnableTagOverride bool @@ -461,6 +479,7 @@ func (s *ServiceNode) PartialClone() *ServiceNode { ServiceAddress: s.ServiceAddress, ServicePort: s.ServicePort, ServiceMeta: nsmeta, + ServiceWeights: s.ServiceWeights, ServiceEnableTagOverride: s.ServiceEnableTagOverride, ServiceProxyDestination: s.ServiceProxyDestination, ServiceConnect: s.ServiceConnect, @@ -481,6 +500,7 @@ func (s *ServiceNode) ToNodeService() *NodeService { Address: s.ServiceAddress, Port: s.ServicePort, Meta: s.ServiceMeta, + Weights: &s.ServiceWeights, EnableTagOverride: s.ServiceEnableTagOverride, ProxyDestination: s.ServiceProxyDestination, Connect: s.ServiceConnect, @@ -491,6 +511,12 @@ func (s *ServiceNode) ToNodeService() *NodeService { } } +// Weights represent the weight used by DNS for a given status +type Weights struct { + Passing int + Warning int +} + type ServiceNodes []*ServiceNode // ServiceKind is the kind of service being registered. @@ -522,6 +548,7 @@ type NodeService struct { Address string Meta map[string]string Port int + Weights *Weights EnableTagOverride bool // ProxyDestination is the name of the service that this service is @@ -590,6 +617,7 @@ func (s *NodeService) IsSame(other *NodeService) bool { !reflect.DeepEqual(s.Tags, other.Tags) || s.Address != other.Address || s.Port != other.Port || + !reflect.DeepEqual(s.Weights, other.Weights) || !reflect.DeepEqual(s.Meta, other.Meta) || s.EnableTagOverride != other.EnableTagOverride || s.Kind != other.Kind || @@ -603,6 +631,15 @@ func (s *NodeService) IsSame(other *NodeService) bool { // ToServiceNode converts the given node service to a service node. func (s *NodeService) ToServiceNode(node string) *ServiceNode { + theWeights := Weights{ + Passing: 1, + Warning: 1, + } + if s.Weights != nil { + if err := ValidateWeights(s.Weights); err == nil { + theWeights = *s.Weights + } + } return &ServiceNode{ // Skip ID, see ServiceNode definition. Node: node, @@ -615,6 +652,7 @@ func (s *NodeService) ToServiceNode(node string) *ServiceNode { ServiceAddress: s.Address, ServicePort: s.Port, ServiceMeta: s.Meta, + ServiceWeights: theWeights, ServiceEnableTagOverride: s.EnableTagOverride, ServiceProxyDestination: s.ProxyDestination, ServiceConnect: s.Connect, diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index 569933924..928989e2a 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -188,6 +188,12 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) { if !reflect.DeepEqual(sn, clone) { t.Fatalf("bad: %v VS %v", clone, sn) } + oldPassingWeight := clone.ServiceWeights.Passing + sn.ServiceWeights.Passing = 1000 + if reflect.DeepEqual(sn, clone) { + t.Fatalf("clone wasn't independent of the original for Meta") + } + sn.ServiceWeights.Passing = oldPassingWeight sn.ServiceMeta["new_meta"] = "new_value" if reflect.DeepEqual(sn, clone) { t.Fatalf("clone wasn't independent of the original for Meta") @@ -206,8 +212,9 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) { sn.Datacenter = "" sn.TaggedAddresses = nil sn.NodeMeta = nil + sn.ServiceWeights = Weights{Passing: 1, Warning: 1} if !reflect.DeepEqual(sn, sn2) { - t.Fatalf("bad: %v", sn2) + t.Fatalf("bad: %#v, but expected %#v", sn2, sn) } } diff --git a/api/agent.go b/api/agent.go index 8cb81fc84..41505e769 100644 --- a/api/agent.go +++ b/api/agent.go @@ -51,6 +51,12 @@ type AgentCheck struct { Definition HealthCheckDefinition } +// AgentWeights represent optional weights for a service +type AgentWeights struct { + Passing int + Warning int +} + // AgentService represents a service known to the agent type AgentService struct { Kind ServiceKind @@ -60,6 +66,7 @@ type AgentService struct { Meta map[string]string Port int Address string + Weights AgentWeights EnableTagOverride bool CreateIndex uint64 ModifyIndex uint64 @@ -119,6 +126,7 @@ type AgentServiceRegistration struct { Address string `json:",omitempty"` EnableTagOverride bool `json:",omitempty"` Meta map[string]string `json:",omitempty"` + Weights *AgentWeights `json:",omitempty"` Check *AgentServiceCheck Checks AgentServiceChecks ProxyDestination string `json:",omitempty"` diff --git a/api/catalog.go b/api/catalog.go index 1a6bbc3b3..6cb745c36 100644 --- a/api/catalog.go +++ b/api/catalog.go @@ -1,5 +1,10 @@ package api +type Weights struct { + Passing int + Warning int +} + type Node struct { ID string Node string @@ -24,6 +29,7 @@ type CatalogService struct { ServiceTags []string ServiceMeta map[string]string ServicePort int + ServiceWeights Weights ServiceEnableTagOverride bool CreateIndex uint64 ModifyIndex uint64 diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index f9d64dcaa..e914d63ad 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -9,6 +9,8 @@ import ( "sync" "testing" + "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil/retry" @@ -83,6 +85,7 @@ func TestRetryJoin(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") shutdownCh := make(chan struct{}) diff --git a/website/source/docs/agent/dns.html.md b/website/source/docs/agent/dns.html.md index 3d3f1c866..04891ed58 100644 --- a/website/source/docs/agent/dns.html.md +++ b/website/source/docs/agent/dns.html.md @@ -95,6 +95,21 @@ node's metadata key starts with `rfc1035-`. A service lookup is used to query for service providers. Service queries support two lookup methods: standard and strict [RFC 2782](https://tools.ietf.org/html/rfc2782). +By default, SRV weights are all set at 1, but changing weights is supported using the +`Weights` attribute of the [service definition](/docs/agent/services.html). + +Note that DNS is limited in size per request, even when performing DNS TCP +queries. + +For services having many instances (more than 500), it might not be possible to +retrieve the complete list of instances for the service. + +When DNS SRV response are sent, order is randomized, but weights are not +taken into account. In the case of truncation different clients using weighted SRV +responses will have partial and inconsistent views of instances weights so the +request distribution could be skewed from the intended weights. In that case, +it is recommended to use the HTTP API to retrieve the list of nodes. + ### Standard Lookup The format of a standard service lookup is: diff --git a/website/source/docs/agent/services.html.md b/website/source/docs/agent/services.html.md index 23e808724..79649bad0 100644 --- a/website/source/docs/agent/services.html.md +++ b/website/source/docs/agent/services.html.md @@ -52,6 +52,10 @@ example shows all possible fields, but note that only a few are required. "command": [], "config": {} } + }, + "weights": { + "passing": 5, + "warning": 1 } } } @@ -151,6 +155,19 @@ are used to configure the proxy and are specified in the [proxy docs](/docs/connect/proxies.html). If `native` is true, it is an error to also specifiy a managed proxy instance. +The `weights` field is an optional field to specify the weight of a service in +DNS SRV responses. If this field is not specified, its default value is: +`"weights": {"passing": 1, "warning": 1}`. +When a service is `critical`, it is excluded from DNS responses. +Services with warning checks are in included in responses by default, +but excluded if the optional param `only_passing = true` is present in +agent DNS configuration or `?passing` is used via the API. +When DNS SRV requests are made, the response will include the weights +specified given the state of the service. +This allows some instances to be given higher weight if they have more capacity, +and optionally allows reducing load on services with checks in `warning` status +by giving passing instances a higher weight. + ## Multiple Service Definitions Multiple services definitions can be provided at once using the plural