connect: expose an API endpoint to compile the discovery chain (#6248)

In addition to exposing compilation over the API cleaned up the structures that would be exchanged to be cleaner and easier to support and understand.

Also removed ability to configure the envoy OverprovisioningFactor.
This commit is contained in:
R.B. Boyer 2019-08-02 15:34:54 -05:00 committed by GitHub
parent 510b1271bc
commit 0165e93517
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1613 additions and 1140 deletions

View File

@ -38,7 +38,7 @@ func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Reques
// Fetch
var reply structs.DiscoveryChainResponse
if err := c.RPC.RPC("ConfigEntry.ReadDiscoveryChain", reqReal, &reply); err != nil {
if err := c.RPC.RPC("DiscoveryChain.Get", reqReal, &reply); err != nil {
return result, err
}

View File

@ -16,13 +16,12 @@ func TestCompiledDiscoveryChain(t *testing.T) {
typ := &CompiledDiscoveryChain{RPC: rpc}
// just do the default chain
entries := structs.NewDiscoveryChainConfigEntries()
chain := discoverychain.TestCompileConfigEntries(t, "web", "default", "dc1", nil)
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.DiscoveryChainResponse
rpc.On("RPC", "ConfigEntry.ReadDiscoveryChain", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", "DiscoveryChain.Get", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DiscoveryChainRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
@ -30,7 +29,6 @@ func TestCompiledDiscoveryChain(t *testing.T) {
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.DiscoveryChainResponse)
reply.ConfigEntries = entries
reply.Chain = chain
reply.QueryMeta.Index = 48
resp = reply

View File

@ -5255,6 +5255,22 @@ func upsertTestToken(codec rpc.ClientCodec, masterToken string, datacenter strin
return &out, nil
}
func upsertTestTokenWithPolicyRules(codec rpc.ClientCodec, masterToken string, datacenter string, rules string) (*structs.ACLToken, error) {
policy, err := upsertTestPolicyWithRules(codec, masterToken, datacenter, rules)
if err != nil {
return nil, err
}
token, err := upsertTestToken(codec, masterToken, datacenter, func(token *structs.ACLToken) {
token.Policies = []structs.ACLTokenPolicyLink{{ID: policy.ID}}
})
if err != nil {
return nil, err
}
return token, nil
}
func retrieveTestTokenAccessorForSecret(codec rpc.ClientCodec, masterToken string, datacenter string, id string) (string, error) {
arg := structs.ACLTokenGetRequest{
TokenID: "root",
@ -5312,6 +5328,10 @@ func deleteTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
// upsertTestPolicy creates a policy for testing purposes
func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string) (*structs.ACLPolicy, error) {
return upsertTestPolicyWithRules(codec, masterToken, datacenter, "")
}
func upsertTestPolicyWithRules(codec rpc.ClientCodec, masterToken string, datacenter string, rules string) (*structs.ACLPolicy, error) {
// Make sure test policies can't collide
policyUnq, err := uuid.GenerateUUID()
if err != nil {
@ -5321,7 +5341,8 @@ func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
arg := structs.ACLPolicySetRequest{
Datacenter: datacenter,
Policy: structs.ACLPolicy{
Name: fmt.Sprintf("test-policy-%s", policyUnq),
Name: fmt.Sprintf("test-policy-%s", policyUnq),
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: masterToken},
}

View File

@ -6,7 +6,6 @@ import (
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
@ -313,64 +312,3 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
return nil
})
}
func (c *ConfigEntry) ReadDiscoveryChain(args *structs.DiscoveryChainRequest, reply *structs.DiscoveryChainResponse) error {
if done, err := c.srv.forward("ConfigEntry.ReadDiscoveryChain", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "read_discovery_chain"}, time.Now())
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceRead(args.Name) {
return acl.ErrPermissionDenied
}
if args.Name == "" {
return fmt.Errorf("Must provide service name")
}
evalDC := args.EvaluateInDatacenter
if evalDC == "" {
evalDC = c.srv.config.Datacenter
}
evalNS := args.EvaluateInNamespace
if evalNS == "" {
// TODO(namespaces) pull from something else?
evalNS = "default"
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name)
if err != nil {
return err
}
// Then we compile it into something useful.
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
ServiceName: args.Name,
CurrentNamespace: evalNS,
CurrentDatacenter: evalDC,
OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout,
Entries: entries,
})
if err != nil {
return err
}
reply.Index = index
reply.ConfigEntries = entries
reply.Chain = chain
return nil
})
}

View File

@ -0,0 +1,77 @@
package consul
import (
"fmt"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
type DiscoveryChain struct {
srv *Server
}
func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs.DiscoveryChainResponse) error {
if done, err := c.srv.forward("DiscoveryChain.Get", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"discovery_chain", "get"}, time.Now())
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceRead(args.Name) {
return acl.ErrPermissionDenied
}
if args.Name == "" {
return fmt.Errorf("Must provide service name")
}
evalDC := args.EvaluateInDatacenter
if evalDC == "" {
evalDC = c.srv.config.Datacenter
}
evalNS := args.EvaluateInNamespace
if evalNS == "" {
// TODO(namespaces) pull from something else?
evalNS = "default"
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name)
if err != nil {
return err
}
// Then we compile it into something useful.
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
ServiceName: args.Name,
CurrentNamespace: evalNS,
CurrentDatacenter: evalDC,
OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout,
Entries: entries,
})
if err != nil {
return err
}
reply.Index = index
reply.Chain = chain
return nil
})
}

View File

@ -0,0 +1,196 @@
package consul
import (
"fmt"
"os"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
)
func TestDiscoveryChainEndpoint_Get(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc1")
denyToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", "")
require.NoError(t, err)
allowToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `service "web" { policy = "read" }`)
require.NoError(t, err)
getChain := func(args *structs.DiscoveryChainRequest) (*structs.DiscoveryChainResponse, error) {
resp := structs.DiscoveryChainResponse{}
err := msgpackrpc.CallWithCodec(codec, "DiscoveryChain.Get", &args, &resp)
if err != nil {
return nil, err
}
// clear fields that we don't care about
resp.QueryMeta = structs.QueryMeta{}
return &resp, nil
}
// ==== compiling the default chain (no config entries)
{ // no token
_, err := getChain(&structs.DiscoveryChainRequest{
Name: "web",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
})
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
}
{ // wrong token
_, err := getChain(&structs.DiscoveryChainRequest{
Name: "web",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: denyToken.SecretID},
})
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
}
expectDefaultResponse_DC1_Default := &structs.DiscoveryChainResponse{
Chain: &structs.CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc1",
Protocol: "tcp",
StartNode: "resolver:web.default.dc1",
Nodes: map[string]*structs.DiscoveryGraphNode{
"resolver:web.default.dc1": &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc1",
Resolver: &structs.DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "web.default.dc1",
},
},
},
Targets: map[string]*structs.DiscoveryTarget{
"web.default.dc1": structs.NewDiscoveryTarget("web", "", "default", "dc1"),
},
},
}
// various ways with good token
for _, tc := range []struct {
evalDC string
evalNS string
expect *structs.DiscoveryChainResponse
}{
{
evalDC: "dc1",
evalNS: "default",
expect: expectDefaultResponse_DC1_Default,
},
{
evalDC: "",
evalNS: "default",
expect: expectDefaultResponse_DC1_Default,
},
{
evalDC: "dc1",
evalNS: "",
expect: expectDefaultResponse_DC1_Default,
},
{
evalDC: "",
evalNS: "",
expect: expectDefaultResponse_DC1_Default,
},
} {
tc := tc
name := fmt.Sprintf("dc=%q ns=%q", tc.evalDC, tc.evalNS)
require.True(t, t.Run(name, func(t *testing.T) {
resp, err := getChain(&structs.DiscoveryChainRequest{
Name: "web",
EvaluateInDatacenter: tc.evalDC,
EvaluateInNamespace: tc.evalNS,
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: allowToken.SecretID},
})
require.NoError(t, err)
require.Equal(t, tc.expect, resp)
}))
}
{ // Now create one config entry.
out := false
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply",
&structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
ConnectTimeout: 33 * time.Second,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}, &out))
require.True(t, out)
}
// ==== compiling a chain with config entries
{ // good token
resp, err := getChain(&structs.DiscoveryChainRequest{
Name: "web",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: allowToken.SecretID},
})
require.NoError(t, err)
expect := &structs.DiscoveryChainResponse{
Chain: &structs.CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc1",
Protocol: "tcp",
StartNode: "resolver:web.default.dc1",
Nodes: map[string]*structs.DiscoveryGraphNode{
"resolver:web.default.dc1": &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc1",
Resolver: &structs.DiscoveryResolver{
ConnectTimeout: 33 * time.Second,
Target: "web.default.dc1",
},
},
},
Targets: map[string]*structs.DiscoveryTarget{
"web.default.dc1": structs.NewDiscoveryTarget("web", "", "default", "dc1"),
},
},
}
require.Equal(t, expect, resp)
}
}

View File

