activate most discovery chain features in xDS for envoy (#6024)

This commit is contained in:
R.B. Boyer 2019-07-01 22:10:51 -05:00 committed by GitHub
parent bcb3439c4c
commit bccbb2b4ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1179 additions and 144 deletions

View File

@ -3945,6 +3945,15 @@ func (a *Agent) registerCache() {
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
}
// defaultProxyCommand returns the default Connect managed proxy command.

View File

@ -0,0 +1,52 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const CompiledDiscoveryChainName = "compiled-discovery-chain"
// CompiledDiscoveryChain supports fetching the complete discovery chain for a
// service and caching its compilation.
type CompiledDiscoveryChain struct {
RPC RPC
}
func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a DiscoveryChainRequest.
reqReal, ok := req.(*structs.DiscoveryChainRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached compiled-discovery-chain to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.DiscoveryChainResponse
if err := c.RPC.RPC("ConfigEntry.ReadDiscoveryChain", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *CompiledDiscoveryChain) SupportsBlocking() bool {
return true
}

View File

@ -6,6 +6,7 @@ import (
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
@ -312,3 +313,53 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
return nil
})
}
func (c *ConfigEntry) ReadDiscoveryChain(args *structs.DiscoveryChainRequest, reply *structs.DiscoveryChainResponse) error {
if done, err := c.srv.forward("ConfigEntry.ReadDiscoveryChain", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "read_discovery_chain"}, time.Now())
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceRead(args.Name) {
return acl.ErrPermissionDenied
}
if args.Name == "" {
return fmt.Errorf("Must provide service name")
}
const currentNamespace = "default"
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name)
if err != nil {
return err
}
// Then we compile it into something useful.
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
ServiceName: args.Name,
CurrentNamespace: currentNamespace,
CurrentDatacenter: c.srv.config.Datacenter,
InferDefaults: true,
Entries: entries,
})
if err != nil {
return err
}
reply.Index = index
reply.ConfigEntries = entries
reply.Chain = chain
return nil
})
}

View File

@ -519,6 +519,12 @@ RESOLVE_AGAIN:
}
groupResolver := groupResolverNode.GroupResolver
// Digest mesh gateway settings.
if serviceDefault := c.entries.GetService(resolver.Name); serviceDefault != nil {
groupResolver.MeshGateway = serviceDefault.MeshGateway
}
// TODO(rb): thread proxy-defaults version through here as well
// Retain this target even if we may not retain the group resolver.
c.targets[target] = struct{}{}

View File

@ -168,7 +168,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
return nil
}
// We are updating the proxy, close it's old state
// We are updating the proxy, close its old state
state.Close()
}

View File

