Add Cluster Peering Failover Support to Prepared Queries (#13835)

Add peering failover support to prepared queries
This commit is contained in:
Eric Haberkorn 2022-07-22 09:14:43 -04:00 committed by GitHub
parent cbafabde16
commit e044343105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 369 additions and 101 deletions

View File

@ -22,7 +22,7 @@ var (
}, },
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "${name.full}", Service: "${name.full}",
Failover: structs.QueryDatacenterOptions{ Failover: structs.QueryFailoverOptions{
Datacenters: []string{ Datacenters: []string{
"${name.full}", "${name.full}",
"${name.prefix}", "${name.prefix}",
@ -69,7 +69,7 @@ var (
}, },
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "${name.full}", Service: "${name.full}",
Failover: structs.QueryDatacenterOptions{ Failover: structs.QueryFailoverOptions{
Datacenters: []string{ Datacenters: []string{
"dc1", "dc1",
"dc2", "dc2",

View File

@ -20,7 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
service := &structs.ServiceQuery{ service := &structs.ServiceQuery{
Service: "the-service", Service: "the-service",
Failover: structs.QueryDatacenterOptions{ Failover: structs.QueryFailoverOptions{
Datacenters: []string{"dc1", "dc2"}, Datacenters: []string{"dc1", "dc2"},
}, },
Near: "_agent", Near: "_agent",

View File

@ -187,11 +187,16 @@ func parseService(svc *structs.ServiceQuery) error {
return fmt.Errorf("Must provide a Service name to query") return fmt.Errorf("Must provide a Service name to query")
} }
failover := svc.Failover
// NearestN can be 0 which means "don't fail over by RTT". // NearestN can be 0 which means "don't fail over by RTT".
if svc.Failover.NearestN < 0 { if failover.NearestN < 0 {
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN) return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
} }
if (failover.NearestN != 0 || len(failover.Datacenters) != 0) && len(failover.Targets) != 0 {
return fmt.Errorf("Targets cannot be populated with NearestN or Datacenters")
}
// Make sure the metadata filters are valid // Make sure the metadata filters are valid
if err := structs.ValidateNodeMetadata(svc.NodeMeta, true); err != nil { if err := structs.ValidateNodeMetadata(svc.NodeMeta, true); err != nil {
return err return err
@ -462,7 +467,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// and bail out. Otherwise, we fail over and try remote DCs, as allowed // and bail out. Otherwise, we fail over and try remote DCs, as allowed
// by the query setup. // by the query setup.
if len(reply.Nodes) == 0 { if len(reply.Nodes) == 0 {
wrapper := &queryServerWrapper{p.srv} wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
if err := queryFailover(wrapper, query, args, reply); err != nil { if err := queryFailover(wrapper, query, args, reply); err != nil {
return err return err
} }
@ -565,8 +570,13 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
reply.Nodes = nodes reply.Nodes = nodes
reply.DNS = query.DNS reply.DNS = query.DNS
// Stamp the result for this datacenter. // Stamp the result with its this datacenter or peer.
reply.Datacenter = p.srv.config.Datacenter if peerName := query.Service.PeerName; peerName != "" {
reply.PeerName = peerName
reply.Datacenter = ""
} else {
reply.Datacenter = p.srv.config.Datacenter
}
return nil return nil
} }
@ -651,12 +661,24 @@ func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNode
type queryServer interface { type queryServer interface {
GetLogger() hclog.Logger GetLogger() hclog.Logger
GetOtherDatacentersByDistance() ([]string, error) GetOtherDatacentersByDistance() ([]string, error)
ForwardDC(method, dc string, args interface{}, reply interface{}) error GetLocalDC() string
ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
} }
// queryServerWrapper applies the queryServer interface to a Server. // queryServerWrapper applies the queryServer interface to a Server.
type queryServerWrapper struct { type queryServerWrapper struct {
srv *Server srv *Server
executeRemote func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
}
// GetLocalDC returns the name of the local datacenter.
func (q *queryServerWrapper) GetLocalDC() string {
return q.srv.config.Datacenter
}
// ExecuteRemote calls ExecuteRemote on PreparedQuery.
func (q *queryServerWrapper) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
return q.executeRemote(args, reply)
} }
// GetLogger returns the server's logger. // GetLogger returns the server's logger.
@ -683,11 +705,6 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
return result, nil return result, nil
} }
// ForwardDC calls into the server's RPC forwarder.
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
return q.srv.forwardDC(method, dc, args, reply)
}
// queryFailover runs an algorithm to determine which DCs to try and then calls // queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services. // them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery, func queryFailover(q queryServer, query *structs.PreparedQuery,
@ -709,7 +726,7 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// Build a candidate list of DCs to try, starting with the nearest N // Build a candidate list of DCs to try, starting with the nearest N
// from RTTs. // from RTTs.
var dcs []string var targets []structs.QueryFailoverTarget
index := make(map[string]struct{}) index := make(map[string]struct{})
if query.Service.Failover.NearestN > 0 { if query.Service.Failover.NearestN > 0 {
for i, dc := range nearest { for i, dc := range nearest {
@ -717,30 +734,36 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
break break
} }
dcs = append(dcs, dc) targets = append(targets, structs.QueryFailoverTarget{Datacenter: dc})
index[dc] = struct{}{} index[dc] = struct{}{}
} }
} }
// Then add any DCs explicitly listed that weren't selected above. // Then add any DCs explicitly listed that weren't selected above.
for _, dc := range query.Service.Failover.Datacenters { for _, target := range query.Service.Failover.AsTargets() {
// This will prevent a log of other log spammage if we do not // This will prevent a log of other log spammage if we do not
// attempt to talk to datacenters we don't know about. // attempt to talk to datacenters we don't know about.
if _, ok := known[dc]; !ok { if dc := target.Datacenter; dc != "" {
q.GetLogger().Debug("Skipping unknown datacenter in prepared query", "datacenter", dc) if _, ok := known[dc]; !ok {
continue q.GetLogger().Debug("Skipping unknown datacenter in prepared query", "datacenter", dc)
continue
}
// This will make sure we don't re-try something that fails
// from the NearestN list.
if _, ok := index[dc]; !ok {
targets = append(targets, target)
}
} }
// This will make sure we don't re-try something that fails if target.PeerName != "" {
// from the NearestN list. targets = append(targets, target)
if _, ok := index[dc]; !ok {
dcs = append(dcs, dc)
} }
} }
// Now try the selected DCs in priority order. // Now try the selected DCs in priority order.
failovers := 0 failovers := 0
for _, dc := range dcs { for _, target := range targets {
// This keeps track of how many iterations we actually run. // This keeps track of how many iterations we actually run.
failovers++ failovers++
@ -752,7 +775,15 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// through this slice across successive RPC calls. // through this slice across successive RPC calls.
reply.Nodes = nil reply.Nodes = nil
// Note that we pass along the limit since it can be applied // Reset PeerName because it may have been set by a previous failover
// target.
query.Service.PeerName = target.PeerName
dc := target.Datacenter
if target.PeerName != "" {
dc = q.GetLocalDC()
}
// Note that we pass along the limit since may be applied
// remotely to save bandwidth. We also pass along the consistency // remotely to save bandwidth. We also pass along the consistency
// mode information and token we were given, so that applies to // mode information and token we were given, so that applies to
// the remote query as well. // the remote query as well.
@ -763,9 +794,11 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
QueryOptions: args.QueryOptions, QueryOptions: args.QueryOptions,
Connect: args.Connect, Connect: args.Connect,
} }
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
if err = q.ExecuteRemote(remote, reply); err != nil {
q.GetLogger().Warn("Failed querying for service in datacenter", q.GetLogger().Warn("Failed querying for service in datacenter",
"service", query.Service.Service, "service", query.Service.Service,
"peerName", query.Service.PeerName,
"datacenter", dc, "datacenter", dc,
"error", err, "error", err,
) )

View File

@ -2,6 +2,9 @@ package consul
import ( import (
"bytes" "bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt" "fmt"
"os" "os"
"reflect" "reflect"
@ -14,6 +17,7 @@ import (
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul-net-rpc/net/rpc"
@ -23,6 +27,7 @@ import (
"github.com/hashicorp/consul/agent/structs/aclfilter" "github.com/hashicorp/consul/agent/structs/aclfilter"
tokenStore "github.com/hashicorp/consul/agent/token" tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
@ -82,8 +87,25 @@ func TestPreparedQuery_Apply(t *testing.T) {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
// Fix that and make sure it propagates an error from the Raft apply. // Fix that and ensure Targets and NearestN cannot be set at the same time.
query.Query.Service.Failover.NearestN = 1
query.Query.Service.Failover.Targets = []structs.QueryFailoverTarget{{PeerName: "peer"}}
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
if err == nil || !strings.Contains(err.Error(), "Targets cannot be populated with") {
t.Fatalf("bad: %v", err)
}
// Fix that and ensure Targets and Datacenters cannot be set at the same time.
query.Query.Service.Failover.NearestN = 0 query.Query.Service.Failover.NearestN = 0
query.Query.Service.Failover.Datacenters = []string{"dc2"}
query.Query.Service.Failover.Targets = []structs.QueryFailoverTarget{{PeerName: "peer"}}
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
if err == nil || !strings.Contains(err.Error(), "Targets cannot be populated with") {
t.Fatalf("bad: %v", err)
}
// Fix that and make sure it propagates an error from the Raft apply.
query.Query.Service.Failover.Targets = nil
query.Query.Session = "nope" query.Query.Session = "nope"
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply) err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
if err == nil || !strings.Contains(err.Error(), "invalid session") { if err == nil || !strings.Contains(err.Error(), "invalid session") {
@ -1442,6 +1464,17 @@ func TestPreparedQuery_Execute(t *testing.T) {
s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc3"
c.PrimaryDatacenter = "dc3"
c.NodeName = "acceptingServer.dc3"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
waitForLeaderEstablishment(t, s3)
codec3 := rpcClient(t, s3)
defer codec3.Close()
// Try to WAN join. // Try to WAN join.
joinWAN(t, s2, s1) joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -1456,6 +1489,70 @@ func TestPreparedQuery_Execute(t *testing.T) {
// check for RPC forwarding // check for RPC forwarding
testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root")) testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root"))
testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root")) testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root"))
testrpc.WaitForLeader(t, s3.RPC, "dc3")
acceptingPeerName := "my-peer-accepting-server"
dialingPeerName := "my-peer-dialing-server"
// Set up peering between dc1 (dailing) and dc3 (accepting) and export the foo service
{
// Create a peering by generating a token.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: dialingPeerName,
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
p := &pbpeering.Peering{
ID: "cc56f0b8-3885-4e78-8d7b-614a0c45712d",
Name: acceptingPeerName,
PeerID: token.PeerID,
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
require.True(t, p.ShouldDial())
require.NoError(t, s1.fsm.State().PeeringWrite(1000, p))
// Wait for the stream to be connected.
retry.Run(t, func(r *retry.R) {
status, found := s1.peerStreamServer.StreamStatus(p.ID)
require.True(r, found)
require.True(r, status.Connected)
})
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc3",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{{PeerName: dialingPeerName}},
},
},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(codec3, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}
execNoNodesToken := createTokenWithPolicyName(t, codec1, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root") execNoNodesToken := createTokenWithPolicyName(t, codec1, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root")
rules := ` rules := `
@ -1485,9 +1582,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Set up some nodes in each DC that host the service. // Set up some nodes in each DC that host the service.
{ {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
for _, dc := range []string{"dc1", "dc2"} { for _, d := range []struct {
codec rpc.ClientCodec
dc string
}{
{codec1, "dc1"},
{codec2, "dc2"},
{codec3, "dc3"},
} {
req := structs.RegisterRequest{ req := structs.RegisterRequest{
Datacenter: dc, Datacenter: d.dc,
Node: fmt.Sprintf("node%d", i+1), Node: fmt.Sprintf("node%d", i+1),
Address: fmt.Sprintf("127.0.0.%d", i+1), Address: fmt.Sprintf("127.0.0.%d", i+1),
NodeMeta: map[string]string{ NodeMeta: map[string]string{
@ -1497,7 +1601,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
Service: &structs.NodeService{ Service: &structs.NodeService{
Service: "foo", Service: "foo",
Port: 8000, Port: 8000,
Tags: []string{dc, fmt.Sprintf("tag%d", i+1)}, Tags: []string{d.dc, fmt.Sprintf("tag%d", i+1)},
Meta: map[string]string{ Meta: map[string]string{
"svc-group": fmt.Sprintf("%d", i%2), "svc-group": fmt.Sprintf("%d", i%2),
"foo": "true", "foo": "true",
@ -1510,15 +1614,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
req.Service.Meta["unique"] = "true" req.Service.Meta["unique"] = "true"
} }
var codec rpc.ClientCodec
if dc == "dc1" {
codec = codec1
} else {
codec = codec2
}
var reply struct{} var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { if err := msgpackrpc.CallWithCodec(d.codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }
@ -1576,6 +1673,17 @@ func TestPreparedQuery_Execute(t *testing.T) {
assert.True(t, reply.QueryMeta.KnownLeader) assert.True(t, reply.QueryMeta.KnownLeader)
} }
expectFailoverPeerNodes := func(t *testing.T, query *structs.PreparedQueryRequest, reply *structs.PreparedQueryExecuteResponse, n int) {
t.Helper()
assert.Len(t, reply.Nodes, n)
assert.Equal(t, "", reply.Datacenter)
assert.Equal(t, acceptingPeerName, reply.PeerName)
assert.Equal(t, 2, reply.Failovers)
assert.Equal(t, query.Query.Service.Service, reply.Service)
assert.Equal(t, query.Query.DNS, reply.DNS)
assert.True(t, reply.QueryMeta.KnownLeader)
}
t.Run("run the registered query", func(t *testing.T) { t.Run("run the registered query", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{ req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -1962,10 +2070,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
// Update the health of a node to mark it critical. // Update the health of a node to mark it critical.
setHealth := func(t *testing.T, node string, health string) { setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, node string, health string) {
t.Helper() t.Helper()
req := structs.RegisterRequest{ req := structs.RegisterRequest{
Datacenter: "dc1", Datacenter: dc,
Node: node, Node: node,
Address: "127.0.0.1", Address: "127.0.0.1",
Service: &structs.NodeService{ Service: &structs.NodeService{
@ -1981,9 +2089,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"}, WriteRequest: structs.WriteRequest{Token: "root"},
} }
var reply struct{} var reply struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "Catalog.Register", &req, &reply)) require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
} }
setHealth(t, "node1", api.HealthCritical) setHealth(t, codec1, "dc1", "node1", api.HealthCritical)
// The failing node should be filtered. // The failing node should be filtered.
t.Run("failing node filtered", func(t *testing.T) { t.Run("failing node filtered", func(t *testing.T) {
@ -2003,7 +2111,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
}) })
// Upgrade it to a warning and re-query, should be 10 nodes again. // Upgrade it to a warning and re-query, should be 10 nodes again.
setHealth(t, "node1", api.HealthWarning) setHealth(t, codec1, "dc1", "node1", api.HealthWarning)
t.Run("warning nodes are included", func(t *testing.T) { t.Run("warning nodes are included", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{ req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -2173,7 +2281,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Now fail everything in dc1 and we should get an empty list back. // Now fail everything in dc1 and we should get an empty list back.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
setHealth(t, fmt.Sprintf("node%d", i+1), api.HealthCritical) setHealth(t, codec1, "dc1", fmt.Sprintf("node%d", i+1), api.HealthCritical)
} }
t.Run("everything is failing so should get empty list", func(t *testing.T) { t.Run("everything is failing so should get empty list", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{ req := structs.PreparedQueryExecuteRequest{
@ -2308,6 +2416,61 @@ func TestPreparedQuery_Execute(t *testing.T) {
assert.NotEqual(t, "node3", node.Node.Node) assert.NotEqual(t, "node3", node.Node.Node)
} }
}) })
// Modify the query to have it fail over to a bogus DC and then dc2.
query.Query.Service.Failover = structs.QueryFailoverOptions{
Targets: []structs.QueryFailoverTarget{
{Datacenter: "dc2"},
{PeerName: acceptingPeerName},
},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
// Ensure the foo service has fully replicated.
retry.Run(t, func(r *retry.R) {
_, nodes, err := s1.fsm.State().CheckServiceNodes(nil, "foo", nil, acceptingPeerName)
require.NoError(r, err)
require.Len(r, nodes, 10)
})
// Now we should see 9 nodes from dc2
t.Run("failing over to cluster peers", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
}
expectFailoverNodes(t, &query, &reply, 9)
})
// Set all checks in dc2 as critical
for i := 0; i < 10; i++ {
setHealth(t, codec2, "dc2", fmt.Sprintf("node%d", i+1), api.HealthCritical)
}
// Now we should see 9 nodes from dc3 (we have the tag filter still)
t.Run("failing over to cluster peers", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
}
expectFailoverPeerNodes(t, &query, &reply, 9)
})
} }
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) { func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
@ -2724,7 +2887,9 @@ func TestPreparedQuery_Wrapper(t *testing.T) {
joinWAN(t, s2, s1) joinWAN(t, s2, s1)
// Try all the operations on a real server via the wrapper. // Try all the operations on a real server via the wrapper.
wrapper := &queryServerWrapper{s1} wrapper := &queryServerWrapper{srv: s1, executeRemote: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
return nil
}}
wrapper.GetLogger().Debug("Test") wrapper.GetLogger().Debug("Test")
ret, err := wrapper.GetOtherDatacentersByDistance() ret, err := wrapper.GetOtherDatacentersByDistance()
@ -2746,7 +2911,7 @@ type mockQueryServer struct {
Datacenters []string Datacenters []string
DatacentersError error DatacentersError error
QueryLog []string QueryLog []string
QueryFn func(dc string, args interface{}, reply interface{}) error QueryFn func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
Logger hclog.Logger Logger hclog.Logger
LogBuffer *bytes.Buffer LogBuffer *bytes.Buffer
} }
@ -2768,17 +2933,27 @@ func (m *mockQueryServer) GetLogger() hclog.Logger {
return m.Logger return m.Logger
} }
func (m *mockQueryServer) GetLocalDC() string {
return "dc1"
}
func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) { func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) {
return m.Datacenters, m.DatacentersError return m.Datacenters, m.DatacentersError
} }
func (m *mockQueryServer) ForwardDC(method, dc string, args interface{}, reply interface{}) error { func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, method)) peerName := args.Query.Service.PeerName
if ret, ok := reply.(*structs.PreparedQueryExecuteResponse); ok { dc := args.Datacenter
ret.Datacenter = dc if peerName != "" {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("peer:%s", peerName))
} else {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, "PreparedQuery.ExecuteRemote"))
} }
reply.PeerName = peerName
reply.Datacenter = dc
if m.QueryFn != nil { if m.QueryFn != nil {
return m.QueryFn(dc, args, reply) return m.QueryFn(args, reply)
} }
return nil return nil
} }
@ -2788,7 +2963,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
query := &structs.PreparedQuery{ query := &structs.PreparedQuery{
Name: "test", Name: "test",
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Failover: structs.QueryDatacenterOptions{ Failover: structs.QueryFailoverOptions{
NearestN: 0, NearestN: 0,
Datacenters: []string{""}, Datacenters: []string{""},
}, },
@ -2862,10 +3037,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Datacenter == "dc1" {
if dc == "dc1" { reply.Nodes = nodes()
ret.Nodes = nodes()
} }
return nil return nil
}, },
@ -2890,10 +3064,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Datacenter == "dc3" {
if dc == "dc3" { reply.Nodes = nodes()
ret.Nodes = nodes()
} }
return nil return nil
}, },
@ -2926,7 +3099,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
} }
if len(reply.Nodes) != 0 || if len(reply.Nodes) != 0 ||
reply.Datacenter != "xxx" || reply.Failovers != 4 { reply.Datacenter != "xxx" || reply.Failovers != 4 {
t.Fatalf("bad: %v", reply) t.Fatalf("bad: %+v", reply)
} }
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" { if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries) t.Fatalf("bad: %s", queries)
@ -2940,10 +3113,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Datacenter == "dc4" {
if dc == "dc4" { reply.Nodes = nodes()
ret.Nodes = nodes()
} }
return nil return nil
}, },
@ -2969,10 +3141,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Datacenter == "dc4" {
if dc == "dc4" { reply.Nodes = nodes()
ret.Nodes = nodes()
} }
return nil return nil
}, },
@ -2998,10 +3169,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Datacenter == "dc4" {
if dc == "dc4" { reply.Nodes = nodes()
ret.Nodes = nodes()
} }
return nil return nil
}, },
@ -3029,12 +3199,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Datacenter == "dc1" {
if dc == "dc1" {
return fmt.Errorf("XXX") return fmt.Errorf("XXX")
} else if dc == "dc4" { } else if req.Datacenter == "dc4" {
ret.Nodes = nodes() reply.Nodes = nodes()
} }
return nil return nil
}, },
@ -3063,10 +3232,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Datacenter == "xxx" {
if dc == "xxx" { reply.Nodes = nodes()
ret.Nodes = nodes()
} }
return nil return nil
}, },
@ -3092,17 +3260,15 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{ {
mock := &mockQueryServer{ mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, args interface{}, reply interface{}) error { QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
inp := args.(*structs.PreparedQueryExecuteRemoteRequest) if req.Datacenter == "xxx" {
ret := reply.(*structs.PreparedQueryExecuteResponse) if req.Limit != 5 {
if dc == "xxx" { t.Fatalf("bad: %d", req.Limit)
if inp.Limit != 5 {
t.Fatalf("bad: %d", inp.Limit)
} }
if inp.RequireConsistent != true { if req.RequireConsistent != true {
t.Fatalf("bad: %v", inp.RequireConsistent) t.Fatalf("bad: %v", req.RequireConsistent)
} }
ret.Nodes = nodes() reply.Nodes = nodes()
} }
return nil return nil
}, },
@ -3124,4 +3290,32 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
t.Fatalf("bad: %s", queries) t.Fatalf("bad: %s", queries)
} }
} }
// Failover returns data from the first cluster peer with data.
query.Service.Failover.Datacenters = nil
query.Service.Failover.Targets = []structs.QueryFailoverTarget{
{PeerName: "cluster-01"},
{Datacenter: "dc44"},
{PeerName: "cluster-02"},
}
{
mock := &mockQueryServer{
Datacenters: []string{"dc44"},
QueryFn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if args.Query.Service.PeerName == "cluster-02" {
reply.Nodes = nodes()
}
return nil
},
}
var reply structs.PreparedQueryExecuteResponse
if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, "cluster-02", reply.PeerName)
require.Equal(t, 3, reply.Failovers)
require.Equal(t, nodes(), reply.Nodes)
require.Equal(t, "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", mock.JoinQueryLog())
}
} }

