digest the proxy-defaults protocol into the graph (#6050)

This commit is contained in:
R.B. Boyer 2019-07-02 11:01:17 -05:00 committed by GitHub
parent bccbb2b4ae
commit a1900754db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 354 additions and 57 deletions

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/mitchellh/mapstructure"
) )
type CompileRequest struct { type CompileRequest struct {
@ -128,9 +129,22 @@ func (c *compiler) recordServiceProtocol(serviceName string) error {
if serviceDefault := c.entries.GetService(serviceName); serviceDefault != nil { if serviceDefault := c.entries.GetService(serviceName); serviceDefault != nil {
return c.recordProtocol(serviceName, serviceDefault.Protocol) return c.recordProtocol(serviceName, serviceDefault.Protocol)
} }
if c.entries.GlobalProxy != nil {
var cfg proxyConfig
// Ignore errors and fallback on defaults if it does happen.
_ = mapstructure.WeakDecode(c.entries.GlobalProxy.Config, &cfg)
if cfg.Protocol != "" {
return c.recordProtocol(serviceName, cfg.Protocol)
}
}
return c.recordProtocol(serviceName, "") return c.recordProtocol(serviceName, "")
} }
// proxyConfig is a snippet from agent/xds/config.go:ProxyConfig
type proxyConfig struct {
Protocol string `mapstructure:"protocol"`
}
func (c *compiler) recordProtocol(fromService, protocol string) error { func (c *compiler) recordProtocol(fromService, protocol string) error {
if protocol == "" { if protocol == "" {
protocol = "tcp" protocol = "tcp"
@ -522,8 +536,9 @@ RESOLVE_AGAIN:
// Digest mesh gateway settings. // Digest mesh gateway settings.
if serviceDefault := c.entries.GetService(resolver.Name); serviceDefault != nil { if serviceDefault := c.entries.GetService(resolver.Name); serviceDefault != nil {
groupResolver.MeshGateway = serviceDefault.MeshGateway groupResolver.MeshGateway = serviceDefault.MeshGateway
} else if c.entries.GlobalProxy != nil {
groupResolver.MeshGateway = c.entries.GlobalProxy.MeshGateway
} }
// TODO(rb): thread proxy-defaults version through here as well
// Retain this target even if we may not retain the group resolver. // Retain this target even if we may not retain the group resolver.
c.targets[target] = struct{}{} c.targets[target] = struct{}{}

View File

@ -40,6 +40,7 @@ func TestCompile(t *testing.T) {
"router with defaults and noop split and resolver": testcase_RouterWithDefaults_WithNoopSplit_WithResolver(), "router with defaults and noop split and resolver": testcase_RouterWithDefaults_WithNoopSplit_WithResolver(),
"route bypasses splitter": testcase_RouteBypassesSplit(), "route bypasses splitter": testcase_RouteBypassesSplit(),
"noop split": testcase_NoopSplit_DefaultResolver(), "noop split": testcase_NoopSplit_DefaultResolver(),
"noop split with protocol from proxy defaults": testcase_NoopSplit_DefaultResolver_ProcotolFromProxyDefaults(),
"noop split with resolver": testcase_NoopSplit_WithResolver(), "noop split with resolver": testcase_NoopSplit_WithResolver(),
"subset split": testcase_SubsetSplit(), "subset split": testcase_SubsetSplit(),
"service split": testcase_ServiceSplit(), "service split": testcase_ServiceSplit(),
@ -53,6 +54,7 @@ func TestCompile(t *testing.T) {
"noop split to resolver with default subset": testcase_NoopSplit_WithDefaultSubset(), "noop split to resolver with default subset": testcase_NoopSplit_WithDefaultSubset(),
"resolver with default subset": testcase_Resolve_WithDefaultSubset(), "resolver with default subset": testcase_Resolve_WithDefaultSubset(),
"resolver with no entries and inferring defaults": testcase_DefaultResolver(), "resolver with no entries and inferring defaults": testcase_DefaultResolver(),
"default resolver with proxy defaults": testcase_DefaultResolver_WithProxyDefaults(),
// TODO(rb): handle this case better: "circular split": testcase_CircularSplit(), // TODO(rb): handle this case better: "circular split": testcase_CircularSplit(),
"all the bells and whistles": testcase_AllBellsAndWhistles(), "all the bells and whistles": testcase_AllBellsAndWhistles(),
@ -296,6 +298,72 @@ func testcase_RouterWithDefaults_WithNoopSplit_DefaultResolver() compileTestCase
return compileTestCase{entries: entries, expect: expect} return compileTestCase{entries: entries, expect: expect}
} }
func testcase_NoopSplit_DefaultResolver_ProcotolFromProxyDefaults() compileTestCase {
entries := newEntries()
setGlobalProxyProtocol(entries, "http")
entries.AddRouters(
&structs.ServiceRouterConfigEntry{
Kind: "service-router",
Name: "main",
},
)
entries.AddSplitters(
&structs.ServiceSplitterConfigEntry{
Kind: "service-splitter",
Name: "main",
Splits: []structs.ServiceSplit{
{Weight: 100},
},
},
)
resolver := newDefaultServiceResolver("main")
expect := &structs.CompiledDiscoveryChain{
Protocol: "http",
Node: &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeRouter,
Name: "main",
Routes: []*structs.DiscoveryRoute{
{
Definition: newDefaultServiceRoute("main"),
DestinationNode: &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeSplitter,
Name: "main",
Splits: []*structs.DiscoverySplit{
{
Weight: 100,
Node: &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeGroupResolver,
Name: "main",
GroupResolver: &structs.DiscoveryGroupResolver{
Definition: resolver,
Default: true,
ConnectTimeout: 5 * time.Second,
Target: newTarget("main", "", "default", "dc1"),
},
},
},
},
},
},
},
},
Resolvers: map[string]*structs.ServiceResolverConfigEntry{
"main": resolver,
},
Targets: []structs.DiscoveryTarget{
newTarget("main", "", "default", "dc1"),
},
GroupResolverNodes: map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode{
newTarget("main", "", "default", "dc1"): nil,
},
}
return compileTestCase{entries: entries, expect: expect}
}
func testcase_RouterWithDefaults_WithNoopSplit_WithResolver() compileTestCase { func testcase_RouterWithDefaults_WithNoopSplit_WithResolver() compileTestCase {
entries := newEntries() entries := newEntries()
setServiceProtocol(entries, "main", "http") setServiceProtocol(entries, "main", "http")
@ -1165,6 +1233,49 @@ func testcase_DefaultResolver() compileTestCase {
return compileTestCase{entries: entries, expect: expect} return compileTestCase{entries: entries, expect: expect}
} }
func testcase_DefaultResolver_WithProxyDefaults() compileTestCase {
entries := newEntries()
entries.GlobalProxy = &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "grpc",
},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
}
resolver := newDefaultServiceResolver("main")
expect := &structs.CompiledDiscoveryChain{
Protocol: "grpc",
Node: &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeGroupResolver,
Name: "main",
GroupResolver: &structs.DiscoveryGroupResolver{
Definition: resolver,
Default: true,
ConnectTimeout: 5 * time.Second,
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Target: newTarget("main", "", "default", "dc1"),
},
},
Resolvers: map[string]*structs.ServiceResolverConfigEntry{
"main": resolver,
},
Targets: []structs.DiscoveryTarget{
newTarget("main", "", "default", "dc1"),
},
GroupResolverNodes: map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode{
newTarget("main", "", "default", "dc1"): nil,
},
}
return compileTestCase{entries: entries, expect: expect}
}
func testcase_Resolve_WithDefaultSubset() compileTestCase { func testcase_Resolve_WithDefaultSubset() compileTestCase {
entries := newEntries() entries := newEntries()
entries.AddResolvers( entries.AddResolvers(
@ -1744,6 +1855,16 @@ func newSimpleRoute(name string, muts ...func(*structs.ServiceRoute)) structs.Se
return r return r
} }
func setGlobalProxyProtocol(entries *structs.DiscoveryChainConfigEntries, protocol string) {
entries.GlobalProxy = &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": protocol,
},
}
}
func setServiceProtocol(entries *structs.DiscoveryChainConfigEntries, name, protocol string) { func setServiceProtocol(entries *structs.DiscoveryChainConfigEntries, name, protocol string) {
entries.AddServices(&structs.ServiceConfigEntry{ entries.AddServices(&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults, Kind: structs.ServiceDefaults,

View File

@ -188,7 +188,10 @@ func (s *Store) ConfigEntries(ws memdb.WatchSet) (uint64, []structs.ConfigEntry,
func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string) (uint64, []structs.ConfigEntry, error) { func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string) (uint64, []structs.ConfigEntry, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
return s.configEntriesByKindTxn(tx, ws, kind)
}
func (s *Store) configEntriesByKindTxn(tx *memdb.Txn, ws memdb.WatchSet, kind string) (uint64, []structs.ConfigEntry, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
@ -243,17 +246,11 @@ func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.Con
} }
raftIndex.ModifyIndex = idx raftIndex.ModifyIndex = idx
var existingConf structs.ConfigEntry
if existing != nil {
existingConf = existing.(structs.ConfigEntry)
}
err = s.validateProposedConfigEntryInGraph( err = s.validateProposedConfigEntryInGraph(
tx, tx,
idx, idx,
conf.GetKind(), conf.GetKind(),
conf.GetName(), conf.GetName(),
existingConf,
conf, conf,
) )
if err != nil { if err != nil {
@ -319,17 +316,11 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error {
return nil return nil
} }
var existingConf structs.ConfigEntry
if existing != nil {
existingConf = existing.(structs.ConfigEntry)
}
err = s.validateProposedConfigEntryInGraph( err = s.validateProposedConfigEntryInGraph(
tx, tx,
idx, idx,
kind, kind,
name, name,
existingConf,
nil, nil,
) )
if err != nil { if err != nil {
@ -359,43 +350,72 @@ func (s *Store) validateProposedConfigEntryInGraph(
tx *memdb.Txn, tx *memdb.Txn,
idx uint64, idx uint64,
kind, name string, kind, name string,
prev, next structs.ConfigEntry, next structs.ConfigEntry,
) error { ) error {
validateAllChains := false
switch kind { switch kind {
case structs.ProxyDefaults: case structs.ProxyDefaults:
return nil // no validation if name != structs.ProxyConfigGlobal {
return nil
}
validateAllChains = true
case structs.ServiceDefaults: case structs.ServiceDefaults:
fallthrough
case structs.ServiceRouter: case structs.ServiceRouter:
fallthrough
case structs.ServiceSplitter: case structs.ServiceSplitter:
fallthrough
case structs.ServiceResolver: case structs.ServiceResolver:
return s.validateProposedConfigEntryInServiceGraph(tx, idx, kind, name, prev, next)
default: default:
return fmt.Errorf("unhandled kind %q during validation of %q", kind, name) return fmt.Errorf("unhandled kind %q during validation of %q", kind, name)
} }
return s.validateProposedConfigEntryInServiceGraph(tx, idx, kind, name, next, validateAllChains)
}
var serviceGraphKinds = []string{
structs.ServiceRouter,
structs.ServiceSplitter,
structs.ServiceResolver,
} }
func (s *Store) validateProposedConfigEntryInServiceGraph( func (s *Store) validateProposedConfigEntryInServiceGraph(
tx *memdb.Txn, tx *memdb.Txn,
idx uint64, idx uint64,
kind, name string, kind, name string,
prev, next structs.ConfigEntry, next structs.ConfigEntry,
validateAllChains bool,
) error { ) error {
// Collect all of the chains that could be affected by this change // Collect all of the chains that could be affected by this change
// including our own. // including our own.
checkChains := map[string]struct{}{ checkChains := make(map[string]struct{})
name: struct{}{},
}
iter, err := tx.Get(configTableName, "link", name) if validateAllChains {
for raw := iter.Next(); raw != nil; raw = iter.Next() { // Must be proxy-defaults/global.
entry := raw.(structs.ConfigEntry)
checkChains[entry.GetName()] = struct{}{} // Check anything that has a discovery chain entry. In the future we could
} // somehow omit the ones that have a default protocol configured.
if err != nil {
return err for _, kind := range serviceGraphKinds {
_, entries, err := s.configEntriesByKindTxn(tx, nil, kind)
if err != nil {
return err
}
for _, entry := range entries {
checkChains[entry.GetName()] = struct{}{}
}
}
} else {
// Must be a single chain.
checkChains[name] = struct{}{}
iter, err := tx.Get(configTableName, "link", name)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(structs.ConfigEntry)
checkChains[entry.GetName()] = struct{}{}
}
if err != nil {
return err
}
} }
overrides := map[structs.ConfigEntryKindName]structs.ConfigEntry{ overrides := map[structs.ConfigEntryKindName]structs.ConfigEntry{
@ -403,27 +423,7 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
} }
for chainName, _ := range checkChains { for chainName, _ := range checkChains {
_, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides) if err := s.testCompileDiscoveryChain(tx, nil, chainName, overrides); err != nil {
if err != nil {
return err
}
// fmt.Printf("SPEC: %s/%s chain=%q, prev=%v, next=%v, ent=%+v\n",
// kind, name,
// chainName,
// prev != nil,
// next != nil, speculativeEntries)
// TODO(rb): is this ok that we execute the compiler in the state store?
// Note we use an arbitrary namespace and datacenter as those would not
// currently affect the graph compilation in ways that matter here.
req := discoverychain.CompileRequest{
ServiceName: chainName,
CurrentNamespace: "default",
CurrentDatacenter: "dc1",
Entries: speculativeEntries,
}
if _, err := discoverychain.Compile(req); err != nil {
return err return err
} }
} }
@ -431,6 +431,31 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
return nil return nil
} }
func (s *Store) testCompileDiscoveryChain(
tx *memdb.Txn,
ws memdb.WatchSet,
chainName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
) error {
_, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides)
if err != nil {
return err
}
// TODO(rb): is this ok that we execute the compiler in the state store?
// Note we use an arbitrary namespace and datacenter as those would not
// currently affect the graph compilation in ways that matter here.
req := discoverychain.CompileRequest{
ServiceName: chainName,
CurrentNamespace: "default",
CurrentDatacenter: "dc1",
Entries: speculativeEntries,
}
_, err = discoverychain.Compile(req)
return err
}
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for // ReadDiscoveryChainConfigEntries will query for the full discovery chain for
// the provided service name. All relevant config entries will be recursively // the provided service name. All relevant config entries will be recursively
// fetched and included in the result. // fetched and included in the result.
@ -489,12 +514,19 @@ func (s *Store) readDiscoveryChainConfigEntriesTxn(
// the end of this function to indicate "no such entry". // the end of this function to indicate "no such entry".
var ( var (
idx uint64
todoSplitters = make(map[string]struct{}) todoSplitters = make(map[string]struct{})
todoResolvers = make(map[string]struct{}) todoResolvers = make(map[string]struct{})
todoDefaults = make(map[string]struct{}) todoDefaults = make(map[string]struct{})
) )
// Grab the proxy defaults if they exist.
idx, proxy, err := s.getProxyConfigEntryTxn(tx, ws, structs.ProxyConfigGlobal, overrides)
if err != nil {
return 0, nil, err
} else if proxy != nil {
res.GlobalProxy = proxy
}
// At every step we'll need service defaults. // At every step we'll need service defaults.
todoDefaults[serviceName] = struct{}{} todoDefaults[serviceName] = struct{}{}
@ -644,6 +676,30 @@ func anyKey(m map[string]struct{}) (string, bool) {
return "", false return "", false
} }
// getProxyConfigEntryTxn is a convenience method for fetching a
// proxy-defaults kind of config entry.
//
// If an override is returned the index returned will be 0.
func (s *Store) getProxyConfigEntryTxn(
tx *memdb.Txn,
ws memdb.WatchSet,
name string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
) (uint64, *structs.ProxyConfigEntry, error) {
idx, entry, err := s.configEntryWithOverridesTxn(tx, ws, structs.ProxyDefaults, name, overrides)
if err != nil {
return 0, nil, err
} else if entry == nil {
return idx, nil, nil
}
proxy, ok := entry.(*structs.ProxyConfigEntry)
if !ok {
return 0, nil, fmt.Errorf("invalid service config type %T", entry)
}
return idx, proxy, nil
}
// getServiceConfigEntryTxn is a convenience method for fetching a // getServiceConfigEntryTxn is a convenience method for fetching a
// service-defaults kind of config entry. // service-defaults kind of config entry.
// //

View File

@ -224,6 +224,57 @@ func TestStore_ConfigEntry_GraphValidation(t *testing.T) {
expectErr: "does not permit advanced routing or splitting behavior", expectErr: "does not permit advanced routing or splitting behavior",
expectGraphErr: true, expectGraphErr: true,
}, },
{
name: "splitter works with http protocol",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "tcp", // loses
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "main",
Protocol: "http",
},
},
op: func(t *testing.T, s *Store) error {
entry := &structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "main",
Splits: []structs.ServiceSplit{
{Weight: 90, Namespace: "v1"},
{Weight: 10, Namespace: "v2"},
},
}
return s.EnsureConfigEntry(0, entry)
},
},
{
name: "splitter works with http protocol (from proxy-defaults)",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
op: func(t *testing.T, s *Store) error {
entry := &structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "main",
Splits: []structs.ServiceSplit{
{Weight: 90, Namespace: "v1"},
{Weight: 10, Namespace: "v2"},
},
}
return s.EnsureConfigEntry(0, entry)
},
},
{ {
name: "router fails with tcp protocol", name: "router fails with tcp protocol",
entries: []structs.ConfigEntry{ entries: []structs.ConfigEntry{
@ -304,6 +355,59 @@ func TestStore_ConfigEntry_GraphValidation(t *testing.T) {
expectErr: "does not permit advanced routing or splitting behavior", expectErr: "does not permit advanced routing or splitting behavior",
expectGraphErr: true, expectGraphErr: true,
}, },
{
name: "cannot remove global default protocol after splitter created",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "main",
Splits: []structs.ServiceSplit{
{Weight: 90, Namespace: "v1"},
{Weight: 10, Namespace: "v2"},
},
},
},
op: func(t *testing.T, s *Store) error {
return s.DeleteConfigEntry(0, structs.ProxyDefaults, structs.ProxyConfigGlobal)
},
expectErr: "does not permit advanced routing or splitting behavior",
expectGraphErr: true,
},
{
name: "can remove global default protocol after splitter created if service default overrides it",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "main",
Protocol: "http",
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "main",
Splits: []structs.ServiceSplit{
{Weight: 90, Namespace: "v1"},
{Weight: 10, Namespace: "v2"},
},
},
},
op: func(t *testing.T, s *Store) error {
return s.DeleteConfigEntry(0, structs.ProxyDefaults, structs.ProxyConfigGlobal)
},
},
{ {
name: "cannot change to tcp protocol after splitter created", name: "cannot change to tcp protocol after splitter created",
entries: []structs.ConfigEntry{ entries: []structs.ConfigEntry{

View File

@ -838,10 +838,11 @@ func canWriteDiscoveryChain(entry discoveryChainConfigEntry, rule acl.Authorizer
// DiscoveryChainConfigEntries wraps just the raw cross-referenced config // DiscoveryChainConfigEntries wraps just the raw cross-referenced config
// entries. None of these are defaulted. // entries. None of these are defaulted.
type DiscoveryChainConfigEntries struct { type DiscoveryChainConfigEntries struct {
Routers map[string]*ServiceRouterConfigEntry Routers map[string]*ServiceRouterConfigEntry
Splitters map[string]*ServiceSplitterConfigEntry Splitters map[string]*ServiceSplitterConfigEntry
Resolvers map[string]*ServiceResolverConfigEntry Resolvers map[string]*ServiceResolverConfigEntry
Services map[string]*ServiceConfigEntry Services map[string]*ServiceConfigEntry
GlobalProxy *ProxyConfigEntry
} }
func (e *DiscoveryChainConfigEntries) GetRouter(name string) *ServiceRouterConfigEntry { func (e *DiscoveryChainConfigEntries) GetRouter(name string) *ServiceRouterConfigEntry {
@ -913,7 +914,7 @@ func (e *DiscoveryChainConfigEntries) AddServices(entries ...*ServiceConfigEntry
} }
func (e *DiscoveryChainConfigEntries) IsEmpty() bool { func (e *DiscoveryChainConfigEntries) IsEmpty() bool {
return e.IsChainEmpty() && len(e.Services) == 0 return e.IsChainEmpty() && len(e.Services) == 0 && e.GlobalProxy == nil
} }
func (e *DiscoveryChainConfigEntries) IsChainEmpty() bool { func (e *DiscoveryChainConfigEntries) IsChainEmpty() bool {