@ -47,6 +47,42 @@ func TestManager_BasicLifecycle(t *testing.T) {
roots, leaf := TestCerts(t)
// Initialize a default group resolver for "db"
dbResolverEntry := &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
}
dbTarget := structs.DiscoveryTarget{
Service: "db",
Namespace: "default",
Datacenter: "dc1",
}
dbResolverNode := &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeGroupResolver,
Name: "db",
GroupResolver: &structs.DiscoveryGroupResolver{
Definition: dbResolverEntry,
Default: true,
Target: dbTarget,
},
}
dbChain := &structs.CompiledDiscoveryChain{
ServiceName: "db",
Namespace: "default",
Datacenter: "dc1",
Protocol: "tcp",
Node: dbResolverNode,
GroupResolverNodes: map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode{
dbTarget: dbResolverNode,
},
Resolvers: map[string]*structs.ServiceResolverConfigEntry{
"db": dbResolverEntry,
},
Targets: []structs.DiscoveryTarget{
dbTarget,
},
}
// Setup initial values
types.roots.value.Store(roots)
types.leaf.value.Store(leaf)
@ -55,6 +91,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
&structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodes(t),
})
types.compiledChain.value.Store(
&structs.DiscoveryChainResponse{
Chain: dbChain,
},
)
logger := log.New(os.Stderr, "", log.LstdFlags)
state := local.NewState(local.Config{}, logger, &token.Store{})
@ -116,9 +157,16 @@ func TestManager_BasicLifecycle(t *testing.T) {
Roots: roots,
ConnectProxy: configSnapshotConnectProxy{
Leaf: leaf,
UpstreamEndpoints: map[string]structs.CheckServiceNodes{
"db": TestUpstreamNodes(t),
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes{
"db": {
dbTarget: TestUpstreamNodes(t),
},
},
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
},
Datacenter: "dc1",
}

View File

@ -8,8 +8,11 @@ import (
)
type configSnapshotConnectProxy struct {
Leaf *structs.IssuedCert
UpstreamEndpoints map[string]structs.CheckServiceNodes
Leaf *structs.IssuedCert
DiscoveryChain map[string]*structs.CompiledDiscoveryChain // this is keyed by the Upstream.Identifier(), not the chain name
WatchedUpstreams map[string]map[structs.DiscoveryTarget]context.CancelFunc
WatchedUpstreamEndpoints map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes
UpstreamEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints
}
type configSnapshotMeshGateway struct {
@ -46,6 +49,7 @@ type ConfigSnapshot struct {
func (s *ConfigSnapshot) Valid() bool {
switch s.Kind {
case structs.ServiceKindConnectProxy:
// TODO(rb): sanity check discovery chain things here?
return s.Roots != nil && s.ConnectProxy.Leaf != nil
case structs.ServiceKindMeshGateway:
// TODO (mesh-gateway) - what happens if all the connect services go away
@ -65,9 +69,11 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
snap := snapCopy.(*ConfigSnapshot)
// nil these out as anything receiving one of these clones does not need them and should never "cancel" our watches
switch s.Kind {
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.WatchedUpstreams = nil
case structs.ServiceKindMeshGateway:
// nil these out as anything receiving one of these clones does not need them and should never "cancel" our watches
snap.MeshGateway.WatchedDatacenters = nil
snap.MeshGateway.WatchedServices = nil
}

View File

@ -59,7 +59,7 @@ type state struct {
// goroutine later without reasoning about races with the NodeService passed
// (especially for embedded fields like maps and slices).
//
// The returned state needs it's required dependencies to be set before Watch
// The returned state needs its required dependencies to be set before Watch
// can be called.
func newState(ns *structs.NodeService, token string) (*state, error) {
if ns.Kind != structs.ServiceKindConnectProxy && ns.Kind != structs.ServiceKindMeshGateway {
@ -141,17 +141,17 @@ func (s *state) initWatches() error {
}
}
func (s *state) watchConnectProxyService(correlationId string, service string, dc string, filter string, meshGatewayMode structs.MeshGatewayMode) error {
func (s *state) watchConnectProxyService(ctx context.Context, correlationId string, service string, dc string, filter string, meshGatewayMode structs.MeshGatewayMode) error {
switch meshGatewayMode {
case structs.MeshGatewayModeRemote:
return s.cache.Notify(s.ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceKind: structs.ServiceKindMeshGateway,
UseServiceKind: true,
}, correlationId, s.ch)
case structs.MeshGatewayModeLocal:
return s.cache.Notify(s.ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceKind: structs.ServiceKindMeshGateway,
@ -159,7 +159,7 @@ func (s *state) watchConnectProxyService(correlationId string, service string, d
}, correlationId, s.ch)
default:
// This includes both the None and Default modes on purpose
return s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{
Token: s.token,
@ -218,6 +218,7 @@ func (s *state) initWatchesConnectProxy() error {
for _, u := range s.proxyCfg.Upstreams {
dc := s.source.Datacenter
if u.Datacenter != "" {
// TODO(rb): if we ASK for a specific datacenter, do we still use the chain?
dc = u.Datacenter
}
@ -232,15 +233,47 @@ func (s *state) initWatchesConnectProxy() error {
case structs.UpstreamDestTypeService:
fallthrough
case "": // Treat unset as the default Service type
meshGateway := structs.MeshGatewayModeNone
// TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point
// Determine if this should use a discovery chain.
//
// TODO(rb): reduce this list of exceptions
var shouldUseDiscoveryChain bool
if dc != s.source.Datacenter {
meshGateway = u.MeshGateway.Mode
shouldUseDiscoveryChain = false
} else if u.DestinationNamespace != "" && u.DestinationNamespace != "default" {
shouldUseDiscoveryChain = false
} else {
shouldUseDiscoveryChain = true
}
if err := s.watchConnectProxyService("upstream:"+serviceIDPrefix+u.Identifier(), u.DestinationName, dc, "", meshGateway); err != nil {
return err
if shouldUseDiscoveryChain {
// Watch for discovery chain configuration updates
err = s.cache.Notify(s.ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: u.DestinationName,
}, "discovery-chain:"+u.Identifier(), s.ch)
if err != nil {
return err
}
} else {
meshGateway := structs.MeshGatewayModeNone
// TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point
if dc != s.source.Datacenter {
meshGateway = u.MeshGateway.Mode
}
if err := s.watchConnectProxyService(
s.ctx,
"upstream:"+serviceIDPrefix+u.Identifier(),
u.DestinationName,
dc,
"",
meshGateway,
); err != nil {
return err
}
}
default:
@ -307,7 +340,10 @@ func (s *state) run() {
switch s.kind {
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[structs.DiscoveryTarget]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes) // TODO(rb): deprecated
case structs.ServiceKindMeshGateway:
snap.MeshGateway.WatchedServices = make(map[string]context.CancelFunc)
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
@ -400,44 +436,212 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
}
func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
switch u.CorrelationID {
case rootsWatchID:
switch {
case u.CorrelationID == rootsWatchID:
roots, ok := u.Result.(*structs.IndexedCARoots)
if !ok {
return fmt.Errorf("invalid type for roots response: %T", u.Result)
}
snap.Roots = roots
case leafWatchID:
case u.CorrelationID == leafWatchID:
leaf, ok := u.Result.(*structs.IssuedCert)
if !ok {
return fmt.Errorf("invalid type for leaf response: %T", u.Result)
}
snap.ConnectProxy.Leaf = leaf
case intentionsWatchID:
case u.CorrelationID == intentionsWatchID:
// Not in snapshot currently, no op
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain
if err := s.resetWatchesFromChain(svc, resp.Chain, snap); err != nil {
return err
}
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
encTarget, svc, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
target := structs.DiscoveryTarget{}
if err := target.UnmarshalText([]byte(encTarget)); err != nil {
return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err)
}
// TODO(rb): do we have to do onlypassing filters here?
m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc]
if !ok {
m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedUpstreamEndpoints[svc] = m
}
snap.ConnectProxy.WatchedUpstreamEndpoints[svc][target] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix)
snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
if !ok {
return fmt.Errorf("invalid type for prepared query response: %T", u.Result)
}
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes
default:
// Service discovery result, figure out which type
switch {
case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix)
snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes
return errors.New("unknown correlation ID")
}
return nil
}
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
if !ok {
return fmt.Errorf("invalid type for prepared query response: %T", u.Result)
}
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes
func removeColonPrefix(s string) (string, string, bool) {
idx := strings.Index(s, ":")
if idx == -1 {
return "", "", false
}
return s[0:idx], s[idx+1:], true
}
default:
return errors.New("unknown correlation ID")
func (s *state) resetWatchesFromChain(
id string,
chain *structs.CompiledDiscoveryChain,
snap *ConfigSnapshot,
) error {
if chain == nil {
return fmt.Errorf("not possible to arrive here with no discovery chain")
}
// Collect all sorts of catalog queries we'll have to run.
targets := make(map[structs.DiscoveryTarget]*structs.ServiceResolverConfigEntry)
addTarget := func(target structs.DiscoveryTarget) error {
resolver, ok := chain.Resolvers[target.Service]
if !ok {
return fmt.Errorf("missing resolver %q for target %s", target.Service, target)
}
targets[target] = resolver
return nil
}
// NOTE: We will NEVER see a missing chain, because we always request it with defaulting enabled.
meshGatewayModes := make(map[structs.DiscoveryTarget]structs.MeshGatewayMode)
for _, group := range chain.GroupResolverNodes {
groupResolver := group.GroupResolver
meshGatewayModes[groupResolver.Target] = groupResolver.MeshGateway.Mode
if err := addTarget(groupResolver.Target); err != nil {
return err
}
if groupResolver.Failover != nil {
for _, target := range groupResolver.Failover.Targets {
if err := addTarget(target); err != nil {
return err
}
}
}
}
// Initialize relevant sub maps.
if _, ok := snap.ConnectProxy.WatchedUpstreams[id]; !ok {
snap.ConnectProxy.WatchedUpstreams[id] = make(map[structs.DiscoveryTarget]context.CancelFunc)
}
if _, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[id]; !ok {
// TODO(rb): does this belong here?
snap.ConnectProxy.WatchedUpstreamEndpoints[id] = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
}
// We could invalidate this selectively based on a hash of the relevant
// resolver information, but for now just reset anything about this
// upstream when the chain changes in any way.
//
// TODO(rb): content hash based add/remove
for target, cancelFn := range snap.ConnectProxy.WatchedUpstreams[id] {
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: stopping watch of target %s", id, chain.ServiceName, target)
delete(snap.ConnectProxy.WatchedUpstreams[id], target)
delete(snap.ConnectProxy.WatchedUpstreamEndpoints[id], target) // TODO(rb): safe?
cancelFn()
}
for target, resolver := range targets {
if target.Service != resolver.Name {
panic(target.Service + " != " + resolver.Name) // TODO(rb): remove
}
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target)
// snap.WatchedUpstreams[name]
// delete(snap.WatchedUpstreams[name], target)
// delete(snap.WatchedUpstreamEndpoint[name], target)
// TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly
// TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters
// TODO(rb): handle subset.onlypassing
var subset structs.ServiceResolverSubset
if target.ServiceSubset != "" {
var ok bool
subset, ok = resolver.Subsets[target.ServiceSubset]
if !ok {
// Not possible really.
return fmt.Errorf("target %s cannot be resolved; service %q does not have a subset named %q", target, target.Service, target.ServiceSubset)
}
}
encodedTarget, err := target.MarshalText()
if err != nil {
return fmt.Errorf("target %s cannot be converted into a cache key string: %v", target, err)
}
ctx, cancel := context.WithCancel(s.ctx)
meshGateway := structs.MeshGatewayModeNone
if target.Datacenter != s.source.Datacenter {
meshGateway = meshGatewayModes[target]
if meshGateway == structs.MeshGatewayModeDefault {
meshGateway = structs.MeshGatewayModeNone
}
} else {
meshGateway = structs.MeshGatewayModeNone
}
// TODO(rb): update the health endpoint to allow returning even unhealthy endpoints
err = s.watchConnectProxyService(
ctx,
"upstream-target:"+string(encodedTarget)+":"+id,
target.Service,
target.Datacenter,
subset.Filter,
meshGateway,
)
if err != nil {
cancel()
return err
}
snap.ConnectProxy.WatchedUpstreams[id][target] = cancel
}
return nil
}

View File

@ -17,11 +17,12 @@ import (
// TestCacheTypes encapsulates all the different cache types proxycfg.State will
// watch/request for controlling one during testing.
type TestCacheTypes struct {
roots *ControllableCacheType
leaf *ControllableCacheType
intentions *ControllableCacheType
health *ControllableCacheType
query *ControllableCacheType
roots *ControllableCacheType
leaf *ControllableCacheType
intentions *ControllableCacheType
health *ControllableCacheType
query *ControllableCacheType
compiledChain *ControllableCacheType
}
// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that
@ -29,11 +30,12 @@ type TestCacheTypes struct {
func NewTestCacheTypes(t testing.T) *TestCacheTypes {
t.Helper()
ct := &TestCacheTypes{
roots: NewControllableCacheType(t),
leaf: NewControllableCacheType(t),
intentions: NewControllableCacheType(t),
health: NewControllableCacheType(t),
query: NewControllableCacheType(t),
roots: NewControllableCacheType(t),
leaf: NewControllableCacheType(t),
intentions: NewControllableCacheType(t),
health: NewControllableCacheType(t),
query: NewControllableCacheType(t),
compiledChain: NewControllableCacheType(t),
}
ct.query.blocking = false
return ct
@ -66,6 +68,12 @@ func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{
Refresh: false,
})
c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
return c
}

View File

@ -4,9 +4,12 @@ import (
"fmt"
"math"
"sort"
"strconv"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/mitchellh/hashstructure"
)
// ServiceRouterConfigEntry defines L7 (e.g. http) routing rules for a named
@ -917,6 +920,52 @@ func (e *DiscoveryChainConfigEntries) IsChainEmpty() bool {
return len(e.Routers) == 0 && len(e.Splitters) == 0 && len(e.Resolvers) == 0
}
// DiscoveryChainRequest is used when requesting the discovery chain for a
// service.
type DiscoveryChainRequest struct {
Name string
Datacenter string
// Source QuerySource
QueryOptions
}
func (r *DiscoveryChainRequest) RequestDatacenter() string {
return r.Datacenter
}
func (r *DiscoveryChainRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
Token: r.Token,
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
}
v, err := hashstructure.Hash(struct {
Name string
}{
Name: r.Name,
}, nil)
if err == nil {
// If there is an error, we don't set the key. A blank key forces
// no cache for this request so the request is forwarded directly
// to the server.
info.Key = strconv.FormatUint(v, 10)
}
return info
}
// TODO(rb): either fix the compiled results, or take the derived data and stash it here in a json/msgpack-friendly way?
type DiscoveryChainResponse struct {
ConfigEntries *DiscoveryChainConfigEntries `json:",omitempty"` // TODO(rb): remove these?
Chain *CompiledDiscoveryChain `json:",omitempty"`
QueryMeta
}
type ConfigEntryGraphError struct {
// one of Message or Err should be set
Message string

View File

@ -73,6 +73,7 @@ type DiscoveryGroupResolver struct {
Definition *ServiceResolverConfigEntry `json:",omitempty"`
Default bool `json:",omitempty"`
ConnectTimeout time.Duration `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty"`
Target DiscoveryTarget `json:",omitempty"`
Failover *DiscoveryFailover `json:",omitempty"`
}
@ -90,6 +91,7 @@ type DiscoverySplit struct {
}
// compiled form of ServiceResolverFailover
// TODO(rb): figure out how to get mesh gateways in here
type DiscoveryFailover struct {
Definition *ServiceResolverFailover `json:",omitempty"`
Targets []DiscoveryTarget `json:",omitempty"`
@ -212,7 +214,7 @@ func (t DiscoveryTarget) String() string {
if t.Namespace != "" {
b.WriteString(t.Namespace)
} else {
b.WriteString("default")
b.WriteString("<default>")
}
b.WriteRune('.')

View File

@ -11,6 +11,7 @@ import (
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
envoytype "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
@ -38,19 +39,41 @@ func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token st
// clustersFromSnapshot returns the xDS API representation of the "clusters"
// (upstreams) in the snapshot.
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
// Include the "app" cluster for the public listener
// TODO(rb): this sizing is a low bound.
clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)
var err error
clusters[0], err = s.makeAppCluster(cfgSnap)
// Include the "app" cluster for the public listener
appCluster, err := s.makeAppCluster(cfgSnap)
if err != nil {
return nil, err
}
for idx, upstream := range cfgSnap.Proxy.Upstreams {
clusters[idx+1], err = s.makeUpstreamCluster(upstream, cfgSnap)
if err != nil {
return nil, err
clusters = append(clusters, appCluster)
for _, u := range cfgSnap.Proxy.Upstreams {
id := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
}
if chain == nil || chain.IsDefault() {
// Either old-school upstream or prepared query.
upstreamCluster, err := s.makeUpstreamCluster(u, cfgSnap)
if err != nil {
return nil, err
}
clusters = append(clusters, upstreamCluster)
} else {
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, chain, cfgSnap)
if err != nil {
return nil, err
}
for _, cluster := range upstreamClusters {
clusters = append(clusters, cluster)
}
}
}
@ -197,6 +220,85 @@ func (s *Server) makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycf
return c, nil
}
func (s *Server) makeUpstreamClustersForDiscoveryChain(
upstreamID string,
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
) ([]*envoy.Cluster, error) {
if chain == nil {
panic("chain must be provided")
}
// TODO(rb): make escape hatches work with chains
var out []*envoy.Cluster
for target, node := range chain.GroupResolverNodes {
groupResolver := node.GroupResolver
// TODO(rb): failover
// Failover *DiscoveryFailover `json:",omitempty"` // sad path
clusterName := makeClusterName(upstreamID, target, cfgSnap.Datacenter)
c := &envoy.Cluster{
Name: clusterName,
AltStatName: clusterName, // TODO(rb): change this?
ConnectTimeout: groupResolver.ConnectTimeout,
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
CommonLbConfig: &envoy.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoytype.Percent{
Value: 0, // disable panic threshold
},
},
// TODO(rb): adjust load assignment
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
// Having an empty config enables outlier detection with default config.
OutlierDetection: &envoycluster.OutlierDetection{},
}
if chain.Protocol == "http2" || chain.Protocol == "grpc" {
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
}
// Enable TLS upstream with the configured client certificate.
c.TlsContext = &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(cfgSnap),
}
out = append(out, c)
}
return out, nil
}
// makeClusterName returns a string representation that uniquely identifies the
// cluster in a canonical but human readable way.
func makeClusterName(upstreamID string, target structs.DiscoveryTarget, currentDatacenter string) string {
var name string
if target.ServiceSubset != "" {
name = target.Service + "/" + target.ServiceSubset
} else {
name = target.Service
}
if target.Namespace != "" && target.Namespace != "default" {
name = target.Namespace + "/" + name
}
if target.Datacenter != "" && target.Datacenter != currentDatacenter {
name += "?dc=" + target.Datacenter
}
if upstreamID == target.Service {
// In the common case don't stutter.
return name
}
return upstreamID + "//" + name
}
// makeClusterFromUserConfig returns the listener config decoded from an
// arbitrary proto3 json format string or an error if it's invalid.
//

View File

@ -33,11 +33,94 @@ func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token s
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
// (upstream instances) in the snapshot.
func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
// TODO(rb): this sizing is a low bound.
resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.UpstreamEndpoints))
for id, endpoints := range cfgSnap.ConnectProxy.UpstreamEndpoints {
la := makeLoadAssignment(id, endpoints, cfgSnap.Datacenter)
resources = append(resources, la)
// TODO(rb): should naming from 1.5 -> 1.6 for clusters remain unchanged?
for _, u := range cfgSnap.Proxy.Upstreams {
id := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
}
if chain == nil {
// We ONLY want this branch for prepared queries.
endpoints, ok := cfgSnap.ConnectProxy.UpstreamEndpoints[id]
if ok {
la := makeLoadAssignment(
id,
0,
[]structs.CheckServiceNodes{endpoints},
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
} else {
// Newfangled discovery chain plumbing.
chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok {
continue // TODO(rb): whaaaa?
}
for target, node := range chain.GroupResolverNodes {
groupResolver := node.GroupResolver
failover := groupResolver.Failover
endpoints, ok := chainEndpointMap[target]
if !ok {
continue // TODO(rb): whaaaa?
}
var (
priorityEndpoints []structs.CheckServiceNodes
overprovisioningFactor int
)
if failover != nil && len(failover.Targets) > 0 {
priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1)
priorityEndpoints = append(priorityEndpoints, endpoints)
if failover.Definition.OverprovisioningFactor > 0 {
overprovisioningFactor = failover.Definition.OverprovisioningFactor
}
if overprovisioningFactor <= 0 {
// We choose such a large value here that the failover math should
// in effect not happen until zero instances are healthy.
overprovisioningFactor = 100000
}
for _, failTarget := range failover.Targets {
failEndpoints, ok := chainEndpointMap[failTarget]
if ok {
priorityEndpoints = append(priorityEndpoints, failEndpoints)
}
}
} else {
priorityEndpoints = []structs.CheckServiceNodes{
endpoints,
}
}
clusterName := makeClusterName(id, target, cfgSnap.Datacenter)
la := makeLoadAssignment(
clusterName,
overprovisioningFactor,
priorityEndpoints,
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
}
}
return resources, nil
}
@ -47,14 +130,28 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
// generate the endpoints for the gateways in the remote datacenters
for dc, endpoints := range cfgSnap.MeshGateway.GatewayGroups {
clusterName := DatacenterSNI(dc, cfgSnap)
la := makeLoadAssignment(clusterName, endpoints, cfgSnap.Datacenter)
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
// generate the endpoints for the local service groups
for svc, endpoints := range cfgSnap.MeshGateway.ServiceGroups {
clusterName := ServiceSNI(svc, "default", cfgSnap.Datacenter, cfgSnap)
la := makeLoadAssignment(clusterName, endpoints, cfgSnap.Datacenter)
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
@ -63,56 +160,78 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
return envoyendpoint.LbEndpoint{
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(host, port),
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(host, port),
},
},
}}
}
}
func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes, localDatacenter string) *envoy.ClusterLoadAssignment {
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))
for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter)
healthStatus := envoycore.HealthStatus_HEALTHY
weight := 1
if ep.Service.Weights != nil {
weight = ep.Service.Weights.Passing
func makeLoadAssignment(
clusterName string,
overprovisioningFactor int,
priorityEndpoints []structs.CheckServiceNodes,
localDatacenter string,
) *envoy.ClusterLoadAssignment {
cla := &envoy.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)),
}
if overprovisioningFactor > 0 {
cla.Policy = &envoy.ClusterLoadAssignment_Policy{
OverprovisioningFactor: makeUint32Value(overprovisioningFactor),
}
}
for priority, endpoints := range priorityEndpoints {
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))
for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter)
healthStatus := envoycore.HealthStatus_HEALTHY
weight := 1
if ep.Service.Weights != nil {
weight = ep.Service.Weights.Passing
}
for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
weight = ep.Service.Weights.Warning
}
}
// Make weights fit Envoy's limits. A zero weight means that either Warning
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
// this instance unhealthy and should not be sent traffic.
if weight < 1 {
healthStatus = envoycore.HealthStatus_UNHEALTHY
weight = 1
}
if weight > 128 {
weight = 128
}
es = append(es, envoyendpoint.LbEndpoint{
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, port),
},
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
weight = ep.Service.Weights.Warning
}
}
// Make weights fit Envoy's limits. A zero weight means that either Warning
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
// this instance unhealthy and should not be sent traffic.
if weight < 1 {
healthStatus = envoycore.HealthStatus_UNHEALTHY
weight = 1
}
if weight > 128 {
weight = 128
}
es = append(es, envoyendpoint.LbEndpoint{
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, port),
}},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
cla.Endpoints = append(cla.Endpoints, envoyendpoint.LocalityLbEndpoints{
Priority: uint32(priority),
LbEndpoints: es,
})
}
return &envoy.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: es,
}},
}
return cla
}