@ -81,11 +81,11 @@ func Compile(req CompileRequest) (*structs.CompiledDiscoveryChain, error) {
resolvers: make(map[string]*structs.ServiceResolverConfigEntry),
splitterNodes: make(map[string]*structs.DiscoveryGraphNode),
resolveNodes: make(map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode),
resolveNodes: make(map[string]*structs.DiscoveryGraphNode),
nodes: make(map[string]*structs.DiscoveryGraphNode),
targets: make(map[structs.DiscoveryTarget]structs.DiscoveryTargetConfig),
nodes: make(map[string]*structs.DiscoveryGraphNode),
loadedTargets: make(map[string]*structs.DiscoveryTarget),
retainedTargets: make(map[string]struct{}),
}
if req.OverrideProtocol != "" {
@ -123,7 +123,7 @@ type compiler struct {
// cached nodes
splitterNodes map[string]*structs.DiscoveryGraphNode
resolveNodes map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode
resolveNodes map[string]*structs.DiscoveryGraphNode
// usesAdvancedRoutingFeatures is set to true if config entries for routing
// or splitting appear in the compiled chain
@ -155,7 +155,8 @@ type compiler struct {
nodes map[string]*structs.DiscoveryGraphNode
// This is an OUTPUT field.
targets map[structs.DiscoveryTarget]structs.DiscoveryTargetConfig
loadedTargets map[string]*structs.DiscoveryTarget
retainedTargets map[string]struct{}
}
type customizationMarkers struct {
@ -175,7 +176,7 @@ func (c *compiler) recordNode(node *structs.DiscoveryGraphNode) {
case structs.DiscoveryGraphNodeTypeRouter:
// no special storage
case structs.DiscoveryGraphNodeTypeSplitter:
c.splitterNodes[node.ServiceName()] = node
c.splitterNodes[node.Name] = node
case structs.DiscoveryGraphNodeTypeResolver:
c.resolveNodes[node.Resolver.Target] = node
default:
@ -249,6 +250,12 @@ func (c *compiler) compile() (*structs.CompiledDiscoveryChain, error) {
return nil, err
}
for targetID, _ := range c.loadedTargets {
if _, ok := c.retainedTargets[targetID]; !ok {
delete(c.loadedTargets, targetID)
}
}
if !enableAdvancedRoutingForProtocol(c.protocol) && c.usesAdvancedRoutingFeatures {
return nil, &structs.ConfigEntryGraphError{
Message: fmt.Sprintf(
@ -297,7 +304,7 @@ func (c *compiler) compile() (*structs.CompiledDiscoveryChain, error) {
Protocol: c.protocol,
StartNode: c.startNode,
Nodes: c.nodes,
Targets: c.targets,
Targets: c.loadedTargets,
}, nil
}
@ -575,19 +582,53 @@ func newDefaultServiceRoute(serviceName string) *structs.ServiceRoute {
}
}
func (c *compiler) newTarget(service, serviceSubset, namespace, datacenter string) structs.DiscoveryTarget {
func (c *compiler) newTarget(service, serviceSubset, namespace, datacenter string) *structs.DiscoveryTarget {
if service == "" {
panic("newTarget called with empty service which makes no sense")
}
return structs.DiscoveryTarget{
Service: service,
ServiceSubset: serviceSubset,
Namespace: defaultIfEmpty(namespace, c.currentNamespace),
Datacenter: defaultIfEmpty(datacenter, c.currentDatacenter),
t := structs.NewDiscoveryTarget(
service,
serviceSubset,
defaultIfEmpty(namespace, c.currentNamespace),
defaultIfEmpty(datacenter, c.currentDatacenter),
)
prev, ok := c.loadedTargets[t.ID]
if ok {
return prev
}
c.loadedTargets[t.ID] = t
return t
}
func (c *compiler) getSplitterOrResolverNode(target structs.DiscoveryTarget) (*structs.DiscoveryGraphNode, error) {
func (c *compiler) rewriteTarget(t *structs.DiscoveryTarget, service, serviceSubset, namespace, datacenter string) *structs.DiscoveryTarget {
var (
service2 = t.Service
serviceSubset2 = t.ServiceSubset
namespace2 = t.Namespace
datacenter2 = t.Datacenter
)
if service != "" && service != service2 {
service2 = service
// Reset the chosen subset if we reference a service other than our own.
serviceSubset2 = ""
}
if serviceSubset != "" {
serviceSubset2 = serviceSubset
}
if namespace != "" {
namespace2 = namespace
}
if datacenter != "" {
datacenter2 = datacenter
}
return c.newTarget(service2, serviceSubset2, namespace2, datacenter2)
}
func (c *compiler) getSplitterOrResolverNode(target *structs.DiscoveryTarget) (*structs.DiscoveryGraphNode, error) {
nextNode, err := c.getSplitterNode(target.Service)
if err != nil {
return nil, err
@ -660,17 +701,17 @@ func (c *compiler) getSplitterNode(name string) (*structs.DiscoveryGraphNode, er
// getResolverNode handles most of the code to handle redirection/rewriting
// capabilities from a resolver config entry. It recurses into itself to
// _generate_ targets used for failover out of convenience.
func (c *compiler) getResolverNode(target structs.DiscoveryTarget, recursedForFailover bool) (*structs.DiscoveryGraphNode, error) {
func (c *compiler) getResolverNode(target *structs.DiscoveryTarget, recursedForFailover bool) (*structs.DiscoveryGraphNode, error) {
var (
// State to help detect redirect cycles and print helpful error
// messages.
redirectHistory = make(map[structs.DiscoveryTarget]struct{})
redirectOrder []structs.DiscoveryTarget
redirectHistory = make(map[string]struct{})
redirectOrder []string
)
RESOLVE_AGAIN:
// Do we already have the node?
if prev, ok := c.resolveNodes[target]; ok {
if prev, ok := c.resolveNodes[target.ID]; ok {
return prev, nil
}
@ -686,23 +727,18 @@ RESOLVE_AGAIN:
c.resolvers[target.Service] = resolver
}
if _, ok := redirectHistory[target]; ok {
redirectOrder = append(redirectOrder, target)
pretty := make([]string, len(redirectOrder))
for i, target := range redirectOrder {
pretty[i] = target.String()
}
if _, ok := redirectHistory[target.ID]; ok {
redirectOrder = append(redirectOrder, target.ID)
return nil, &structs.ConfigEntryGraphError{
Message: fmt.Sprintf(
"detected circular resolver redirect: [%s]",
strings.Join(pretty, " -> "),
strings.Join(redirectOrder, " -> "),
),
}
}
redirectHistory[target] = struct{}{}
redirectOrder = append(redirectOrder, target)
redirectHistory[target.ID] = struct{}{}
redirectOrder = append(redirectOrder, target.ID)
// Handle redirects right up front.
//
@ -710,13 +746,14 @@ RESOLVE_AGAIN:
if resolver.Redirect != nil {
redirect := resolver.Redirect
redirectedTarget := target.CopyAndModify(
redirectedTarget := c.rewriteTarget(
target,
redirect.Service,
redirect.ServiceSubset,
redirect.Namespace,
redirect.Datacenter,
)
if redirectedTarget != target {
if redirectedTarget.ID != target.ID {
target = redirectedTarget
goto RESOLVE_AGAIN
}
@ -724,7 +761,13 @@ RESOLVE_AGAIN:
// Handle default subset.
if target.ServiceSubset == "" && resolver.DefaultSubset != "" {
target.ServiceSubset = resolver.DefaultSubset
target = c.rewriteTarget(
target,
"",
resolver.DefaultSubset,
"",
"",
)
goto RESOLVE_AGAIN
}
@ -753,37 +796,34 @@ RESOLVE_AGAIN:
// Build node.
node := &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: target.Identifier(),
Name: target.ID,
Resolver: &structs.DiscoveryResolver{
Definition: resolver,
Default: resolver.IsDefault(),
Target: target,
Target: target.ID,
ConnectTimeout: connectTimeout,
},
}
targetConfig := structs.DiscoveryTargetConfig{
Subset: resolver.Subsets[target.ServiceSubset],
}
target.Subset = resolver.Subsets[target.ServiceSubset]
// Default mesh gateway settings
if serviceDefault := c.entries.GetService(target.Service); serviceDefault != nil {
targetConfig.MeshGateway = serviceDefault.MeshGateway
target.MeshGateway = serviceDefault.MeshGateway
}
if c.entries.GlobalProxy != nil && targetConfig.MeshGateway.Mode == structs.MeshGatewayModeDefault {
targetConfig.MeshGateway.Mode = c.entries.GlobalProxy.MeshGateway.Mode
if c.entries.GlobalProxy != nil && target.MeshGateway.Mode == structs.MeshGatewayModeDefault {
target.MeshGateway.Mode = c.entries.GlobalProxy.MeshGateway.Mode
}
if c.overrideMeshGateway.Mode != structs.MeshGatewayModeDefault {
if targetConfig.MeshGateway.Mode != c.overrideMeshGateway.Mode {
targetConfig.MeshGateway.Mode = c.overrideMeshGateway.Mode
if target.MeshGateway.Mode != c.overrideMeshGateway.Mode {
target.MeshGateway.Mode = c.overrideMeshGateway.Mode
c.customizedBy.MeshGateway = true
}
}
// Retain this target even if we may not retain the group resolver.
c.targets[target] = targetConfig
// Retain this target in the final results.
c.retainedTargets[target.ID] = struct{}{}
if recursedForFailover {
// If we recursed here from ourselves in a failover context, just emit
@ -808,42 +848,43 @@ RESOLVE_AGAIN:
if ok {
// Determine which failover definitions apply.
var failoverTargets []structs.DiscoveryTarget
var failoverTargets []*structs.DiscoveryTarget
if len(failover.Datacenters) > 0 {
for _, dc := range failover.Datacenters {
// Rewrite the target as per the failover policy.
failoverTarget := target.CopyAndModify(
failoverTarget := c.rewriteTarget(
target,
failover.Service,
failover.ServiceSubset,
failover.Namespace,
dc,
)
if failoverTarget != target { // don't failover to yourself
if failoverTarget.ID != target.ID { // don't failover to yourself
failoverTargets = append(failoverTargets, failoverTarget)
}
}
} else {
// Rewrite the target as per the failover policy.
failoverTarget := target.CopyAndModify(
failoverTarget := c.rewriteTarget(
target,
failover.Service,
failover.ServiceSubset,
failover.Namespace,
"",
)
if failoverTarget != target { // don't failover to yourself
if failoverTarget.ID != target.ID { // don't failover to yourself
failoverTargets = append(failoverTargets, failoverTarget)
}
}
// If we filtered everything out then no point in having a failover.
if len(failoverTargets) > 0 {
df := &structs.DiscoveryFailover{
Definition: &failover,
}
df := &structs.DiscoveryFailover{}
node.Resolver.Failover = df
// Convert the targets into targets by cheating a bit and
// recursing into ourselves.
// Take care of doing any redirects or configuration loading
// related to targets by cheating a bit and recursing into
// ourselves.
for _, target := range failoverTargets {
failoverResolveNode, err := c.getResolverNode(target, true)
if err != nil {

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,7 @@ func init() {
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) })
registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} })
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} })
registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} })
registerEndpoint(func(s *Server) interface{} { return &Health{s} })
registerEndpoint(func(s *Server) interface{} { return &Intention{s} })
registerEndpoint(func(s *Server) interface{} { return &Internal{s} })

View File

@ -0,0 +1,127 @@
package agent
import (
"fmt"
"net/http"
"strings"
"time"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/mitchellh/mapstructure"
)
func (s *HTTPServer) DiscoveryChainRead(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.DiscoveryChainRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
args.Name = strings.TrimPrefix(req.URL.Path, "/v1/discovery-chain/")
if args.Name == "" {
return nil, BadRequestError{Reason: "Missing chain name"}
}
args.EvaluateInDatacenter = req.URL.Query().Get("compile-dc")
// TODO(namespaces): args.EvaluateInNamespace = req.URL.Query().Get("compile-namespace")
if req.Method == "POST" {
var raw map[string]interface{}
if err := decodeBody(req, &raw, nil); err != nil {
return nil, BadRequestError{Reason: fmt.Sprintf("Request decoding failed: %v", err)}
}
apiReq, err := decodeDiscoveryChainReadRequest(raw)
if err != nil {
return nil, BadRequestError{Reason: fmt.Sprintf("Request decoding failed: %v", err)}
}
args.OverrideProtocol = apiReq.OverrideProtocol
args.OverrideConnectTimeout = apiReq.OverrideConnectTimeout
if apiReq.OverrideMeshGateway.Mode != "" {
_, err := structs.ValidateMeshGatewayMode(string(apiReq.OverrideMeshGateway.Mode))
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Invalid OverrideMeshGateway.Mode parameter")
return nil, nil
}
args.OverrideMeshGateway = apiReq.OverrideMeshGateway
}
}
// Make the RPC request
var out structs.DiscoveryChainResponse
defer setMeta(resp, &out.QueryMeta)
if args.QueryOptions.UseCache {
raw, m, err := s.agent.cache.Get(cachetype.CompiledDiscoveryChainName, &args)
if err != nil {
return nil, err
}
defer setCacheMeta(resp, &m)
reply, ok := raw.(*structs.DiscoveryChainResponse)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
out = *reply
} else {
RETRY_ONCE:
if err := s.agent.RPC("DiscoveryChain.Get", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
return discoveryChainReadResponse{Chain: out.Chain}, nil
}
// discoveryChainReadRequest is the API variation of structs.DiscoveryChainRequest
type discoveryChainReadRequest struct {
OverrideMeshGateway structs.MeshGatewayConfig
OverrideProtocol string
OverrideConnectTimeout time.Duration
}
// discoveryChainReadResponse is the API variation of structs.DiscoveryChainResponse
type discoveryChainReadResponse struct {
Chain *structs.CompiledDiscoveryChain
}
func decodeDiscoveryChainReadRequest(raw map[string]interface{}) (*discoveryChainReadRequest, error) {
// lib.TranslateKeys doesn't understand []map[string]interface{} so we have
// to do this part first.
raw = lib.PatchSliceOfMaps(raw, nil, nil)
lib.TranslateKeys(raw, map[string]string{
"override_mesh_gateway": "overridemeshgateway",
"override_protocol": "overrideprotocol",
"override_connect_timeout": "overrideconnecttimeout",
})
var apiReq discoveryChainReadRequest
decodeConf := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: &apiReq,
WeaklyTypedInput: true,
}
decoder, err := mapstructure.NewDecoder(decodeConf)
if err != nil {
return nil, err
}
if err := decoder.Decode(raw); err != nil {
return nil, err
}
return &apiReq, nil
}

View File

@ -0,0 +1,295 @@
package agent
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)
func TestDiscoveryChainRead(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
for _, method := range []string{"GET", "POST"} {
require.True(t, t.Run(method+": error on no service name", func(t *testing.T) {
var (
req *http.Request
err error
)
if method == "GET" {
req, err = http.NewRequest("GET", "/v1/discovery-chain/", nil)
} else {
apiReq := &discoveryChainReadRequest{}
req, err = http.NewRequest("POST", "/v1/discovery-chain/", jsonReader(apiReq))
}
require.NoError(t, err)
resp := httptest.NewRecorder()
_, err = a.srv.DiscoveryChainRead(resp, req)
require.Error(t, err)
_, ok := err.(BadRequestError)
require.True(t, ok)
}))
require.True(t, t.Run(method+": read default chain", func(t *testing.T) {
var (
req *http.Request
err error
)
if method == "GET" {
req, err = http.NewRequest("GET", "/v1/discovery-chain/web", nil)
} else {
apiReq := &discoveryChainReadRequest{}
req, err = http.NewRequest("POST", "/v1/discovery-chain/web", jsonReader(apiReq))
}
require.NoError(t, err)
resp := httptest.NewRecorder()
obj, err := a.srv.DiscoveryChainRead(resp, req)
require.NoError(t, err)
value := obj.(discoveryChainReadResponse)
expect := &structs.CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc1",
Protocol: "tcp",
StartNode: "resolver:web.default.dc1",
Nodes: map[string]*structs.DiscoveryGraphNode{
"resolver:web.default.dc1": &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc1",
Resolver: &structs.DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "web.default.dc1",
},
},
},
Targets: map[string]*structs.DiscoveryTarget{
"web.default.dc1": structs.NewDiscoveryTarget("web", "", "default", "dc1"),
},
}
require.Equal(t, expect, value.Chain)
}))
require.True(t, t.Run(method+": read default chain; evaluate in dc2", func(t *testing.T) {
var (
req *http.Request
err error
)
if method == "GET" {
req, err = http.NewRequest("GET", "/v1/discovery-chain/web?compile-dc=dc2", nil)
} else {
apiReq := &discoveryChainReadRequest{}
req, err = http.NewRequest("POST", "/v1/discovery-chain/web?compile-dc=dc2", jsonReader(apiReq))
}
require.NoError(t, err)
resp := httptest.NewRecorder()
obj, err := a.srv.DiscoveryChainRead(resp, req)
require.NoError(t, err)
value := obj.(discoveryChainReadResponse)
expect := &structs.CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc2",
Protocol: "tcp",
StartNode: "resolver:web.default.dc2",
Nodes: map[string]*structs.DiscoveryGraphNode{
"resolver:web.default.dc2": &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc2",
Resolver: &structs.DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "web.default.dc2",
},
},
},
Targets: map[string]*structs.DiscoveryTarget{
"web.default.dc2": structs.NewDiscoveryTarget("web", "", "default", "dc2"),
},
}
require.Equal(t, expect, value.Chain)
}))
require.True(t, t.Run(method+": read default chain with cache", func(t *testing.T) {
var (
req *http.Request
err error
)
if method == "GET" {
req, err = http.NewRequest("GET", "/v1/discovery-chain/web?cached", nil)
} else {
apiReq := &discoveryChainReadRequest{}
req, err = http.NewRequest("POST", "/v1/discovery-chain/web?cached", jsonReader(apiReq))
}
require.NoError(t, err)
resp := httptest.NewRecorder()
obj, err := a.srv.DiscoveryChainRead(resp, req)
require.NoError(t, err)
// The GET request primes the cache so the POST is a hit.
if method == "GET" {
// Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
} else {
// Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
}
value := obj.(discoveryChainReadResponse)
expect := &structs.CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc1",
Protocol: "tcp",
StartNode: "resolver:web.default.dc1",
Nodes: map[string]*structs.DiscoveryGraphNode{
"resolver:web.default.dc1": &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc1",
Resolver: &structs.DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "web.default.dc1",
},
},
},
Targets: map[string]*structs.DiscoveryTarget{
"web.default.dc1": structs.NewDiscoveryTarget("web", "", "default", "dc1"),
},
}
require.Equal(t, expect, value.Chain)
}))
}
{ // Now create one config entry.
out := false
require.NoError(t, a.RPC("ConfigEntry.Apply", &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
ConnectTimeout: 33 * time.Second,
},
}, &out))
require.True(t, out)
}
// Ensure background refresh works
require.True(t, t.Run("GET: read modified chain", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
req, err := http.NewRequest("GET", "/v1/discovery-chain/web?cached", nil)
r.Check(err)
resp := httptest.NewRecorder()
obj, err := a.srv.DiscoveryChainRead(resp, req)
r.Check(err)
value := obj.(discoveryChainReadResponse)
chain := value.Chain
// light comparison
node := chain.Nodes["resolver:web.default.dc1"]
if node == nil {
r.Fatalf("missing node")
}
if node.Resolver.Default {
r.Fatalf("not refreshed yet")
}
// Should be a cache hit! The data should've updated in the cache
// in the background so this should've been fetched directly from
// the cache.
if resp.Header().Get("X-Cache") != "HIT" {
r.Fatalf("should be a cache hit")
}
})
}))
// TODO(namespaces): add a test
expectTarget := structs.NewDiscoveryTarget("web", "", "default", "dc1")
expectTarget.MeshGateway = structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
}
expectModifiedWithOverrides := &structs.CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc1",
Protocol: "grpc",
CustomizationHash: "98809527",
StartNode: "resolver:web.default.dc1",
Nodes: map[string]*structs.DiscoveryGraphNode{
"resolver:web.default.dc1": &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc1",
Resolver: &structs.DiscoveryResolver{
ConnectTimeout: 22 * time.Second,
Target: "web.default.dc1",
},
},
},
Targets: map[string]*structs.DiscoveryTarget{
expectTarget.ID: expectTarget,
},
}
require.True(t, t.Run("POST: read modified chain with overrides (camel case)", func(t *testing.T) {
body := ` {
"OverrideMeshGateway": {
"Mode": "local"
},
"OverrideProtocol": "grpc",
"OverrideConnectTimeout": "22s"
} `
req, err := http.NewRequest("POST", "/v1/discovery-chain/web", strings.NewReader(body))
require.NoError(t, err)
resp := httptest.NewRecorder()
obj, err := a.srv.DiscoveryChainRead(resp, req)
require.NoError(t, err)
value := obj.(discoveryChainReadResponse)
require.Equal(t, expectModifiedWithOverrides, value.Chain)
}))
require.True(t, t.Run("POST: read modified chain with overrides (snake case)", func(t *testing.T) {
body := ` {
"override_mesh_gateway": {
"mode": "local"
},
"override_protocol": "grpc",
"override_connect_timeout": "22s"
} `
req, err := http.NewRequest("POST", "/v1/discovery-chain/web", strings.NewReader(body))
require.NoError(t, err)
resp := httptest.NewRecorder()
obj, err := a.srv.DiscoveryChainRead(resp, req)
require.NoError(t, err)
value := obj.(discoveryChainReadResponse)
require.Equal(t, expectModifiedWithOverrides, value.Chain)
}))
}

View File

@ -81,6 +81,7 @@ func init() {
registerEndpoint("/v1/coordinate/nodes", []string{"GET"}, (*HTTPServer).CoordinateNodes)
registerEndpoint("/v1/coordinate/node/", []string{"GET"}, (*HTTPServer).CoordinateNode)
registerEndpoint("/v1/coordinate/update", []string{"PUT"}, (*HTTPServer).CoordinateUpdate)
registerEndpoint("/v1/discovery-chain/", []string{"GET", "POST"}, (*HTTPServer).DiscoveryChainRead)
registerEndpoint("/v1/event/fire/", []string{"PUT"}, (*HTTPServer).EventFire)
registerEndpoint("/v1/event/list", []string{"GET"}, (*HTTPServer).EventList)
registerEndpoint("/v1/health/node/", []string{"GET"}, (*HTTPServer).HealthNodeChecks)
@ -88,7 +89,6 @@ func init() {
registerEndpoint("/v1/health/state/", []string{"GET"}, (*HTTPServer).HealthChecksInState)
registerEndpoint("/v1/health/service/", []string{"GET"}, (*HTTPServer).HealthServiceNodes)
registerEndpoint("/v1/health/connect/", []string{"GET"}, (*HTTPServer).HealthConnectServiceNodes)
registerEndpoint("/v1/internal/discovery-chain/", []string{"GET"}, (*HTTPServer).InternalDiscoveryChain)
registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPServer).UINodes)
registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPServer).UINodeInfo)
registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPServer).UIServices)