View File

@ -6075,7 +6075,7 @@ func TestDNS_PreparedQuery_Failover(t *testing.T) {
Name: "my-query", Name: "my-query",
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "db", Service: "db",
Failover: structs.QueryDatacenterOptions{ Failover: structs.QueryFailoverOptions{
Datacenters: []string{"dc2"}, Datacenters: []string{"dc2"},
}, },
}, },

View File

@ -92,7 +92,7 @@ func TestPreparedQuery_Create(t *testing.T) {
Session: "my-session", Session: "my-session",
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "my-service", Service: "my-service",
Failover: structs.QueryDatacenterOptions{ Failover: structs.QueryFailoverOptions{
NearestN: 4, NearestN: 4,
Datacenters: []string{"dc1", "dc2"}, Datacenters: []string{"dc1", "dc2"},
}, },
@ -883,7 +883,7 @@ func TestPreparedQuery_Update(t *testing.T) {
Session: "my-session", Session: "my-session",
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "my-service", Service: "my-service",
Failover: structs.QueryDatacenterOptions{ Failover: structs.QueryFailoverOptions{
NearestN: 4, NearestN: 4,
Datacenters: []string{"dc1", "dc2"}, Datacenters: []string{"dc1", "dc2"},
}, },

View File

@ -10,9 +10,9 @@ import (
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
// QueryDatacenterOptions sets options about how we fail over if there are no // QueryFailoverOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter. // healthy nodes in the local datacenter.
type QueryDatacenterOptions struct { type QueryFailoverOptions struct {
// NearestN is set to the number of remote datacenters to try, based on // NearestN is set to the number of remote datacenters to try, based on
// network coordinates. // network coordinates.
NearestN int NearestN int
@ -21,6 +21,32 @@ type QueryDatacenterOptions struct {
// never try a datacenter multiple times, so those are subtracted from // never try a datacenter multiple times, so those are subtracted from
// this list before proceeding. // this list before proceeding.
Datacenters []string Datacenters []string
// Targets is a fixed list of datacenters and peers to try. This field cannot
// be populated with NearestN or Datacenters.
Targets []QueryFailoverTarget
}
// AsTargets either returns Targets as is or Datacenters converted into
// Targets.
func (f *QueryFailoverOptions) AsTargets() []QueryFailoverTarget {
if dcs := f.Datacenters; len(dcs) > 0 {
var targets []QueryFailoverTarget
for _, dc := range dcs {
targets = append(targets, QueryFailoverTarget{Datacenter: dc})
}
return targets
}
return f.Targets
}
type QueryFailoverTarget struct {
// PeerName specifies a peer to try during failover.
PeerName string
// Datacenter specifies a datacenter to try during failover.
Datacenter string
} }
// QueryDNSOptions controls settings when query results are served over DNS. // QueryDNSOptions controls settings when query results are served over DNS.
@ -37,7 +63,7 @@ type ServiceQuery struct {
// Failover controls what we do if there are no healthy nodes in the // Failover controls what we do if there are no healthy nodes in the
// local datacenter. // local datacenter.
Failover QueryDatacenterOptions Failover QueryFailoverOptions
// If OnlyPassing is true then we will only include nodes with passing // If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be // health checks (critical AND warning checks will cause a node to be
@ -323,6 +349,9 @@ type PreparedQueryExecuteResponse struct {
// Datacenter is the datacenter that these results came from. // Datacenter is the datacenter that these results came from.
Datacenter string Datacenter string
// PeerName specifies the cluster peer that these results came from.
PeerName string
// Failovers is a count of how many times we had to query a remote // Failovers is a count of how many times we had to query a remote
// datacenter. // datacenter.
Failovers int Failovers int

View File

@ -1,8 +1,8 @@
package api package api
// QueryDatacenterOptions sets options about how we fail over if there are no // QueryFailoverOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter. // healthy nodes in the local datacenter.
type QueryDatacenterOptions struct { type QueryFailoverOptions struct {
// NearestN is set to the number of remote datacenters to try, based on // NearestN is set to the number of remote datacenters to try, based on
// network coordinates. // network coordinates.
NearestN int NearestN int
@ -11,6 +11,18 @@ type QueryDatacenterOptions struct {
// never try a datacenter multiple times, so those are subtracted from // never try a datacenter multiple times, so those are subtracted from
// this list before proceeding. // this list before proceeding.
Datacenters []string Datacenters []string
// Targets is a fixed list of datacenters and peers to try. This field cannot
// be populated with NearestN or Datacenters.
Targets []QueryFailoverTarget
}
type QueryFailoverTarget struct {
// PeerName specifies a peer to try during failover.
PeerName string
// Datacenter specifies a datacenter to try during failover.
Datacenter string
} }
// QueryDNSOptions controls settings when query results are served over DNS. // QueryDNSOptions controls settings when query results are served over DNS.
@ -35,7 +47,7 @@ type ServiceQuery struct {
// Failover controls what we do if there are no healthy nodes in the // Failover controls what we do if there are no healthy nodes in the
// local datacenter. // local datacenter.
Failover QueryDatacenterOptions Failover QueryFailoverOptions
// IgnoreCheckIDs is an optional list of health check IDs to ignore when // IgnoreCheckIDs is an optional list of health check IDs to ignore when
// considering which nodes are healthy. It is useful as an emergency measure // considering which nodes are healthy. It is useful as an emergency measure