connect: use stronger validation that ingress gateways have compatible protocols defined for their upstreams (#8470)

Fixes #8466

Since Consul 1.8.0 there was a bug in how ingress gateway protocol
compatibility was enforced. At the point in time that an ingress-gateway
config entry was modified the discovery chain for each upstream was
checked to ensure the ingress gateway protocol matched. Unfortunately
future modifications of other config entries were not validated against
existing ingress-gateway definitions, such as:

1. create tcp ingress-gateway pointing to 'api' (ok)
2. create service-defaults for 'api' setting protocol=http (worked, but not ok)
3. create service-splitter or service-router for 'api' (worked, but caused an agent panic)

If you were to do these in a different order, it would fail without a
crash:

1. create service-defaults for 'api' setting protocol=http (ok)
2. create service-splitter or service-router for 'api' (ok)
3. create tcp ingress-gateway pointing to 'api' (fail with message about
   protocol mismatch)

This PR introduces the missing validation. The two new behaviors are:

1. create tcp ingress-gateway pointing to 'api' (ok)
2. (NEW) create service-defaults for 'api' setting protocol=http ("ok" for back compat)
3. (NEW) create service-splitter or service-router for 'api' (fail with
   message about protocol mismatch)

In consideration for any existing users that may be inadvertently be
falling into item (2) above, that is now officiall a valid configuration
to be in. For anyone falling into item (3) above while you cannot use
the API to manufacture that scenario anymore, anyone that has old (now
bad) data will still be able to have the agent use them just enough to
generate a new agent/proxycfg error message rather than a panic.
Unfortunately we just don't have enough information to properly fix the
config entries.
This commit is contained in:
R.B. Boyer 2020-08-12 11:19:20 -05:00 committed by GitHub
parent 77de9bbe22
commit 63422ca9c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 353 additions and 93 deletions

View File

@ -337,10 +337,6 @@ func (s *Store) validateProposedConfigEntryInGraph(
if err != nil {
return err
}
err = validateProposedIngressProtocolsInServiceGraph(tx, next, entMeta)
if err != nil {
return err
}
case structs.TerminatingGateway:
err := checkGatewayClash(tx, name, structs.TerminatingGateway, structs.IngressGateway, entMeta)
if err != nil {
@ -384,7 +380,11 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
) error {
// Collect all of the chains that could be affected by this change
// including our own.
checkChains := make(map[structs.ServiceID]struct{})
var (
checkChains = make(map[structs.ServiceID]struct{})
checkIngress []*structs.IngressGatewayConfigEntry
enforceIngressProtocolsMatch bool
)
if validateAllChains {
// Must be proxy-defaults/global.
@ -401,6 +401,37 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{}
}
}
_, entries, err := configEntriesByKindTxn(tx, nil, structs.IngressGateway, structs.WildcardEnterpriseMeta())
if err != nil {
return err
}
for _, entry := range entries {
ingress, ok := entry.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", entry)
}
checkIngress = append(checkIngress, ingress)
}
} else if kind == structs.IngressGateway {
// Checking an ingress pointing to multiple chains.
// This is the case for deleting a config entry
if next == nil {
return nil
}
ingress, ok := next.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", next)
}
checkIngress = append(checkIngress, ingress)
// When editing an ingress-gateway directly we are stricter about
// validating the protocol equivalence.
enforceIngressProtocolsMatch = true
} else {
// Must be a single chain.
@ -413,7 +444,25 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(structs.ConfigEntry)
checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{}
switch entry.GetKind() {
case structs.ServiceRouter, structs.ServiceSplitter, structs.ServiceResolver:
svcID := structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())
checkChains[svcID] = struct{}{}
case structs.IngressGateway:
ingress, ok := entry.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", entry)
}
checkIngress = append(checkIngress, ingress)
}
}
}
// Ensure if any ingress is affected that we fetch all of the chains needed
// to fully validate that ingress.
for _, ingress := range checkIngress {
for _, svcID := range ingress.ListRelatedServices() {
checkChains[svcID] = struct{}{}
}
}
@ -421,24 +470,69 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
{Kind: kind, Name: name}: next,
}
var (
svcProtocols = make(map[structs.ServiceID]string)
svcTopNodeType = make(map[structs.ServiceID]string)
)
for chain := range checkChains {
if err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta); err != nil {
protocol, topNode, err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta)
if err != nil {
return err
}
svcProtocols[chain] = protocol
svcTopNodeType[chain] = topNode.Type
}
// Now validate all of our ingress gateways.
for _, e := range checkIngress {
for _, listener := range e.Listeners {
expectedProto := listener.Protocol
for _, service := range listener.Services {
if service.Name == structs.WildcardSpecifier {
continue
}
svcID := structs.NewServiceID(service.Name, &service.EnterpriseMeta)
svcProto := svcProtocols[svcID]
if svcProto != expectedProto {
// The only time an ingress gateway and its upstreams can
// have differing protocols is when:
//
// 1. ingress is tcp and the target is not-tcp
// AND
// 2. the disco chain has a resolver as the top node
topNodeType := svcTopNodeType[svcID]
if enforceIngressProtocolsMatch ||
(expectedProto != "tcp") ||
(expectedProto == "tcp" && topNodeType != structs.DiscoveryGraphNodeTypeResolver) {
return fmt.Errorf(
"service %q has protocol %q, which does not match defined listener protocol %q",
svcID.String(),
svcProto,
expectedProto,
)
}
}
}
}
}
return nil
}
// testCompileDiscoveryChain speculatively compiles a discovery chain with
// pending modifications to see if it would be valid. Also returns the computed
// protocol and topmost discovery chain node.
func (s *Store) testCompileDiscoveryChain(
tx *txn,
chainName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
) error {
) (string, *structs.DiscoveryGraphNode, error) {
_, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides, entMeta)
if err != nil {
return err
return "", nil, err
}
// Note we use an arbitrary namespace and datacenter as those would not
@ -453,8 +547,12 @@ func (s *Store) testCompileDiscoveryChain(
UseInDatacenter: "dc1",
Entries: speculativeEntries,
}
_, err = discoverychain.Compile(req)
return err
chain, err := discoverychain.Compile(req)
if err != nil {
return "", nil, err
}
return chain.Protocol, chain.Nodes[chain.StartNode], nil
}
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for
@ -822,48 +920,6 @@ func configEntryWithOverridesTxn(
return configEntryTxn(tx, ws, kind, name, entMeta)
}
func validateProposedIngressProtocolsInServiceGraph(
tx *txn,
next structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
) error {
// This is the case for deleting a config entry
if next == nil {
return nil
}
ingress, ok := next.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", next)
}
validationFn := func(svc structs.ServiceName, expectedProto string) error {
_, svcProto, err := protocolForService(tx, nil, svc)
if err != nil {
return err
}
if svcProto != expectedProto {
return fmt.Errorf("service %q has protocol %q, which does not match defined listener protocol %q",
svc.String(), svcProto, expectedProto)
}
return nil
}
for _, l := range ingress.Listeners {
for _, s := range l.Services {
if s.Name == structs.WildcardSpecifier {
continue
}
err := validationFn(s.ToServiceName(), l.Protocol)
if err != nil {
return err
}
}
}
return nil
}
// protocolForService returns the service graph protocol associated to the
// provided service, checking all relevant config entries.
func protocolForService(

View File

@ -1285,29 +1285,120 @@ func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) {
}
func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
s := testStateStore(t)
ingress := &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: "http",
Services: []structs.IngressService{
{Name: "web"},
newIngress := func(protocol, name string) *structs.IngressGatewayConfigEntry {
return &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: protocol,
Services: []structs.IngressService{
{Name: name},
},
},
},
},
}
}
t.Run("default to tcp", func(t *testing.T) {
err := s.EnsureConfigEntry(0, ingress, nil)
t.Run("http ingress fails with http upstream later changed to tcp", func(t *testing.T) {
s := testStateStore(t)
// First set the target service as http
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(0, expected, nil))
// Next configure http ingress to route to the http service
require.NoError(t, s.EnsureConfigEntry(1, newIngress("http", "web"), nil))
t.Run("via modification", func(t *testing.T) {
// Now redefine the target service as tcp
expected = &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "tcp",
}
err := s.EnsureConfigEntry(2, expected, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "tcp"`)
})
t.Run("via deletion", func(t *testing.T) {
// This will fall back to the default tcp.
err := s.DeleteConfigEntry(2, structs.ServiceDefaults, "web", nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "tcp"`)
})
})
t.Run("tcp ingress ok with tcp upstream (defaulted) later changed to http", func(t *testing.T) {
s := testStateStore(t)
// First configure tcp ingress to route to a defaulted tcp service
require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil))
// Now redefine the target service as http
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
})
t.Run("tcp ingress fails with tcp upstream (defaulted) later changed to http", func(t *testing.T) {
s := testStateStore(t)
// First configure tcp ingress to route to a defaulted tcp service
require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil))
// Now redefine the target service as http
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
t.Run("and a router defined", func(t *testing.T) {
// This part should fail.
expected2 := &structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "web",
}
err := s.EnsureConfigEntry(2, expected2, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "http"`)
})
t.Run("and a splitter defined", func(t *testing.T) {
// This part should fail.
expected2 := &structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "web",
Splits: []structs.ServiceSplit{
{Weight: 100},
},
}
err := s.EnsureConfigEntry(2, expected2, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "http"`)
})
})
t.Run("http ingress fails with tcp upstream (defaulted)", func(t *testing.T) {
s := testStateStore(t)
err := s.EnsureConfigEntry(0, newIngress("http", "web"), nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "tcp"`)
})
t.Run("with proxy-default", func(t *testing.T) {
t.Run("http ingress fails with http2 upstream (via proxy-defaults)", func(t *testing.T) {
s := testStateStore(t)
expected := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
@ -1317,51 +1408,43 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
}
require.NoError(t, s.EnsureConfigEntry(0, expected, nil))
err := s.EnsureConfigEntry(1, ingress, nil)
err := s.EnsureConfigEntry(1, newIngress("http", "web"), nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "http2"`)
})
t.Run("with service-defaults override", func(t *testing.T) {
t.Run("http ingress fails with grpc upstream (via service-defaults)", func(t *testing.T) {
s := testStateStore(t)
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "grpc",
}
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
err := s.EnsureConfigEntry(2, ingress, nil)
err := s.EnsureConfigEntry(2, newIngress("http", "web"), nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "grpc"`)
})
t.Run("with service-defaults correct protocol", func(t *testing.T) {
t.Run("http ingress ok with http upstream (via service-defaults)", func(t *testing.T) {
s := testStateStore(t)
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(2, expected, nil))
require.NoError(t, s.EnsureConfigEntry(3, ingress, nil))
require.NoError(t, s.EnsureConfigEntry(3, newIngress("http", "web"), nil))
})
t.Run("ignores wildcard specifier", func(t *testing.T) {
ingress := &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: "http",
Services: []structs.IngressService{
{Name: "*"},
},
},
},
}
require.NoError(t, s.EnsureConfigEntry(4, ingress, nil))
t.Run("http ingress ignores wildcard specifier", func(t *testing.T) {
s := testStateStore(t)
require.NoError(t, s.EnsureConfigEntry(4, newIngress("http", "*"), nil))
})
t.Run("deleting a config entry", func(t *testing.T) {
t.Run("deleting ingress config entry ok", func(t *testing.T) {
s := testStateStore(t)
require.NoError(t, s.EnsureConfigEntry(1, newIngress("tcp", "web"), nil))
require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil))
})
}

View File

@ -2,6 +2,7 @@ package structs
import (
"fmt"
"sort"
"strings"
"github.com/hashicorp/consul/acl"
@ -203,6 +204,44 @@ func validateHost(tlsEnabled bool, host string) error {
return nil
}
// ListRelatedServices implements discoveryChainConfigEntry
//
// For ingress-gateway config entries this only finds services that are
// explicitly linked in the ingress-gateway config entry. Wildcards will not
// expand to all services.
//
// This function is used during discovery chain graph validation to prevent
// erroneous sets of config entries from being created. Wildcard ingress
// filters out sets with protocol mismatch elsewhere so it isn't an issue here
// that needs fixing.
func (e *IngressGatewayConfigEntry) ListRelatedServices() []ServiceID {
found := make(map[ServiceID]struct{})
for _, listener := range e.Listeners {
for _, service := range listener.Services {
if service.Name == WildcardSpecifier {
continue
}
svcID := NewServiceID(service.Name, &service.EnterpriseMeta)
found[svcID] = struct{}{}
}
}
if len(found) == 0 {
return nil
}
out := make([]ServiceID, 0, len(found))
for svc := range found {
out = append(out, svc)
}
sort.Slice(out, func(i, j int) bool {
return out[i].EnterpriseMeta.LessThan(&out[j].EnterpriseMeta) ||
out[i].ID < out[j].ID
})
return out
}
func (e *IngressGatewayConfigEntry) CanRead(authz acl.Authorizer) bool {
var authzContext acl.AuthorizerContext
e.FillAuthzContext(&authzContext)

View File

@ -86,6 +86,88 @@ func TestIngressConfigEntry_Normalize(t *testing.T) {
}
}
func TestIngressConfigEntry_ListRelatedServices(t *testing.T) {
type testcase struct {
entry IngressGatewayConfigEntry
expectServices []ServiceID
}
cases := map[string]testcase{
"one exact": {
entry: IngressGatewayConfigEntry{
Kind: IngressGateway,
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{
{Name: "web"},
},
},
},
},
expectServices: []ServiceID{NewServiceID("web", nil)},
},
"one wild": {
entry: IngressGatewayConfigEntry{
Kind: IngressGateway,
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{
{Name: "*"},
},
},
},
},
expectServices: nil,
},
"kitchen sink": {
entry: IngressGatewayConfigEntry{
Kind: IngressGateway,
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{
{Name: "api"},
{Name: "web"},
},
},
{
Port: 2222,
Protocol: "tcp",
Services: []IngressService{
{Name: "web"},
{Name: "*"},
{Name: "db"},
{Name: "blah"},
},
},
},
},
expectServices: []ServiceID{
NewServiceID("api", nil),
NewServiceID("blah", nil),
NewServiceID("db", nil),
NewServiceID("web", nil),
},
},
}
for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
got := tc.entry.ListRelatedServices()
require.Equal(t, tc.expectServices, got)
})
}
}
func TestIngressConfigEntry_Validate(t *testing.T) {
cases := []struct {

View File

@ -780,9 +780,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
} else if cfg.Protocol == "tcp" {
startNode := chain.Nodes[chain.StartNode]
if startNode == nil {
panic("missing first node in compiled discovery chain for: " + chain.ServiceName)
return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName)
} else if startNode.Type != structs.DiscoveryGraphNodeTypeResolver {
panic(fmt.Sprintf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type))
return nil, fmt.Errorf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type)
}
targetID := startNode.Resolver.Target
target := chain.Targets[targetID]

View File

@ -170,7 +170,7 @@ func makeUpstreamRouteForDiscoveryChain(
startNode := chain.Nodes[chain.StartNode]
if startNode == nil {
panic("missing first node in compiled discovery chain for: " + chain.ServiceName)
return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName)
}
switch startNode.Type {
@ -265,7 +265,7 @@ func makeUpstreamRouteForDiscoveryChain(
routes = []*envoyroute.Route{defaultRoute}
default:
panic("unknown first node in discovery chain of type: " + startNode.Type)
return nil, fmt.Errorf("unknown first node in discovery chain of type: %s", startNode.Type)
}
host := &envoyroute.VirtualHost{