fix bug where pqs that failover to a cluster peer dont un-fail over (#16729)
This commit is contained in:
parent
91528b9d62
commit
d7c81a3b1d
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
peering: Fix issue resulting in prepared query failover to cluster peers never un-failing over.
|
||||||
|
```
|
|
@ -468,7 +468,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
||||||
// by the query setup.
|
// by the query setup.
|
||||||
if len(reply.Nodes) == 0 {
|
if len(reply.Nodes) == 0 {
|
||||||
wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
|
wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
|
||||||
if err := queryFailover(wrapper, query, args, reply); err != nil {
|
if err := queryFailover(wrapper, *query, args, reply); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -707,7 +707,7 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
|
||||||
|
|
||||||
// queryFailover runs an algorithm to determine which DCs to try and then calls
|
// queryFailover runs an algorithm to determine which DCs to try and then calls
|
||||||
// them to try to locate alternative services.
|
// them to try to locate alternative services.
|
||||||
func queryFailover(q queryServer, query *structs.PreparedQuery,
|
func queryFailover(q queryServer, query structs.PreparedQuery,
|
||||||
args *structs.PreparedQueryExecuteRequest,
|
args *structs.PreparedQueryExecuteRequest,
|
||||||
reply *structs.PreparedQueryExecuteResponse) error {
|
reply *structs.PreparedQueryExecuteResponse) error {
|
||||||
|
|
||||||
|
@ -789,7 +789,7 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
|
||||||
// the remote query as well.
|
// the remote query as well.
|
||||||
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
Query: *query,
|
Query: query,
|
||||||
Limit: args.Limit,
|
Limit: args.Limit,
|
||||||
QueryOptions: args.QueryOptions,
|
QueryOptions: args.QueryOptions,
|
||||||
Connect: args.Connect,
|
Connect: args.Connect,
|
||||||
|
|
|
@ -2092,16 +2092,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
|
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
|
||||||
|
|
||||||
// Update the health of a node to mark it critical.
|
// Update the health of a node to mark it critical.
|
||||||
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, node string, health string) {
|
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, i int, health string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
Node: node,
|
Node: fmt.Sprintf("node%d", i),
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
Service: &structs.NodeService{
|
Service: &structs.NodeService{
|
||||||
Service: "foo",
|
Service: "foo",
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
Tags: []string{"dc1", "tag1"},
|
Tags: []string{dc, fmt.Sprintf("tag%d", i)},
|
||||||
},
|
},
|
||||||
Check: &structs.HealthCheck{
|
Check: &structs.HealthCheck{
|
||||||
Name: "failing",
|
Name: "failing",
|
||||||
|
@ -2113,7 +2113,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
var reply struct{}
|
var reply struct{}
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
|
||||||
}
|
}
|
||||||
setHealth(t, codec1, "dc1", "node1", api.HealthCritical)
|
setHealth(t, codec1, "dc1", 1, api.HealthCritical)
|
||||||
|
|
||||||
// The failing node should be filtered.
|
// The failing node should be filtered.
|
||||||
t.Run("failing node filtered", func(t *testing.T) {
|
t.Run("failing node filtered", func(t *testing.T) {
|
||||||
|
@ -2133,7 +2133,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Upgrade it to a warning and re-query, should be 10 nodes again.
|
// Upgrade it to a warning and re-query, should be 10 nodes again.
|
||||||
setHealth(t, codec1, "dc1", "node1", api.HealthWarning)
|
setHealth(t, codec1, "dc1", 1, api.HealthWarning)
|
||||||
t.Run("warning nodes are included", func(t *testing.T) {
|
t.Run("warning nodes are included", func(t *testing.T) {
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -2303,7 +2303,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
|
|
||||||
// Now fail everything in dc1 and we should get an empty list back.
|
// Now fail everything in dc1 and we should get an empty list back.
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
setHealth(t, codec1, "dc1", fmt.Sprintf("node%d", i+1), api.HealthCritical)
|
setHealth(t, codec1, "dc1", i+1, api.HealthCritical)
|
||||||
}
|
}
|
||||||
t.Run("everything is failing so should get empty list", func(t *testing.T) {
|
t.Run("everything is failing so should get empty list", func(t *testing.T) {
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
@ -2474,7 +2474,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
|
|
||||||
// Set all checks in dc2 as critical
|
// Set all checks in dc2 as critical
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
setHealth(t, codec2, "dc2", fmt.Sprintf("node%d", i+1), api.HealthCritical)
|
setHealth(t, codec2, "dc2", i+1, api.HealthCritical)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now we should see 9 nodes from dc3 (we have the tag filter still)
|
// Now we should see 9 nodes from dc3 (we have the tag filter still)
|
||||||
|
@ -2493,6 +2493,31 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
expectFailoverPeerNodes(t, &query, &reply, 9)
|
expectFailoverPeerNodes(t, &query, &reply, 9)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Set all checks in dc1 as passing
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
setHealth(t, codec1, "dc1", i+1, api.HealthPassing)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nothing is healthy so nothing is returned
|
||||||
|
t.Run("un-failing over", func(t *testing.T) {
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
|
||||||
|
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
assert.NotEqual(t, "node3", node.Node.Node)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectNodes(t, &query, &reply, 9)
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
|
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
|
||||||
|
@ -2982,7 +3007,7 @@ func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemote
|
||||||
|
|
||||||
func TestPreparedQuery_queryFailover(t *testing.T) {
|
func TestPreparedQuery_queryFailover(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
query := &structs.PreparedQuery{
|
query := structs.PreparedQuery{
|
||||||
Name: "test",
|
Name: "test",
|
||||||
Service: structs.ServiceQuery{
|
Service: structs.ServiceQuery{
|
||||||
Failover: structs.QueryFailoverOptions{
|
Failover: structs.QueryFailoverOptions{
|
||||||
|
|
Loading…
Reference in New Issue