From c4a34602b6b908534032af17f7a2edb5f0f3944d Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Thu, 25 Jul 2019 14:26:22 -0400 Subject: [PATCH] Allow forwarding of some status RPCs (#6198) * Allow forwarding of some status RPCs * Update docs * add comments about not using the regular forward --- agent/consul/server_serf.go | 2 +- agent/consul/status_endpoint.go | 19 ++++++- agent/consul/status_endpoint_test.go | 54 ++++++++++++++++++++ agent/status_endpoint.go | 16 +++++- agent/status_endpoint_test.go | 75 ++++++++++++++++++++++++++++ website/source/api/status.html.md | 12 +++++ 6 files changed, 173 insertions(+), 5 deletions(-) diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 94020816d..5e2934d61 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -287,7 +287,7 @@ func (s *Server) maybeBootstrap() { // Retry with exponential backoff to get peer status from this server for attempt := uint(0); attempt < maxPeerRetries; attempt++ { if err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, - "Status.Peers", server.UseTLS, &struct{}{}, &peers); err != nil { + "Status.Peers", server.UseTLS, &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil { nextRetry := time.Duration((1 << attempt) * peerRetryBase) s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+ "%v...", server.Name, err, nextRetry.String()) diff --git a/agent/consul/status_endpoint.go b/agent/consul/status_endpoint.go index dac364444..c19b60318 100644 --- a/agent/consul/status_endpoint.go +++ b/agent/consul/status_endpoint.go @@ -5,6 +5,7 @@ import ( "strconv" "github.com/hashicorp/consul/agent/consul/autopilot" + "github.com/hashicorp/consul/agent/structs" ) // Status endpoint is used to check on server status @@ -18,7 +19,14 @@ func (s *Status) Ping(args struct{}, reply *struct{}) error { } // Leader is used to get the address of the leader -func (s *Status) Leader(args struct{}, reply *string) error { +func (s *Status) Leader(args *structs.DCSpecificRequest, reply *string) error { + // not using the regular forward function as it does a bunch of stuff we + // dont want like verifying consistency etc. We just want to enable DC + // forwarding + if args.Datacenter != "" && args.Datacenter != s.server.config.Datacenter { + return s.server.forwardDC("Status.Leader", args.Datacenter, args, reply) + } + leader := string(s.server.raft.Leader()) if leader != "" { *reply = leader @@ -29,7 +37,14 @@ func (s *Status) Leader(args struct{}, reply *string) error { } // Peers is used to get all the Raft peers -func (s *Status) Peers(args struct{}, reply *[]string) error { +func (s *Status) Peers(args *structs.DCSpecificRequest, reply *[]string) error { + // not using the regular forward function as it does a bunch of stuff we + // dont want like verifying consistency etc. We just want to enable DC + // forwarding + if args.Datacenter != "" && args.Datacenter != s.server.config.Datacenter { + return s.server.forwardDC("Status.Peers", args.Datacenter, args, reply) + } + future := s.server.raft.GetConfiguration() if err := future.Error(); err != nil { return err diff --git a/agent/consul/status_endpoint_test.go b/agent/consul/status_endpoint_test.go index 02d87b582..0403e6320 100644 --- a/agent/consul/status_endpoint_test.go +++ b/agent/consul/status_endpoint_test.go @@ -8,9 +8,11 @@ import ( "time" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/require" ) func rpcClient(t *testing.T, s *Server) rpc.ClientCodec { @@ -69,6 +71,32 @@ func TestStatusLeader(t *testing.T) { } } +func TestStatusLeader_ForwardDC(t *testing.T) { + t.Parallel() + dir1, s1 := testServerDC(t, "primary") + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + dir2, s2 := testServerDC(t, "secondary") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + joinWAN(t, s2, s1) + + testrpc.WaitForLeader(t, s1.RPC, "secondary") + testrpc.WaitForLeader(t, s2.RPC, "primary") + + args := structs.DCSpecificRequest{ + Datacenter: "secondary", + } + + var out string + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.Leader", &args, &out)) + require.Equal(t, s2.config.RPCAdvertise.String(), out) +} + func TestStatusPeers(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) @@ -86,3 +114,29 @@ func TestStatusPeers(t *testing.T) { t.Fatalf("no peers: %v", peers) } } + +func TestStatusPeers_ForwardDC(t *testing.T) { + t.Parallel() + dir1, s1 := testServerDC(t, "primary") + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + dir2, s2 := testServerDC(t, "secondary") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + joinWAN(t, s2, s1) + + testrpc.WaitForLeader(t, s1.RPC, "secondary") + testrpc.WaitForLeader(t, s2.RPC, "primary") + + args := structs.DCSpecificRequest{ + Datacenter: "secondary", + } + + var out []string + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.Peers", &args, &out)) + require.Equal(t, []string{s2.config.RPCAdvertise.String()}, out) +} diff --git a/agent/status_endpoint.go b/agent/status_endpoint.go index 75275800f..4f7769e41 100644 --- a/agent/status_endpoint.go +++ b/agent/status_endpoint.go @@ -2,19 +2,31 @@ package agent import ( "net/http" + + "github.com/hashicorp/consul/agent/structs" ) func (s *HTTPServer) StatusLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + var out string - if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil { + if err := s.agent.RPC("Status.Leader", &args, &out); err != nil { return nil, err } return out, nil } func (s *HTTPServer) StatusPeers(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + var out []string - if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil { + if err := s.agent.RPC("Status.Peers", &args, &out); err != nil { return nil, err } return out, nil diff --git a/agent/status_endpoint_test.go b/agent/status_endpoint_test.go index 93a48798e..76e661da3 100644 --- a/agent/status_endpoint_test.go +++ b/agent/status_endpoint_test.go @@ -1,10 +1,13 @@ package agent import ( + "fmt" "net/http" "testing" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" ) func TestStatusLeader(t *testing.T) { @@ -24,6 +27,42 @@ func TestStatusLeader(t *testing.T) { } } +func TestStatusLeaderSecondary(t *testing.T) { + t.Parallel() + a1 := NewTestAgent(t, t.Name(), "datacenter = \"primary\"") + defer a1.Shutdown() + a2 := NewTestAgent(t, t.Name(), "datacenter = \"secondary\"") + defer a2.Shutdown() + + testrpc.WaitForTestAgent(t, a1.RPC, "primary") + testrpc.WaitForTestAgent(t, a2.RPC, "secondary") + + a1SerfAddr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) + a1Addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.ServerPort) + a2Addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.ServerPort) + _, err := a2.JoinWAN([]string{a1SerfAddr}) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + require.Len(r, a1.WANMembers(), 2) + require.Len(r, a2.WANMembers(), 2) + }) + + req, _ := http.NewRequest("GET", "/v1/status/leader?dc=secondary", nil) + obj, err := a1.srv.StatusLeader(nil, req) + require.NoError(t, err) + leader, ok := obj.(string) + require.True(t, ok) + require.Equal(t, a2Addr, leader) + + req, _ = http.NewRequest("GET", "/v1/status/leader?dc=primary", nil) + obj, err = a2.srv.StatusLeader(nil, req) + require.NoError(t, err) + leader, ok = obj.(string) + require.True(t, ok) + require.Equal(t, a1Addr, leader) +} + func TestStatusPeers(t *testing.T) { t.Parallel() a := NewTestAgent(t, t.Name(), "") @@ -40,3 +79,39 @@ func TestStatusPeers(t *testing.T) { t.Fatalf("bad peers: %v", peers) } } + +func TestStatusPeersSecondary(t *testing.T) { + t.Parallel() + a1 := NewTestAgent(t, t.Name(), "datacenter = \"primary\"") + defer a1.Shutdown() + a2 := NewTestAgent(t, t.Name(), "datacenter = \"secondary\"") + defer a2.Shutdown() + + testrpc.WaitForTestAgent(t, a1.RPC, "primary") + testrpc.WaitForTestAgent(t, a2.RPC, "secondary") + + a1SerfAddr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) + a1Addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.ServerPort) + a2Addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.ServerPort) + _, err := a2.JoinWAN([]string{a1SerfAddr}) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + require.Len(r, a1.WANMembers(), 2) + require.Len(r, a2.WANMembers(), 2) + }) + + req, _ := http.NewRequest("GET", "/v1/status/peers?dc=secondary", nil) + obj, err := a1.srv.StatusPeers(nil, req) + require.NoError(t, err) + peers, ok := obj.([]string) + require.True(t, ok) + require.Equal(t, []string{a2Addr}, peers) + + req, _ = http.NewRequest("GET", "/v1/status/peers?dc=primary", nil) + obj, err = a2.srv.StatusPeers(nil, req) + require.NoError(t, err) + peers, ok = obj.([]string) + require.True(t, ok) + require.Equal(t, []string{a1Addr}, peers) +} diff --git a/website/source/api/status.html.md b/website/source/api/status.html.md index 3b85817c7..1479fab5d 100644 --- a/website/source/api/status.html.md +++ b/website/source/api/status.html.md @@ -33,6 +33,12 @@ The table below shows this endpoint's support for | ---------------- | ----------------- | ------------- | ------------ | | `NO` | `none` | `none` | `none` | +### Parameters + +- `dc` `(string: "")` - Specifies the datacenter to query. This will default to + the datacenter of the agent being queried. This is specified as part of the + URL as a query parameter. + ### Sample Request ```text @@ -65,6 +71,12 @@ The table below shows this endpoint's support for | ---------------- | ----------------- | ------------- | ------------ | | `NO` | `none` | `none` | `none` | +### Parameters + +- `dc` `(string: "")` - Specifies the datacenter to query. This will default to + the datacenter of the agent being queried. This is specified as part of the + URL as a query parameter. + ### Sample Request ```text