From e044343105d1f206524d43b7a215182ce0927826 Mon Sep 17 00:00:00 2001 From: Eric Haberkorn Date: Fri, 22 Jul 2022 09:14:43 -0400 Subject: [PATCH] Add Cluster Peering Failover Support to Prepared Queries (#13835) Add peering failover support to prepared queries --- agent/consul/prepared_query/template_test.go | 4 +- agent/consul/prepared_query/walk_test.go | 2 +- agent/consul/prepared_query_endpoint.go | 81 +++-- agent/consul/prepared_query_endpoint_test.go | 324 +++++++++++++++---- agent/dns_test.go | 2 +- agent/prepared_query_endpoint_test.go | 4 +- agent/structs/prepared_query.go | 35 +- api/prepared_query.go | 18 +- 8 files changed, 369 insertions(+), 101 deletions(-) diff --git a/agent/consul/prepared_query/template_test.go b/agent/consul/prepared_query/template_test.go index 05cbc17da..3fbf2d5af 100644 --- a/agent/consul/prepared_query/template_test.go +++ b/agent/consul/prepared_query/template_test.go @@ -22,7 +22,7 @@ var ( }, Service: structs.ServiceQuery{ Service: "${name.full}", - Failover: structs.QueryDatacenterOptions{ + Failover: structs.QueryFailoverOptions{ Datacenters: []string{ "${name.full}", "${name.prefix}", @@ -69,7 +69,7 @@ var ( }, Service: structs.ServiceQuery{ Service: "${name.full}", - Failover: structs.QueryDatacenterOptions{ + Failover: structs.QueryFailoverOptions{ Datacenters: []string{ "dc1", "dc2", diff --git a/agent/consul/prepared_query/walk_test.go b/agent/consul/prepared_query/walk_test.go index e45aa3a1e..ad71e0fed 100644 --- a/agent/consul/prepared_query/walk_test.go +++ b/agent/consul/prepared_query/walk_test.go @@ -20,7 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) { service := &structs.ServiceQuery{ Service: "the-service", - Failover: structs.QueryDatacenterOptions{ + Failover: structs.QueryFailoverOptions{ Datacenters: []string{"dc1", "dc2"}, }, Near: "_agent", diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index fc0642b6f..7215161f3 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -187,11 +187,16 @@ func parseService(svc *structs.ServiceQuery) error { return fmt.Errorf("Must provide a Service name to query") } + failover := svc.Failover // NearestN can be 0 which means "don't fail over by RTT". - if svc.Failover.NearestN < 0 { + if failover.NearestN < 0 { return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN) } + if (failover.NearestN != 0 || len(failover.Datacenters) != 0) && len(failover.Targets) != 0 { + return fmt.Errorf("Targets cannot be populated with NearestN or Datacenters") + } + // Make sure the metadata filters are valid if err := structs.ValidateNodeMetadata(svc.NodeMeta, true); err != nil { return err @@ -462,7 +467,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // and bail out. Otherwise, we fail over and try remote DCs, as allowed // by the query setup. if len(reply.Nodes) == 0 { - wrapper := &queryServerWrapper{p.srv} + wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote} if err := queryFailover(wrapper, query, args, reply); err != nil { return err } @@ -565,8 +570,13 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery, reply.Nodes = nodes reply.DNS = query.DNS - // Stamp the result for this datacenter. - reply.Datacenter = p.srv.config.Datacenter + // Stamp the result with its this datacenter or peer. + if peerName := query.Service.PeerName; peerName != "" { + reply.PeerName = peerName + reply.Datacenter = "" + } else { + reply.Datacenter = p.srv.config.Datacenter + } return nil } @@ -651,12 +661,24 @@ func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNode type queryServer interface { GetLogger() hclog.Logger GetOtherDatacentersByDistance() ([]string, error) - ForwardDC(method, dc string, args interface{}, reply interface{}) error + GetLocalDC() string + ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error } // queryServerWrapper applies the queryServer interface to a Server. type queryServerWrapper struct { - srv *Server + srv *Server + executeRemote func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error +} + +// GetLocalDC returns the name of the local datacenter. +func (q *queryServerWrapper) GetLocalDC() string { + return q.srv.config.Datacenter +} + +// ExecuteRemote calls ExecuteRemote on PreparedQuery. +func (q *queryServerWrapper) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + return q.executeRemote(args, reply) } // GetLogger returns the server's logger. @@ -683,11 +705,6 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) { return result, nil } -// ForwardDC calls into the server's RPC forwarder. -func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error { - return q.srv.forwardDC(method, dc, args, reply) -} - // queryFailover runs an algorithm to determine which DCs to try and then calls // them to try to locate alternative services. func queryFailover(q queryServer, query *structs.PreparedQuery, @@ -709,7 +726,7 @@ func queryFailover(q queryServer, query *structs.PreparedQuery, // Build a candidate list of DCs to try, starting with the nearest N // from RTTs. - var dcs []string + var targets []structs.QueryFailoverTarget index := make(map[string]struct{}) if query.Service.Failover.NearestN > 0 { for i, dc := range nearest { @@ -717,30 +734,36 @@ func queryFailover(q queryServer, query *structs.PreparedQuery, break } - dcs = append(dcs, dc) + targets = append(targets, structs.QueryFailoverTarget{Datacenter: dc}) index[dc] = struct{}{} } } // Then add any DCs explicitly listed that weren't selected above. - for _, dc := range query.Service.Failover.Datacenters { + for _, target := range query.Service.Failover.AsTargets() { // This will prevent a log of other log spammage if we do not // attempt to talk to datacenters we don't know about. - if _, ok := known[dc]; !ok { - q.GetLogger().Debug("Skipping unknown datacenter in prepared query", "datacenter", dc) - continue + if dc := target.Datacenter; dc != "" { + if _, ok := known[dc]; !ok { + q.GetLogger().Debug("Skipping unknown datacenter in prepared query", "datacenter", dc) + continue + } + + // This will make sure we don't re-try something that fails + // from the NearestN list. + if _, ok := index[dc]; !ok { + targets = append(targets, target) + } } - // This will make sure we don't re-try something that fails - // from the NearestN list. - if _, ok := index[dc]; !ok { - dcs = append(dcs, dc) + if target.PeerName != "" { + targets = append(targets, target) } } // Now try the selected DCs in priority order. failovers := 0 - for _, dc := range dcs { + for _, target := range targets { // This keeps track of how many iterations we actually run. failovers++ @@ -752,7 +775,15 @@ func queryFailover(q queryServer, query *structs.PreparedQuery, // through this slice across successive RPC calls. reply.Nodes = nil - // Note that we pass along the limit since it can be applied + // Reset PeerName because it may have been set by a previous failover + // target. + query.Service.PeerName = target.PeerName + dc := target.Datacenter + if target.PeerName != "" { + dc = q.GetLocalDC() + } + + // Note that we pass along the limit since may be applied // remotely to save bandwidth. We also pass along the consistency // mode information and token we were given, so that applies to // the remote query as well. @@ -763,9 +794,11 @@ func queryFailover(q queryServer, query *structs.PreparedQuery, QueryOptions: args.QueryOptions, Connect: args.Connect, } - if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil { + + if err = q.ExecuteRemote(remote, reply); err != nil { q.GetLogger().Warn("Failed querying for service in datacenter", "service", query.Service.Service, + "peerName", query.Service.PeerName, "datacenter", dc, "error", err, ) diff --git a/agent/consul/prepared_query_endpoint_test.go b/agent/consul/prepared_query_endpoint_test.go index 30de90fb2..4965a2a0d 100644 --- a/agent/consul/prepared_query_endpoint_test.go +++ b/agent/consul/prepared_query_endpoint_test.go @@ -2,6 +2,9 @@ package consul import ( "bytes" + "context" + "encoding/base64" + "encoding/json" "fmt" "os" "reflect" @@ -14,6 +17,7 @@ import ( "github.com/hashicorp/serf/coordinate" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/consul-net-rpc/net/rpc" @@ -23,6 +27,7 @@ import ( "github.com/hashicorp/consul/agent/structs/aclfilter" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/types" @@ -82,8 +87,25 @@ func TestPreparedQuery_Apply(t *testing.T) { t.Fatalf("bad: %v", err) } - // Fix that and make sure it propagates an error from the Raft apply. + // Fix that and ensure Targets and NearestN cannot be set at the same time. + query.Query.Service.Failover.NearestN = 1 + query.Query.Service.Failover.Targets = []structs.QueryFailoverTarget{{PeerName: "peer"}} + err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply) + if err == nil || !strings.Contains(err.Error(), "Targets cannot be populated with") { + t.Fatalf("bad: %v", err) + } + + // Fix that and ensure Targets and Datacenters cannot be set at the same time. query.Query.Service.Failover.NearestN = 0 + query.Query.Service.Failover.Datacenters = []string{"dc2"} + query.Query.Service.Failover.Targets = []structs.QueryFailoverTarget{{PeerName: "peer"}} + err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply) + if err == nil || !strings.Contains(err.Error(), "Targets cannot be populated with") { + t.Fatalf("bad: %v", err) + } + + // Fix that and make sure it propagates an error from the Raft apply. + query.Query.Service.Failover.Targets = nil query.Query.Session = "nope" err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply) if err == nil || !strings.Contains(err.Error(), "invalid session") { @@ -1442,6 +1464,17 @@ func TestPreparedQuery_Execute(t *testing.T) { s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc3" + c.PrimaryDatacenter = "dc3" + c.NodeName = "acceptingServer.dc3" + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + waitForLeaderEstablishment(t, s3) + codec3 := rpcClient(t, s3) + defer codec3.Close() + // Try to WAN join. joinWAN(t, s2, s1) retry.Run(t, func(r *retry.R) { @@ -1456,6 +1489,70 @@ func TestPreparedQuery_Execute(t *testing.T) { // check for RPC forwarding testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root")) testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root")) + testrpc.WaitForLeader(t, s3.RPC, "dc3") + + acceptingPeerName := "my-peer-accepting-server" + dialingPeerName := "my-peer-dialing-server" + + // Set up peering between dc1 (dailing) and dc3 (accepting) and export the foo service + { + // Create a peering by generating a token. + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + peeringClient := pbpeering.NewPeeringServiceClient(conn) + req := pbpeering.GenerateTokenRequest{ + PeerName: dialingPeerName, + } + resp, err := peeringClient.GenerateToken(ctx, &req) + require.NoError(t, err) + tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken) + require.NoError(t, err) + var token structs.PeeringToken + require.NoError(t, json.Unmarshal(tokenJSON, &token)) + + p := &pbpeering.Peering{ + ID: "cc56f0b8-3885-4e78-8d7b-614a0c45712d", + Name: acceptingPeerName, + PeerID: token.PeerID, + PeerCAPems: token.CA, + PeerServerName: token.ServerName, + PeerServerAddresses: token.ServerAddresses, + } + require.True(t, p.ShouldDial()) + require.NoError(t, s1.fsm.State().PeeringWrite(1000, p)) + + // Wait for the stream to be connected. + retry.Run(t, func(r *retry.R) { + status, found := s1.peerStreamServer.StreamStatus(p.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + exportedServices := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc3", + Entry: &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "foo", + Consumers: []structs.ServiceConsumer{{PeerName: dialingPeerName}}, + }, + }, + }, + } + var configOutput bool + require.NoError(t, msgpackrpc.CallWithCodec(codec3, "ConfigEntry.Apply", &exportedServices, &configOutput)) + require.True(t, configOutput) + } execNoNodesToken := createTokenWithPolicyName(t, codec1, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root") rules := ` @@ -1485,9 +1582,16 @@ func TestPreparedQuery_Execute(t *testing.T) { // Set up some nodes in each DC that host the service. { for i := 0; i < 10; i++ { - for _, dc := range []string{"dc1", "dc2"} { + for _, d := range []struct { + codec rpc.ClientCodec + dc string + }{ + {codec1, "dc1"}, + {codec2, "dc2"}, + {codec3, "dc3"}, + } { req := structs.RegisterRequest{ - Datacenter: dc, + Datacenter: d.dc, Node: fmt.Sprintf("node%d", i+1), Address: fmt.Sprintf("127.0.0.%d", i+1), NodeMeta: map[string]string{ @@ -1497,7 +1601,7 @@ func TestPreparedQuery_Execute(t *testing.T) { Service: &structs.NodeService{ Service: "foo", Port: 8000, - Tags: []string{dc, fmt.Sprintf("tag%d", i+1)}, + Tags: []string{d.dc, fmt.Sprintf("tag%d", i+1)}, Meta: map[string]string{ "svc-group": fmt.Sprintf("%d", i%2), "foo": "true", @@ -1510,15 +1614,8 @@ func TestPreparedQuery_Execute(t *testing.T) { req.Service.Meta["unique"] = "true" } - var codec rpc.ClientCodec - if dc == "dc1" { - codec = codec1 - } else { - codec = codec2 - } - var reply struct{} - if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(d.codec, "Catalog.Register", &req, &reply); err != nil { t.Fatalf("err: %v", err) } } @@ -1576,6 +1673,17 @@ func TestPreparedQuery_Execute(t *testing.T) { assert.True(t, reply.QueryMeta.KnownLeader) } + expectFailoverPeerNodes := func(t *testing.T, query *structs.PreparedQueryRequest, reply *structs.PreparedQueryExecuteResponse, n int) { + t.Helper() + assert.Len(t, reply.Nodes, n) + assert.Equal(t, "", reply.Datacenter) + assert.Equal(t, acceptingPeerName, reply.PeerName) + assert.Equal(t, 2, reply.Failovers) + assert.Equal(t, query.Query.Service.Service, reply.Service) + assert.Equal(t, query.Query.DNS, reply.DNS) + assert.True(t, reply.QueryMeta.KnownLeader) + } + t.Run("run the registered query", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", @@ -1962,10 +2070,10 @@ func TestPreparedQuery_Execute(t *testing.T) { require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) // Update the health of a node to mark it critical. - setHealth := func(t *testing.T, node string, health string) { + setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, node string, health string) { t.Helper() req := structs.RegisterRequest{ - Datacenter: "dc1", + Datacenter: dc, Node: node, Address: "127.0.0.1", Service: &structs.NodeService{ @@ -1981,9 +2089,9 @@ func TestPreparedQuery_Execute(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var reply struct{} - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "Catalog.Register", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply)) } - setHealth(t, "node1", api.HealthCritical) + setHealth(t, codec1, "dc1", "node1", api.HealthCritical) // The failing node should be filtered. t.Run("failing node filtered", func(t *testing.T) { @@ -2003,7 +2111,7 @@ func TestPreparedQuery_Execute(t *testing.T) { }) // Upgrade it to a warning and re-query, should be 10 nodes again. - setHealth(t, "node1", api.HealthWarning) + setHealth(t, codec1, "dc1", "node1", api.HealthWarning) t.Run("warning nodes are included", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", @@ -2173,7 +2281,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // Now fail everything in dc1 and we should get an empty list back. for i := 0; i < 10; i++ { - setHealth(t, fmt.Sprintf("node%d", i+1), api.HealthCritical) + setHealth(t, codec1, "dc1", fmt.Sprintf("node%d", i+1), api.HealthCritical) } t.Run("everything is failing so should get empty list", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ @@ -2308,6 +2416,61 @@ func TestPreparedQuery_Execute(t *testing.T) { assert.NotEqual(t, "node3", node.Node.Node) } }) + + // Modify the query to have it fail over to a bogus DC and then dc2. + query.Query.Service.Failover = structs.QueryFailoverOptions{ + Targets: []structs.QueryFailoverTarget{ + {Datacenter: "dc2"}, + {PeerName: acceptingPeerName}, + }, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + + // Ensure the foo service has fully replicated. + retry.Run(t, func(r *retry.R) { + _, nodes, err := s1.fsm.State().CheckServiceNodes(nil, "foo", nil, acceptingPeerName) + require.NoError(r, err) + require.Len(r, nodes, 10) + }) + + // Now we should see 9 nodes from dc2 + t.Run("failing over to cluster peers", func(t *testing.T) { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + QueryOptions: structs.QueryOptions{Token: execToken}, + } + + var reply structs.PreparedQueryExecuteResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + + for _, node := range reply.Nodes { + assert.NotEqual(t, "node3", node.Node.Node) + } + expectFailoverNodes(t, &query, &reply, 9) + }) + + // Set all checks in dc2 as critical + for i := 0; i < 10; i++ { + setHealth(t, codec2, "dc2", fmt.Sprintf("node%d", i+1), api.HealthCritical) + } + + // Now we should see 9 nodes from dc3 (we have the tag filter still) + t.Run("failing over to cluster peers", func(t *testing.T) { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + QueryOptions: structs.QueryOptions{Token: execToken}, + } + + var reply structs.PreparedQueryExecuteResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + + for _, node := range reply.Nodes { + assert.NotEqual(t, "node3", node.Node.Node) + } + expectFailoverPeerNodes(t, &query, &reply, 9) + }) } func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) { @@ -2724,7 +2887,9 @@ func TestPreparedQuery_Wrapper(t *testing.T) { joinWAN(t, s2, s1) // Try all the operations on a real server via the wrapper. - wrapper := &queryServerWrapper{s1} + wrapper := &queryServerWrapper{srv: s1, executeRemote: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + return nil + }} wrapper.GetLogger().Debug("Test") ret, err := wrapper.GetOtherDatacentersByDistance() @@ -2746,7 +2911,7 @@ type mockQueryServer struct { Datacenters []string DatacentersError error QueryLog []string - QueryFn func(dc string, args interface{}, reply interface{}) error + QueryFn func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error Logger hclog.Logger LogBuffer *bytes.Buffer } @@ -2768,17 +2933,27 @@ func (m *mockQueryServer) GetLogger() hclog.Logger { return m.Logger } +func (m *mockQueryServer) GetLocalDC() string { + return "dc1" +} + func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) { return m.Datacenters, m.DatacentersError } -func (m *mockQueryServer) ForwardDC(method, dc string, args interface{}, reply interface{}) error { - m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, method)) - if ret, ok := reply.(*structs.PreparedQueryExecuteResponse); ok { - ret.Datacenter = dc +func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + peerName := args.Query.Service.PeerName + dc := args.Datacenter + if peerName != "" { + m.QueryLog = append(m.QueryLog, fmt.Sprintf("peer:%s", peerName)) + } else { + m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, "PreparedQuery.ExecuteRemote")) } + reply.PeerName = peerName + reply.Datacenter = dc + if m.QueryFn != nil { - return m.QueryFn(dc, args, reply) + return m.QueryFn(args, reply) } return nil } @@ -2788,7 +2963,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) { query := &structs.PreparedQuery{ Name: "test", Service: structs.ServiceQuery{ - Failover: structs.QueryDatacenterOptions{ + Failover: structs.QueryFailoverOptions{ NearestN: 0, Datacenters: []string{""}, }, @@ -2862,10 +3037,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, _ interface{}, reply interface{}) error { - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "dc1" { - ret.Nodes = nodes() + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "dc1" { + reply.Nodes = nodes() } return nil }, @@ -2890,10 +3064,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, _ interface{}, reply interface{}) error { - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "dc3" { - ret.Nodes = nodes() + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "dc3" { + reply.Nodes = nodes() } return nil }, @@ -2926,7 +3099,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) { } if len(reply.Nodes) != 0 || reply.Datacenter != "xxx" || reply.Failovers != 4 { - t.Fatalf("bad: %v", reply) + t.Fatalf("bad: %+v", reply) } if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) @@ -2940,10 +3113,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, _ interface{}, reply interface{}) error { - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "dc4" { - ret.Nodes = nodes() + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "dc4" { + reply.Nodes = nodes() } return nil }, @@ -2969,10 +3141,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, _ interface{}, reply interface{}) error { - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "dc4" { - ret.Nodes = nodes() + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "dc4" { + reply.Nodes = nodes() } return nil }, @@ -2998,10 +3169,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, _ interface{}, reply interface{}) error { - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "dc4" { - ret.Nodes = nodes() + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "dc4" { + reply.Nodes = nodes() } return nil }, @@ -3029,12 +3199,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, _ interface{}, reply interface{}) error { - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "dc1" { + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "dc1" { return fmt.Errorf("XXX") - } else if dc == "dc4" { - ret.Nodes = nodes() + } else if req.Datacenter == "dc4" { + reply.Nodes = nodes() } return nil }, @@ -3063,10 +3232,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, _ interface{}, reply interface{}) error { - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "xxx" { - ret.Nodes = nodes() + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "xxx" { + reply.Nodes = nodes() } return nil }, @@ -3092,17 +3260,15 @@ func TestPreparedQuery_queryFailover(t *testing.T) { { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, - QueryFn: func(dc string, args interface{}, reply interface{}) error { - inp := args.(*structs.PreparedQueryExecuteRemoteRequest) - ret := reply.(*structs.PreparedQueryExecuteResponse) - if dc == "xxx" { - if inp.Limit != 5 { - t.Fatalf("bad: %d", inp.Limit) + QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if req.Datacenter == "xxx" { + if req.Limit != 5 { + t.Fatalf("bad: %d", req.Limit) } - if inp.RequireConsistent != true { - t.Fatalf("bad: %v", inp.RequireConsistent) + if req.RequireConsistent != true { + t.Fatalf("bad: %v", req.RequireConsistent) } - ret.Nodes = nodes() + reply.Nodes = nodes() } return nil }, @@ -3124,4 +3290,32 @@ func TestPreparedQuery_queryFailover(t *testing.T) { t.Fatalf("bad: %s", queries) } } + + // Failover returns data from the first cluster peer with data. + query.Service.Failover.Datacenters = nil + query.Service.Failover.Targets = []structs.QueryFailoverTarget{ + {PeerName: "cluster-01"}, + {Datacenter: "dc44"}, + {PeerName: "cluster-02"}, + } + { + mock := &mockQueryServer{ + Datacenters: []string{"dc44"}, + QueryFn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if args.Query.Service.PeerName == "cluster-02" { + reply.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + require.Equal(t, "cluster-02", reply.PeerName) + require.Equal(t, 3, reply.Failovers) + require.Equal(t, nodes(), reply.Nodes) + require.Equal(t, "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", mock.JoinQueryLog()) + } } diff --git a/agent/dns_test.go b/agent/dns_test.go index f0d82d2e7..51f2b6d54 100644 --- a/agent/dns_test.go +++ b/agent/dns_test.go @@ -6075,7 +6075,7 @@ func TestDNS_PreparedQuery_Failover(t *testing.T) { Name: "my-query", Service: structs.ServiceQuery{ Service: "db", - Failover: structs.QueryDatacenterOptions{ + Failover: structs.QueryFailoverOptions{ Datacenters: []string{"dc2"}, }, }, diff --git a/agent/prepared_query_endpoint_test.go b/agent/prepared_query_endpoint_test.go index 34b8975fd..9cf805b88 100644 --- a/agent/prepared_query_endpoint_test.go +++ b/agent/prepared_query_endpoint_test.go @@ -92,7 +92,7 @@ func TestPreparedQuery_Create(t *testing.T) { Session: "my-session", Service: structs.ServiceQuery{ Service: "my-service", - Failover: structs.QueryDatacenterOptions{ + Failover: structs.QueryFailoverOptions{ NearestN: 4, Datacenters: []string{"dc1", "dc2"}, }, @@ -883,7 +883,7 @@ func TestPreparedQuery_Update(t *testing.T) { Session: "my-session", Service: structs.ServiceQuery{ Service: "my-service", - Failover: structs.QueryDatacenterOptions{ + Failover: structs.QueryFailoverOptions{ NearestN: 4, Datacenters: []string{"dc1", "dc2"}, }, diff --git a/agent/structs/prepared_query.go b/agent/structs/prepared_query.go index 440053f0b..cd8ec574b 100644 --- a/agent/structs/prepared_query.go +++ b/agent/structs/prepared_query.go @@ -10,9 +10,9 @@ import ( "github.com/hashicorp/consul/types" ) -// QueryDatacenterOptions sets options about how we fail over if there are no +// QueryFailoverOptions sets options about how we fail over if there are no // healthy nodes in the local datacenter. -type QueryDatacenterOptions struct { +type QueryFailoverOptions struct { // NearestN is set to the number of remote datacenters to try, based on // network coordinates. NearestN int @@ -21,6 +21,32 @@ type QueryDatacenterOptions struct { // never try a datacenter multiple times, so those are subtracted from // this list before proceeding. Datacenters []string + + // Targets is a fixed list of datacenters and peers to try. This field cannot + // be populated with NearestN or Datacenters. + Targets []QueryFailoverTarget +} + +// AsTargets either returns Targets as is or Datacenters converted into +// Targets. +func (f *QueryFailoverOptions) AsTargets() []QueryFailoverTarget { + if dcs := f.Datacenters; len(dcs) > 0 { + var targets []QueryFailoverTarget + for _, dc := range dcs { + targets = append(targets, QueryFailoverTarget{Datacenter: dc}) + } + return targets + } + + return f.Targets +} + +type QueryFailoverTarget struct { + // PeerName specifies a peer to try during failover. + PeerName string + + // Datacenter specifies a datacenter to try during failover. + Datacenter string } // QueryDNSOptions controls settings when query results are served over DNS. @@ -37,7 +63,7 @@ type ServiceQuery struct { // Failover controls what we do if there are no healthy nodes in the // local datacenter. - Failover QueryDatacenterOptions + Failover QueryFailoverOptions // If OnlyPassing is true then we will only include nodes with passing // health checks (critical AND warning checks will cause a node to be @@ -323,6 +349,9 @@ type PreparedQueryExecuteResponse struct { // Datacenter is the datacenter that these results came from. Datacenter string + // PeerName specifies the cluster peer that these results came from. + PeerName string + // Failovers is a count of how many times we had to query a remote // datacenter. Failovers int diff --git a/api/prepared_query.go b/api/prepared_query.go index b3dd7be6f..60cd437cb 100644 --- a/api/prepared_query.go +++ b/api/prepared_query.go @@ -1,8 +1,8 @@ package api -// QueryDatacenterOptions sets options about how we fail over if there are no +// QueryFailoverOptions sets options about how we fail over if there are no // healthy nodes in the local datacenter. -type QueryDatacenterOptions struct { +type QueryFailoverOptions struct { // NearestN is set to the number of remote datacenters to try, based on // network coordinates. NearestN int @@ -11,6 +11,18 @@ type QueryDatacenterOptions struct { // never try a datacenter multiple times, so those are subtracted from // this list before proceeding. Datacenters []string + + // Targets is a fixed list of datacenters and peers to try. This field cannot + // be populated with NearestN or Datacenters. + Targets []QueryFailoverTarget +} + +type QueryFailoverTarget struct { + // PeerName specifies a peer to try during failover. + PeerName string + + // Datacenter specifies a datacenter to try during failover. + Datacenter string } // QueryDNSOptions controls settings when query results are served over DNS. @@ -35,7 +47,7 @@ type ServiceQuery struct { // Failover controls what we do if there are no healthy nodes in the // local datacenter. - Failover QueryDatacenterOptions + Failover QueryFailoverOptions // IgnoreCheckIDs is an optional list of health check IDs to ignore when // considering which nodes are healthy. It is useful as an emergency measure