View File

@ -1,40 +0,0 @@
package agent
import (
"fmt"
"net/http"
"strings"
"github.com/hashicorp/consul/agent/structs"
)
// InternalDiscoveryChain is helpful for debugging. Eventually we should expose
// this data officially somehow.
func (s *HTTPServer) InternalDiscoveryChain(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.DiscoveryChainRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
args.Name = strings.TrimPrefix(req.URL.Path, "/v1/internal/discovery-chain/")
if args.Name == "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing chain name")
return nil, nil
}
// Make the RPC request
var out structs.DiscoveryChainResponse
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("ConfigEntry.ReadDiscoveryChain", &args, &out); err != nil {
return nil, err
}
if out.Chain == nil {
resp.WriteHeader(http.StatusNotFound)
return nil, nil
}
return out.Chain, nil
}

View File

@ -45,23 +45,6 @@ func TestManager_BasicLifecycle(t *testing.T) {
// Create a bunch of common data for the various test cases.
roots, leaf := TestCerts(t)
dbTarget := structs.DiscoveryTarget{
Service: "db",
Namespace: "default",
Datacenter: "dc1",
}
dbTarget_v1 := structs.DiscoveryTarget{
Service: "db",
ServiceSubset: "v1",
Namespace: "default",
Datacenter: "dc1",
}
dbTarget_v2 := structs.DiscoveryTarget{
Service: "db",
ServiceSubset: "v2",
Namespace: "default",
Datacenter: "dc1",
}
dbDefaultChain := func() *structs.CompiledDiscoveryChain {
return discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1",
func(req *discoverychain.CompileRequest) {
@ -213,9 +196,9 @@ func TestManager_BasicLifecycle(t *testing.T) {
"db": dbDefaultChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes{
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
dbTarget: TestUpstreamNodes(t),
"db.default.dc1": TestUpstreamNodes(t),
},
},
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
@ -252,10 +235,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
"db": dbSplitChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes{
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
dbTarget_v1: TestUpstreamNodes(t),
dbTarget_v2: TestUpstreamNodesAlternate(t),
"v1.db.default.dc1": TestUpstreamNodes(t),
"v2.db.default.dc1": TestUpstreamNodesAlternate(t),
},
},
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},

View File

@ -10,8 +10,8 @@ import (
type configSnapshotConnectProxy struct {
Leaf *structs.IssuedCert
DiscoveryChain map[string]*structs.CompiledDiscoveryChain // this is keyed by the Upstream.Identifier(), not the chain name
WatchedUpstreams map[string]map[structs.DiscoveryTarget]context.CancelFunc
WatchedUpstreamEndpoints map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes
WatchedUpstreams map[string]map[string]context.CancelFunc
WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes
UpstreamEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints
}

View File

@ -359,8 +359,8 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
switch s.kind {
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[structs.DiscoveryTarget]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes) // TODO(rb): deprecated
case structs.ServiceKindMeshGateway:
snap.MeshGateway.WatchedServices = make(map[string]context.CancelFunc)
@ -503,22 +503,17 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
return fmt.Errorf("invalid type for service response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
encTarget, svc, ok := removeColonPrefix(correlationID)
targetID, svc, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
target := structs.DiscoveryTarget{}
if err := target.UnmarshalText([]byte(encTarget)); err != nil {
return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err)
}
m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc]
if !ok {
m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
m = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedUpstreamEndpoints[svc] = m
}
snap.ConnectProxy.WatchedUpstreamEndpoints[svc][target] = resp.Nodes
snap.ConnectProxy.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
@ -561,11 +556,10 @@ func (s *state) resetWatchesFromChain(
// Initialize relevant sub maps.
if _, ok := snap.ConnectProxy.WatchedUpstreams[id]; !ok {
snap.ConnectProxy.WatchedUpstreams[id] = make(map[structs.DiscoveryTarget]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams[id] = make(map[string]context.CancelFunc)
}
if _, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[id]; !ok {
// TODO(rb): does this belong here?
snap.ConnectProxy.WatchedUpstreamEndpoints[id] = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes)
}
// We could invalidate this selectively based on a hash of the relevant
@ -573,24 +567,22 @@ func (s *state) resetWatchesFromChain(
// upstream when the chain changes in any way.
//
// TODO(rb): content hash based add/remove
for target, cancelFn := range snap.ConnectProxy.WatchedUpstreams[id] {
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: stopping watch of target %s", id, chain.ServiceName, target)
delete(snap.ConnectProxy.WatchedUpstreams[id], target)
delete(snap.ConnectProxy.WatchedUpstreamEndpoints[id], target) // TODO(rb): safe?
for targetID, cancelFn := range snap.ConnectProxy.WatchedUpstreams[id] {
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: stopping watch of target %s", id, chain.ServiceName, targetID)
delete(snap.ConnectProxy.WatchedUpstreams[id], targetID)
delete(snap.ConnectProxy.WatchedUpstreamEndpoints[id], targetID)
cancelFn()
}
for target, targetConfig := range chain.Targets {
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target)
encodedTarget := target.Identifier()
for _, target := range chain.Targets {
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target.ID)
ctx, cancel := context.WithCancel(s.ctx)
// TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point
meshGateway := structs.MeshGatewayModeDefault
if target.Datacenter != s.source.Datacenter {
meshGateway = targetConfig.MeshGateway.Mode
meshGateway = target.MeshGateway.Mode
}
// if the default mode
@ -600,10 +592,10 @@ func (s *state) resetWatchesFromChain(
err := s.watchConnectProxyService(
ctx,
"upstream-target:"+encodedTarget+":"+id,
"upstream-target:"+target.ID+":"+id,
target.Service,
target.Datacenter,
targetConfig.Subset.Filter,
target.Subset.Filter,
meshGateway,
)
if err != nil {
@ -611,7 +603,7 @@ func (s *state) resetWatchesFromChain(
return err
}
snap.ConnectProxy.WatchedUpstreams[id][target] = cancel
snap.ConnectProxy.WatchedUpstreams[id][target.ID] = cancel
}
return nil

View File

@ -481,17 +481,17 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stage1 := verificationStage{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:api,,,dc1:api": genVerifyServiceWatch("api", "", "dc1", true),
"upstream-target:api-failover-remote,,,dc2:api-failover-remote?dc=dc2": genVerifyGatewayWatch("dc2"),
"upstream-target:api-failover-local,,,dc2:api-failover-local?dc=dc2": genVerifyGatewayWatch("dc1"),
"upstream-target:api-failover-direct,,,dc2:api-failover-direct?dc=dc2": genVerifyServiceWatch("api-failover-direct", "", "dc2", true),
"upstream-target:api.default.dc1:api": genVerifyServiceWatch("api", "", "dc1", true),
"upstream-target:api-failover-remote.default.dc2:api-failover-remote?dc=dc2": genVerifyGatewayWatch("dc2"),
"upstream-target:api-failover-local.default.dc2:api-failover-local?dc=dc2": genVerifyGatewayWatch("dc1"),
"upstream-target:api-failover-direct.default.dc2:api-failover-direct?dc=dc2": genVerifyServiceWatch("api-failover-direct", "", "dc2", true),
},
}
if meshGatewayProxyConfigValue == structs.MeshGatewayModeDefault {
stage1.requiredWatches["upstream-target:api,,,dc2:api-dc2"] = genVerifyServiceWatch("api", "", "dc2", true)
stage1.requiredWatches["upstream-target:api.default.dc2:api-dc2"] = genVerifyServiceWatch("api", "", "dc2", true)
} else {
stage1.requiredWatches["upstream-target:api,,,dc2:api-dc2"] = genVerifyGatewayWatch("dc1")
stage1.requiredWatches["upstream-target:api.default.dc2:api-dc2"] = genVerifyGatewayWatch("dc1")
}
return testCase{

View File

@ -545,17 +545,6 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", compileSetup, entries...)
dbTarget := structs.DiscoveryTarget{
Service: "db",
Namespace: "default",
Datacenter: "dc1",
}
failTarget := structs.DiscoveryTarget{
Service: "fail",
Namespace: "default",
Datacenter: "dc1",
}
snap := &ConfigSnapshot{
Kind: structs.ServiceKindConnectProxy,
Service: "web-sidecar-proxy",
@ -578,9 +567,9 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
},
WatchedUpstreamEndpoints: map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes{
"db": map[structs.DiscoveryTarget]structs.CheckServiceNodes{
dbTarget: TestUpstreamNodes(t),
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": map[string]structs.CheckServiceNodes{
"db.default.dc1": TestUpstreamNodes(t),
},
},
},
@ -591,24 +580,12 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE
case "simple-with-overrides":
case "simple":
case "failover":
snap.ConnectProxy.WatchedUpstreamEndpoints["db"][failTarget] =
snap.ConnectProxy.WatchedUpstreamEndpoints["db"]["fail.default.dc1"] =
TestUpstreamNodesAlternate(t)
case "splitter-with-resolver-redirect-multidc":
dbTarget_v1_dc1 := structs.DiscoveryTarget{
Service: "db",
ServiceSubset: "v1",
Namespace: "default",
Datacenter: "dc1",
}
dbTarget_v2_dc2 := structs.DiscoveryTarget{
Service: "db",
ServiceSubset: "v2",
Namespace: "default",
Datacenter: "dc2",
}
snap.ConnectProxy.WatchedUpstreamEndpoints["db"] = map[structs.DiscoveryTarget]structs.CheckServiceNodes{
dbTarget_v1_dc1: TestUpstreamNodes(t),
dbTarget_v2_dc2: TestUpstreamNodesDC2(t),
snap.ConnectProxy.WatchedUpstreamEndpoints["db"] = map[string]structs.CheckServiceNodes{
"v1.db.default.dc1": TestUpstreamNodes(t),
"v2.db.default.dc2": TestUpstreamNodesDC2(t),
}
default:
t.Fatalf("unexpected variation: %q", variation)

View File

@ -336,11 +336,10 @@ func ConfigEntryDecodeRulesForKind(kind string) (skipWhenPatching []string, tran
}, nil
case ServiceResolver:
return nil, map[string]string{
"connect_timeout": "connecttimeout",
"default_subset": "defaultsubset",
"only_passing": "onlypassing",
"overprovisioning_factor": "overprovisioningfactor",
"service_subset": "servicesubset",
"connect_timeout": "connecttimeout",
"default_subset": "defaultsubset",
"only_passing": "onlypassing",
"service_subset": "servicesubset",
}, nil
default:
return nil, nil, fmt.Errorf("kind %q should be explicitly handled here", kind)

View File

@ -677,10 +677,6 @@ func (e *ServiceResolverConfigEntry) Validate() error {
}
}
if f.OverprovisioningFactor < 0 {
return fmt.Errorf("Bad Failover[%q].OverprovisioningFactor '%d', must be >= 0", subset, f.OverprovisioningFactor)
}
for _, dc := range f.Datacenters {
if dc == "" {
return fmt.Errorf("Bad Failover[%q].Datacenters: found empty datacenter", subset)
@ -783,11 +779,9 @@ type ServiceResolverRedirect struct {
// There are some restrictions on what is allowed in here:
//
// - Service, ServiceSubset, Namespace, NearestN, and Datacenters cannot all be
// - Service, ServiceSubset, Namespace, and Datacenters cannot all be
// empty at once.
//
// - Both 'NearestN' and 'Datacenters' may be specified at once.
//
type ServiceResolverFailover struct {
// Service is the service to resolve instead of the default as the failover
// group of instances (optional).
@ -809,27 +803,12 @@ type ServiceResolverFailover struct {
// This is a DESTINATION during failover.
Namespace string `json:",omitempty"`
// NearestN is set to the number of remote datacenters to try, based on
// network coordinates.
//
// This is a DESTINATION during failover.
//
// TODO(rb): bring this back after normal DC failover works
// NearestN int `json:",omitempty"`
// Datacenters is a fixed list of datacenters to try after NearestN. We
// never try a datacenter multiple times, so those are subtracted from this
// list before proceeding.
// Datacenters is a fixed list of datacenters to try. We never try a
// datacenter multiple times, so those are subtracted from this list before
// proceeding.
//
// This is a DESTINATION during failover.
Datacenters []string `json:",omitempty"`
// OverprovisioningFactor is a pass through for envoy's
// overprovisioning_factor value.
//
// If omitted the overprovisioning factor value will be set so high as to
// imply binary failover (all or nothing).
OverprovisioningFactor int `json:",omitempty"`
}
type discoveryChainConfigEntry interface {
@ -1051,10 +1030,8 @@ func (r *DiscoveryChainRequest) CacheInfo() cache.RequestInfo {
return info
}
// TODO(rb): either fix the compiled results, or take the derived data and stash it here in a json/msgpack-friendly way?
type DiscoveryChainResponse struct {
ConfigEntries *DiscoveryChainConfigEntries `json:",omitempty"` // TODO(rb): remove these?
Chain *CompiledDiscoveryChain `json:",omitempty"`
Chain *CompiledDiscoveryChain
QueryMeta
}

View File

@ -456,20 +456,6 @@ func TestServiceResolverConfigEntry(t *testing.T) {
},
},
},
{
name: "failover with invalid overprovisioning factor",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
Failover: map[string]ServiceResolverFailover{
"*": ServiceResolverFailover{
Service: "backup",
OverprovisioningFactor: -1,
},
},
},
validateErr: `Bad Failover["*"].OverprovisioningFactor '-1', must be >= 0`,
},
{
name: "failover with empty datacenters in list",
entry: &ServiceResolverConfigEntry{

View File

@ -424,7 +424,6 @@ func TestDecodeConfigEntry(t *testing.T) {
service_subset = "sure"
namespace = "neighbor"
datacenters = ["dc5", "dc14"]
overprovisioning_factor = 150
},
"*" = {
datacenters = ["dc7"]
@ -450,7 +449,6 @@ func TestDecodeConfigEntry(t *testing.T) {
ServiceSubset = "sure"
Namespace = "neighbor"
Datacenters = ["dc5", "dc14"]
OverprovisioningFactor = 150
},
"*" = {
Datacenters = ["dc7"]
@ -472,11 +470,10 @@ func TestDecodeConfigEntry(t *testing.T) {
},
Failover: map[string]ServiceResolverFailover{
"v2": {
Service: "failcopy",
ServiceSubset: "sure",
Namespace: "neighbor",
Datacenters: []string{"dc5", "dc14"},
OverprovisioningFactor: 150,
Service: "failcopy",
ServiceSubset: "sure",
Namespace: "neighbor",
Datacenters: []string{"dc5", "dc14"},
},
"*": {
Datacenters: []string{"dc7"},

View File

@ -1,11 +1,7 @@
package structs
import (
"bytes"
"encoding"
"fmt"
"net/url"
"strings"
"time"
)
@ -24,10 +20,10 @@ type CompiledDiscoveryChain struct {
// If set, this value should be used to prefix/suffix any generated load
// balancer data plane objects to avoid sharing customized and
// non-customized versions.
CustomizationHash string
CustomizationHash string `json:",omitempty"`
// Protocol is the overall protocol shared by everything in the chain.
Protocol string
Protocol string `json:",omitempty"`
// StartNode is the first key into the Nodes map that should be followed
// when walking the discovery chain.
@ -40,8 +36,8 @@ type CompiledDiscoveryChain struct {
// guaranteed to be consistent within a single compilation.
Nodes map[string]*DiscoveryGraphNode `json:",omitempty"`
// Targets is a list of all targets and configuration related just to targets.
Targets map[DiscoveryTarget]DiscoveryTargetConfig `json:",omitempty"`
// Targets is a list of all targets used in this chain.
Targets map[string]*DiscoveryTarget `json:",omitempty"`
}
// IsDefault returns true if the compiled chain represents no routing, no
@ -59,10 +55,16 @@ func (c *CompiledDiscoveryChain) IsDefault() bool {
panic("not possible: missing node named '" + c.StartNode + "' in chain '" + c.ServiceName + "'")
}
// TODO(rb): include CustomizationHash here?
return node.Type == DiscoveryGraphNodeTypeResolver &&
node.Resolver.Default &&
node.Resolver.Target.Service == c.ServiceName
if node.Type != DiscoveryGraphNodeTypeResolver {
return false
}
if !node.Resolver.Default {
return false
}
target := c.Targets[node.Resolver.Target]
return target.Service == c.ServiceName
}
const (
@ -98,29 +100,16 @@ func (s *DiscoveryGraphNode) IsResolver() bool {
return s.Type == DiscoveryGraphNodeTypeResolver
}
func (s *DiscoveryGraphNode) ServiceName() string {
if s.Type == DiscoveryGraphNodeTypeResolver {
return s.Resolver.Target.Service
}
return s.Name
}
func (s *DiscoveryGraphNode) MapKey() string {
return fmt.Sprintf("%s:%s", s.Type, s.Name)
}
// compiled form of ServiceResolverConfigEntry
type DiscoveryResolver struct {
Definition *ServiceResolverConfigEntry `json:",omitempty"`
Default bool `json:",omitempty"`
ConnectTimeout time.Duration `json:",omitempty"`
Target DiscoveryTarget `json:",omitempty"`
Failover *DiscoveryFailover `json:",omitempty"`
}
type DiscoveryTargetConfig struct {
MeshGateway MeshGatewayConfig `json:",omitempty"`
Subset ServiceResolverSubset `json:",omitempty"`
Default bool `json:",omitempty"`
ConnectTimeout time.Duration `json:",omitempty"`
Target string `json:",omitempty"`
Failover *DiscoveryFailover `json:",omitempty"`
}
// compiled form of ServiceRoute
@ -137,136 +126,44 @@ type DiscoverySplit struct {
// compiled form of ServiceResolverFailover
type DiscoveryFailover struct {
Definition *ServiceResolverFailover `json:",omitempty"`
Targets []DiscoveryTarget `json:",omitempty"`
Targets []string `json:",omitempty"`
}
// DiscoveryTarget represents all of the inputs necessary to use a resolver
// config entry to execute a catalog query to generate a list of service
// instances during discovery.
//
// This is a value type so it can be used as a map key.
type DiscoveryTarget struct {
ID string `json:",omitempty"`
Service string `json:",omitempty"`
ServiceSubset string `json:",omitempty"`
Namespace string `json:",omitempty"`
Datacenter string `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty"`
Subset ServiceResolverSubset `json:",omitempty"`
}
func (t DiscoveryTarget) IsEmpty() bool {
return t.Service == "" && t.ServiceSubset == "" && t.Namespace == "" && t.Datacenter == ""
func NewDiscoveryTarget(service, serviceSubset, namespace, datacenter string) *DiscoveryTarget {
t := &DiscoveryTarget{
Service: service,
ServiceSubset: serviceSubset,
Namespace: namespace,
Datacenter: datacenter,
}
t.setID()
return t
}
// CopyAndModify will duplicate the target and selectively modify it given the
// requested inputs.
func (t DiscoveryTarget) CopyAndModify(
service,
serviceSubset,
namespace,
datacenter string,
) DiscoveryTarget {
t2 := t // copy
if service != "" && service != t2.Service {
t2.Service = service
// Reset the chosen subset if we reference a service other than our own.
t2.ServiceSubset = ""
}
if serviceSubset != "" && serviceSubset != t2.ServiceSubset {
t2.ServiceSubset = serviceSubset
}
if namespace != "" && namespace != t2.Namespace {
t2.Namespace = namespace
}
if datacenter != "" && datacenter != t2.Datacenter {
t2.Datacenter = datacenter
}
return t2
}
var _ encoding.TextMarshaler = DiscoveryTarget{}
var _ encoding.TextUnmarshaler = (*DiscoveryTarget)(nil)
// MarshalText implements encoding.TextMarshaler.
//
// This should also not include any colons for embedding that happens
// elsewhere.
//
// This should NOT return any errors.
func (t DiscoveryTarget) MarshalText() (text []byte, err error) {
return []byte(t.Identifier()), nil
}
func (t DiscoveryTarget) Identifier() string {
var buf bytes.Buffer
buf.WriteString(url.QueryEscape(t.Service))
buf.WriteRune(',')
buf.WriteString(url.QueryEscape(t.ServiceSubset)) // TODO(rb): move this first so the scoping flows from small->large?
buf.WriteRune(',')
if t.Namespace != "default" {
buf.WriteString(url.QueryEscape(t.Namespace))
}
buf.WriteRune(',')
buf.WriteString(url.QueryEscape(t.Datacenter))
return buf.String()
}
// UnmarshalText implements encoding.TextUnmarshaler.
func (t *DiscoveryTarget) UnmarshalText(text []byte) error {
parts := bytes.Split(text, []byte(","))
bad := false
if len(parts) != 4 {
return fmt.Errorf("invalid target: %q", string(text))
}
var err error
t.Service, err = url.QueryUnescape(string(parts[0]))
if err != nil {
bad = true
}
t.ServiceSubset, err = url.QueryUnescape(string(parts[1]))
if err != nil {
bad = true
}
t.Namespace, err = url.QueryUnescape(string(parts[2]))
if err != nil {
bad = true
}
t.Datacenter, err = url.QueryUnescape(string(parts[3]))
if err != nil {
bad = true
}
if bad {
return fmt.Errorf("invalid target: %q", string(text))
}
if t.Namespace == "" {
t.Namespace = "default"
}
return nil
}
func (t DiscoveryTarget) String() string {
var b strings.Builder
if t.ServiceSubset != "" {
b.WriteString(t.ServiceSubset)
func (t *DiscoveryTarget) setID() {
// NOTE: this format is similar to the SNI syntax for simplicity
if t.ServiceSubset == "" {
t.ID = fmt.Sprintf("%s.%s.%s", t.Service, t.Namespace, t.Datacenter)
} else {
b.WriteString("<default>")
t.ID = fmt.Sprintf("%s.%s.%s.%s", t.ServiceSubset, t.Service, t.Namespace, t.Datacenter)
}
b.WriteRune('.')
b.WriteString(t.Service)
b.WriteRune('.')
if t.Namespace != "" {
b.WriteString(t.Namespace)
} else {
b.WriteString("<default>")
}
b.WriteRune('.')
b.WriteString(t.Datacenter)
return b.String()
}
func (t *DiscoveryTarget) String() string {
return t.ID
}

View File

@ -1,120 +0,0 @@
package structs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestDiscoveryTarget_TextMarshal(t *testing.T) {
for _, tc := range []struct {
target DiscoveryTarget
enc string
alt DiscoveryTarget
}{
{
target: DiscoveryTarget{"", "", "", ""},
enc: ",,,",
alt: DiscoveryTarget{"", "", "default", ""},
},
{
target: DiscoveryTarget{"a:b", "", "", ""},
enc: "a%3Ab,,,",
alt: DiscoveryTarget{"a:b", "", "default", ""},
},
{
target: DiscoveryTarget{"", "a:b", "", ""},
enc: ",a%3Ab,,",
alt: DiscoveryTarget{"", "a:b", "default", ""},
},
{
target: DiscoveryTarget{"", "", "a:b", ""},
enc: ",,a%3Ab,",
alt: DiscoveryTarget{"", "", "a:b", ""},
},
{
target: DiscoveryTarget{"", "", "", "a:b"},
enc: ",,,a%3Ab",
alt: DiscoveryTarget{"", "", "default", "a:b"},
},
{
target: DiscoveryTarget{"one", "two", "three", "four"},
enc: "one,two,three,four",
},
} {
tc := tc
t.Run(tc.target.String(), func(t *testing.T) {
out, err := tc.target.MarshalText()
require.NoError(t, err)
require.Equal(t, tc.enc, string(out))
var dec DiscoveryTarget
require.NoError(t, dec.UnmarshalText(out))
if tc.alt.IsEmpty() {
require.Equal(t, tc.target, dec)
} else {
require.Equal(t, tc.alt, dec)
}
})
}
}
func TestDiscoveryTarget_CopyAndModify(t *testing.T) {
type fields = DiscoveryTarget // abbreviation
for _, tc := range []struct {
name string
in fields
mod fields // this is semantically wrong, but the shape of the struct is still what we want
expect fields
}{
{
name: "service with no subset and no mod",
in: fields{"foo", "", "default", "dc1"},
mod: fields{},
expect: fields{"foo", "", "default", "dc1"},
},
{
name: "service with subset and no mod",
in: fields{"foo", "v2", "default", "dc1"},
mod: fields{},
expect: fields{"foo", "v2", "default", "dc1"},
},
{
name: "service with no subset and service mod",
in: fields{"foo", "", "default", "dc1"},
mod: fields{"bar", "", "", ""},
expect: fields{"bar", "", "default", "dc1"},
},
{
name: "service with subset and service mod",
in: fields{"foo", "v2", "default", "dc1"},
mod: fields{"bar", "", "", ""},
expect: fields{"bar", "", "default", "dc1"},
},
{
name: "service with subset and noop service mod with dc mod",
in: fields{"foo", "v2", "default", "dc1"},
mod: fields{"foo", "", "", "dc9"},
expect: fields{"foo", "v2", "default", "dc9"},
},
{
name: "service with subset and namespace mod",
in: fields{"foo", "v2", "default", "dc1"},
mod: fields{"", "", "fancy", ""},
expect: fields{"foo", "v2", "fancy", "dc1"},
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
out := tc.in.CopyAndModify(
tc.mod.Service,
tc.mod.ServiceSubset,
tc.mod.Namespace,
tc.mod.Datacenter,
)
require.Equal(t, tc.expect, out)
})
}
}

View File

@ -254,7 +254,9 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
}
target := node.Resolver.Target
targetID := node.Resolver.Target
target := chain.Targets[targetID]
sni := TargetSNI(target, cfgSnap)
clusterName := CustomizeClusterName(sni, chain)

View File

@ -56,7 +56,6 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
if ok {
la := makeLoadAssignment(
clusterName,
0,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
@ -79,23 +78,20 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
continue
}
failover := node.Resolver.Failover
target := node.Resolver.Target
targetID := node.Resolver.Target
endpoints, ok := chainEndpointMap[target]
target := chain.Targets[targetID]
endpoints, ok := chainEndpointMap[targetID]
if !ok {
continue // skip the cluster (should not happen)
}
targetConfig := chain.Targets[target]
var (
endpointGroups []loadAssignmentEndpointGroup
overprovisioningFactor int
)
var endpointGroups []loadAssignmentEndpointGroup
primaryGroup := loadAssignmentEndpointGroup{
Endpoints: endpoints,
OnlyPassing: targetConfig.Subset.OnlyPassing,
OnlyPassing: target.Subset.OnlyPassing,
}
if failover != nil && len(failover.Targets) > 0 {
@ -103,26 +99,17 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
endpointGroups = append(endpointGroups, primaryGroup)
if failover.Definition.OverprovisioningFactor > 0 {
overprovisioningFactor = failover.Definition.OverprovisioningFactor
}
if overprovisioningFactor <= 0 {
// We choose such a large value here that the failover math should
// in effect not happen until zero instances are healthy.
overprovisioningFactor = 100000
}
for _, failTarget := range failover.Targets {
failEndpoints, ok := chainEndpointMap[failTarget]
for _, failTargetID := range failover.Targets {
failEndpoints, ok := chainEndpointMap[failTargetID]
if !ok {
continue // skip the failover target (should not happen)
}
failTargetConfig := chain.Targets[failTarget]
failTarget := chain.Targets[failTargetID]
endpointGroups = append(endpointGroups, loadAssignmentEndpointGroup{
Endpoints: failEndpoints,
OnlyPassing: failTargetConfig.Subset.OnlyPassing,
OnlyPassing: failTarget.Subset.OnlyPassing,
})
}
} else {
@ -134,7 +121,6 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
la := makeLoadAssignment(
clusterName,
overprovisioningFactor,
endpointGroups,
cfgSnap.Datacenter,
)
@ -154,7 +140,6 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
clusterName := DatacenterSNI(dc, cfgSnap)
la := makeLoadAssignment(
clusterName,
0,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
@ -168,7 +153,6 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
clusterName := ServiceSNI(svc, "", "default", cfgSnap.Datacenter, cfgSnap)
la := makeLoadAssignment(
clusterName,
0,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
@ -200,7 +184,6 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment(
clusterName,
0,
[]loadAssignmentEndpointGroup{
{
Endpoints: endpoints,
@ -231,19 +214,17 @@ type loadAssignmentEndpointGroup struct {
OnlyPassing bool
}
func makeLoadAssignment(
clusterName string,
overprovisioningFactor int,
endpointGroups []loadAssignmentEndpointGroup,
localDatacenter string,
) *envoy.ClusterLoadAssignment {
func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localDatacenter string) *envoy.ClusterLoadAssignment {
cla := &envoy.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(endpointGroups)),
}
if overprovisioningFactor > 0 {
if len(endpointGroups) > 1 {
cla.Policy = &envoy.ClusterLoadAssignment_Policy{
OverprovisioningFactor: makeUint32Value(overprovisioningFactor),
// We choose such a large value here that the failover math should
// in effect not happen until zero instances are healthy.
OverprovisioningFactor: makeUint32Value(100000),
}
}

View File

@ -96,11 +96,10 @@ func Test_makeLoadAssignment(t *testing.T) {
// TODO(rb): test onlypassing
tests := []struct {
name string
clusterName string
overprovisioningFactor int
endpoints []loadAssignmentEndpointGroup
want *envoy.ClusterLoadAssignment
name string
clusterName string
endpoints []loadAssignmentEndpointGroup
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
@ -210,7 +209,6 @@ func Test_makeLoadAssignment(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(
tt.clusterName,
tt.overprovisioningFactor,
tt.endpoints,
"dc1",
)
@ -255,24 +253,6 @@ func Test_endpointsFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotDiscoveryChainWithFailover,
setup: nil,
},
{
name: "connect-proxy-with-chain-and-sliding-failover",
create: proxycfg.TestConfigSnapshotDiscoveryChainWithFailover,
setup: func(snap *proxycfg.ConfigSnapshot) {
chain := snap.ConnectProxy.DiscoveryChain["db"]
dbTarget := structs.DiscoveryTarget{
Service: "db",
Namespace: "default",
Datacenter: "dc1",
}
dbResolverNode := chain.Nodes["resolver:"+dbTarget.Identifier()]
failover := dbResolverNode.Resolver.Failover
failover.Definition.OverprovisioningFactor = 160
},
},
{
name: "splitter-with-resolver-redirect",
create: proxycfg.TestConfigSnapshotDiscoveryChain_SplitterWithResolverRedirectMultiDC,

View File

@ -307,7 +307,9 @@ func makeDefaultRouteMatch() envoyroute.RouteMatch {
}
}
func makeRouteActionForSingleCluster(target structs.DiscoveryTarget, chain *structs.CompiledDiscoveryChain, cfgSnap *proxycfg.ConfigSnapshot) *envoyroute.Route_Route {
func makeRouteActionForSingleCluster(targetID string, chain *structs.CompiledDiscoveryChain, cfgSnap *proxycfg.ConfigSnapshot) *envoyroute.Route_Route {
target := chain.Targets[targetID]
sni := TargetSNI(target, cfgSnap)
clusterName := CustomizeClusterName(sni, chain)
@ -328,7 +330,9 @@ func makeRouteActionForSplitter(splits []*structs.DiscoverySplit, chain *structs
if nextNode.Type != structs.DiscoveryGraphNodeTypeResolver {
return nil, fmt.Errorf("unexpected splitter destination node type: %s", nextNode.Type)
}
target := nextNode.Resolver.Target
targetID := nextNode.Resolver.Target
target := chain.Targets[targetID]
sni := TargetSNI(target, cfgSnap)
clusterName := CustomizeClusterName(sni, chain)

View File

@ -42,7 +42,7 @@ func QuerySNI(service string, datacenter string, cfgSnap *proxycfg.ConfigSnapsho
return fmt.Sprintf("%s.default.%s.query.%s", service, datacenter, cfgSnap.Roots.TrustDomain)
}
func TargetSNI(target structs.DiscoveryTarget, cfgSnap *proxycfg.ConfigSnapshot) string {
func TargetSNI(target *structs.DiscoveryTarget, cfgSnap *proxycfg.ConfigSnapshot) string {
return ServiceSNI(target.Service, target.ServiceSubset, target.Namespace, target.Datacenter, cfgSnap)
}

View File

@ -1,73 +0,0 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"clusterName": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
},
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.20.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.20.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
],
"priority": 1
}
],
"policy": {
"overprovisioningFactor": 160
}
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"nonce": "00000001"
}

View File

@ -188,7 +188,7 @@ func DecodeConfigEntryFromJSON(data []byte) (ConfigEntry, error) {
return DecodeConfigEntry(raw)
}
// Config can be used to query the Config endpoints
// ConfigEntries can be used to query the Config endpoints
type ConfigEntries struct {
c *Client
}

View File

@ -118,12 +118,8 @@ type ServiceResolverRedirect struct {
}
type ServiceResolverFailover struct {
Service string `json:",omitempty"`
ServiceSubset string `json:",omitempty"`
Namespace string `json:",omitempty"`
Datacenters []string `json:",omitempty"`
OverprovisioningFactor int `json:",omitempty"`
// TODO(rb): bring this back after normal DC failover works
// NearestN int `json:",omitempty"`
Service string `json:",omitempty"`
ServiceSubset string `json:",omitempty"`
Namespace string `json:",omitempty"`
Datacenters []string `json:",omitempty"`
}

190
api/discovery_chain.go Normal file
View File

@ -0,0 +1,190 @@
package api
import (
"fmt"
"time"
)
// DiscoveryChain can be used to query the discovery-chain endpoints
type DiscoveryChain struct {
c *Client
}
// DiscoveryChain returns a handle to the discovery-chain endpoints
func (c *Client) DiscoveryChain() *DiscoveryChain {
return &DiscoveryChain{c}
}
func (d *DiscoveryChain) Get(name string, opts *DiscoveryChainOptions, q *QueryOptions) (*DiscoveryChainResponse, *QueryMeta, error) {
if name == "" {
return nil, nil, fmt.Errorf("Name parameter must not be empty")
}
method := "GET"
if opts != nil && opts.requiresPOST() {
method = "POST"
}
r := d.c.newRequest(method, fmt.Sprintf("/v1/discovery-chain/%s", name))
r.setQueryOptions(q)
if opts != nil {
if opts.EvaluateInDatacenter != "" {
r.params.Set("compile-dc", opts.EvaluateInDatacenter)
}
// TODO(namespaces): handle possible EvaluateInNamespace here
}
if method == "POST" {
r.obj = opts
}
rtt, resp, err := requireOK(d.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out DiscoveryChainResponse
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return &out, qm, nil
}
type DiscoveryChainOptions struct {
EvaluateInDatacenter string `json:"-"`
// OverrideMeshGateway allows for the mesh gateway setting to be overridden
// for any resolver in the compiled chain.
OverrideMeshGateway MeshGatewayConfig `json:",omitempty"`
// OverrideProtocol allows for the final protocol for the chain to be
// altered.
//
// - If the chain ordinarily would be TCP and an L7 protocol is passed here
// the chain will not include Routers or Splitters.
//
// - If the chain ordinarily would be L7 and TCP is passed here the chain
// will not include Routers or Splitters.
OverrideProtocol string `json:",omitempty"`
// OverrideConnectTimeout allows for the ConnectTimeout setting to be
// overridden for any resolver in the compiled chain.
OverrideConnectTimeout time.Duration `json:",omitempty"`
}
func (o *DiscoveryChainOptions) requiresPOST() bool {
if o == nil {
return false
}
return o.OverrideMeshGateway.Mode != "" ||
o.OverrideProtocol != "" ||
o.OverrideConnectTimeout != 0
}
type DiscoveryChainResponse struct {
Chain *CompiledDiscoveryChain
}
type CompiledDiscoveryChain struct {
ServiceName string
Namespace string
Datacenter string
// CustomizationHash is a unique hash of any data that affects the
// compilation of the discovery chain other than config entries or the
// name/namespace/datacenter evaluation criteria.
//
// If set, this value should be used to prefix/suffix any generated load
// balancer data plane objects to avoid sharing customized and
// non-customized versions.
CustomizationHash string
// Protocol is the overall protocol shared by everything in the chain.
Protocol string
// StartNode is the first key into the Nodes map that should be followed
// when walking the discovery chain.
StartNode string
// Nodes contains all nodes available for traversal in the chain keyed by a
// unique name. You can walk this by starting with StartNode.
//
// NOTE: The names should be treated as opaque values and are only
// guaranteed to be consistent within a single compilation.
Nodes map[string]*DiscoveryGraphNode
// Targets is a list of all targets used in this chain.
//
// NOTE: The names should be treated as opaque values and are only
// guaranteed to be consistent within a single compilation.
Targets map[string]*DiscoveryTarget
}
const (
DiscoveryGraphNodeTypeRouter = "router"
DiscoveryGraphNodeTypeSplitter = "splitter"
DiscoveryGraphNodeTypeResolver = "resolver"
)
// DiscoveryGraphNode is a single node in the compiled discovery chain.
type DiscoveryGraphNode struct {
Type string
Name string // this is NOT necessarily a service
// fields for Type==router
Routes []*DiscoveryRoute
// fields for Type==splitter
Splits []*DiscoverySplit
// fields for Type==resolver
Resolver *DiscoveryResolver
}
// compiled form of ServiceRoute
type DiscoveryRoute struct {
Definition *ServiceRoute
NextNode string
}
// compiled form of ServiceSplit
type DiscoverySplit struct {
Weight float32
NextNode string
}
// compiled form of ServiceResolverConfigEntry
type DiscoveryResolver struct {
Default bool
ConnectTimeout time.Duration
Target string
Failover *DiscoveryFailover
}
// compiled form of ServiceResolverFailover
type DiscoveryFailover struct {
Targets []string
}
// DiscoveryTarget represents all of the inputs necessary to use a resolver
// config entry to execute a catalog query to generate a list of service
// instances during discovery.
type DiscoveryTarget struct {
ID string
Service string
ServiceSubset string
Namespace string
Datacenter string
MeshGateway MeshGatewayConfig
Subset ServiceResolverSubset
}

180
api/discovery_chain_test.go Normal file
View File

@ -0,0 +1,180 @@
package api
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestAPI_DiscoveryChain_Get(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
config_entries := c.ConfigEntries()
discoverychain := c.DiscoveryChain()
require.True(t, t.Run("read default chain", func(t *testing.T) {
resp, _, err := discoverychain.Get("web", nil, nil)
require.NoError(t, err)
expect := &DiscoveryChainResponse{
Chain: &CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc1",
Protocol: "tcp",
StartNode: "resolver:web.default.dc1",
Nodes: map[string]*DiscoveryGraphNode{
"resolver:web.default.dc1": &DiscoveryGraphNode{
Type: DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc1",
Resolver: &DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "web.default.dc1",
},
},
},
Targets: map[string]*DiscoveryTarget{
"web.default.dc1": &DiscoveryTarget{
ID: "web.default.dc1",
Service: "web",
Namespace: "default",
Datacenter: "dc1",
},
},
},
}
require.Equal(t, expect, resp)
}))
require.True(t, t.Run("read default chain; evaluate in dc2", func(t *testing.T) {
opts := &DiscoveryChainOptions{
EvaluateInDatacenter: "dc2",
}
resp, _, err := discoverychain.Get("web", opts, nil)
require.NoError(t, err)
expect := &DiscoveryChainResponse{
Chain: &CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc2",
Protocol: "tcp",
StartNode: "resolver:web.default.dc2",
Nodes: map[string]*DiscoveryGraphNode{
"resolver:web.default.dc2": &DiscoveryGraphNode{
Type: DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc2",
Resolver: &DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "web.default.dc2",
},
},
},
Targets: map[string]*DiscoveryTarget{
"web.default.dc2": &DiscoveryTarget{
ID: "web.default.dc2",
Service: "web",
Namespace: "default",
Datacenter: "dc2",
},
},
},
}
require.Equal(t, expect, resp)
}))
{ // Now create one config entry.
ok, _, err := config_entries.Set(&ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "web",
ConnectTimeout: 33 * time.Second,
}, nil)
require.NoError(t, err)
require.True(t, ok)
}
require.True(t, t.Run("read modified chain", func(t *testing.T) {
resp, _, err := discoverychain.Get("web", nil, nil)
require.NoError(t, err)
expect := &DiscoveryChainResponse{
Chain: &CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc1",
Protocol: "tcp",
StartNode: "resolver:web.default.dc1",
Nodes: map[string]*DiscoveryGraphNode{
"resolver:web.default.dc1": &DiscoveryGraphNode{
Type: DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc1",
Resolver: &DiscoveryResolver{
ConnectTimeout: 33 * time.Second,
Target: "web.default.dc1",
},
},
},
Targets: map[string]*DiscoveryTarget{
"web.default.dc1": &DiscoveryTarget{
ID: "web.default.dc1",
Service: "web",
Namespace: "default",
Datacenter: "dc1",
},
},
},
}
require.Equal(t, expect, resp)
}))
require.True(t, t.Run("read modified chain in dc2 with overrides", func(t *testing.T) {
opts := &DiscoveryChainOptions{
EvaluateInDatacenter: "dc2",
OverrideMeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeLocal,
},
OverrideProtocol: "grpc",
OverrideConnectTimeout: 22 * time.Second,
}
resp, _, err := discoverychain.Get("web", opts, nil)
require.NoError(t, err)
expect := &DiscoveryChainResponse{
Chain: &CompiledDiscoveryChain{
ServiceName: "web",
Namespace: "default",
Datacenter: "dc2",
Protocol: "grpc",
CustomizationHash: "98809527",
StartNode: "resolver:web.default.dc2",
Nodes: map[string]*DiscoveryGraphNode{
"resolver:web.default.dc2": &DiscoveryGraphNode{
Type: DiscoveryGraphNodeTypeResolver,
Name: "web.default.dc2",
Resolver: &DiscoveryResolver{
ConnectTimeout: 22 * time.Second,
Target: "web.default.dc2",
},
},
},
Targets: map[string]*DiscoveryTarget{
"web.default.dc2": &DiscoveryTarget{
ID: "web.default.dc2",
Service: "web",
Namespace: "default",
Datacenter: "dc2",
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeLocal,
},
},
},
},
}
require.Equal(t, expect, resp)
}))
}

View File

@ -518,7 +518,6 @@ func TestParseConfigEntry(t *testing.T) {
service_subset = "sure"
namespace = "neighbor"
datacenters = ["dc5", "dc14"]
overprovisioning_factor = 150
},
"*" = {
datacenters = ["dc7"]
@ -544,7 +543,6 @@ func TestParseConfigEntry(t *testing.T) {
ServiceSubset = "sure"
Namespace = "neighbor"
Datacenters = ["dc5", "dc14"]
OverprovisioningFactor = 150
},
"*" = {
Datacenters = ["dc7"]
@ -566,11 +564,10 @@ func TestParseConfigEntry(t *testing.T) {
},
Failover: map[string]api.ServiceResolverFailover{
"v2": {
Service: "failcopy",
ServiceSubset: "sure",
Namespace: "neighbor",
Datacenters: []string{"dc5", "dc14"},
OverprovisioningFactor: 150,
Service: "failcopy",
ServiceSubset: "sure",
Namespace: "neighbor",
Datacenters: []string{"dc5", "dc14"},
},
"*": {
Datacenters: []string{"dc7"},