diff --git a/.changelog/_4821.txt b/.changelog/_4821.txt
new file mode 100644
index 000000000..88758540a
--- /dev/null
+++ b/.changelog/_4821.txt
@@ -0,0 +1,3 @@
+```release-note:improvement
+connect: **(Enterprise Only)** Add support for specifying "Partition" and "Namespace" in Prepared Queries failover rules.
+```
\ No newline at end of file
diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go
index 54ef82baf..2843082cc 100644
--- a/agent/consul/prepared_query_endpoint.go
+++ b/agent/consul/prepared_query_endpoint.go
@@ -718,13 +718,13 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
return err
}
- // This will help us filter unknown DCs supplied by the user.
+ // This will help us filter unknown targets supplied by the user.
known := make(map[string]struct{})
for _, dc := range nearest {
known[dc] = struct{}{}
}
- // Build a candidate list of DCs to try, starting with the nearest N
+ // Build a candidate list of failover targets to try, starting with the nearest N target
// from RTTs.
var targets []structs.QueryFailoverTarget
index := make(map[string]struct{})
@@ -739,9 +739,9 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
}
}
- // Then add any DCs explicitly listed that weren't selected above.
+ // Then add any targets explicitly listed that weren't selected above.
for _, target := range query.Service.Failover.AsTargets() {
- // This will prevent a log of other log spammage if we do not
+ // This will prevent a log of other log spam if we do not
// attempt to talk to datacenters we don't know about.
if dc := target.Datacenter; dc != "" {
if _, ok := known[dc]; !ok {
@@ -753,15 +753,16 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
// from the NearestN list.
if _, ok := index[dc]; !ok {
targets = append(targets, target)
+ continue
}
}
- if target.Peer != "" {
+ if target.Peer != "" || target.PartitionOrEmpty() != "" || target.NamespaceOrEmpty() != "" {
targets = append(targets, target)
}
}
- // Now try the selected DCs in priority order.
+ // Now try the selected targets in priority order.
failovers := 0
for _, target := range targets {
// This keeps track of how many iterations we actually run.
@@ -775,9 +776,10 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
// through this slice across successive RPC calls.
reply.Nodes = nil
- // Reset PeerName because it may have been set by a previous failover
+ // Reset Peer, because it may have been set by a previous failover
// target.
query.Service.Peer = target.Peer
+ query.Service.EnterpriseMeta = target.EnterpriseMeta
dc := target.Datacenter
if target.Peer != "" {
dc = q.GetLocalDC()
@@ -800,6 +802,7 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
"service", query.Service.Service,
"peerName", query.Service.Peer,
"datacenter", dc,
+ "enterpriseMeta", query.Service.EnterpriseMeta,
"error", err,
)
continue
diff --git a/agent/consul/prepared_query_endpoint_test.go b/agent/consul/prepared_query_endpoint_test.go
index ca1ad0c66..1da472dbc 100644
--- a/agent/consul/prepared_query_endpoint_test.go
+++ b/agent/consul/prepared_query_endpoint_test.go
@@ -1437,152 +1437,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
-
t.Parallel()
- dir1, s1 := testServerWithConfig(t, func(c *Config) {
- c.PrimaryDatacenter = "dc1"
- c.ACLsEnabled = true
- c.ACLInitialManagementToken = "root"
- c.ACLResolverSettings.ACLDefaultPolicy = "deny"
- })
- defer os.RemoveAll(dir1)
- defer s1.Shutdown()
- waitForLeaderEstablishment(t, s1)
- codec1 := rpcClient(t, s1)
- defer codec1.Close()
- dir2, s2 := testServerWithConfig(t, func(c *Config) {
- c.Datacenter = "dc2"
- c.PrimaryDatacenter = "dc1"
- c.ACLsEnabled = true
- c.ACLResolverSettings.ACLDefaultPolicy = "deny"
- })
- defer os.RemoveAll(dir2)
- defer s2.Shutdown()
- waitForLeaderEstablishment(t, s2)
- codec2 := rpcClient(t, s2)
- defer codec2.Close()
-
- s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
-
- ca := connect.TestCA(t, nil)
- dir3, s3 := testServerWithConfig(t, func(c *Config) {
- c.Datacenter = "dc3"
- c.PrimaryDatacenter = "dc3"
- c.NodeName = "acceptingServer.dc3"
- c.GRPCTLSPort = freeport.GetOne(t)
- c.CAConfig = &structs.CAConfiguration{
- ClusterID: connect.TestClusterID,
- Provider: structs.ConsulCAProvider,
- Config: map[string]interface{}{
- "PrivateKey": ca.SigningKey,
- "RootCert": ca.RootCert,
- },
- }
- })
- defer os.RemoveAll(dir3)
- defer s3.Shutdown()
- waitForLeaderEstablishment(t, s3)
- codec3 := rpcClient(t, s3)
- defer codec3.Close()
-
- // Try to WAN join.
- joinWAN(t, s2, s1)
- retry.Run(t, func(r *retry.R) {
- if got, want := len(s1.WANMembers()), 2; got != want {
- r.Fatalf("got %d WAN members want %d", got, want)
- }
- if got, want := len(s2.WANMembers()), 2; got != want {
- r.Fatalf("got %d WAN members want %d", got, want)
- }
- })
-
- // check for RPC forwarding
- testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root"))
- testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root"))
- testrpc.WaitForLeader(t, s3.RPC, "dc3")
-
- acceptingPeerName := "my-peer-accepting-server"
- dialingPeerName := "my-peer-dialing-server"
-
- // Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service
- {
- // Create a peering by generating a token.
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- t.Cleanup(cancel)
-
- options := structs.QueryOptions{Token: "root"}
- ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options)
- require.NoError(t, err)
-
- conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(),
- grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())),
- //nolint:staticcheck
- grpc.WithInsecure(),
- grpc.WithBlock())
- require.NoError(t, err)
- defer conn.Close()
-
- peeringClient := pbpeering.NewPeeringServiceClient(conn)
- req := pbpeering.GenerateTokenRequest{
- PeerName: dialingPeerName,
- }
- resp, err := peeringClient.GenerateToken(ctx, &req)
- require.NoError(t, err)
-
- conn, err = grpc.DialContext(ctx, s1.config.RPCAddr.String(),
- grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
- //nolint:staticcheck
- grpc.WithInsecure(),
- grpc.WithBlock())
- require.NoError(t, err)
- defer conn.Close()
-
- peeringClient = pbpeering.NewPeeringServiceClient(conn)
- establishReq := pbpeering.EstablishRequest{
- PeerName: acceptingPeerName,
- PeeringToken: resp.PeeringToken,
- }
- establishResp, err := peeringClient.Establish(ctx, &establishReq)
- require.NoError(t, err)
- require.NotNil(t, establishResp)
-
- readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: acceptingPeerName})
- require.NoError(t, err)
- require.NotNil(t, readResp)
-
- // Wait for the stream to be connected.
- retry.Run(t, func(r *retry.R) {
- status, found := s1.peerStreamServer.StreamStatus(readResp.GetPeering().GetID())
- require.True(r, found)
- require.True(r, status.Connected)
- })
-
- exportedServices := structs.ConfigEntryRequest{
- Op: structs.ConfigEntryUpsert,
- Datacenter: "dc3",
- Entry: &structs.ExportedServicesConfigEntry{
- Name: "default",
- Services: []structs.ExportedService{
- {
- Name: "foo",
- Consumers: []structs.ServiceConsumer{{Peer: dialingPeerName}},
- },
- },
- },
- }
- var configOutput bool
- require.NoError(t, msgpackrpc.CallWithCodec(codec3, "ConfigEntry.Apply", &exportedServices, &configOutput))
- require.True(t, configOutput)
- }
-
- execNoNodesToken := createTokenWithPolicyName(t, codec1, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root")
- rules := `
- service_prefix "" { policy = "read" }
- node_prefix "" { policy = "read" }
- `
- execToken := createTokenWithPolicyName(t, codec1, "with-read", rules, "root")
- denyToken := createTokenWithPolicyName(t, codec1, "with-deny", `service_prefix "foo" { policy = "deny" }`, "root")
+ es := createExecuteServers(t)
newSessionDC1 := func(t *testing.T) string {
t.Helper()
@@ -1590,12 +1447,12 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
- Node: s1.config.NodeName,
+ Node: es.server.server.config.NodeName,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var session string
- if err := msgpackrpc.CallWithCodec(codec1, "Session.Apply", &req, &session); err != nil {
+ if err := msgpackrpc.CallWithCodec(es.server.codec, "Session.Apply", &req, &session); err != nil {
t.Fatalf("err: %v", err)
}
return session
@@ -1608,9 +1465,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
codec rpc.ClientCodec
dc string
}{
- {codec1, "dc1"},
- {codec2, "dc2"},
- {codec3, "dc3"},
+ {es.server.codec, "dc1"},
+ {es.wanServer.codec, "dc2"},
+ {es.peeringServer.codec, "dc3"},
} {
req := structs.RegisterRequest{
Datacenter: d.dc,
@@ -1659,7 +1516,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
- if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
+ if err := msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}
@@ -1671,7 +1528,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
var reply structs.PreparedQueryExecuteResponse
- err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)
+ err := msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply)
assert.EqualError(t, err, structs.ErrQueryNotFound.Error())
assert.Len(t, reply.Nodes, 0)
})
@@ -1699,7 +1556,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Helper()
assert.Len(t, reply.Nodes, n)
assert.Equal(t, "", reply.Datacenter)
- assert.Equal(t, acceptingPeerName, reply.PeerName)
+ assert.Equal(t, es.peeringServer.acceptingPeerName, reply.PeerName)
assert.Equal(t, 2, reply.Failovers)
assert.Equal(t, query.Query.Service.Service, reply.Service)
assert.Equal(t, query.Query.DNS, reply.DNS)
@@ -1710,11 +1567,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 10)
})
@@ -1723,11 +1580,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
Limit: 3,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 3)
})
@@ -1771,16 +1628,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID))
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: nodeMetaQuery.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, tc.numNodes)
for _, node := range reply.Nodes {
@@ -1834,16 +1691,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &svcMetaQuery, &svcMetaQuery.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &svcMetaQuery, &svcMetaQuery.Query.ID))
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: svcMetaQuery.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, tc.numNodes)
for _, node := range reply.Nodes {
assert.True(t, structs.SatisfiesMetaFilters(node.Service.Meta, tc.filters), "meta: %v", node.Service.Meta)
@@ -1861,8 +1718,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out struct{}
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "Coordinate.Update", &req, &out))
- time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "Coordinate.Update", &req, &out))
+ time.Sleep(3 * es.server.server.config.CoordinateUpdatePeriod)
}
// Try an RTT sort. We don't have any other coordinates in there but
@@ -1877,11 +1734,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
Node: "node3",
},
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 10)
assert.Equal(t, "node3", reply.Nodes[0].Node.Node)
@@ -1895,11 +1752,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 10)
@@ -1924,7 +1781,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// so node3 should always show up first.
query.Op = structs.PreparedQueryUpdate
query.Query.Service.Near = "node3"
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Now run the query and make sure the sort looks right.
for i := 0; i < 10; i++ {
@@ -1936,11 +1793,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, 10)
assert.Equal(t, "node3", reply.Nodes[0].Node.Node)
})
@@ -1963,13 +1820,13 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
shuffled := false
for i := 0; i < 10; i++ {
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, 10)
if node := reply.Nodes[0].Node.Node; node != "node3" {
@@ -1991,12 +1848,12 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
for i := 0; i < 10; i++ {
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, 10)
assert.Equal(t, "node1", reply.Nodes[0].Node.Node)
}
@@ -2004,7 +1861,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Bake the magic "_agent" flag into the query.
query.Query.Service.Near = "_agent"
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Check that we sort the local agent first when the magic flag is set.
t.Run("local agent is first using _agent on node3", func(t *testing.T) {
@@ -2015,12 +1872,12 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
for i := 0; i < 10; i++ {
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, 10)
assert.Equal(t, "node3", reply.Nodes[0].Node.Node)
}
@@ -2037,7 +1894,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
// Expect the set to be shuffled since we have no coordinates
@@ -2045,7 +1902,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
shuffled := false
for i := 0; i < 10; i++ {
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, 10)
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
@@ -2070,13 +1927,13 @@ func TestPreparedQuery_Execute(t *testing.T) {
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
shuffled := false
for i := 0; i < 10; i++ {
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
assert.Len(t, reply.Nodes, 10)
if reply.Nodes[0].Node.Node != "node3" {
shuffled = true
@@ -2089,7 +1946,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Un-bake the near parameter.
query.Query.Service.Near = ""
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Update the health of a node to mark it critical.
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, i int, health string) {
@@ -2113,18 +1970,18 @@ func TestPreparedQuery_Execute(t *testing.T) {
var reply struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
}
- setHealth(t, codec1, "dc1", 1, api.HealthCritical)
+ setHealth(t, es.server.codec, "dc1", 1, api.HealthCritical)
// The failing node should be filtered.
t.Run("failing node filtered", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 9)
for _, node := range reply.Nodes {
@@ -2133,34 +1990,34 @@ func TestPreparedQuery_Execute(t *testing.T) {
})
// Upgrade it to a warning and re-query, should be 10 nodes again.
- setHealth(t, codec1, "dc1", 1, api.HealthWarning)
+ setHealth(t, es.server.codec, "dc1", 1, api.HealthWarning)
t.Run("warning nodes are included", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 10)
})
// Make the query more picky so it excludes warning nodes.
query.Query.Service.OnlyPassing = true
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// The node in the warning state should be filtered.
t.Run("warning nodes are omitted with onlypassing=true", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 9)
for _, node := range reply.Nodes {
@@ -2171,31 +2028,31 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Make the query ignore all our health checks (which have "failing" ID
// implicitly from their name).
query.Query.Service.IgnoreCheckIDs = []types.CheckID{"failing"}
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// We should end up with 10 nodes again
t.Run("all nodes including when ignoring failing checks", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 10)
})
// Undo that so all the following tests aren't broken!
query.Query.Service.IgnoreCheckIDs = nil
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Make the query more picky by adding a tag filter. This just proves we
// call into the tag filter, it is tested more thoroughly in a separate
// test.
query.Query.Service.Tags = []string{"!tag3"}
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// The node in the warning state should be filtered as well as the node
// with the filtered tag.
@@ -2203,11 +2060,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 8)
for _, node := range reply.Nodes {
@@ -2221,29 +2078,29 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: denyToken},
+ QueryOptions: structs.QueryOptions{Token: es.denyToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 0)
})
// Bake the exec token into the query.
- query.Query.Token = execToken
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ query.Query.Token = es.execToken
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Now even querying with the deny token should work.
t.Run("query with deny token still works", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: denyToken},
+ QueryOptions: structs.QueryOptions{Token: es.denyToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 8)
for _, node := range reply.Nodes {
@@ -2254,18 +2111,18 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Un-bake the token.
query.Query.Token = ""
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Make sure the query gets denied again with the deny token.
t.Run("denied with deny token when no query token", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: denyToken},
+ QueryOptions: structs.QueryOptions{Token: es.denyToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 0)
})
@@ -2274,11 +2131,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execNoNodesToken},
+ QueryOptions: structs.QueryOptions{Token: es.execNoNodesToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 0)
require.True(t, reply.QueryMeta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be true")
@@ -2288,11 +2145,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 8)
for _, node := range reply.Nodes {
@@ -2303,35 +2160,35 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Now fail everything in dc1 and we should get an empty list back.
for i := 0; i < 10; i++ {
- setHealth(t, codec1, "dc1", i+1, api.HealthCritical)
+ setHealth(t, es.server.codec, "dc1", i+1, api.HealthCritical)
}
t.Run("everything is failing so should get empty list", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectNodes(t, &query, &reply, 0)
})
// Modify the query to have it fail over to a bogus DC and then dc2.
query.Query.Service.Failover.Datacenters = []string{"bogus", "dc2"}
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Now we should see 9 nodes from dc2 (we have the tag filter still).
t.Run("see 9 nodes from dc2 using tag filter", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectFailoverNodes(t, &query, &reply, 9)
for _, node := range reply.Nodes {
@@ -2346,13 +2203,13 @@ func TestPreparedQuery_Execute(t *testing.T) {
QueryIDOrName: query.Query.ID,
Limit: 3,
QueryOptions: structs.QueryOptions{
- Token: execToken,
+ Token: es.execToken,
RequireConsistent: true,
},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectFailoverNodes(t, &query, &reply, 3)
for _, node := range reply.Nodes {
@@ -2367,11 +2224,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectFailoverNodes(t, &query, &reply, 9)
var names []string
@@ -2395,11 +2252,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: denyToken},
+ QueryOptions: structs.QueryOptions{Token: es.denyToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectFailoverNodes(t, &query, &reply, 0)
})
@@ -2408,30 +2265,30 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execNoNodesToken},
+ QueryOptions: structs.QueryOptions{Token: es.execNoNodesToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectFailoverNodes(t, &query, &reply, 0)
require.True(t, reply.QueryMeta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be true")
})
// Bake the exec token into the query.
- query.Query.Token = execToken
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ query.Query.Token = es.execToken
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Now even querying with the deny token should work.
t.Run("query from dc2 with exec token using deny token works", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: denyToken},
+ QueryOptions: structs.QueryOptions{Token: es.denyToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
expectFailoverNodes(t, &query, &reply, 9)
for _, node := range reply.Nodes {
@@ -2443,14 +2300,14 @@ func TestPreparedQuery_Execute(t *testing.T) {
query.Query.Service.Failover = structs.QueryFailoverOptions{
Targets: []structs.QueryFailoverTarget{
{Datacenter: "dc2"},
- {Peer: acceptingPeerName},
+ {Peer: es.peeringServer.acceptingPeerName},
},
}
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Apply", &query, &query.Query.ID))
// Ensure the foo service has fully replicated.
retry.Run(t, func(r *retry.R) {
- _, nodes, err := s1.fsm.State().CheckServiceNodes(nil, "foo", nil, acceptingPeerName)
+ _, nodes, err := es.server.server.fsm.State().CheckServiceNodes(nil, "foo", nil, es.peeringServer.acceptingPeerName)
require.NoError(r, err)
require.Len(r, nodes, 10)
})
@@ -2460,11 +2317,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
@@ -2474,7 +2331,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Set all checks in dc2 as critical
for i := 0; i < 10; i++ {
- setHealth(t, codec2, "dc2", i+1, api.HealthCritical)
+ setHealth(t, es.wanServer.codec, "dc2", i+1, api.HealthCritical)
}
// Now we should see 9 nodes from dc3 (we have the tag filter still)
@@ -2482,11 +2339,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
@@ -2496,7 +2353,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Set all checks in dc1 as passing
for i := 0; i < 10; i++ {
- setHealth(t, codec1, "dc1", i+1, api.HealthPassing)
+ setHealth(t, es.server.codec, "dc1", i+1, api.HealthPassing)
}
// Nothing is healthy so nothing is returned
@@ -2505,11 +2362,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
- QueryOptions: structs.QueryOptions{Token: execToken},
+ QueryOptions: structs.QueryOptions{Token: es.execToken},
}
var reply structs.PreparedQueryExecuteResponse
- require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
+ require.NoError(t, msgpackrpc.CallWithCodec(es.server.codec, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
@@ -3032,7 +2889,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
}
// Datacenters are available but the query doesn't use them.
- {
+ t.Run("Query no datacenters used", func(t *testing.T) {
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
}
@@ -3044,10 +2901,10 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 {
t.Fatalf("bad: %v", reply)
}
- }
+ })
// Make it fail to get datacenters.
- {
+ t.Run("Fail to get datacenters", func(t *testing.T) {
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
DatacentersError: fmt.Errorf("XXX"),
@@ -3061,11 +2918,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 {
t.Fatalf("bad: %v", reply)
}
- }
+ })
// The query wants to use other datacenters but none are available.
- query.Service.Failover.NearestN = 3
- {
+ t.Run("no datacenters available", func(t *testing.T) {
+ query.Service.Failover.NearestN = 3
mock := &mockQueryServer{
Datacenters: []string{},
}
@@ -3077,11 +2934,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 {
t.Fatalf("bad: %v", reply)
}
- }
+ })
// Try the first three nearest datacenters, first one has the data.
- query.Service.Failover.NearestN = 3
- {
+ t.Run("first datacenter has data", func(t *testing.T) {
+ query.Service.Failover.NearestN = 3
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3104,11 +2961,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
}
- }
+ })
// Try the first three nearest datacenters, last one has the data.
- query.Service.Failover.NearestN = 3
- {
+ t.Run("last datacenter has data", func(t *testing.T) {
+ query.Service.Failover.NearestN = 3
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3131,11 +2988,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
}
- }
+ })
// Try the first four nearest datacenters, nobody has the data.
- query.Service.Failover.NearestN = 4
- {
+ t.Run("no datacenters with data", func(t *testing.T) {
+ query.Service.Failover.NearestN = 4
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
}
@@ -3151,13 +3008,13 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
}
- }
+ })
// Try the first two nearest datacenters, plus a user-specified one that
// has the data.
- query.Service.Failover.NearestN = 2
- query.Service.Failover.Datacenters = []string{"dc4"}
- {
+ t.Run("user specified datacenter with data", func(t *testing.T) {
+ query.Service.Failover.NearestN = 2
+ query.Service.Failover.Datacenters = []string{"dc4"}
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3180,12 +3037,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
}
- }
+ })
// Add in a hard-coded value that overlaps with the nearest list.
- query.Service.Failover.NearestN = 2
- query.Service.Failover.Datacenters = []string{"dc4", "dc1"}
- {
+ t.Run("overlap with nearest list", func(t *testing.T) {
+ query.Service.Failover.NearestN = 2
+ query.Service.Failover.Datacenters = []string{"dc4", "dc1"}
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3208,12 +3065,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
}
- }
+ })
// Now add a bogus user-defined one to the mix.
- query.Service.Failover.NearestN = 2
- query.Service.Failover.Datacenters = []string{"nope", "dc4", "dc1"}
- {
+ t.Run("bogus user-defined", func(t *testing.T) {
+ query.Service.Failover.NearestN = 2
+ query.Service.Failover.Datacenters = []string{"nope", "dc4", "dc1"}
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3237,13 +3094,13 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
t.Fatalf("bad: %s", queries)
}
require.Contains(t, mock.LogBuffer.String(), "Skipping unknown datacenter")
- }
+ })
// Same setup as before but dc1 is going to return an error and should
// get skipped over, still yielding data from dc4 which comes later.
- query.Service.Failover.NearestN = 2
- query.Service.Failover.Datacenters = []string{"dc4", "dc1"}
- {
+ t.Run("dc1 error", func(t *testing.T) {
+ query.Service.Failover.NearestN = 2
+ query.Service.Failover.Datacenters = []string{"dc4", "dc1"}
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3271,12 +3128,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if !strings.Contains(mock.LogBuffer.String(), "Failed querying") {
t.Fatalf("bad: %s", mock.LogBuffer.String())
}
- }
+ })
// Just use a hard-coded list and now xxx has the data.
- query.Service.Failover.NearestN = 0
- query.Service.Failover.Datacenters = []string{"dc3", "xxx"}
- {
+ t.Run("hard coded list", func(t *testing.T) {
+ query.Service.Failover.NearestN = 0
+ query.Service.Failover.Datacenters = []string{"dc3", "xxx"}
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3299,12 +3156,12 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if queries := mock.JoinQueryLog(); queries != "dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
}
- }
+ })
// Make sure the limit and query options are plumbed through.
- query.Service.Failover.NearestN = 0
- query.Service.Failover.Datacenters = []string{"xxx"}
- {
+ t.Run("limit and query options used", func(t *testing.T) {
+ query.Service.Failover.NearestN = 0
+ query.Service.Failover.Datacenters = []string{"xxx"}
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
@@ -3336,33 +3193,331 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
if queries := mock.JoinQueryLog(); queries != "xxx:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
}
- }
+ })
// Failover returns data from the first cluster peer with data.
- query.Service.Failover.Datacenters = nil
- query.Service.Failover.Targets = []structs.QueryFailoverTarget{
- {Peer: "cluster-01"},
- {Datacenter: "dc44"},
- {Peer: "cluster-02"},
- }
- {
- mock := &mockQueryServer{
- Datacenters: []string{"dc44"},
- QueryFn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
+ t.Run("failover first peer with data", func(t *testing.T) {
+ query.Service.Failover.Datacenters = nil
+ query.Service.Failover.Targets = []structs.QueryFailoverTarget{
+ {Peer: "cluster-01"},
+ {Datacenter: "dc44"},
+ {Peer: "cluster-02"},
+ }
+ {
+ mock := &mockQueryServer{
+ Datacenters: []string{"dc44"},
+ QueryFn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
+ if args.Query.Service.Peer == "cluster-02" {
+ reply.Nodes = nodes()
+ }
+ return nil
+ },
+ }
+
+ var reply structs.PreparedQueryExecuteResponse
+ if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ require.Equal(t, "cluster-02", reply.PeerName)
+ require.Equal(t, 3, reply.Failovers)
+ require.Equal(t, nodes(), reply.Nodes)
+ require.Equal(t, "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", mock.JoinQueryLog())
+ }
+ })
+
+ tests := []struct {
+ name string
+ targets []structs.QueryFailoverTarget
+ datacenters []string
+ queryfn func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
+ expectedPeer string
+ expectedDatacenter string
+ expectedReplies int
+ expectedQuery string
+ }{
+ {
+ name: "failover first peer with data",
+ targets: []structs.QueryFailoverTarget{
+ {Peer: "cluster-01"},
+ {Datacenter: "dc44"},
+ {Peer: "cluster-02"},
+ },
+ queryfn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if args.Query.Service.Peer == "cluster-02" {
reply.Nodes = nodes()
}
return nil
},
- }
+ datacenters: []string{"dc44"},
+ expectedPeer: "cluster-02",
+ expectedDatacenter: "",
+ expectedReplies: 3,
+ expectedQuery: "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02",
+ },
+ {
+ name: "failover datacenter with data",
+ targets: []structs.QueryFailoverTarget{
+ {Peer: "cluster-01"},
+ {Datacenter: "dc44"},
+ {Peer: "cluster-02"},
+ },
+ queryfn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
+ if args.Datacenter == "dc44" {
+ reply.Nodes = nodes()
+ }
+ return nil
+ },
+ datacenters: []string{"dc44"},
+ expectedPeer: "",
+ expectedDatacenter: "dc44",
+ expectedReplies: 2,
+ expectedQuery: "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote",
+ },
+ }
- var reply structs.PreparedQueryExecuteResponse
- if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil {
- t.Fatalf("err: %v", err)
- }
- require.Equal(t, "cluster-02", reply.PeerName)
- require.Equal(t, 3, reply.Failovers)
- require.Equal(t, nodes(), reply.Nodes)
- require.Equal(t, "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", mock.JoinQueryLog())
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ query.Service.Failover.Datacenters = nil
+ query.Service.Failover.Targets = tt.targets
+
+ mock := &mockQueryServer{
+ Datacenters: tt.datacenters,
+ QueryFn: tt.queryfn,
+ }
+
+ var reply structs.PreparedQueryExecuteResponse
+ if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ require.Equal(t, tt.expectedPeer, reply.PeerName)
+ require.Equal(t, tt.expectedReplies, reply.Failovers)
+ require.Equal(t, nodes(), reply.Nodes)
+ require.Equal(t, tt.expectedQuery, mock.JoinQueryLog())
+ })
+ }
+}
+
+type serverTestMetadata struct {
+ server *Server
+ codec rpc.ClientCodec
+ datacenter string
+ acceptingPeerName string
+ dialingPeerName string
+}
+
+type executeServers struct {
+ server *serverTestMetadata
+ peeringServer *serverTestMetadata
+ wanServer *serverTestMetadata
+ execToken string
+ denyToken string
+ execNoNodesToken string
+}
+
+func createExecuteServers(t *testing.T) *executeServers {
+ es := newExecuteServers(t)
+ es.initWanFed(t)
+ es.exportPeeringServices(t)
+ es.initTokens(t)
+
+ return es
+}
+
+func newExecuteServers(t *testing.T) *executeServers {
+
+ // Setup server
+ dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.PrimaryDatacenter = "dc1"
+ c.ACLsEnabled = true
+ c.ACLInitialManagementToken = "root"
+ c.ACLResolverSettings.ACLDefaultPolicy = "deny"
+ })
+ t.Cleanup(func() {
+ os.RemoveAll(dir1)
+ })
+ t.Cleanup(func() {
+ s1.Shutdown()
+ })
+ waitForLeaderEstablishment(t, s1)
+ codec1 := rpcClient(t, s1)
+ t.Cleanup(func() {
+ codec1.Close()
+ })
+
+ ca := connect.TestCA(t, nil)
+ dir3, s3 := testServerWithConfig(t, func(c *Config) {
+ c.Datacenter = "dc3"
+ c.PrimaryDatacenter = "dc3"
+ c.NodeName = "acceptingServer.dc3"
+ c.GRPCTLSPort = freeport.GetOne(t)
+ c.CAConfig = &structs.CAConfiguration{
+ ClusterID: connect.TestClusterID,
+ Provider: structs.ConsulCAProvider,
+ Config: map[string]interface{}{
+ "PrivateKey": ca.SigningKey,
+ "RootCert": ca.RootCert,
+ },
+ }
+ })
+ t.Cleanup(func() {
+ os.RemoveAll(dir3)
+ })
+ t.Cleanup(func() {
+ s3.Shutdown()
+ })
+ waitForLeaderEstablishment(t, s3)
+ codec3 := rpcClient(t, s3)
+ t.Cleanup(func() {
+ codec3.Close()
+ })
+
+ // check for RPC forwarding
+ testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root"))
+ testrpc.WaitForLeader(t, s3.RPC, "dc3")
+
+ acceptingPeerName := "my-peer-accepting-server"
+ dialingPeerName := "my-peer-dialing-server"
+
+ // Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service
+ {
+ // Create a peering by generating a token.
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ t.Cleanup(cancel)
+
+ options := structs.QueryOptions{Token: "root"}
+ ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options)
+ require.NoError(t, err)
+
+ conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(),
+ grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())),
+ //nolint:staticcheck
+ grpc.WithInsecure(),
+ grpc.WithBlock())
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ conn.Close()
+ })
+
+ peeringClient := pbpeering.NewPeeringServiceClient(conn)
+ req := pbpeering.GenerateTokenRequest{
+ PeerName: dialingPeerName,
+ }
+ resp, err := peeringClient.GenerateToken(ctx, &req)
+ require.NoError(t, err)
+
+ conn, err = grpc.DialContext(ctx, s1.config.RPCAddr.String(),
+ grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
+ //nolint:staticcheck
+ grpc.WithInsecure(),
+ grpc.WithBlock())
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ conn.Close()
+ })
+
+ peeringClient = pbpeering.NewPeeringServiceClient(conn)
+ establishReq := pbpeering.EstablishRequest{
+ PeerName: acceptingPeerName,
+ PeeringToken: resp.PeeringToken,
+ }
+ establishResp, err := peeringClient.Establish(ctx, &establishReq)
+ require.NoError(t, err)
+ require.NotNil(t, establishResp)
+
+ readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: acceptingPeerName})
+ require.NoError(t, err)
+ require.NotNil(t, readResp)
+
+ // Wait for the stream to be connected.
+ retry.Run(t, func(r *retry.R) {
+ status, found := s1.peerStreamServer.StreamStatus(readResp.GetPeering().GetID())
+ require.True(r, found)
+ require.True(r, status.Connected)
+ })
+ }
+
+ es := executeServers{
+ server: &serverTestMetadata{
+ server: s1,
+ codec: codec1,
+ datacenter: "dc1",
+ },
+ peeringServer: &serverTestMetadata{
+ server: s3,
+ codec: codec3,
+ datacenter: "dc3",
+ dialingPeerName: dialingPeerName,
+ acceptingPeerName: acceptingPeerName,
+ },
+ }
+
+ return &es
+}
+
+func (es *executeServers) initTokens(t *testing.T) {
+ es.execNoNodesToken = createTokenWithPolicyName(t, es.server.codec, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root")
+ rules := `
+ service_prefix "" { policy = "read" }
+ node_prefix "" { policy = "read" }
+ `
+ es.execToken = createTokenWithPolicyName(t, es.server.codec, "with-read", rules, "root")
+ es.denyToken = createTokenWithPolicyName(t, es.server.codec, "with-deny", `service_prefix "foo" { policy = "deny" }`, "root")
+}
+
+func (es *executeServers) exportPeeringServices(t *testing.T) {
+ exportedServices := structs.ConfigEntryRequest{
+ Op: structs.ConfigEntryUpsert,
+ Datacenter: "dc3",
+ Entry: &structs.ExportedServicesConfigEntry{
+ Name: "default",
+ Services: []structs.ExportedService{
+ {
+ Name: "foo",
+ Consumers: []structs.ServiceConsumer{{Peer: es.peeringServer.dialingPeerName}},
+ },
+ },
+ },
+ }
+ var configOutput bool
+ require.NoError(t, msgpackrpc.CallWithCodec(es.peeringServer.codec, "ConfigEntry.Apply", &exportedServices, &configOutput))
+ require.True(t, configOutput)
+}
+
+func (es *executeServers) initWanFed(t *testing.T) {
+ dir2, s2 := testServerWithConfig(t, func(c *Config) {
+ c.Datacenter = "dc2"
+ c.PrimaryDatacenter = "dc1"
+ c.ACLsEnabled = true
+ c.ACLResolverSettings.ACLDefaultPolicy = "deny"
+ })
+ t.Cleanup(func() {
+ os.RemoveAll(dir2)
+ })
+ t.Cleanup(func() {
+ s2.Shutdown()
+ })
+ waitForLeaderEstablishment(t, s2)
+ codec2 := rpcClient(t, s2)
+ t.Cleanup(func() {
+ codec2.Close()
+ })
+
+ s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
+
+ // Try to WAN join.
+ joinWAN(t, s2, es.server.server)
+ retry.Run(t, func(r *retry.R) {
+ if got, want := len(es.server.server.WANMembers()), 2; got != want {
+ r.Fatalf("got %d WAN members want %d", got, want)
+ }
+ if got, want := len(s2.WANMembers()), 2; got != want {
+ r.Fatalf("got %d WAN members want %d", got, want)
+ }
+ })
+ testrpc.WaitForLeader(t, es.server.server.RPC, "dc2", testrpc.WithToken("root"))
+ es.wanServer = &serverTestMetadata{
+ server: s2,
+ codec: codec2,
+ datacenter: "dc2",
}
}
diff --git a/agent/prepared_query_endpoint.go b/agent/prepared_query_endpoint.go
index 2aafb7d96..9b8225f33 100644
--- a/agent/prepared_query_endpoint.go
+++ b/agent/prepared_query_endpoint.go
@@ -16,7 +16,7 @@ type preparedQueryCreateResponse struct {
}
// preparedQueryCreate makes a new prepared query.
-func (s *HTTPHandlers) preparedQueryCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
+func (s *HTTPHandlers) preparedQueryCreate(_ http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryRequest{
Op: structs.PreparedQueryCreate,
}
diff --git a/agent/structs/prepared_query.go b/agent/structs/prepared_query.go
index b46145113..59501d21e 100644
--- a/agent/structs/prepared_query.go
+++ b/agent/structs/prepared_query.go
@@ -47,6 +47,8 @@ type QueryFailoverTarget struct {
// Datacenter specifies a datacenter to try during failover.
Datacenter string
+
+ acl.EnterpriseMeta
}
// QueryDNSOptions controls settings when query results are served over DNS.
diff --git a/api/prepared_query.go b/api/prepared_query.go
index f47a58373..3dfd571c7 100644
--- a/api/prepared_query.go
+++ b/api/prepared_query.go
@@ -26,6 +26,14 @@ type QueryFailoverTarget struct {
// Datacenter specifies a datacenter to try during failover.
Datacenter string
+
+ // Partition specifies a partition to try during failover
+ // Note: Partition are available only in Consul Enterprise
+ Partition string
+
+ // Namespace specifies a namespace to try during failover
+ // Note: Namespaces are available only in Consul Enterprise
+ Namespace string
}
// QueryDNSOptions controls settings when query results are served over DNS.
diff --git a/website/content/api-docs/query.mdx b/website/content/api-docs/query.mdx
index 09aa867c4..81c6ed0e4 100644
--- a/website/content/api-docs/query.mdx
+++ b/website/content/api-docs/query.mdx
@@ -10,7 +10,7 @@ The `/query` endpoints create, update, destroy, and execute prepared queries.
Prepared queries allow you to register a complex service query and then execute
it later by specifying the query ID or name. Consul returns a set of healthy nodes that provide a given
-service. Refer to
+service. Refer to
[Enable Dynamic DNS Queries](/consul/docs/services/discovery/dns-dynamic-lookups) for additional information.
Check the [Geo Failover tutorial](/consul/tutorials/developer-discovery/automate-geo-failover) for details and
@@ -212,6 +212,12 @@ The table below shows this endpoint's support for
- `Datacenter` `(string: "")` - Specifies a WAN federated datacenter to forward the
query to.
+ - `Partition` `(string: "")` - Specifies a Partition to forward the
+ query to.
+
+ - `Namespace` `(string: "")` - Specifies a Namespace to forward the
+ query to.
+
- `IgnoreCheckIDs` `(array: nil)` - Specifies a list of check IDs that
should be ignored when filtering unhealthy instances. This is mostly useful
in an emergency or as a temporary measure when a health check is found to be