Add sameness group field to prepared queries (#17089)
* added method for converting SamenessGroupConfigEntry - added new method `ToQueryFailoverTargets` for converting a SamenessGroupConfigEntry's members to a list of QueryFailoverTargets - renamed `ToFailoverTargets` ToServiceResolverFailoverTargets to distinguish it from `ToQueryFailoverTargets` * Added SamenessGroup to PreparedQuery - exposed Service.Partition to API when defining a prepared query - added a method for determining if a QueryFailoverOptions is empty - This will be useful for validation - added unit tests * added method for retrieving a SamenessGroup to state store * added logic for using PQ with SamenessGroup - added branching path for SamenessGroup handling in execute. It will be handled separate from the normal PQ case - added a new interface so that the `GetSamenessGroupFailoverTargets` can be properly tested - separated the execute logic into a `targetSelector` function so that it can be used for both failover and sameness group PQs - split OSS only methods into new PQ OSS files - added validation that `samenessGroup` is an enterprise only feature * added documentation for PQ SamenessGroup
This commit is contained in:
parent
dc4d8b0cf6
commit
40dd8ce65b
|
@ -1196,7 +1196,7 @@ func (c *compiler) makeSamenessGroupFailover(target *structs.DiscoveryTarget, op
|
|||
}
|
||||
|
||||
var failoverTargets []*structs.DiscoveryTarget
|
||||
for _, t := range samenessGroup.ToFailoverTargets() {
|
||||
for _, t := range samenessGroup.ToServiceResolverFailoverTargets() {
|
||||
// Rewrite the target as per the failover policy.
|
||||
targetOpts := structs.MergeDiscoveryTargetOpts(opts, t.ToDiscoveryTargetOpts())
|
||||
failoverTarget := c.rewriteTarget(target, targetOpts)
|
||||
|
|
|
@ -46,6 +46,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
|||
".Tags[1]:tag2",
|
||||
".Tags[2]:tag3",
|
||||
".Peer:",
|
||||
".SamenessGroup:",
|
||||
}
|
||||
expected = append(expected, entMetaWalkFields...)
|
||||
sort.Strings(expected)
|
||||
|
|
|
@ -141,7 +141,7 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
|
|||
}
|
||||
|
||||
// parseQuery makes sure the entries of a query are valid for a create or
|
||||
// update operation. Some of the fields are not checked or are partially
|
||||
// update operation. Some fields are not checked or are partially
|
||||
// checked, as noted in the comments below. This also updates all the parsed
|
||||
// fields of the query.
|
||||
func parseQuery(query *structs.PreparedQuery) error {
|
||||
|
@ -205,6 +205,10 @@ func parseService(svc *structs.ServiceQuery) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := parseSameness(svc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We skip a few fields:
|
||||
// - There's no validation for Datacenters; we skip any unknown entries
|
||||
// at execution time.
|
||||
|
@ -371,108 +375,117 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
|||
return structs.ErrQueryNotFound
|
||||
}
|
||||
|
||||
// Execute the query for the local DC.
|
||||
if err := p.execute(query, reply, args.Connect); err != nil {
|
||||
return err
|
||||
}
|
||||
// If we have a sameness group, it controls the initial query and
|
||||
// subsequent failover if required (Enterprise Only)
|
||||
if query.Service.SamenessGroup != "" {
|
||||
wrapper := newQueryServerWrapper(p.srv, p.ExecuteRemote)
|
||||
if err := querySameness(wrapper, *query, args, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Execute the query for the local DC.
|
||||
if err := p.execute(query, reply, args.Connect); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If they supplied a token with the query, use that, otherwise use the
|
||||
// token passed in with the request.
|
||||
token := args.QueryOptions.Token
|
||||
if query.Token != "" {
|
||||
token = query.Token
|
||||
}
|
||||
if err := p.srv.filterACL(token, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
// If they supplied a token with the query, use that, otherwise use the
|
||||
// token passed in with the request.
|
||||
token := args.QueryOptions.Token
|
||||
if query.Token != "" {
|
||||
token = query.Token
|
||||
}
|
||||
if err := p.srv.filterACL(token, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO (slackpad) We could add a special case here that will avoid the
|
||||
// fail over if we filtered everything due to ACLs. This seems like it
|
||||
// might not be worth the code complexity and behavior differences,
|
||||
// though, since this is essentially a misconfiguration.
|
||||
// TODO (slackpad) We could add a special case here that will avoid the
|
||||
// fail over if we filtered everything due to ACLs. This seems like it
|
||||
// might not be worth the code complexity and behavior differences,
|
||||
// though, since this is essentially a misconfiguration.
|
||||
|
||||
// We have to do this ourselves since we are not doing a blocking RPC.
|
||||
p.srv.setQueryMeta(&reply.QueryMeta, token)
|
||||
// We have to do this ourselves since we are not doing a blocking RPC.
|
||||
p.srv.setQueryMeta(&reply.QueryMeta, token)
|
||||
|
||||
// Shuffle the results in case coordinates are not available if they
|
||||
// requested an RTT sort.
|
||||
reply.Nodes.Shuffle()
|
||||
// Shuffle the results in case coordinates are not available if they
|
||||
// requested an RTT sort.
|
||||
reply.Nodes.Shuffle()
|
||||
|
||||
// Build the query source. This can be provided by the client, or by
|
||||
// the prepared query. Client-specified takes priority.
|
||||
qs := args.Source
|
||||
if qs.Datacenter == "" {
|
||||
qs.Datacenter = args.Agent.Datacenter
|
||||
}
|
||||
if query.Service.Near != "" && qs.Node == "" {
|
||||
qs.Node = query.Service.Near
|
||||
}
|
||||
// Build the query source. This can be provided by the client, or by
|
||||
// the prepared query. Client-specified takes priority.
|
||||
qs := args.Source
|
||||
if qs.Datacenter == "" {
|
||||
qs.Datacenter = args.Agent.Datacenter
|
||||
}
|
||||
if query.Service.Near != "" && qs.Node == "" {
|
||||
qs.Node = query.Service.Near
|
||||
}
|
||||
|
||||
// Respect the magic "_agent" flag.
|
||||
if qs.Node == "_agent" {
|
||||
qs.Node = args.Agent.Node
|
||||
} else if qs.Node == "_ip" {
|
||||
if args.Source.Ip != "" {
|
||||
_, nodes, err := state.Nodes(nil, structs.NodeEnterpriseMetaInDefaultPartition(), structs.TODOPeerKeyword)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
if args.Source.Ip == node.Address {
|
||||
qs.Node = node.Node
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
p.logger.Warn("Prepared Query using near=_ip requires " +
|
||||
"the source IP to be set but none was provided. No distance " +
|
||||
"sorting will be done.")
|
||||
|
||||
// Respect the magic "_agent" flag.
|
||||
if qs.Node == "_agent" {
|
||||
qs.Node = args.Agent.Node
|
||||
} else if qs.Node == "_ip" {
|
||||
if args.Source.Ip != "" {
|
||||
_, nodes, err := state.Nodes(nil, structs.NodeEnterpriseMetaInDefaultPartition(), structs.TODOPeerKeyword)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
if args.Source.Ip == node.Address {
|
||||
qs.Node = node.Node
|
||||
// Either a source IP was given, but we couldn't find the associated node
|
||||
// or no source ip was given. In both cases we should wipe the Node value
|
||||
if qs.Node == "_ip" {
|
||||
qs.Node = ""
|
||||
}
|
||||
}
|
||||
|
||||
// Perform the distance sort
|
||||
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If we applied a distance sort, make sure that the node queried for is in
|
||||
// position 0, provided the results are from the same datacenter.
|
||||
if qs.Node != "" && reply.Datacenter == qs.Datacenter {
|
||||
for i, node := range reply.Nodes {
|
||||
if strings.EqualFold(node.Node.Node, qs.Node) {
|
||||
reply.Nodes[0], reply.Nodes[i] = reply.Nodes[i], reply.Nodes[0]
|
||||
break
|
||||
}
|
||||
|
||||
// Put a cap on the depth of the search. The local agent should
|
||||
// never be further in than this if distance sorting was applied.
|
||||
if i == 9 {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
p.logger.Warn("Prepared Query using near=_ip requires " +
|
||||
"the source IP to be set but none was provided. No distance " +
|
||||
"sorting will be done.")
|
||||
|
||||
}
|
||||
|
||||
// Either a source IP was given but we couldnt find the associated node
|
||||
// or no source ip was given. In both cases we should wipe the Node value
|
||||
if qs.Node == "_ip" {
|
||||
qs.Node = ""
|
||||
// Apply the limit if given.
|
||||
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
||||
reply.Nodes = reply.Nodes[:args.Limit]
|
||||
}
|
||||
}
|
||||
|
||||
// Perform the distance sort
|
||||
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If we applied a distance sort, make sure that the node queried for is in
|
||||
// position 0, provided the results are from the same datacenter.
|
||||
if qs.Node != "" && reply.Datacenter == qs.Datacenter {
|
||||
for i, node := range reply.Nodes {
|
||||
if strings.EqualFold(node.Node.Node, qs.Node) {
|
||||
reply.Nodes[0], reply.Nodes[i] = reply.Nodes[i], reply.Nodes[0]
|
||||
break
|
||||
// In the happy path where we found some healthy nodes we go with that
|
||||
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
|
||||
// by the query setup.
|
||||
if len(reply.Nodes) == 0 {
|
||||
wrapper := newQueryServerWrapper(p.srv, p.ExecuteRemote)
|
||||
if err := queryFailover(wrapper, *query, args, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Put a cap on the depth of the search. The local agent should
|
||||
// never be further in than this if distance sorting was applied.
|
||||
if i == 9 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply the limit if given.
|
||||
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
||||
reply.Nodes = reply.Nodes[:args.Limit]
|
||||
}
|
||||
|
||||
// In the happy path where we found some healthy nodes we go with that
|
||||
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
|
||||
// by the query setup.
|
||||
if len(reply.Nodes) == 0 {
|
||||
wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
|
||||
if err := queryFailover(wrapper, *query, args, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -660,20 +673,38 @@ func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNode
|
|||
return filtered
|
||||
}
|
||||
|
||||
type stateLookuper interface {
|
||||
samenessGroupLookup(name string, entMeta acl.EnterpriseMeta) (uint64, *structs.SamenessGroupConfigEntry, error)
|
||||
}
|
||||
|
||||
type stateLookup struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// queryServer is a wrapper that makes it easier to test the failover logic.
|
||||
type queryServer interface {
|
||||
GetLogger() hclog.Logger
|
||||
GetOtherDatacentersByDistance() ([]string, error)
|
||||
GetLocalDC() string
|
||||
ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
|
||||
GetSamenessGroupFailoverTargets(name string, entMeta acl.EnterpriseMeta) ([]structs.QueryFailoverTarget, error)
|
||||
}
|
||||
|
||||
// queryServerWrapper applies the queryServer interface to a Server.
|
||||
type queryServerWrapper struct {
|
||||
srv *Server
|
||||
sl stateLookuper
|
||||
executeRemote func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
|
||||
}
|
||||
|
||||
func newQueryServerWrapper(srv *Server, executeRemote func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error) *queryServerWrapper {
|
||||
return &queryServerWrapper{
|
||||
srv: srv,
|
||||
executeRemote: executeRemote,
|
||||
sl: stateLookup{srv},
|
||||
}
|
||||
}
|
||||
|
||||
// GetLocalDC returns the name of the local datacenter.
|
||||
func (q *queryServerWrapper) GetLocalDC() string {
|
||||
return q.srv.config.Datacenter
|
||||
|
@ -771,43 +802,8 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
|
|||
// This keeps track of how many iterations we actually run.
|
||||
failovers++
|
||||
|
||||
// Be super paranoid and set the nodes slice to nil since it's
|
||||
// the same slice we used before. We know there's nothing in
|
||||
// there, but the underlying msgpack library has a policy of
|
||||
// updating the slice when it's non-nil, and that feels dirty.
|
||||
// Let's just set it to nil so there's no way to communicate
|
||||
// through this slice across successive RPC calls.
|
||||
reply.Nodes = nil
|
||||
|
||||
// Reset Peer, because it may have been set by a previous failover
|
||||
// target.
|
||||
query.Service.Peer = target.Peer
|
||||
query.Service.EnterpriseMeta = target.EnterpriseMeta
|
||||
dc := target.Datacenter
|
||||
if target.Peer != "" {
|
||||
dc = q.GetLocalDC()
|
||||
}
|
||||
|
||||
// Note that we pass along the limit since may be applied
|
||||
// remotely to save bandwidth. We also pass along the consistency
|
||||
// mode information and token we were given, so that applies to
|
||||
// the remote query as well.
|
||||
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
||||
Datacenter: dc,
|
||||
Query: query,
|
||||
Limit: args.Limit,
|
||||
QueryOptions: args.QueryOptions,
|
||||
Connect: args.Connect,
|
||||
}
|
||||
|
||||
if err = q.ExecuteRemote(remote, reply); err != nil {
|
||||
q.GetLogger().Warn("Failed querying for service in datacenter",
|
||||
"service", query.Service.Service,
|
||||
"peerName", query.Service.Peer,
|
||||
"datacenter", dc,
|
||||
"enterpriseMeta", query.Service.EnterpriseMeta,
|
||||
"error", err,
|
||||
)
|
||||
err = targetSelector(q, query, args, target, reply)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -823,3 +819,52 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func targetSelector(q queryServer,
|
||||
query structs.PreparedQuery,
|
||||
args *structs.PreparedQueryExecuteRequest,
|
||||
target structs.QueryFailoverTarget,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
// Be super paranoid and set the nodes slice to nil since it's
|
||||
// the same slice we used before. We know there's nothing in
|
||||
// there, but the underlying msgpack library has a policy of
|
||||
// updating the slice when it's non-nil, and that feels dirty.
|
||||
// Let's just set it to nil so there's no way to communicate
|
||||
// through this slice across successive RPC calls.
|
||||
reply.Nodes = nil
|
||||
|
||||
// Reset Peer, because it may have been set by a previous failover
|
||||
// target.
|
||||
query.Service.Peer = target.Peer
|
||||
query.Service.EnterpriseMeta = target.EnterpriseMeta
|
||||
dc := target.Datacenter
|
||||
if target.Peer != "" {
|
||||
dc = q.GetLocalDC()
|
||||
}
|
||||
|
||||
// Note that we pass along the limit since may be applied
|
||||
// remotely to save bandwidth. We also pass along the consistency
|
||||
// mode information and token we were given, so that applies to
|
||||
// the remote query as well.
|
||||
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
||||
Datacenter: dc,
|
||||
Query: query,
|
||||
Limit: args.Limit,
|
||||
QueryOptions: args.QueryOptions,
|
||||
Connect: args.Connect,
|
||||
}
|
||||
|
||||
var err error
|
||||
if err = q.ExecuteRemote(remote, reply); err != nil {
|
||||
q.GetLogger().Warn("Failed querying for service in datacenter",
|
||||
"service", query.Service.Service,
|
||||
"peerName", query.Service.Peer,
|
||||
"datacenter", dc,
|
||||
"enterpriseMeta", query.Service.EnterpriseMeta,
|
||||
"error", err,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func parseSameness(svc *structs.ServiceQuery) error {
|
||||
if svc.SamenessGroup != "" {
|
||||
return fmt.Errorf("sameness-groups are an enterprise-only feature")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sl stateLookup) samenessGroupLookup(_ string, _ acl.EnterpriseMeta) (uint64, *structs.SamenessGroupConfigEntry, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
// GetSamenessGroupFailoverTargets supports Sameness Groups an enterprise only feature. This satisfies the queryServer interface
|
||||
func (q *queryServerWrapper) GetSamenessGroupFailoverTargets(_ string, _ acl.EnterpriseMeta) ([]structs.QueryFailoverTarget, error) {
|
||||
return []structs.QueryFailoverTarget{}, nil
|
||||
}
|
||||
|
||||
func querySameness(_ queryServer,
|
||||
_ structs.PreparedQuery,
|
||||
_ *structs.PreparedQueryExecuteRequest,
|
||||
_ *structs.PreparedQueryExecuteResponse) error {
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package consul
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPreparedQuery_OSS_Apply(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Set up a bare bones query.
|
||||
query := structs.PreparedQueryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.PreparedQueryCreate,
|
||||
Query: &structs.PreparedQuery{
|
||||
Name: "test",
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "redis",
|
||||
},
|
||||
},
|
||||
}
|
||||
var reply string
|
||||
|
||||
// Fix that and ensure Targets and Datacenters cannot be set at the same time.
|
||||
query.Query.Service.SamenessGroup = "sg"
|
||||
err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "enterprise")
|
||||
}
|
|
@ -6,6 +6,7 @@ package consul
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -37,6 +38,8 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
const localTestDC = "dc1"
|
||||
|
||||
func TestPreparedQuery_Apply(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
@ -2814,13 +2817,17 @@ func TestPreparedQuery_Wrapper(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
var _ queryServer = (*mockQueryServer)(nil)
|
||||
|
||||
type mockQueryServer struct {
|
||||
queryServerWrapper
|
||||
Datacenters []string
|
||||
DatacentersError error
|
||||
QueryLog []string
|
||||
QueryFn func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
|
||||
Logger hclog.Logger
|
||||
LogBuffer *bytes.Buffer
|
||||
SamenessGroup map[string]*structs.SamenessGroupConfigEntry
|
||||
}
|
||||
|
||||
func (m *mockQueryServer) JoinQueryLog() string {
|
||||
|
@ -2841,7 +2848,7 @@ func (m *mockQueryServer) GetLogger() hclog.Logger {
|
|||
}
|
||||
|
||||
func (m *mockQueryServer) GetLocalDC() string {
|
||||
return "dc1"
|
||||
return localTestDC
|
||||
}
|
||||
|
||||
func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) {
|
||||
|
@ -2850,14 +2857,21 @@ func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) {
|
|||
|
||||
func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||
peerName := args.Query.Service.Peer
|
||||
partitionName := args.Query.Service.PartitionOrEmpty()
|
||||
namespaceName := args.Query.Service.NamespaceOrEmpty()
|
||||
dc := args.Datacenter
|
||||
if peerName != "" {
|
||||
m.QueryLog = append(m.QueryLog, fmt.Sprintf("peer:%s", peerName))
|
||||
} else if partitionName != "" {
|
||||
m.QueryLog = append(m.QueryLog, fmt.Sprintf("partition:%s", partitionName))
|
||||
} else if namespaceName != "" {
|
||||
m.QueryLog = append(m.QueryLog, fmt.Sprintf("namespace:%s", namespaceName))
|
||||
} else {
|
||||
m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, "PreparedQuery.ExecuteRemote"))
|
||||
}
|
||||
reply.PeerName = peerName
|
||||
reply.Datacenter = dc
|
||||
reply.EnterpriseMeta = acl.NewEnterpriseMetaWithPartition(partitionName, namespaceName)
|
||||
|
||||
if m.QueryFn != nil {
|
||||
return m.QueryFn(args, reply)
|
||||
|
@ -2865,6 +2879,33 @@ func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemote
|
|||
return nil
|
||||
}
|
||||
|
||||
type mockStateLookup struct {
|
||||
SamenessGroup map[string]*structs.SamenessGroupConfigEntry
|
||||
}
|
||||
|
||||
func (sl mockStateLookup) samenessGroupLookup(name string, entMeta acl.EnterpriseMeta) (uint64, *structs.SamenessGroupConfigEntry, error) {
|
||||
lookup := name
|
||||
if ap := entMeta.PartitionOrEmpty(); ap != "" {
|
||||
lookup = fmt.Sprintf("%s-%s", lookup, ap)
|
||||
} else if ns := entMeta.NamespaceOrEmpty(); ns != "" {
|
||||
lookup = fmt.Sprintf("%s-%s", lookup, ns)
|
||||
}
|
||||
|
||||
sg, ok := sl.SamenessGroup[lookup]
|
||||
if !ok {
|
||||
return 0, nil, errors.New("unable to find sameness group")
|
||||
}
|
||||
|
||||
return 0, sg, nil
|
||||
}
|
||||
|
||||
func (m *mockQueryServer) GetSamenessGroupFailoverTargets(name string, entMeta acl.EnterpriseMeta) ([]structs.QueryFailoverTarget, error) {
|
||||
m.sl = mockStateLookup{
|
||||
SamenessGroup: m.SamenessGroup,
|
||||
}
|
||||
return m.queryServerWrapper.GetSamenessGroupFailoverTargets(name, entMeta)
|
||||
}
|
||||
|
||||
func TestPreparedQuery_queryFailover(t *testing.T) {
|
||||
t.Parallel()
|
||||
query := structs.PreparedQuery{
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/configentry"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// GetSamenessGroup returns a SamenessGroupConfigEntry from the state
|
||||
// store using the provided parameters.
|
||||
func (s *Store) GetSamenessGroup(ws memdb.WatchSet,
|
||||
name string,
|
||||
overrides map[configentry.KindName]structs.ConfigEntry,
|
||||
partition string) (uint64, *structs.SamenessGroupConfigEntry, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
return getSamenessGroupConfigEntryTxn(tx, ws, name, overrides, partition)
|
||||
}
|
|
@ -8,20 +8,27 @@ package structs
|
|||
|
||||
import "fmt"
|
||||
|
||||
// Validate assures that the sameness-groups are an enterprise only feature
|
||||
func (s *SamenessGroupConfigEntry) Validate() error {
|
||||
return fmt.Errorf("sameness-groups are an enterprise-only feature")
|
||||
}
|
||||
|
||||
// RelatedPeers returns all peers that are members of a sameness group config entry.
|
||||
// RelatedPeers is an OSS placeholder noop
|
||||
func (s *SamenessGroupConfigEntry) RelatedPeers() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AllMembers adds the local partition to Members when it is set.
|
||||
// AllMembers is an OSS placeholder noop
|
||||
func (s *SamenessGroupConfigEntry) AllMembers() []SamenessGroupMember {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SamenessGroupConfigEntry) ToFailoverTargets() []ServiceResolverFailoverTarget {
|
||||
// ToServiceResolverFailoverTargets is an OSS placeholder noop
|
||||
func (s *SamenessGroupConfigEntry) ToServiceResolverFailoverTargets() []ServiceResolverFailoverTarget {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ToQueryFailoverTargets is an OSS placeholder noop
|
||||
func (s *SamenessGroupConfigEntry) ToQueryFailoverTargets(namespace string) []QueryFailoverTarget {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -44,6 +44,14 @@ func (f *QueryFailoverOptions) AsTargets() []QueryFailoverTarget {
|
|||
return f.Targets
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the QueryFailoverOptions are empty (not set), false otherwise
|
||||
func (f *QueryFailoverOptions) IsEmpty() bool {
|
||||
if f == nil || (f.NearestN == 0 && len(f.Datacenters) == 0 && len(f.Targets) == 0) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type QueryFailoverTarget struct {
|
||||
// Peer specifies a peer to try during failover.
|
||||
Peer string
|
||||
|
@ -66,6 +74,11 @@ type ServiceQuery struct {
|
|||
// Service is the service to query.
|
||||
Service string
|
||||
|
||||
// SamenessGroup specifies a sameness group to query. The first member of the Sameness Group will
|
||||
// be targeted first on PQ execution and subsequent members will be targeted during failover scenarios.
|
||||
// This field is mutually exclusive with Failover.
|
||||
SamenessGroup string
|
||||
|
||||
// Failover controls what we do if there are no healthy nodes in the
|
||||
// local datacenter.
|
||||
Failover QueryFailoverOptions
|
||||
|
|
|
@ -5,6 +5,8 @@ package structs
|
|||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestStructs_PreparedQuery_GetACLPrefix(t *testing.T) {
|
||||
|
@ -36,3 +38,54 @@ func TestPreparedQueryExecuteRequest_CacheInfoKey(t *testing.T) {
|
|||
ignored := []string{"Agent", "QueryOptions"}
|
||||
assertCacheInfoKeyIsComplete(t, &PreparedQueryExecuteRequest{}, ignored...)
|
||||
}
|
||||
|
||||
func TestQueryFailoverOptions_IsEmpty(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
query QueryFailoverOptions
|
||||
isExpectedEmpty bool
|
||||
}{
|
||||
{
|
||||
name: "expect empty",
|
||||
query: QueryFailoverOptions{},
|
||||
isExpectedEmpty: true,
|
||||
},
|
||||
{
|
||||
name: "expect not empty NearestN",
|
||||
query: QueryFailoverOptions{
|
||||
NearestN: 1,
|
||||
},
|
||||
isExpectedEmpty: false,
|
||||
},
|
||||
{
|
||||
name: "expect not empty NearestN negative",
|
||||
query: QueryFailoverOptions{
|
||||
NearestN: -1,
|
||||
},
|
||||
isExpectedEmpty: false,
|
||||
},
|
||||
{
|
||||
name: "expect not empty datacenters",
|
||||
query: QueryFailoverOptions{
|
||||
Datacenters: []string{"dc"},
|
||||
},
|
||||
isExpectedEmpty: false,
|
||||
},
|
||||
{
|
||||
name: "expect not empty targets",
|
||||
query: QueryFailoverOptions{
|
||||
Targets: []QueryFailoverTarget{
|
||||
{
|
||||
Peer: "peer",
|
||||
},
|
||||
},
|
||||
},
|
||||
isExpectedEmpty: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
assert.Equal(t, tt.isExpectedEmpty, tt.query.IsEmpty())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,9 +51,17 @@ type ServiceQuery struct {
|
|||
// Service is the service to query.
|
||||
Service string
|
||||
|
||||
// SamenessGroup specifies a sameness group to query. The first member of the Sameness Group will
|
||||
// be targeted first on PQ execution and subsequent members will be targeted during failover scenarios.
|
||||
// This field is mutually exclusive with Failover.
|
||||
SamenessGroup string `json:",omitempty"`
|
||||
|
||||
// Namespace of the service to query
|
||||
Namespace string `json:",omitempty"`
|
||||
|
||||
// Partition of the service to query
|
||||
Partition string `json:",omitempty"`
|
||||
|
||||
// Near allows baking in the name of a node to automatically distance-
|
||||
// sort from. The magic "_agent" value is supported, which sorts near
|
||||
// the agent which initiated the request by default.
|
||||
|
@ -61,7 +69,7 @@ type ServiceQuery struct {
|
|||
|
||||
// Failover controls what we do if there are no healthy nodes in the
|
||||
// local datacenter.
|
||||
Failover QueryFailoverOptions
|
||||
Failover QueryFailoverOptions `json:",omitempty"`
|
||||
|
||||
// IgnoreCheckIDs is an optional list of health check IDs to ignore when
|
||||
// considering which nodes are healthy. It is useful as an emergency measure
|
||||
|
|
|
@ -177,13 +177,22 @@ The table below shows this endpoint's support for
|
|||
- `Service` `(string: <required>)` - Specifies the name of the service to
|
||||
query.
|
||||
|
||||
- `Namespace` `(string: "")` <EnterpriseAlert inline /> - Specifies the Consul namespace
|
||||
to query. If not provided the query will use Consul default namespace for resolution.
|
||||
- `SamenessGroup` `(string: "")` <EnterpriseAlert inline /> - Specifies a Sameness group to forward the
|
||||
query to. The `SamenessGroup` will forward to its members in the order defined, returning on the first
|
||||
healthy query. `SamenessGroup` is mutually exclusive with `Failover` as `SamenessGroup` members will be used
|
||||
in place of a defined list of failovers.
|
||||
|
||||
- `Failover` contains two fields, both of which are optional, and determine
|
||||
what happens if no healthy nodes are available in the local datacenter when
|
||||
- `Namespace` `(string: "")` <EnterpriseAlert inline /> - Specifies the Consul namespace
|
||||
to query. If not provided the query will use Consul default namespace for resolution. When combined with
|
||||
`SamenessGroup` this will specify the namespaces in which the `SamenessGroup` will resolve all members listed.
|
||||
|
||||
- `Partition` `(string: "")` <EnterpriseAlert inline /> - Specifies the Consul partition
|
||||
to query. If not provided the query will use Consul's default partition for resolution. When combined with
|
||||
`SamenessGroup`, this will specify the partition where the `SamenessGroup` exists.
|
||||
|
||||
- `Failover` Determines what happens if no healthy nodes are available in the local datacenter when
|
||||
the query is executed. It allows the use of nodes in other datacenters with
|
||||
very little configuration.
|
||||
very little configuration. This field is mutually exclusive with `SamenessGroup`.
|
||||
|
||||
- `NearestN` `(int: 0)` - Specifies that the query will be forwarded to up
|
||||
to `NearestN` other datacenters based on their estimated network round
|
||||
|
|
Loading…
Reference in New Issue