GH-3798: More PR Updates
Update docs a little Update/add tests. Make sure all the various ways of determining the source IP work Update X-Forwarded-For header parsing. This can be a comma separated list with the first element being the original IP so we now handle csv data there. Got rid of error return from sourceAddrFromRequest
This commit is contained in:
parent
aa9151738a
commit
d604642792
|
@ -918,7 +918,7 @@ func (d *DNSServer) serviceLookup(network, datacenter, service, tag string, req,
|
||||||
}
|
}
|
||||||
|
|
||||||
func ednsSubnetForRequest(req *dns.Msg) (*dns.EDNS0_SUBNET) {
|
func ednsSubnetForRequest(req *dns.Msg) (*dns.EDNS0_SUBNET) {
|
||||||
// Its probably not obvious but IsEdns0 returns the EDNS RR if present or nil otherwise
|
// IsEdns0 returns the EDNS RR if present or nil otherwise
|
||||||
edns := req.IsEdns0()
|
edns := req.IsEdns0()
|
||||||
|
|
||||||
if edns == nil {
|
if edns == nil {
|
||||||
|
|
|
@ -1837,6 +1837,132 @@ func TestDNS_ServiceLookup_TagPeriod(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDNS_PreparedQueryNearIPEDNS(t *testing.T) {
|
||||||
|
ipCoord := lib.GenerateCoordinate(1 * time.Millisecond)
|
||||||
|
serviceNodes := []struct{
|
||||||
|
name string
|
||||||
|
address string
|
||||||
|
coord *coordinate.Coordinate
|
||||||
|
}{
|
||||||
|
{"foo1", "198.18.0.1", lib.GenerateCoordinate(1 * time.Millisecond),},
|
||||||
|
{"foo2", "198.18.0.2", lib.GenerateCoordinate(10 * time.Millisecond),},
|
||||||
|
{"foo3", "198.18.0.3", lib.GenerateCoordinate(30 * time.Millisecond),},
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
added := 0
|
||||||
|
|
||||||
|
// Register nodes with a service
|
||||||
|
for _, cfg := range serviceNodes {
|
||||||
|
args := &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: cfg.name,
|
||||||
|
Address: cfg.address,
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "db",
|
||||||
|
Port: 12345,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var out struct{}
|
||||||
|
err := a.RPC("Catalog.Register", args, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Send coordinate updates
|
||||||
|
coordArgs := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: cfg.name,
|
||||||
|
Coord: cfg.coord,
|
||||||
|
}
|
||||||
|
err = a.RPC("Coordinate.Update", &coordArgs, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
added += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Added %d service nodes\n", added)
|
||||||
|
|
||||||
|
// Register a node without a service
|
||||||
|
{
|
||||||
|
args := &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar",
|
||||||
|
Address: "198.18.0.9",
|
||||||
|
}
|
||||||
|
|
||||||
|
var out struct{}
|
||||||
|
err := a.RPC("Catalog.Register", args, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Send coordinate updates for a few nodes.
|
||||||
|
coordArgs := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar",
|
||||||
|
Coord: ipCoord,
|
||||||
|
}
|
||||||
|
err = a.RPC("Coordinate.Update", &coordArgs, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register a prepared query Near = _ip
|
||||||
|
{
|
||||||
|
args := &structs.PreparedQueryRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.PreparedQueryCreate,
|
||||||
|
Query: &structs.PreparedQuery{
|
||||||
|
Name: "some.query.we.like",
|
||||||
|
Service: structs.ServiceQuery{
|
||||||
|
Service: "db",
|
||||||
|
Near: "_ip",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var id string
|
||||||
|
err := a.RPC("PreparedQuery.Apply", args, &id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
m := new(dns.Msg)
|
||||||
|
m.SetQuestion("some.query.we.like.query.consul.", dns.TypeA)
|
||||||
|
m.SetEdns0(4096, false)
|
||||||
|
o := new(dns.OPT)
|
||||||
|
o.Hdr.Name = "."
|
||||||
|
o.Hdr.Rrtype = dns.TypeOPT
|
||||||
|
e := new(dns.EDNS0_SUBNET)
|
||||||
|
e.Code = dns.EDNS0SUBNET
|
||||||
|
e.Family = 1
|
||||||
|
e.SourceNetmask = 32
|
||||||
|
e.SourceScope = 0
|
||||||
|
e.Address = net.ParseIP("198.18.0.9").To4()
|
||||||
|
o.Option = append(o.Option, e)
|
||||||
|
m.Extra = append(m.Extra, o)
|
||||||
|
|
||||||
|
c := new(dns.Client)
|
||||||
|
in, _, err := c.Exchange(m, a.DNSAddr())
|
||||||
|
if err != nil {
|
||||||
|
r.Fatalf("Error with call to dns.Client.Exchange: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(serviceNodes) != len(in.Answer) {
|
||||||
|
r.Fatalf("Expecting %d A RRs in response, Actual found was %d", len(serviceNodes), len(in.Answer))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, rr := range in.Answer {
|
||||||
|
if aRec, ok := rr.(*dns.A); ok {
|
||||||
|
if actual := aRec.A.String(); serviceNodes[i].address != actual {
|
||||||
|
r.Fatalf("Expecting A RR #%d = %s, Actual RR was %s", i, serviceNodes[i].address, actual)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
r.Fatalf("DNS Answer contained a non-A RR")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestDNS_PreparedQueryNearIP(t *testing.T) {
|
func TestDNS_PreparedQueryNearIP(t *testing.T) {
|
||||||
ipCoord := lib.GenerateCoordinate(1 * time.Millisecond)
|
ipCoord := lib.GenerateCoordinate(1 * time.Millisecond)
|
||||||
serviceNodes := []struct{
|
serviceNodes := []struct{
|
||||||
|
@ -1890,7 +2016,7 @@ func TestDNS_PreparedQueryNearIP(t *testing.T) {
|
||||||
args := &structs.RegisterRequest{
|
args := &structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "bar",
|
Node: "bar",
|
||||||
Address: "127.0.0.1",
|
Address: "198.18.0.9",
|
||||||
}
|
}
|
||||||
|
|
||||||
var out struct{}
|
var out struct{}
|
||||||
|
@ -1925,42 +2051,6 @@ func TestDNS_PreparedQueryNearIP(t *testing.T) {
|
||||||
err := a.RPC("PreparedQuery.Apply", args, &id)
|
err := a.RPC("PreparedQuery.Apply", args, &id)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
retry.Run(t, func(r *retry.R) {
|
|
||||||
m := new(dns.Msg)
|
|
||||||
m.SetQuestion("some.query.we.like.query.consul.", dns.TypeA)
|
|
||||||
m.SetEdns0(4096, false)
|
|
||||||
o := new(dns.OPT)
|
|
||||||
o.Hdr.Name = "."
|
|
||||||
o.Hdr.Rrtype = dns.TypeOPT
|
|
||||||
e := new(dns.EDNS0_SUBNET)
|
|
||||||
e.Code = dns.EDNS0SUBNET
|
|
||||||
e.Family = 1
|
|
||||||
e.SourceNetmask = 32
|
|
||||||
e.SourceScope = 0
|
|
||||||
e.Address = net.ParseIP("127.0.0.1").To4()
|
|
||||||
o.Option = append(o.Option, e)
|
|
||||||
m.Extra = append(m.Extra, o)
|
|
||||||
|
|
||||||
c := new(dns.Client)
|
|
||||||
in, _, err := c.Exchange(m, a.DNSAddr())
|
|
||||||
if err != nil {
|
|
||||||
r.Fatalf("Error with call to dns.Client.Exchange: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(serviceNodes) != len(in.Answer) {
|
|
||||||
r.Fatalf("Expecting %d A RRs in response, Actual found was %d", len(serviceNodes), len(in.Answer))
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, rr := range in.Answer {
|
|
||||||
if aRec, ok := rr.(*dns.A); ok {
|
|
||||||
if actual := aRec.A.String(); serviceNodes[i].address != actual {
|
|
||||||
r.Fatalf("Expecting A RR #%d = %s, Actual RR was %s", i, serviceNodes[i].address, actual)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
r.Fatalf("DNS Answer contained a non-A RR")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
m := new(dns.Msg)
|
m := new(dns.Msg)
|
||||||
|
|
|
@ -498,23 +498,26 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
|
||||||
*token = s.agent.tokens.UserToken()
|
*token = s.agent.tokens.UserToken()
|
||||||
}
|
}
|
||||||
|
|
||||||
func sourceAddrFromRequest(req *http.Request) (string, error) {
|
func sourceAddrFromRequest(req *http.Request) string {
|
||||||
forwardHost := req.Header.Get("X-Forwarded-For")
|
xff := req.Header.Get("X-Forwarded-For")
|
||||||
forwardIp := net.ParseIP(forwardHost)
|
forwardHosts := strings.Split(xff, ",")
|
||||||
|
if len(forwardHosts) > 0 {
|
||||||
|
forwardIp := net.ParseIP(strings.TrimSpace(forwardHosts[0]))
|
||||||
if forwardIp != nil {
|
if forwardIp != nil {
|
||||||
return forwardIp.String(), nil
|
return forwardIp.String()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
host, _, err := net.SplitHostPort(req.RemoteAddr)
|
host, _, err := net.SplitHostPort(req.RemoteAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
ip := net.ParseIP(host)
|
ip := net.ParseIP(host)
|
||||||
if ip != nil {
|
if ip != nil {
|
||||||
return ip.String(), nil
|
return ip.String()
|
||||||
} else {
|
} else {
|
||||||
return "", fmt.Errorf("Could not get remote IP from HTTP Request")
|
return ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,7 +527,7 @@ func sourceAddrFromRequest(req *http.Request) (string, error) {
|
||||||
// DC in the request, if given, or else the agent's DC.
|
// DC in the request, if given, or else the agent's DC.
|
||||||
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
||||||
s.parseDC(req, &source.Datacenter)
|
s.parseDC(req, &source.Datacenter)
|
||||||
source.Ip, _ = sourceAddrFromRequest(req)
|
source.Ip = sourceAddrFromRequest(req)
|
||||||
if node := req.URL.Query().Get("near"); node != "" {
|
if node := req.URL.Query().Get("near"); node != "" {
|
||||||
if node == "_agent" {
|
if node == "_agent" {
|
||||||
source.Node = s.agent.config.NodeName
|
source.Node = s.agent.config.NodeName
|
||||||
|
|
|
@ -380,9 +380,66 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
if r.Failovers != 99 {
|
if r.Failovers != 99 {
|
||||||
t.Fatalf("bad: %v", r)
|
t.Fatalf("bad: %v", r)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
m := MockPreparedQuery{
|
||||||
|
executeFn: func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||||
|
expected := &structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: "my-id",
|
||||||
|
Limit: 5,
|
||||||
|
Source: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "_ip",
|
||||||
|
Ip: "198.18.0.1",
|
||||||
|
},
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Datacenter: a.Config.Datacenter,
|
||||||
|
Node: a.Config.NodeName,
|
||||||
|
},
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Token: "my-token",
|
||||||
|
RequireConsistent: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(args, expected) {
|
||||||
|
t.Fatalf("bad: %v", args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just set something so we can tell this is returned.
|
||||||
|
reply.Failovers = 99
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := a.registerEndpoint("PreparedQuery", &m); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
body := bytes.NewBuffer(nil)
|
||||||
|
req, _ := http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
|
||||||
|
req.Header.Add("X-Forwarded-For", "198.18.0.1")
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.PreparedQuerySpecific(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if resp.Code != 200 {
|
||||||
|
t.Fatalf("bad code: %d", resp.Code)
|
||||||
|
}
|
||||||
|
r, ok := obj.(structs.PreparedQueryExecuteResponse)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unexpected: %T", obj)
|
||||||
|
}
|
||||||
|
if r.Failovers != 99 {
|
||||||
|
t.Fatalf("bad: %v", r)
|
||||||
|
}
|
||||||
|
|
||||||
req, _ = http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
|
req, _ = http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
|
||||||
req.RemoteAddr = "127.0.0.1:12345"
|
req.Header.Add("X-Forwarded-For", "198.18.0.1, 198.19.0.1")
|
||||||
resp = httptest.NewRecorder()
|
resp = httptest.NewRecorder()
|
||||||
obj, err = a.srv.PreparedQuerySpecific(resp, req)
|
obj, err = a.srv.PreparedQuerySpecific(resp, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -176,14 +176,15 @@ The table below shows this endpoint's support for
|
||||||
nearest instance to the specified node will be returned first, and subsequent
|
nearest instance to the specified node will be returned first, and subsequent
|
||||||
nodes in the response will be sorted in ascending order of estimated
|
nodes in the response will be sorted in ascending order of estimated
|
||||||
round-trip times. If the node given does not exist, the nodes in the response
|
round-trip times. If the node given does not exist, the nodes in the response
|
||||||
will be shuffled. Using `_agent` is supported, and will automatically return
|
will be shuffled. If unspecified, the response will be shuffled by default.
|
||||||
results nearest the agent servicing the request. Using `_ip` is supported and
|
|
||||||
will automatically return results nearest to the node associated with the
|
- `_agent` - Returns results nearest the agent servicing the request.
|
||||||
source IP where the query is executed from. For HTTP the source IP is the
|
- `_ip` - Returns results nearest to the node associated with the source IP
|
||||||
remote peer's IP address or the value of the X-Forwarded-For header with the
|
where the query was executed from. For HTTP the source IP is the remote
|
||||||
header taking precedence. For DNS the source IP is the value of the EDNS
|
peer's IP address or the value of the X-Forwarded-For header with the
|
||||||
client IP or the remote peer's IP address. If unspecified, the response
|
header taking precedence. For DNS the source IP is the remote peer's IP
|
||||||
will be shuffled by default.
|
address or the value of the ENDS client IP with the EDNS client IP
|
||||||
|
taking precedence.
|
||||||
|
|
||||||
- `Service` `(Service: <required>)` - Specifies the structure to define the query's behavior.
|
- `Service` `(Service: <required>)` - Specifies the structure to define the query's behavior.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue