From baa1fd3cd6dfa89f66c661051fa9592a4592d9f6 Mon Sep 17 00:00:00 2001 From: Michael Wilkerson <62034708+wilkermichael@users.noreply.github.com> Date: Mon, 27 Mar 2023 15:40:49 -0700 Subject: [PATCH] changes to support new PQ enterprise fields (#16793) --- .changelog/_4821.txt | 3 + agent/consul/prepared_query_endpoint.go | 17 +- agent/consul/prepared_query_endpoint_test.go | 755 +++++++++++-------- agent/prepared_query_endpoint.go | 2 +- agent/structs/prepared_query.go | 2 + api/prepared_query.go | 8 + website/content/api-docs/query.mdx | 8 +- 7 files changed, 486 insertions(+), 309 deletions(-) create mode 100644 .changelog/_4821.txt diff --git a/.changelog/_4821.txt b/.changelog/_4821.txt new file mode 100644 index 000000000..88758540a --- /dev/null +++ b/.changelog/_4821.txt @@ -0,0 +1,3 @@ +```release-note:improvement +connect: **(Enterprise Only)** Add support for specifying "Partition" and "Namespace" in Prepared Queries failover rules. +``` \ No newline at end of file diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index 54ef82baf..2843082cc 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -718,13 +718,13 @@ func queryFailover(q queryServer, query structs.PreparedQuery, return err } - // This will help us filter unknown DCs supplied by the user. + // This will help us filter unknown targets supplied by the user. known := make(map[string]struct{}) for _, dc := range nearest { known[dc] = struct{}{} } - // Build a candidate list of DCs to try, starting with the nearest N + // Build a candidate list of failover targets to try, starting with the nearest N target // from RTTs. var targets []structs.QueryFailoverTarget index := make(map[string]struct{}) @@ -739,9 +739,9 @@ func queryFailover(q queryServer, query structs.PreparedQuery, } } - // Then add any DCs explicitly listed that weren't selected above. + // Then add any targets explicitly listed that weren't selected above. for _, target := range query.Service.Failover.AsTargets() { - // This will prevent a log of other log spammage if we do not + // This will prevent a log of other log spam if we do not // attempt to talk to datacenters we don't know about. if dc := target.Datacenter; dc != "" { if _, ok := known[dc]; !ok { @@ -753,15 +753,16 @@ func queryFailover(q queryServer, query structs.PreparedQuery, // from the NearestN list. if _, ok := index[dc]; !ok { targets = append(targets, target) + continue } } - if target.Peer != "" { + if target.Peer != "" || target.PartitionOrEmpty() != "" || target.NamespaceOrEmpty() != "" { targets = append(targets, target) } } - // Now try the selected DCs in priority order. + // Now try the selected targets in priority order. failovers := 0 for _, target := range targets { // This keeps track of how many iterations we actually run. @@ -775,9 +776,10 @@ func queryFailover(q queryServer, query structs.PreparedQuery, // through this slice across successive RPC calls. reply.Nodes = nil - // Reset PeerName because it may have been set by a previous failover + // Reset Peer, because it may have been set by a previous failover // target. query.Service.Peer = target.Peer + query.Service.EnterpriseMeta = target.EnterpriseMeta dc := target.Datacenter if target.Peer != "" { dc = q.GetLocalDC() @@ -800,6 +802,7 @@ func queryFailover(q queryServer, query structs.PreparedQuery, "service", query.Service.Service, "peerName", query.Service.Peer, "datacenter", dc, + "enterpriseMeta", query.Service.EnterpriseMeta, "error", err, ) continue diff --git a/agent/consul/prepared_query_endpoint_test.go b/agent/consul/prepared_query_endpoint_test.go index ca1ad0c66..1da472dbc 100644 --- a/agent/consul/prepared_query_endpoint_test.go +++ b/agent/consul/prepared_query_endpoint_test.go @@ -1437,152 +1437,9 @@ func TestPreparedQuery_Execute(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - waitForLeaderEstablishment(t, s1) - codec1 := rpcClient(t, s1) - defer codec1.Close() - dir2, s2 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc2" - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - waitForLeaderEstablishment(t, s2) - codec2 := rpcClient(t, s2) - defer codec2.Close() - - s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) - - ca := connect.TestCA(t, nil) - dir3, s3 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc3" - c.PrimaryDatacenter = "dc3" - c.NodeName = "acceptingServer.dc3" - c.GRPCTLSPort = freeport.GetOne(t) - c.CAConfig = &structs.CAConfiguration{ - ClusterID: connect.TestClusterID, - Provider: structs.ConsulCAProvider, - Config: map[string]interface{}{ - "PrivateKey": ca.SigningKey, - "RootCert": ca.RootCert, - }, - } - }) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - waitForLeaderEstablishment(t, s3) - codec3 := rpcClient(t, s3) - defer codec3.Close() - - // Try to WAN join. - joinWAN(t, s2, s1) - retry.Run(t, func(r *retry.R) { - if got, want := len(s1.WANMembers()), 2; got != want { - r.Fatalf("got %d WAN members want %d", got, want) - } - if got, want := len(s2.WANMembers()), 2; got != want { - r.Fatalf("got %d WAN members want %d", got, want) - } - }) - - // check for RPC forwarding - testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root")) - testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root")) - testrpc.WaitForLeader(t, s3.RPC, "dc3") - - acceptingPeerName := "my-peer-accepting-server" - dialingPeerName := "my-peer-dialing-server" - - // Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service - { - // Create a peering by generating a token. - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - t.Cleanup(cancel) - - options := structs.QueryOptions{Token: "root"} - ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options) - require.NoError(t, err) - - conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())), - //nolint:staticcheck - grpc.WithInsecure(), - grpc.WithBlock()) - require.NoError(t, err) - defer conn.Close() - - peeringClient := pbpeering.NewPeeringServiceClient(conn) - req := pbpeering.GenerateTokenRequest{ - PeerName: dialingPeerName, - } - resp, err := peeringClient.GenerateToken(ctx, &req) - require.NoError(t, err) - - conn, err = grpc.DialContext(ctx, s1.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), - //nolint:staticcheck - grpc.WithInsecure(), - grpc.WithBlock()) - require.NoError(t, err) - defer conn.Close() - - peeringClient = pbpeering.NewPeeringServiceClient(conn) - establishReq := pbpeering.EstablishRequest{ - PeerName: acceptingPeerName, - PeeringToken: resp.PeeringToken, - } - establishResp, err := peeringClient.Establish(ctx, &establishReq) - require.NoError(t, err) - require.NotNil(t, establishResp) - - readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: acceptingPeerName}) - require.NoError(t, err) - require.NotNil(t, readResp) - - // Wait for the stream to be connected. - retry.Run(t, func(r *retry.R) { - status, found := s1.peerStreamServer.StreamStatus(readResp.GetPeering().GetID()) - require.True(r, found) - require.True(r, status.Connected) - }) - - exportedServices := structs.ConfigEntryRequest{ - Op: structs.ConfigEntryUpsert, - Datacenter: "dc3", - Entry: &structs.ExportedServicesConfigEntry{ - Name: "default", - Services: []structs.ExportedService{ - { - Name: "foo", - Consumers: []structs.ServiceConsumer{{Peer: dialingPeerName}}, - }, - }, - }, - } - var configOutput bool - require.NoError(t, msgpackrpc.CallWithCodec(codec3, "ConfigEntry.Apply", &exportedServices, &configOutput)) - require.True(t, configOutput) - } - - execNoNodesToken := createTokenWithPolicyName(t, codec1, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root") - rules := ` - service_prefix "" { policy = "read" } - node_prefix "" { policy = "read" } - ` - execToken := createTokenWithPolicyName(t, codec1, "with-read", rules, "root") - denyToken := createTokenWithPolicyName(t, codec1, "with-deny", `service_prefix "foo" { policy = "deny" }`, "root") + es := createExecuteServers(t) newSessionDC1 := func(t *testing.T) string { t.Helper() @@ -1590,12 +1447,12 @@ func TestPreparedQuery_Execute(t *testing.T) { Datacenter: "dc1", Op: structs.SessionCreate, Session: structs.Session{ - Node: s1.config.NodeName, + Node: es.server.server.config.NodeName, }, WriteRequest: structs.WriteRequest{Token: "root"}, } var session string - if err := msgpackrpc.CallWithCodec(codec1, "Session.Apply", &req, &session); err != nil { + if err := msgpackrpc.CallWithCodec(es.server.codec, "Session.Apply", &req, &session); err != nil { t.Fatalf("err: %v", err) } return session @@ -1608,9 +1465,9 @@ func TestPreparedQuery_Execute(t *testing.T) { codec rpc.ClientCodec dc string }{ - {codec1, "dc1"}, - {codec2, "dc2"}, - {codec3, "dc3"}, + {es.server.codec, "dc1"}, + {es.wanServer.codec, "dc2"}, + {es.peeringServer.codec, "dc3"}, } { req := structs.RegisterRequest{ Datacenter: d.dc, @@ -1659,7 +1516,7 @@ func TestPreparedQuery_Execute(t *testing.T) { }, WriteRequest: structs.WriteRequest{Token: "root"}, } - if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { + if err := msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { t.Fatalf("err: %v", err) } @@ -1671,7 +1528,7 @@ func TestPreparedQuery_Execute(t *testing.T) { } var reply structs.PreparedQueryExecuteResponse - err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply) + err := msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply) assert.EqualError(t, err, structs.ErrQueryNotFound.Error()) assert.Len(t, reply.Nodes, 0) }) @@ -1699,7 +1556,7 @@ func TestPreparedQuery_Execute(t *testing.T) { t.Helper() assert.Len(t, reply.Nodes, n) assert.Equal(t, "", reply.Datacenter) - assert.Equal(t, acceptingPeerName, reply.PeerName) + assert.Equal(t, es.peeringServer.acceptingPeerName, reply.PeerName) assert.Equal(t, 2, reply.Failovers) assert.Equal(t, query.Query.Service.Service, reply.Service) assert.Equal(t, query.Query.DNS, reply.DNS) @@ -1710,11 +1567,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 10) }) @@ -1723,11 +1580,11 @@ func TestPreparedQuery_Execute(t *testing.T) { Datacenter: "dc1", QueryIDOrName: query.Query.ID, Limit: 3, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 3) }) @@ -1771,16 +1628,16 @@ func TestPreparedQuery_Execute(t *testing.T) { }, WriteRequest: structs.WriteRequest{Token: "root"}, } - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID)) req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: nodeMetaQuery.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, tc.numNodes) for _, node := range reply.Nodes { @@ -1834,16 +1691,16 @@ func TestPreparedQuery_Execute(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &svcMetaQuery, &svcMetaQuery.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &svcMetaQuery, &svcMetaQuery.Query.ID)) req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: svcMetaQuery.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, tc.numNodes) for _, node := range reply.Nodes { assert.True(t, structs.SatisfiesMetaFilters(node.Service.Meta, tc.filters), "meta: %v", node.Service.Meta) @@ -1861,8 +1718,8 @@ func TestPreparedQuery_Execute(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out struct{} - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "Coordinate.Update", &req, &out)) - time.Sleep(3 * s1.config.CoordinateUpdatePeriod) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "Coordinate.Update", &req, &out)) + time.Sleep(3 * es.server.server.config.CoordinateUpdatePeriod) } // Try an RTT sort. We don't have any other coordinates in there but @@ -1877,11 +1734,11 @@ func TestPreparedQuery_Execute(t *testing.T) { Datacenter: "dc1", Node: "node3", }, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 10) assert.Equal(t, "node3", reply.Nodes[0].Node.Node) @@ -1895,11 +1752,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 10) @@ -1924,7 +1781,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // so node3 should always show up first. query.Op = structs.PreparedQueryUpdate query.Query.Service.Near = "node3" - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Now run the query and make sure the sort looks right. for i := 0; i < 10; i++ { @@ -1936,11 +1793,11 @@ func TestPreparedQuery_Execute(t *testing.T) { }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, 10) assert.Equal(t, "node3", reply.Nodes[0].Node.Node) }) @@ -1963,13 +1820,13 @@ func TestPreparedQuery_Execute(t *testing.T) { }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } shuffled := false for i := 0; i < 10; i++ { var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, 10) if node := reply.Nodes[0].Node.Node; node != "node3" { @@ -1991,12 +1848,12 @@ func TestPreparedQuery_Execute(t *testing.T) { }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } for i := 0; i < 10; i++ { var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, 10) assert.Equal(t, "node1", reply.Nodes[0].Node.Node) } @@ -2004,7 +1861,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // Bake the magic "_agent" flag into the query. query.Query.Service.Near = "_agent" - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Check that we sort the local agent first when the magic flag is set. t.Run("local agent is first using _agent on node3", func(t *testing.T) { @@ -2015,12 +1872,12 @@ func TestPreparedQuery_Execute(t *testing.T) { }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } for i := 0; i < 10; i++ { var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, 10) assert.Equal(t, "node3", reply.Nodes[0].Node.Node) } @@ -2037,7 +1894,7 @@ func TestPreparedQuery_Execute(t *testing.T) { }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } // Expect the set to be shuffled since we have no coordinates @@ -2045,7 +1902,7 @@ func TestPreparedQuery_Execute(t *testing.T) { shuffled := false for i := 0; i < 10; i++ { var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, 10) if node := reply.Nodes[0].Node.Node; node != "node3" { shuffled = true @@ -2070,13 +1927,13 @@ func TestPreparedQuery_Execute(t *testing.T) { }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } shuffled := false for i := 0; i < 10; i++ { var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) assert.Len(t, reply.Nodes, 10) if reply.Nodes[0].Node.Node != "node3" { shuffled = true @@ -2089,7 +1946,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // Un-bake the near parameter. query.Query.Service.Near = "" - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Update the health of a node to mark it critical. setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, i int, health string) { @@ -2113,18 +1970,18 @@ func TestPreparedQuery_Execute(t *testing.T) { var reply struct{} require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply)) } - setHealth(t, codec1, "dc1", 1, api.HealthCritical) + setHealth(t, es.server.codec, "dc1", 1, api.HealthCritical) // The failing node should be filtered. t.Run("failing node filtered", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 9) for _, node := range reply.Nodes { @@ -2133,34 +1990,34 @@ func TestPreparedQuery_Execute(t *testing.T) { }) // Upgrade it to a warning and re-query, should be 10 nodes again. - setHealth(t, codec1, "dc1", 1, api.HealthWarning) + setHealth(t, es.server.codec, "dc1", 1, api.HealthWarning) t.Run("warning nodes are included", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 10) }) // Make the query more picky so it excludes warning nodes. query.Query.Service.OnlyPassing = true - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // The node in the warning state should be filtered. t.Run("warning nodes are omitted with onlypassing=true", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 9) for _, node := range reply.Nodes { @@ -2171,31 +2028,31 @@ func TestPreparedQuery_Execute(t *testing.T) { // Make the query ignore all our health checks (which have "failing" ID // implicitly from their name). query.Query.Service.IgnoreCheckIDs = []types.CheckID{"failing"} - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // We should end up with 10 nodes again t.Run("all nodes including when ignoring failing checks", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 10) }) // Undo that so all the following tests aren't broken! query.Query.Service.IgnoreCheckIDs = nil - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Make the query more picky by adding a tag filter. This just proves we // call into the tag filter, it is tested more thoroughly in a separate // test. query.Query.Service.Tags = []string{"!tag3"} - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // The node in the warning state should be filtered as well as the node // with the filtered tag. @@ -2203,11 +2060,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 8) for _, node := range reply.Nodes { @@ -2221,29 +2078,29 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: denyToken}, + QueryOptions: structs.QueryOptions{Token: es.denyToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 0) }) // Bake the exec token into the query. - query.Query.Token = execToken - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + query.Query.Token = es.execToken + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Now even querying with the deny token should work. t.Run("query with deny token still works", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: denyToken}, + QueryOptions: structs.QueryOptions{Token: es.denyToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 8) for _, node := range reply.Nodes { @@ -2254,18 +2111,18 @@ func TestPreparedQuery_Execute(t *testing.T) { // Un-bake the token. query.Query.Token = "" - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Make sure the query gets denied again with the deny token. t.Run("denied with deny token when no query token", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: denyToken}, + QueryOptions: structs.QueryOptions{Token: es.denyToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 0) }) @@ -2274,11 +2131,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execNoNodesToken}, + QueryOptions: structs.QueryOptions{Token: es.execNoNodesToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 0) require.True(t, reply.QueryMeta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be true") @@ -2288,11 +2145,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 8) for _, node := range reply.Nodes { @@ -2303,35 +2160,35 @@ func TestPreparedQuery_Execute(t *testing.T) { // Now fail everything in dc1 and we should get an empty list back. for i := 0; i < 10; i++ { - setHealth(t, codec1, "dc1", i+1, api.HealthCritical) + setHealth(t, es.server.codec, "dc1", i+1, api.HealthCritical) } t.Run("everything is failing so should get empty list", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectNodes(t, &query, &reply, 0) }) // Modify the query to have it fail over to a bogus DC and then dc2. query.Query.Service.Failover.Datacenters = []string{"bogus", "dc2"} - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Now we should see 9 nodes from dc2 (we have the tag filter still). t.Run("see 9 nodes from dc2 using tag filter", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectFailoverNodes(t, &query, &reply, 9) for _, node := range reply.Nodes { @@ -2346,13 +2203,13 @@ func TestPreparedQuery_Execute(t *testing.T) { QueryIDOrName: query.Query.ID, Limit: 3, QueryOptions: structs.QueryOptions{ - Token: execToken, + Token: es.execToken, RequireConsistent: true, }, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectFailoverNodes(t, &query, &reply, 3) for _, node := range reply.Nodes { @@ -2367,11 +2224,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectFailoverNodes(t, &query, &reply, 9) var names []string @@ -2395,11 +2252,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: denyToken}, + QueryOptions: structs.QueryOptions{Token: es.denyToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectFailoverNodes(t, &query, &reply, 0) }) @@ -2408,30 +2265,30 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execNoNodesToken}, + QueryOptions: structs.QueryOptions{Token: es.execNoNodesToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectFailoverNodes(t, &query, &reply, 0) require.True(t, reply.QueryMeta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be true") }) // Bake the exec token into the query. - query.Query.Token = execToken - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + query.Query.Token = es.execToken + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Now even querying with the deny token should work. t.Run("query from dc2 with exec token using deny token works", func(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: denyToken}, + QueryOptions: structs.QueryOptions{Token: es.denyToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) expectFailoverNodes(t, &query, &reply, 9) for _, node := range reply.Nodes { @@ -2443,14 +2300,14 @@ func TestPreparedQuery_Execute(t *testing.T) { query.Query.Service.Failover = structs.QueryFailoverOptions{ Targets: []structs.QueryFailoverTarget{ {Datacenter: "dc2"}, - {Peer: acceptingPeerName}, + {Peer: es.peeringServer.acceptingPeerName}, }, } - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID)) // Ensure the foo service has fully replicated. retry.Run(t, func(r *retry.R) { - _, nodes, err := s1.fsm.State().CheckServiceNodes(nil, "foo", nil, acceptingPeerName) + _, nodes, err := es.server.server.fsm.State().CheckServiceNodes(nil, "foo", nil, es.peeringServer.acceptingPeerName) require.NoError(r, err) require.Len(r, nodes, 10) }) @@ -2460,11 +2317,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) for _, node := range reply.Nodes { assert.NotEqual(t, "node3", node.Node.Node) @@ -2474,7 +2331,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // Set all checks in dc2 as critical for i := 0; i < 10; i++ { - setHealth(t, codec2, "dc2", i+1, api.HealthCritical) + setHealth(t, es.wanServer.codec, "dc2", i+1, api.HealthCritical) } // Now we should see 9 nodes from dc3 (we have the tag filter still) @@ -2482,11 +2339,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) for _, node := range reply.Nodes { assert.NotEqual(t, "node3", node.Node.Node) @@ -2496,7 +2353,7 @@ func TestPreparedQuery_Execute(t *testing.T) { // Set all checks in dc1 as passing for i := 0; i < 10; i++ { - setHealth(t, codec1, "dc1", i+1, api.HealthPassing) + setHealth(t, es.server.codec, "dc1", i+1, api.HealthPassing) } // Nothing is healthy so nothing is returned @@ -2505,11 +2362,11 @@ func TestPreparedQuery_Execute(t *testing.T) { req := structs.PreparedQueryExecuteRequest{ Datacenter: "dc1", QueryIDOrName: query.Query.ID, - QueryOptions: structs.QueryOptions{Token: execToken}, + QueryOptions: structs.QueryOptions{Token: es.execToken}, } var reply structs.PreparedQueryExecuteResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)) + require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)) for _, node := range reply.Nodes { assert.NotEqual(t, "node3", node.Node.Node) @@ -3032,7 +2889,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) { } // Datacenters are available but the query doesn't use them. - { + t.Run("Query no datacenters used", func(t *testing.T) { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, } @@ -3044,10 +2901,10 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 { t.Fatalf("bad: %v", reply) } - } + }) // Make it fail to get datacenters. - { + t.Run("Fail to get datacenters", func(t *testing.T) { mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, DatacentersError: fmt.Errorf("XXX"), @@ -3061,11 +2918,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 { t.Fatalf("bad: %v", reply) } - } + }) // The query wants to use other datacenters but none are available. - query.Service.Failover.NearestN = 3 - { + t.Run("no datacenters available", func(t *testing.T) { + query.Service.Failover.NearestN = 3 mock := &mockQueryServer{ Datacenters: []string{}, } @@ -3077,11 +2934,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 { t.Fatalf("bad: %v", reply) } - } + }) // Try the first three nearest datacenters, first one has the data. - query.Service.Failover.NearestN = 3 - { + t.Run("first datacenter has data", func(t *testing.T) { + query.Service.Failover.NearestN = 3 mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3104,11 +2961,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) } - } + }) // Try the first three nearest datacenters, last one has the data. - query.Service.Failover.NearestN = 3 - { + t.Run("last datacenter has data", func(t *testing.T) { + query.Service.Failover.NearestN = 3 mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3131,11 +2988,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) } - } + }) // Try the first four nearest datacenters, nobody has the data. - query.Service.Failover.NearestN = 4 - { + t.Run("no datacenters with data", func(t *testing.T) { + query.Service.Failover.NearestN = 4 mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, } @@ -3151,13 +3008,13 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) } - } + }) // Try the first two nearest datacenters, plus a user-specified one that // has the data. - query.Service.Failover.NearestN = 2 - query.Service.Failover.Datacenters = []string{"dc4"} - { + t.Run("user specified datacenter with data", func(t *testing.T) { + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"dc4"} mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3180,12 +3037,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) } - } + }) // Add in a hard-coded value that overlaps with the nearest list. - query.Service.Failover.NearestN = 2 - query.Service.Failover.Datacenters = []string{"dc4", "dc1"} - { + t.Run("overlap with nearest list", func(t *testing.T) { + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"dc4", "dc1"} mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3208,12 +3065,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) } - } + }) // Now add a bogus user-defined one to the mix. - query.Service.Failover.NearestN = 2 - query.Service.Failover.Datacenters = []string{"nope", "dc4", "dc1"} - { + t.Run("bogus user-defined", func(t *testing.T) { + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"nope", "dc4", "dc1"} mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3237,13 +3094,13 @@ func TestPreparedQuery_queryFailover(t *testing.T) { t.Fatalf("bad: %s", queries) } require.Contains(t, mock.LogBuffer.String(), "Skipping unknown datacenter") - } + }) // Same setup as before but dc1 is going to return an error and should // get skipped over, still yielding data from dc4 which comes later. - query.Service.Failover.NearestN = 2 - query.Service.Failover.Datacenters = []string{"dc4", "dc1"} - { + t.Run("dc1 error", func(t *testing.T) { + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"dc4", "dc1"} mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3271,12 +3128,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if !strings.Contains(mock.LogBuffer.String(), "Failed querying") { t.Fatalf("bad: %s", mock.LogBuffer.String()) } - } + }) // Just use a hard-coded list and now xxx has the data. - query.Service.Failover.NearestN = 0 - query.Service.Failover.Datacenters = []string{"dc3", "xxx"} - { + t.Run("hard coded list", func(t *testing.T) { + query.Service.Failover.NearestN = 0 + query.Service.Failover.Datacenters = []string{"dc3", "xxx"} mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3299,12 +3156,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if queries := mock.JoinQueryLog(); queries != "dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) } - } + }) // Make sure the limit and query options are plumbed through. - query.Service.Failover.NearestN = 0 - query.Service.Failover.Datacenters = []string{"xxx"} - { + t.Run("limit and query options used", func(t *testing.T) { + query.Service.Failover.NearestN = 0 + query.Service.Failover.Datacenters = []string{"xxx"} mock := &mockQueryServer{ Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -3336,33 +3193,331 @@ func TestPreparedQuery_queryFailover(t *testing.T) { if queries := mock.JoinQueryLog(); queries != "xxx:PreparedQuery.ExecuteRemote" { t.Fatalf("bad: %s", queries) } - } + }) // Failover returns data from the first cluster peer with data. - query.Service.Failover.Datacenters = nil - query.Service.Failover.Targets = []structs.QueryFailoverTarget{ - {Peer: "cluster-01"}, - {Datacenter: "dc44"}, - {Peer: "cluster-02"}, - } - { - mock := &mockQueryServer{ - Datacenters: []string{"dc44"}, - QueryFn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + t.Run("failover first peer with data", func(t *testing.T) { + query.Service.Failover.Datacenters = nil + query.Service.Failover.Targets = []structs.QueryFailoverTarget{ + {Peer: "cluster-01"}, + {Datacenter: "dc44"}, + {Peer: "cluster-02"}, + } + { + mock := &mockQueryServer{ + Datacenters: []string{"dc44"}, + QueryFn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if args.Query.Service.Peer == "cluster-02" { + reply.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + require.Equal(t, "cluster-02", reply.PeerName) + require.Equal(t, 3, reply.Failovers) + require.Equal(t, nodes(), reply.Nodes) + require.Equal(t, "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", mock.JoinQueryLog()) + } + }) + + tests := []struct { + name string + targets []structs.QueryFailoverTarget + datacenters []string + queryfn func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error + expectedPeer string + expectedDatacenter string + expectedReplies int + expectedQuery string + }{ + { + name: "failover first peer with data", + targets: []structs.QueryFailoverTarget{ + {Peer: "cluster-01"}, + {Datacenter: "dc44"}, + {Peer: "cluster-02"}, + }, + queryfn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { if args.Query.Service.Peer == "cluster-02" { reply.Nodes = nodes() } return nil }, - } + datacenters: []string{"dc44"}, + expectedPeer: "cluster-02", + expectedDatacenter: "", + expectedReplies: 3, + expectedQuery: "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", + }, + { + name: "failover datacenter with data", + targets: []structs.QueryFailoverTarget{ + {Peer: "cluster-01"}, + {Datacenter: "dc44"}, + {Peer: "cluster-02"}, + }, + queryfn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { + if args.Datacenter == "dc44" { + reply.Nodes = nodes() + } + return nil + }, + datacenters: []string{"dc44"}, + expectedPeer: "", + expectedDatacenter: "dc44", + expectedReplies: 2, + expectedQuery: "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote", + }, + } - var reply structs.PreparedQueryExecuteResponse - if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil { - t.Fatalf("err: %v", err) - } - require.Equal(t, "cluster-02", reply.PeerName) - require.Equal(t, 3, reply.Failovers) - require.Equal(t, nodes(), reply.Nodes) - require.Equal(t, "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", mock.JoinQueryLog()) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query.Service.Failover.Datacenters = nil + query.Service.Failover.Targets = tt.targets + + mock := &mockQueryServer{ + Datacenters: tt.datacenters, + QueryFn: tt.queryfn, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + require.Equal(t, tt.expectedPeer, reply.PeerName) + require.Equal(t, tt.expectedReplies, reply.Failovers) + require.Equal(t, nodes(), reply.Nodes) + require.Equal(t, tt.expectedQuery, mock.JoinQueryLog()) + }) + } +} + +type serverTestMetadata struct { + server *Server + codec rpc.ClientCodec + datacenter string + acceptingPeerName string + dialingPeerName string +} + +type executeServers struct { + server *serverTestMetadata + peeringServer *serverTestMetadata + wanServer *serverTestMetadata + execToken string + denyToken string + execNoNodesToken string +} + +func createExecuteServers(t *testing.T) *executeServers { + es := newExecuteServers(t) + es.initWanFed(t) + es.exportPeeringServices(t) + es.initTokens(t) + + return es +} + +func newExecuteServers(t *testing.T) *executeServers { + + // Setup server + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + t.Cleanup(func() { + os.RemoveAll(dir1) + }) + t.Cleanup(func() { + s1.Shutdown() + }) + waitForLeaderEstablishment(t, s1) + codec1 := rpcClient(t, s1) + t.Cleanup(func() { + codec1.Close() + }) + + ca := connect.TestCA(t, nil) + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc3" + c.PrimaryDatacenter = "dc3" + c.NodeName = "acceptingServer.dc3" + c.GRPCTLSPort = freeport.GetOne(t) + c.CAConfig = &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + Provider: structs.ConsulCAProvider, + Config: map[string]interface{}{ + "PrivateKey": ca.SigningKey, + "RootCert": ca.RootCert, + }, + } + }) + t.Cleanup(func() { + os.RemoveAll(dir3) + }) + t.Cleanup(func() { + s3.Shutdown() + }) + waitForLeaderEstablishment(t, s3) + codec3 := rpcClient(t, s3) + t.Cleanup(func() { + codec3.Close() + }) + + // check for RPC forwarding + testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root")) + testrpc.WaitForLeader(t, s3.RPC, "dc3") + + acceptingPeerName := "my-peer-accepting-server" + dialingPeerName := "my-peer-dialing-server" + + // Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service + { + // Create a peering by generating a token. + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + options := structs.QueryOptions{Token: "root"} + ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + + conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())), + //nolint:staticcheck + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { + conn.Close() + }) + + peeringClient := pbpeering.NewPeeringServiceClient(conn) + req := pbpeering.GenerateTokenRequest{ + PeerName: dialingPeerName, + } + resp, err := peeringClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + conn, err = grpc.DialContext(ctx, s1.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + //nolint:staticcheck + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { + conn.Close() + }) + + peeringClient = pbpeering.NewPeeringServiceClient(conn) + establishReq := pbpeering.EstablishRequest{ + PeerName: acceptingPeerName, + PeeringToken: resp.PeeringToken, + } + establishResp, err := peeringClient.Establish(ctx, &establishReq) + require.NoError(t, err) + require.NotNil(t, establishResp) + + readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: acceptingPeerName}) + require.NoError(t, err) + require.NotNil(t, readResp) + + // Wait for the stream to be connected. + retry.Run(t, func(r *retry.R) { + status, found := s1.peerStreamServer.StreamStatus(readResp.GetPeering().GetID()) + require.True(r, found) + require.True(r, status.Connected) + }) + } + + es := executeServers{ + server: &serverTestMetadata{ + server: s1, + codec: codec1, + datacenter: "dc1", + }, + peeringServer: &serverTestMetadata{ + server: s3, + codec: codec3, + datacenter: "dc3", + dialingPeerName: dialingPeerName, + acceptingPeerName: acceptingPeerName, + }, + } + + return &es +} + +func (es *executeServers) initTokens(t *testing.T) { + es.execNoNodesToken = createTokenWithPolicyName(t, es.server.codec, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root") + rules := ` + service_prefix "" { policy = "read" } + node_prefix "" { policy = "read" } + ` + es.execToken = createTokenWithPolicyName(t, es.server.codec, "with-read", rules, "root") + es.denyToken = createTokenWithPolicyName(t, es.server.codec, "with-deny", `service_prefix "foo" { policy = "deny" }`, "root") +} + +func (es *executeServers) exportPeeringServices(t *testing.T) { + exportedServices := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc3", + Entry: &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "foo", + Consumers: []structs.ServiceConsumer{{Peer: es.peeringServer.dialingPeerName}}, + }, + }, + }, + } + var configOutput bool + require.NoError(t, msgpackrpc.CallWithCodec(es.peeringServer.codec, "ConfigEntry.Apply", &exportedServices, &configOutput)) + require.True(t, configOutput) +} + +func (es *executeServers) initWanFed(t *testing.T) { + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + t.Cleanup(func() { + os.RemoveAll(dir2) + }) + t.Cleanup(func() { + s2.Shutdown() + }) + waitForLeaderEstablishment(t, s2) + codec2 := rpcClient(t, s2) + t.Cleanup(func() { + codec2.Close() + }) + + s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) + + // Try to WAN join. + joinWAN(t, s2, es.server.server) + retry.Run(t, func(r *retry.R) { + if got, want := len(es.server.server.WANMembers()), 2; got != want { + r.Fatalf("got %d WAN members want %d", got, want) + } + if got, want := len(s2.WANMembers()), 2; got != want { + r.Fatalf("got %d WAN members want %d", got, want) + } + }) + testrpc.WaitForLeader(t, es.server.server.RPC, "dc2", testrpc.WithToken("root")) + es.wanServer = &serverTestMetadata{ + server: s2, + codec: codec2, + datacenter: "dc2", } } diff --git a/agent/prepared_query_endpoint.go b/agent/prepared_query_endpoint.go index 2aafb7d96..9b8225f33 100644 --- a/agent/prepared_query_endpoint.go +++ b/agent/prepared_query_endpoint.go @@ -16,7 +16,7 @@ type preparedQueryCreateResponse struct { } // preparedQueryCreate makes a new prepared query. -func (s *HTTPHandlers) preparedQueryCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPHandlers) preparedQueryCreate(_ http.ResponseWriter, req *http.Request) (interface{}, error) { args := structs.PreparedQueryRequest{ Op: structs.PreparedQueryCreate, } diff --git a/agent/structs/prepared_query.go b/agent/structs/prepared_query.go index b46145113..59501d21e 100644 --- a/agent/structs/prepared_query.go +++ b/agent/structs/prepared_query.go @@ -47,6 +47,8 @@ type QueryFailoverTarget struct { // Datacenter specifies a datacenter to try during failover. Datacenter string + + acl.EnterpriseMeta } // QueryDNSOptions controls settings when query results are served over DNS. diff --git a/api/prepared_query.go b/api/prepared_query.go index f47a58373..3dfd571c7 100644 --- a/api/prepared_query.go +++ b/api/prepared_query.go @@ -26,6 +26,14 @@ type QueryFailoverTarget struct { // Datacenter specifies a datacenter to try during failover. Datacenter string + + // Partition specifies a partition to try during failover + // Note: Partition are available only in Consul Enterprise + Partition string + + // Namespace specifies a namespace to try during failover + // Note: Namespaces are available only in Consul Enterprise + Namespace string } // QueryDNSOptions controls settings when query results are served over DNS. diff --git a/website/content/api-docs/query.mdx b/website/content/api-docs/query.mdx index 09aa867c4..81c6ed0e4 100644 --- a/website/content/api-docs/query.mdx +++ b/website/content/api-docs/query.mdx @@ -10,7 +10,7 @@ The `/query` endpoints create, update, destroy, and execute prepared queries. Prepared queries allow you to register a complex service query and then execute it later by specifying the query ID or name. Consul returns a set of healthy nodes that provide a given -service. Refer to +service. Refer to [Enable Dynamic DNS Queries](/consul/docs/services/discovery/dns-dynamic-lookups) for additional information. Check the [Geo Failover tutorial](/consul/tutorials/developer-discovery/automate-geo-failover) for details and @@ -212,6 +212,12 @@ The table below shows this endpoint's support for - `Datacenter` `(string: "")` - Specifies a WAN federated datacenter to forward the query to. + - `Partition` `(string: "")` - Specifies a Partition to forward the + query to. + + - `Namespace` `(string: "")` - Specifies a Namespace to forward the + query to. + - `IgnoreCheckIDs` `(array: nil)` - Specifies a list of check IDs that should be ignored when filtering unhealthy instances. This is mostly useful in an emergency or as a temporary measure when a health check is found to be