View File

@ -94,15 +94,18 @@ func Test_makeLoadAssignment(t *testing.T) {
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"
tests := []struct {
name string
clusterName string
endpoints structs.CheckServiceNodes
want *envoy.ClusterLoadAssignment
name string
clusterName string
overprovisioningFactor int
endpoints []structs.CheckServiceNodes
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
clusterName: "service:test",
endpoints: structs.CheckServiceNodes{},
endpoints: []structs.CheckServiceNodes{
{},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
@ -113,7 +116,9 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, no weights",
clusterName: "service:test",
endpoints: testCheckServiceNodes,
endpoints: []structs.CheckServiceNodes{
testCheckServiceNodes,
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
@ -141,7 +146,9 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, healthy weights",
clusterName: "service:test",
endpoints: testWeightedCheckServiceNodes,
endpoints: []structs.CheckServiceNodes{
testWeightedCheckServiceNodes,
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
@ -169,7 +176,9 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, warning weights",
clusterName: "service:test",
endpoints: testWarningCheckServiceNodes,
endpoints: []structs.CheckServiceNodes{
testWarningCheckServiceNodes,
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
@ -197,7 +206,7 @@ func Test_makeLoadAssignment(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.endpoints, "dc1")
got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1")
require.Equal(t, tt.want, got)
})
}

View File

@ -52,10 +52,23 @@ func (s *Server) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
return nil, err
}
for i, u := range cfgSnap.Proxy.Upstreams {
resources[i+1], err = s.makeUpstreamListener(&u)
id := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
}
var upstreamListener proto.Message
if chain == nil || chain.IsDefault() {
upstreamListener, err = s.makeUpstreamListener(&u, cfgSnap)
} else {
upstreamListener, err = s.makeUpstreamListenerForDiscoveryChain(&u, chain, cfgSnap)
}
if err != nil {
return nil, err
}
resources[i+1] = upstreamListener
}
return resources, nil
}
@ -226,7 +239,7 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri
}
l = makeListener(PublicListenerName, addr, cfgSnap.Port)
filter, err := makeListenerFilter(cfg.Protocol, "public_listener", LocalAppClusterName, "", true)
filter, err := makeListenerFilter(false, cfg.Protocol, "public_listener", LocalAppClusterName, "", true)
if err != nil {
return nil, err
}
@ -243,7 +256,7 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri
return l, err
}
func (s *Server) makeUpstreamListener(u *structs.Upstream) (proto.Message, error) {
func (s *Server) makeUpstreamListener(u *structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (proto.Message, error) {
cfg, err := ParseUpstreamConfig(u.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
@ -260,11 +273,16 @@ func (s *Server) makeUpstreamListener(u *structs.Upstream) (proto.Message, error
addr = "127.0.0.1"
}
l := makeListener(u.Identifier(), addr, u.LocalBindPort)
filter, err := makeListenerFilter(cfg.Protocol, u.Identifier(), u.Identifier(), "upstream_", false)
upstreamID := u.Identifier()
clusterName := upstreamID
l := makeListener(upstreamID, addr, u.LocalBindPort)
filter, err := makeListenerFilter(false, cfg.Protocol, upstreamID, clusterName, "upstream_", false)
if err != nil {
return nil, err
}
l.FilterChains = []envoylistener.FilterChain{
{
Filters: []envoylistener.Filter{
@ -330,14 +348,44 @@ func (s *Server) makeGatewayListener(name, addr string, port int, cfgSnap *proxy
return l, nil
}
func makeListenerFilter(protocol, filterName, cluster, statPrefix string, ingress bool) (envoylistener.Filter, error) {
func (s *Server) makeUpstreamListenerForDiscoveryChain(
u *structs.Upstream,
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
) (proto.Message, error) {
// TODO(rb): make the listener escape hatch work again
addr := u.LocalBindAddress
if addr == "" {
addr = "127.0.0.1"
}
upstreamID := u.Identifier()
l := makeListener(upstreamID, addr, u.LocalBindPort)
filter, err := makeListenerFilter(true, chain.Protocol, upstreamID, "", "upstream_", false)
if err != nil {
return nil, err
}
l.FilterChains = []envoylistener.FilterChain{
{
Filters: []envoylistener.Filter{
filter,
},
},
}
return l, nil
}
func makeListenerFilter(useRDS bool, protocol, filterName, cluster, statPrefix string, ingress bool) (envoylistener.Filter, error) {
switch protocol {
case "grpc":
return makeHTTPFilter(filterName, cluster, statPrefix, ingress, true, true)
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, true, true)
case "http2":
return makeHTTPFilter(filterName, cluster, statPrefix, ingress, false, true)
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, true)
case "http":
return makeHTTPFilter(filterName, cluster, statPrefix, ingress, false, false)
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, false)
case "tcp":
fallthrough
default:
@ -375,7 +423,11 @@ func makeStatPrefix(protocol, prefix, filterName string) string {
return fmt.Sprintf("%s%s_%s", prefix, strings.Replace(filterName, ":", "_", -1), protocol)
}
func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2 bool) (envoylistener.Filter, error) {
func makeHTTPFilter(
useRDS bool,
filterName, cluster, statPrefix string,
ingress, grpc, http2 bool,
) (envoylistener.Filter, error) {
op := envoyhttp.INGRESS
if !ingress {
op = envoyhttp.EGRESS
@ -387,7 +439,39 @@ func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2
cfg := &envoyhttp.HttpConnectionManager{
StatPrefix: makeStatPrefix(proto, statPrefix, filterName),
CodecType: envoyhttp.AUTO,
RouteSpecifier: &envoyhttp.HttpConnectionManager_RouteConfig{
HttpFilters: []*envoyhttp.HttpFilter{
&envoyhttp.HttpFilter{
Name: "envoy.router",
},
},
Tracing: &envoyhttp.HttpConnectionManager_Tracing{
OperationName: op,
// Don't trace any requests by default unless the client application
// explicitly propagates trace headers that indicate this should be
// sampled.
RandomSampling: &envoytype.Percent{Value: 0.0},
},
}
if useRDS {
if cluster != "" {
return envoylistener.Filter{}, fmt.Errorf("cannot specify cluster name when using RDS")
}
cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_Rds{
Rds: &envoyhttp.Rds{
RouteConfigName: filterName,
ConfigSource: envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
}
} else {
if cluster == "" {
return envoylistener.Filter{}, fmt.Errorf("must specify cluster name when not using RDS")
}
cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_RouteConfig{
RouteConfig: &envoy.RouteConfiguration{
Name: filterName,
VirtualHosts: []envoyroute.VirtualHost{
@ -418,19 +502,7 @@ func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2
},
},
},
},
HttpFilters: []*envoyhttp.HttpFilter{
&envoyhttp.HttpFilter{
Name: "envoy.router",
},
},
Tracing: &envoyhttp.HttpConnectionManager_Tracing{
OperationName: op,
// Don't trace any requests by default unless the client application
// explicitly propagates trace headers that indicate this should be
// sampled.
RandomSampling: &envoytype.Percent{Value: 0.0},
},
}
}
if http2 {

View File

@ -57,3 +57,7 @@ func makeAddressPtr(ip string, port int) *envoycore.Address {
func makeUint32Value(n int) *prototypes.UInt32Value {
return &prototypes.UInt32Value{Value: uint32(n)}
}
func makeBoolValue(n bool) *prototypes.BoolValue {
return &prototypes.BoolValue{Value: n}
}

View File

@ -2,18 +2,298 @@ package xds
import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
// routesFromSnapshot returns the xDS API representation of the "routes"
// in the snapshot.
// routesFromSnapshot returns the xDS API representation of the "routes" in the
// snapshot.
func routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
// We don't support routes yet but probably will later
return nil, nil
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
return routesFromSnapshotConnectProxy(cfgSnap, token)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
}
// routesFromSnapshotConnectProxy returns the xDS API representation of the
// "routes" in the snapshot.
func routesFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
var resources []proto.Message
for _, u := range cfgSnap.Proxy.Upstreams {
upstreamID := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[upstreamID]
}
if chain == nil || chain.IsDefault() {
// TODO(rb): make this do the old school stuff too
} else {
upstreamRoute, err := makeUpstreamRouteForDiscoveryChain(&u, chain, cfgSnap)
if err != nil {
return nil, err
}
if upstreamRoute != nil {
resources = append(resources, upstreamRoute)
}
}
}
// TODO(rb): make sure we don't generate an empty result
return resources, nil
}
func makeUpstreamRouteForDiscoveryChain(
u *structs.Upstream,
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
) (*envoy.RouteConfiguration, error) {
upstreamID := u.Identifier()
routeName := upstreamID
var routes []envoyroute.Route
switch chain.Node.Type {
case structs.DiscoveryGraphNodeTypeRouter:
routes = make([]envoyroute.Route, 0, len(chain.Node.Routes))
for _, discoveryRoute := range chain.Node.Routes {
routeMatch := makeRouteMatchForDiscoveryRoute(discoveryRoute, chain.Protocol)
// TODO(rb): handle PrefixRewrite
// TODO(rb): handle RequestTimeout
// TODO(rb): handle Retries
var (
routeAction *envoyroute.Route_Route
err error
)
next := discoveryRoute.DestinationNode
if next.Type == structs.DiscoveryGraphNodeTypeSplitter {
routeAction, err = makeRouteActionForSplitter(upstreamID, cfgSnap.Datacenter, next.Splits)
if err != nil {
return nil, err
}
} else if next.Type == structs.DiscoveryGraphNodeTypeGroupResolver {
groupResolver := next.GroupResolver
routeAction = makeRouteActionForSingleCluster(upstreamID, cfgSnap.Datacenter, groupResolver.Target)
} else {
return nil, fmt.Errorf("unexpected graph node after route %q", next.Type)
}
routes = append(routes, envoyroute.Route{
Match: routeMatch,
Action: routeAction,
})
}
case structs.DiscoveryGraphNodeTypeSplitter:
routeAction, err := makeRouteActionForSplitter(upstreamID, cfgSnap.Datacenter, chain.Node.Splits)
if err != nil {
return nil, err
}
defaultRoute := envoyroute.Route{
Match: makeDefaultRouteMatch(),
Action: routeAction,
}
routes = []envoyroute.Route{defaultRoute}
case structs.DiscoveryGraphNodeTypeGroupResolver:
groupResolver := chain.Node.GroupResolver
routeAction := makeRouteActionForSingleCluster(upstreamID, cfgSnap.Datacenter, groupResolver.Target)
defaultRoute := envoyroute.Route{
Match: makeDefaultRouteMatch(),
Action: routeAction,
}
routes = []envoyroute.Route{defaultRoute}
default:
panic("unknown top node in discovery chain of type: " + chain.Node.Type)
}
return &envoy.RouteConfiguration{
Name: routeName,
VirtualHosts: []envoyroute.VirtualHost{
envoyroute.VirtualHost{
Name: routeName,
Domains: []string{"*"},
Routes: routes,
},
},
// ValidateClusters defaults to true when defined statically and false
// when done via RDS. Re-set the sane value of true to prevent
// null-routing traffic.
ValidateClusters: makeBoolValue(true),
}, nil
}
func makeRouteMatchForDiscoveryRoute(discoveryRoute *structs.DiscoveryRoute, protocol string) envoyroute.RouteMatch {
switch protocol {
case "http", "http2":
// The only match stanza is HTTP.
default:
return makeDefaultRouteMatch()
}
match := discoveryRoute.Definition.Match
if match == nil || match.IsEmpty() {
return makeDefaultRouteMatch()
}
em := envoyroute.RouteMatch{}
switch {
case match.HTTP.PathExact != "":
em.PathSpecifier = &envoyroute.RouteMatch_Path{Path: "/"}
case match.HTTP.PathPrefix != "":
em.PathSpecifier = &envoyroute.RouteMatch_Prefix{Prefix: "/"}
case match.HTTP.PathRegex != "":
em.PathSpecifier = &envoyroute.RouteMatch_Regex{Regex: "/"}
default:
em.PathSpecifier = &envoyroute.RouteMatch_Prefix{Prefix: "/"}
}
if len(match.HTTP.Header) > 0 {
em.Headers = make([]*envoyroute.HeaderMatcher, 0, len(match.HTTP.Header))
for _, hdr := range match.HTTP.Header {
eh := &envoyroute.HeaderMatcher{
Name: hdr.Name,
}
switch {
case hdr.Exact != "":
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_ExactMatch{
ExactMatch: hdr.Exact,
}
case hdr.Regex != "":
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_RegexMatch{
RegexMatch: hdr.Regex,
}
case hdr.Prefix != "":
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PrefixMatch{
PrefixMatch: hdr.Prefix,
}
case hdr.Suffix != "":
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_SuffixMatch{
SuffixMatch: hdr.Suffix,
}
case hdr.Present:
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PresentMatch{
PresentMatch: true,
}
case hdr.Invert: // THIS HAS TO BE LAST
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PresentMatch{
// We set this to the misleading value of 'true' here
// because we'll generically invert it next.
PresentMatch: true,
}
default:
continue // skip this impossible situation
}
if hdr.Invert {
eh.InvertMatch = true
}
em.Headers = append(em.Headers, eh)
}
}
if len(match.HTTP.QueryParam) > 0 {
em.QueryParameters = make([]*envoyroute.QueryParameterMatcher, 0, len(match.HTTP.QueryParam))
for _, qm := range match.HTTP.QueryParam {
eq := &envoyroute.QueryParameterMatcher{
Name: qm.Name,
Value: qm.Value,
Regex: makeBoolValue(qm.Regex),
}
em.QueryParameters = append(em.QueryParameters, eq)
}
}
return em
}
func makeDefaultRouteMatch() envoyroute.RouteMatch {
return envoyroute.RouteMatch{
PathSpecifier: &envoyroute.RouteMatch_Prefix{
Prefix: "/",
},
// TODO(banks) Envoy supports matching only valid GRPC
// requests which might be nice to add here for gRPC services
// but it's not supported in our current envoy SDK version
// although docs say it was supported by 1.8.0. Going to defer
// that until we've updated the deps.
}
}
func makeRouteActionForSingleCluster(upstreamID, currentDatacenter string, target structs.DiscoveryTarget) *envoyroute.Route_Route {
clusterName := makeClusterName(upstreamID, target, currentDatacenter)
return &envoyroute.Route_Route{
Route: &envoyroute.RouteAction{
ClusterSpecifier: &envoyroute.RouteAction_Cluster{
Cluster: clusterName,
},
},
}
}
func makeRouteActionForSplitter(upstreamID, currentDatacenter string, splits []*structs.DiscoverySplit) (*envoyroute.Route_Route, error) {
clusters := make([]*envoyroute.WeightedCluster_ClusterWeight, 0, len(splits))
for _, split := range splits {
if split.Node.Type != structs.DiscoveryGraphNodeTypeGroupResolver {
return nil, fmt.Errorf("unexpected splitter destination node type: %s", split.Node.Type)
}
groupResolver := split.Node.GroupResolver
target := groupResolver.Target
clusterName := makeClusterName(upstreamID, target, currentDatacenter)
// TODO(rb): scale up by 100 and adjust total weight
cw := &envoyroute.WeightedCluster_ClusterWeight{
Weight: makeUint32Value(int(split.Weight)),
Name: clusterName,
}
clusters = append(clusters, cw)
}
return &envoyroute.Route_Route{
Route: &envoyroute.RouteAction{
ClusterSpecifier: &envoyroute.RouteAction_WeightedClusters{
WeightedClusters: &envoyroute.WeightedCluster{
Clusters: clusters,
TotalWeight: makeUint32Value(100),
},
},
},
}, nil
}

View File

@ -0,0 +1 @@
/workdir

View File

@ -111,10 +111,23 @@ function get_envoy_stats_flush_interval {
echo "$output" | jq --raw-output '.configs[0].bootstrap.stats_flush_interval'
}
# snapshot_envoy_admin is meant to be used from a teardown scriptlet from the host.
function snapshot_envoy_admin {
local HOSTPORT=$1
local ENVOY_NAME=$2
docker_wget "http://${HOSTPORT}/config_dump" -q -O - > "./workdir/envoy/${ENVOY_NAME}-config_dump.json"
docker_wget "http://${HOSTPORT}/clusters" -q -O - > "./workdir/envoy/${ENVOY_NAME}-clusters.out"
}
function docker_consul {
docker run -ti --rm --network container:envoy_consul_1 consul-dev $@
}
function docker_wget {
docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@
}
function must_match_in_statsd_logs {
run cat /workdir/statsd/statsd.log
COUNT=$( echo "$output" | grep -Ec $1 )