From d7c81a3b1dfe3f1bc69a3753a62a71ecc5404949 Mon Sep 17 00:00:00 2001 From: Eric Haberkorn Date: Wed, 22 Mar 2023 09:24:13 -0400 Subject: [PATCH] fix bug where pqs that failover to a cluster peer dont un-fail over (#16729) --- .changelog/16729.txt | 3 ++ agent/consul/prepared_query_endpoint.go | 6 +-- agent/consul/prepared_query_endpoint_test.go | 41 ++++++++++++++++---- 3 files changed, 39 insertions(+), 11 deletions(-) create mode 100644 .changelog/16729.txt diff --git a/.changelog/16729.txt b/.changelog/16729.txt new file mode 100644 index 000000000..36c6e1aea --- /dev/null +++ b/.changelog/16729.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: Fix issue resulting in prepared query failover to cluster peers never un-failing over. +``` diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index ffa4b5e50..54ef82baf 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -468,7 +468,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // by the query setup. if len(reply.Nodes) == 0 { wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote} - if err := queryFailover(wrapper, query, args, reply); err != nil { + if err := queryFailover(wrapper, *query, args, reply); err != nil { return err } } @@ -707,7 +707,7 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) { // queryFailover runs an algorithm to determine which DCs to try and then calls // them to try to locate alternative services. -func queryFailover(q queryServer, query *structs.PreparedQuery, +func queryFailover(q queryServer, query structs.PreparedQuery, args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -789,7 +789,7 @@ func queryFailover(q queryServer, query *structs.PreparedQuery, // the remote query as well. remote := &structs.PreparedQueryExecuteRemoteRequest{ Datacenter: dc, - Query: *query, + Query: query, Limit: args.Limit, QueryOptions: args.QueryOptions, Connect: args.Connect, diff --git a/agent/consul/prepared_query_endpoint_test.go b/agent/consul/prepared_query_endpoint_test.go index d883c8bf4..ca1ad0c66 100644 --- a/agent/consul/prepared_query_endpoint_test.go +++ b/agent/consul/prepared_query_endpoint_test.go @@ -2092,16 +2092,16 @@ func TestPreparedQuery_Execute(t *testing.T) { require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) // Update the health of a node to mark it critical. - setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, node string, health string) { + setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, i int, health string) { t.Helper() req := structs.RegisterRequest{ Datacenter: dc, - Node: node, + Node: fmt.Sprintf("node%d", i), Address: "127.0.0.1", Service: &structs.NodeService{ Service: "foo", Port: 8000, - Tags: []string{"dc1", "tag1"}, + Tags: []string{dc, fmt.Sprintf("tag%d", i)}, }, Check: &structs.HealthCheck{ Name: "failing", @@ -2113,7 +2113,7 @@ func TestPreparedQuery_Execute(t *testing.T) { var reply struct{} require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply)) } - setHealth(t, codec1, "dc1", "node1", api.HealthCritical) + setHealth(t, codec1, "dc1", 1, api.HealthCritical) // The failing node should be filtered. t.Run("failing node filtered", func(t *testing.T) { @@ -2133,7 +2133,7 @@ func TestPreparedQuery_Execute(t *testing.T) { }) // Upgrade it to a warning and re-query, should be 10 nodes again. - setHealth(t, codec1, "dc1", "node1", api.HealthWarning) + setHealth(t, codec1, "dc1", 1, api.HealthWarning) t.Run("warning nodes are included", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", @@ -2303,7 +2303,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // Now fail everything in dc1 and we should get an empty list back. for i := 0; i < 10; i++ { - setHealth(t, codec1, "dc1", fmt.Sprintf("node%d", i+1), api.HealthCritical) + setHealth(t, codec1, "dc1", i+1, api.HealthCritical) } t.Run("everything is failing so should get empty list", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ @@ -2474,7 +2474,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // Set all checks in dc2 as critical for i := 0; i < 10; i++ { - setHealth(t, codec2, "dc2", fmt.Sprintf("node%d", i+1), api.HealthCritical) + setHealth(t, codec2, "dc2", i+1, api.HealthCritical) } // Now we should see 9 nodes from dc3 (we have the tag filter still) @@ -2493,6 +2493,31 @@ func TestPreparedQuery_Execute(t *testing.T) { } expectFailoverPeerNodes(t, &query, &reply, 9) }) + + // Set all checks in dc1 as passing + for i := 0; i < 10; i++ { + setHealth(t, codec1, "dc1", i+1, api.HealthPassing) + } + + // Nothing is healthy so nothing is returned + t.Run("un-failing over", func(t *testing.T) { + retry.Run(t, func(r *retry.R) { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + QueryOptions: structs.QueryOptions{Token: execToken}, + } + + var reply structs.PreparedQueryExecuteResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + + for _, node := range reply.Nodes { + assert.NotEqual(t, "node3", node.Node.Node) + } + + expectNodes(t, &query, &reply, 9) + }) + }) } func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) { @@ -2982,7 +3007,7 @@ func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemote func TestPreparedQuery_queryFailover(t *testing.T) { t.Parallel() - query := &structs.PreparedQuery{ + query := structs.PreparedQuery{ Name: "test", Service: structs.ServiceQuery{ Failover: structs.QueryFailoverOptions{