Ingress Gateways for TCP services (#7509)

* Implements a simple, tcp ingress gateway workflow

This adds a new type of gateway for allowing Ingress traffic into Connect from external services.

Co-authored-by: Chris Piraino <cpiraino@hashicorp.com>
This commit is contained in:
Kyle Havlovitz 2020-04-16 14:00:48 -07:00 committed by GitHub
parent 12b026db62
commit 6a5eba63ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 2567 additions and 478 deletions

View File

@ -4294,6 +4294,15 @@ func (a *Agent) registerCache() {
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{
RPC: a,
}, &cache.RegisterOptions{

View File

@ -19,7 +19,7 @@ type CatalogServices struct {
func (c *CatalogServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a DCSpecificRequest.
// The request should be a ServiceSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(

View File

@ -0,0 +1,55 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const GatewayServicesName = "gateway-services"
// GatewayUpstreams supports fetching upstreams for a given gateway name.
type GatewayServices struct {
RPC RPC
}
func (g *GatewayServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a ServiceSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.IndexedGatewayServices
if err := g.RPC.RPC("Internal.GatewayServices", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
func (g *GatewayServices) SupportsBlocking() bool {
return true
}

View File

@ -0,0 +1,60 @@
package cachetype
import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestGatewayServices(t *testing.T) {
rpc := TestRPC(t)
typ := &GatewayServices{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedGatewayServices
rpc.On("RPC", "Internal.GatewayServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
require.Equal(t, "foo", req.ServiceName)
services := structs.GatewayServices{
{
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindIngressGateway,
Port: 1234,
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
},
}
reply := args.Get(2).(*structs.IndexedGatewayServices)
reply.Services = services
reply.QueryMeta.Index = 48
resp = reply
})
// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "foo",
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)
rpc.AssertExpectations(t)
}

View File

@ -1417,6 +1417,8 @@ func (b *Builder) serviceKindVal(v *string) structs.ServiceKind {
return structs.ServiceKindMeshGateway
case string(structs.ServiceKindTerminatingGateway):
return structs.ServiceKindTerminatingGateway
case string(structs.ServiceKindIngressGateway):
return structs.ServiceKindIngressGateway
default:
return structs.ServiceKindTypical
}

View File

@ -213,24 +213,6 @@ func TestConfig_Apply_TerminatingGateway(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, resp.Code, "!200 Response Code: %s", resp.Body.String())
// Attempt to create an entry for a separate gateway that also routes to web
body = bytes.NewBuffer([]byte(`
{
"Kind": "terminating-gateway",
"Name": "east-gw-01",
"Services": [
{
"Name": "web",
}
]
}`))
req, _ = http.NewRequest("PUT", "/v1/config", body)
resp = httptest.NewRecorder()
_, err = a.srv.ConfigApply(resp, req)
require.Error(t, err, "service \"web\" is associated with a different gateway")
require.Equal(t, 200, resp.Code, "!200 Response Code: %s", resp.Body.String())
// List all entries, there should only be one
{
args := structs.ConfigEntryQuery{
@ -258,6 +240,67 @@ func TestConfig_Apply_TerminatingGateway(t *testing.T) {
}
}
func TestConfig_Apply_IngressGateway(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Create some config entries.
body := bytes.NewBuffer([]byte(`
{
"Kind": "ingress-gateway",
"Name": "ingress",
"Listeners": [
{
"Port": 8080,
"Services": [
{ "Name": "web" }
]
}
]
}`))
req, _ := http.NewRequest("PUT", "/v1/config", body)
resp := httptest.NewRecorder()
_, err := a.srv.ConfigApply(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code, "!200 Response Code: %s", resp.Body.String())
// List all entries, there should only be one
{
args := structs.ConfigEntryQuery{
Kind: structs.IngressGateway,
Datacenter: "dc1",
}
var out structs.IndexedConfigEntries
require.NoError(t, a.RPC("ConfigEntry.List", &args, &out))
require.NotNil(t, out)
require.Len(t, out.Entries, 1)
got := out.Entries[0].(*structs.IngressGatewayConfigEntry)
// Ignore create and modify indices
got.CreateIndex = 0
got.ModifyIndex = 0
expect := &structs.IngressGatewayConfigEntry{
Name: "ingress",
Kind: structs.IngressGateway,
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: "tcp",
Services: []structs.IngressService{
{Name: "web"},
},
},
},
}
require.Equal(t, expect, got)
}
}
func TestConfig_Apply_ProxyDefaultsMeshGateway(t *testing.T) {
t.Parallel()

View File

@ -187,6 +187,8 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
f = h.serviceNodesConnect
case args.TagFilter:
f = h.serviceNodesTagFilter
case args.Ingress:
f = h.serviceNodesIngress
default:
f = h.serviceNodesDefault
}
@ -201,9 +203,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
return err
}
// If we're doing a connect query, we need read access to the service
// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect {
if args.Connect || args.Ingress {
if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// Just return nil, which will return an empty response (tested)
return nil
@ -249,6 +251,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
if args.Connect {
key = "connect"
}
if args.Ingress {
key = "ingress"
}
metrics.IncrCounterWithLabels([]string{"health", key, "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
@ -284,6 +289,10 @@ func (h *Health) serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *st
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
}
func (h *Health) serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
}
func (h *Health) serviceNodesTagFilter(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.

View File

@ -11,7 +11,7 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/net-rpc-msgpackrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -1142,6 +1142,214 @@ func TestHealth_ServiceNodes_Gateway(t *testing.T) {
assert.Equal(r, 443, resp.Nodes[1].Service.Port)
})
}
func TestHealth_ServiceNodes_Ingress(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "ingress-gateway",
Service: "ingress-gateway",
Kind: structs.ServiceKindIngressGateway,
},
Check: &structs.HealthCheck{
Name: "ingress connect",
Status: api.HealthPassing,
ServiceID: "ingress-gateway",
},
}
var out struct{}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "ingress-gateway",
Service: "ingress-gateway",
Kind: structs.ServiceKindIngressGateway,
},
Check: &structs.HealthCheck{
Name: "ingress connect",
Status: api.HealthWarning,
ServiceID: "ingress-gateway",
},
}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
// Register ingress-gateway config entry
{
args := &structs.IngressGatewayConfigEntry{
Name: "ingress-gateway",
Kind: structs.IngressGateway,
Listeners: []structs.IngressListener{
{
Port: 8888,
Services: []structs.IngressService{
{Name: "db"},
},
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
}
var out bool
require.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out))
require.True(t, out)
}
var out2 structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
Ingress: true,
}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2))
nodes := out2.Nodes
require.Len(t, nodes, 2)
require.Equal(t, nodes[0].Node.Node, "bar")
require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning)
require.Equal(t, nodes[1].Node.Node, "foo")
require.Equal(t, nodes[1].Checks[0].Status, api.HealthPassing)
}
func TestHealth_ServiceNodes_Ingress_ACL(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
token, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `
service "db" { policy = "read" }
service "ingress-gateway" { policy = "read" }
node_prefix "" { policy = "read" }`)
require.NoError(t, err)
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "ingress-gateway",
Service: "ingress-gateway",
},
Check: &structs.HealthCheck{
Name: "ingress connect",
Status: api.HealthPassing,
ServiceID: "ingress-gateway",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out struct{}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "ingress-gateway",
Service: "ingress-gateway",
},
Check: &structs.HealthCheck{
Name: "ingress connect",
Status: api.HealthWarning,
ServiceID: "ingress-gateway",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
// Register ingress-gateway config entry
{
args := &structs.IngressGatewayConfigEntry{
Name: "ingress-gateway",
Kind: structs.IngressGateway,
Listeners: []structs.IngressListener{
{
Port: 8888,
Protocol: "http",
Services: []structs.IngressService{
{Name: "db"},
{Name: "another"},
},
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out bool
require.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out))
require.True(t, out)
}
// No token used
var out2 structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
Ingress: true,
}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2))
require.Len(t, out2.Nodes, 0)
// Requesting a service that is not covered by the token's policy
req = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "another",
Ingress: true,
QueryOptions: structs.QueryOptions{Token: token.SecretID},
}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2))
require.Len(t, out2.Nodes, 0)
// Requesting service covered by the token's policy
req = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
Ingress: true,
QueryOptions: structs.QueryOptions{Token: token.SecretID},
}
require.Nil(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2))
nodes := out2.Nodes
require.Len(t, nodes, 2)
require.Equal(t, nodes[0].Node.Node, "bar")
require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning)
require.Equal(t, nodes[1].Node.Node, "foo")
require.Equal(t, nodes[1].Checks[0].Status, api.HealthPassing)
}
func TestHealth_NodeChecks_FilterACL(t *testing.T) {
t.Parallel()

View File

@ -323,12 +323,26 @@ func (m *Internal) GatewayServices(args *structs.ServiceSpecificRequest, reply *
var index uint64
var services structs.GatewayServices
switch args.ServiceKind {
case structs.ServiceKindTerminatingGateway:
index, services, err = state.TerminatingGatewayServices(ws, args.ServiceName, &args.EnterpriseMeta)
supportedGateways := []string{structs.IngressGateway, structs.TerminatingGateway}
var found bool
for _, kind := range supportedGateways {
// We only use this call to validate the RPC call, don't add the watch set
_, entry, err := state.ConfigEntry(nil, kind, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if entry != nil {
found = true
}
}
if !found {
return fmt.Errorf("service %q is not a configured terminating-gateway or ingress-gateway", args.ServiceName)
}
index, services, err = state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if err := m.srv.filterACL(args.Token, &services); err != nil {

View File

@ -2,11 +2,12 @@ package consul
import (
"encoding/base64"
"github.com/hashicorp/consul/sdk/testutil/retry"
"os"
"strings"
"testing"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
@ -752,7 +753,6 @@ func TestInternal_TerminatingGatewayServices(t *testing.T) {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
ServiceKind: structs.ServiceKindTerminatingGateway,
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
@ -784,11 +784,175 @@ func TestInternal_TerminatingGatewayServices(t *testing.T) {
KeyFile: "client.key",
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
}
func TestInternal_TerminatingGatewayServices_ACLFiltering(t *testing.T) {
func TestInternal_GatewayServices_BothGateways(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
{
var out struct{}
// Register a service "api"
args := structs.TestRegisterRequest(t)
args.Service.Service = "api"
args.Check = &structs.HealthCheck{
Name: "api",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a terminating gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "gateway",
Port: 443,
},
Check: &structs.HealthCheck{
Name: "gateway",
Status: api.HealthPassing,
ServiceID: "gateway",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
Services: []structs.LinkedService{
{
Name: "api",
},
},
},
}
var entryResp bool
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
// Register a service "db"
args = structs.TestRegisterRequest(t)
args.Service.Service = "db"
args.Check = &structs.HealthCheck{
Name: "db",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register an ingress gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.2",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "ingress",
Port: 444,
},
Check: &structs.HealthCheck{
Name: "ingress",
Status: api.HealthPassing,
ServiceID: "ingress",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs = &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 8888,
Services: []structs.IngressService{
{Name: "db"},
},
},
},
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
}
retry.Run(t, func(r *retry.R) {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 1)
expect := structs.GatewayServices{
{
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
req.ServiceName = "ingress"
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 1)
expect = structs.GatewayServices{
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("ingress", nil),
GatewayKind: structs.ServiceKindIngressGateway,
Port: 8888,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
// Test a non-gateway service being requested
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
}
var resp structs.IndexedGatewayServices
err := msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)
assert.Error(t, err)
assert.Contains(t, err.Error(), `service "api" is not a configured terminating-gateway or ingress-gateway`)
}
func TestInternal_GatewayServices_ACLFiltering(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
@ -907,7 +1071,6 @@ service_prefix "db" {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
ServiceKind: structs.ServiceKindTerminatingGateway,
QueryOptions: structs.QueryOptions{Token: svcToken.SecretID},
}
var resp structs.IndexedGatewayServices
@ -928,7 +1091,6 @@ service "gateway" {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
ServiceKind: structs.ServiceKindTerminatingGateway,
QueryOptions: structs.QueryOptions{Token: gwToken.SecretID},
}
var resp structs.IndexedGatewayServices
@ -952,7 +1114,6 @@ service "gateway" {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
ServiceKind: structs.ServiceKindTerminatingGateway,
QueryOptions: structs.QueryOptions{Token: validToken.SecretID},
}
var resp structs.IndexedGatewayServices
@ -971,6 +1132,11 @@ service "gateway" {
GatewayKind: structs.ServiceKindTerminatingGateway,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
}

View File

@ -14,7 +14,7 @@ import (
const (
servicesTableName = "services"
terminatingGatewayServicesTableName = "terminating-gateway-services"
gatewayServicesTableName = "gateway-services"
// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
@ -57,11 +57,11 @@ func nodesTableSchema() *memdb.TableSchema {
}
}
// terminatingGatewayServicesTableSchema returns a new table schema used to store information
// gatewayServicesTableNameSchema returns a new table schema used to store information
// about services associated with terminating gateways.
func terminatingGatewayServicesTableSchema() *memdb.TableSchema {
func gatewayServicesTableNameSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: terminatingGatewayServicesTableName,
Name: gatewayServicesTableName,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
@ -158,7 +158,7 @@ func init() {
registerSchema(nodesTableSchema)
registerSchema(servicesTableSchema)
registerSchema(checksTableSchema)
registerSchema(terminatingGatewayServicesTableSchema)
registerSchema(gatewayServicesTableNameSchema)
}
const (
@ -775,16 +775,23 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
return fmt.Errorf("Invalid Service Meta for node %s and serviceID %s: %v", node, svc.ID, err)
}
// Check if this service is covered by a terminating gateway's wildcard specifier
gateway, err := s.serviceTerminatingGateway(tx, structs.WildcardSpecifier, &svc.EnterpriseMeta)
// Check if this service is covered by a gateway's wildcard specifier
svcGateways, err := s.serviceGateways(tx, structs.WildcardSpecifier, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Service, err)
}
if gatewaySvc, ok := gateway.(*structs.GatewayService); ok && gatewaySvc != nil {
if err = s.updateTerminatingGatewayService(tx, idx, gatewaySvc.Gateway, svc.Service, &svc.EnterpriseMeta); err != nil {
for service := svcGateways.Next(); service != nil; service = svcGateways.Next() {
if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil {
// Copy the wildcard mapping and modify it
gatewaySvc := wildcardSvc.Clone()
gatewaySvc.Service = structs.NewServiceID(svc.Service, &svc.EnterpriseMeta)
if err = s.updateGatewayService(tx, idx, gatewaySvc); err != nil {
return fmt.Errorf("Failed to associate service %q with gateway %q", gatewaySvc.Service.String(), gatewaySvc.Gateway.String())
}
}
}
// Create the service node entry and populate the indexes. Note that
// conversion doesn't populate any of the node-specific information.
@ -863,6 +870,10 @@ func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta)
tx := s.db.Txn(false)
defer tx.Abort()
return s.serviceListTxn(tx, ws, entMeta)
}
func (s *Store) serviceListTxn(tx *memdb.Txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
idx := s.catalogServicesMaxIndex(tx, entMeta)
services, err := s.catalogServiceList(tx, entMeta, true)
@ -1040,11 +1051,14 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
// to the mesh with a mix of sidecars and gateways until all its instances have a sidecar.
if connect {
// Look up gateway nodes associated with the service
nodes, ch, err := s.serviceTerminatingGatewayNodes(tx, serviceName, entMeta)
_, nodes, chs, err := s.serviceGatewayNodes(tx, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
}
for _, ch := range chs {
ws.Add(ch)
}
for i := 0; i < len(nodes); i++ {
results = append(results, nodes[i])
}
@ -1459,18 +1473,12 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
return err
}
// Clean up association between service name and gateway
gateway, err := s.serviceTerminatingGateway(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
}
if gateway != nil {
if err := tx.Delete(terminatingGatewayServicesTableName, gateway); err != nil {
return fmt.Errorf("failed to delete gateway mapping for %q: %v", svc.ServiceName, err)
}
if err := indexUpdateMaxTxn(tx, idx, terminatingGatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating terminating-gateway-services index: %v", err)
// Clean up association between service name and gateways
if _, err := tx.DeleteAll(gatewayServicesTableName, "service", structs.NewServiceID(svc.ServiceName, entMeta)); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
} else {
@ -1942,10 +1950,53 @@ func (s *Store) CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string,
return s.checkServiceNodes(ws, serviceName, true, entMeta)
}
// CheckIngressServiceNodes is used to query all nodes and checks for ingress
// endpoints for a given service.
func (s *Store) CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
maxIdx, nodes, watchChs, err := s.serviceGatewayNodes(tx, serviceName, structs.ServiceKindIngressGateway, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
}
// TODO(ingress) : Deal with incorporating index from mapping table
// Watch list of gateway nodes for changes
for _, ch := range watchChs {
ws.Add(ch)
}
// TODO(ingress): Test namespace functionality here
// De-dup services to lookup
serviceIDs := make(map[structs.ServiceID]struct{})
for _, n := range nodes {
serviceIDs[n.CompoundServiceName()] = struct{}{}
}
var results structs.CheckServiceNodes
for sid := range serviceIDs {
idx, n, err := s.checkServiceNodesTxn(tx, ws, sid.ID, false, &sid.EnterpriseMeta)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
results = append(results, n...)
}
return maxIdx, results, nil
}
func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.checkServiceNodesTxn(tx, ws, serviceName, connect, entMeta)
}
func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// Function for lookup
index := "service"
if connect {
@ -1979,13 +2030,13 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
serviceNames[sn.ServiceName] = struct{}{}
}
// If we are querying for Connect nodes, the associated proxy might be a gateway.
// If we are querying for Connect nodes, the associated proxy might be a terminating-gateway.
// Gateways are tracked in a separate table, and we append them to the result set.
// We append rather than replace since it allows users to migrate a service
// to the mesh with a mix of sidecars and gateways until all its instances have a sidecar.
if connect {
// Look up gateway nodes associated with the service
nodes, _, err := s.serviceTerminatingGatewayNodes(tx, serviceName, entMeta)
_, nodes, _, err := s.serviceGatewayNodes(tx, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
}
@ -2095,12 +2146,12 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
}
// TerminatingGatewayServices is used to query all services associated with a terminating gateway
func (s *Store) TerminatingGatewayServices(ws memdb.WatchSet, gateway string, entMeta *structs.EnterpriseMeta) (uint64, structs.GatewayServices, error) {
// GatewayServices is used to query all services associated with a gateway
func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *structs.EnterpriseMeta) (uint64, structs.GatewayServices, error) {
tx := s.db.Txn(false)
defer tx.Abort()
iter, err := s.terminatingGatewayServices(tx, gateway, entMeta)
iter, err := s.gatewayServices(tx, gateway, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed gateway services lookup: %s", err)
}
@ -2115,7 +2166,7 @@ func (s *Store) TerminatingGatewayServices(ws memdb.WatchSet, gateway string, en
}
}
idx := maxIndexTxn(tx, terminatingGatewayServicesTableName)
idx := maxIndexTxn(tx, gatewayServicesTableName)
return idx, results, nil
}
@ -2363,79 +2414,127 @@ func checkSessionsTxn(tx *memdb.Txn, hc *structs.HealthCheck) ([]*sessionCheck,
return sessions, nil
}
// updateGatewayService associates services with gateways as specified in a terminating-gateway config entry
func (s *Store) updateTerminatingGatewayServices(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
entry, ok := conf.(*structs.TerminatingGatewayConfigEntry)
if !ok {
return fmt.Errorf("unexpected config entry type: %T", conf)
}
// updateGatewayServices associates services with gateways as specified in a gateway config entry
func (s *Store) updateGatewayServices(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
var gatewayServices structs.GatewayServices
var err error
// Check if service list matches the last known list for the config entry, if it does, skip the update
_, c, err := s.configEntryTxn(tx, nil, conf.GetKind(), conf.GetName(), entMeta)
gatewayID := structs.NewServiceID(conf.GetName(), conf.GetEnterpriseMeta())
switch conf.GetKind() {
case structs.IngressGateway:
gatewayServices, err = s.ingressConfigGatewayServices(tx, gatewayID, conf, entMeta)
case structs.TerminatingGateway:
gatewayServices, err = s.terminatingConfigGatewayServices(tx, gatewayID, conf, entMeta)
default:
return fmt.Errorf("config entry kind %q does not need gateway-services", conf.GetKind())
}
// Return early if there is an error OR we don't have any services to update
if err != nil {
return fmt.Errorf("failed to get config entry: %v", err)
}
if cfg, ok := c.(*structs.TerminatingGatewayConfigEntry); ok && cfg != nil {
if reflect.DeepEqual(cfg.Services, entry.Services) {
// Services are the same, nothing to update
return nil
}
return err
}
// Delete all associated with gateway first, to avoid keeping mappings that were removed
if _, err := tx.DeleteAll(terminatingGatewayServicesTableName, "gateway", structs.NewServiceID(entry.Name, entMeta)); err != nil {
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceID(conf.GetName(), entMeta)); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
gatewayID := structs.NewServiceID(entry.Name, &entry.EnterpriseMeta)
for _, svc := range entry.Services {
for _, svc := range gatewayServices {
// If the service is a wildcard we need to target all services within the namespace
if svc.Name == structs.WildcardSpecifier {
if err := s.updateTerminatingGatewayNamespace(tx, gatewayID, svc, entMeta); err != nil {
if svc.Service.ID == structs.WildcardSpecifier {
if err := s.updateGatewayNamespace(tx, idx, svc, entMeta); err != nil {
return fmt.Errorf("failed to associate gateway %q with wildcard: %v", gatewayID.String(), err)
}
// Skip service-specific update below if there was a wildcard update
continue
}
// Check if the non-wildcard service is already associated with a gateway
existing, err := s.serviceTerminatingGateway(tx, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
if gs, ok := existing.(*structs.GatewayService); ok && gs != nil {
// Only return an error if the stored gateway does not match the one from the config entry
if !gs.Gateway.Matches(&gatewayID) {
return fmt.Errorf("service %q is associated with different gateway, %q", gs.Service.String(), gs.Gateway.String())
}
}
// Since this service was specified on its own, and not with a wildcard,
// if there is an existing entry, we overwrite it. The service entry is the source of truth.
//
// By extension, if TLS creds are provided with a wildcard but are not provided in
// the service entry, the service does not inherit the creds from the wildcard.
err = s.updateGatewayService(tx, idx, svc)
if err != nil {
return err
}
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
return nil
}
func (s *Store) ingressConfigGatewayServices(tx *memdb.Txn, gateway structs.ServiceID, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) (structs.GatewayServices, error) {
entry, ok := conf.(*structs.IngressGatewayConfigEntry)
if !ok {
return nil, fmt.Errorf("unexpected config entry type: %T", conf)
}
// Check if service list matches the last known list for the config entry, if it does, skip the update
_, c, err := s.configEntryTxn(tx, nil, conf.GetKind(), conf.GetName(), entMeta)
if err != nil {
return nil, fmt.Errorf("failed to get config entry: %v", err)
}
if cfg, ok := c.(*structs.IngressGatewayConfigEntry); ok && cfg != nil {
if reflect.DeepEqual(cfg.Listeners, entry.Listeners) {
// Services are the same, nothing to update
return nil, nil
}
}
var gatewayServices structs.GatewayServices
for _, listener := range entry.Listeners {
for _, service := range listener.Services {
mapping := &structs.GatewayService{
Gateway: gatewayID,
Gateway: gateway,
Service: service.ToServiceID(),
GatewayKind: structs.ServiceKindIngressGateway,
Port: listener.Port,
}
gatewayServices = append(gatewayServices, mapping)
}
}
return gatewayServices, nil
}
func (s *Store) terminatingConfigGatewayServices(tx *memdb.Txn, gateway structs.ServiceID, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) (structs.GatewayServices, error) {
entry, ok := conf.(*structs.TerminatingGatewayConfigEntry)
if !ok {
return nil, fmt.Errorf("unexpected config entry type: %T", conf)
}
// Check if service list matches the last known list for the config entry, if it does, skip the update
_, c, err := s.configEntryTxn(tx, nil, conf.GetKind(), conf.GetName(), entMeta)
if err != nil {
return nil, fmt.Errorf("failed to get config entry: %v", err)
}
if cfg, ok := c.(*structs.TerminatingGatewayConfigEntry); ok && cfg != nil {
if reflect.DeepEqual(cfg.Services, entry.Services) {
// Services are the same, nothing to update
return nil, nil
}
}
var gatewayServices structs.GatewayServices
for _, svc := range entry.Services {
mapping := &structs.GatewayService{
Gateway: gateway,
Service: structs.NewServiceID(svc.Name, &svc.EnterpriseMeta),
GatewayKind: structs.ServiceKindTerminatingGateway,
KeyFile: svc.KeyFile,
CertFile: svc.CertFile,
CAFile: svc.CAFile,
}
if err := tx.Insert(terminatingGatewayServicesTableName, mapping); err != nil {
return fmt.Errorf("failed inserting gateway service mapping: %s", err)
gatewayServices = append(gatewayServices, mapping)
}
return gatewayServices, nil
}
if err := indexUpdateMaxTxn(tx, idx, terminatingGatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating terminating-gateway-services index: %v", err)
}
return nil
}
// updateTerminatingGatewayNamespace is used to target all services within a namespace with a set of TLS certificates
func (s *Store) updateTerminatingGatewayNamespace(tx *memdb.Txn, gateway structs.ServiceID, service structs.LinkedService, entMeta *structs.EnterpriseMeta) error {
// updateGatewayNamespace is used to target all services within a namespace
func (s *Store) updateGatewayNamespace(tx *memdb.Txn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error {
services, err := s.catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta)
if err != nil {
return fmt.Errorf("failed querying services: %s", err)
@ -2450,125 +2549,108 @@ func (s *Store) updateTerminatingGatewayNamespace(tx *memdb.Txn, gateway structs
continue
}
existing, err := s.serviceTerminatingGateway(tx, sn.ServiceName, &sn.EnterpriseMeta)
existing, err := tx.First(gatewayServicesTableName, "id", service.Gateway, sn.CompoundServiceName())
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
if gs, ok := existing.(*structs.GatewayService); ok && gs != nil {
// Return an error if the wildcard is attempting to cover a service specified by a different gateway's config entry
if !gs.Gateway.Matches(&gateway) {
return fmt.Errorf("service %q is associated with different gateway, %q", gs.Service.String(), gs.Gateway.String())
}
if existing != nil {
// If there's an existing service associated with this gateway then we skip it.
// This means the service was specified on its own, and the service entry overrides the wildcard entry.
continue
}
mapping := &structs.GatewayService{
Gateway: gateway,
Service: structs.NewServiceID(sn.ServiceName, &service.EnterpriseMeta),
GatewayKind: structs.ServiceKindTerminatingGateway,
KeyFile: service.KeyFile,
CertFile: service.CertFile,
CAFile: service.CAFile,
}
if err := tx.Insert(terminatingGatewayServicesTableName, mapping); err != nil {
return fmt.Errorf("failed inserting gateway service mapping: %s", err)
mapping := service.Clone()
mapping.Service = structs.NewServiceID(sn.ServiceName, &service.Service.EnterpriseMeta)
err = s.updateGatewayService(tx, idx, mapping)
if err != nil {
return err
}
}
// Also store a mapping for the wildcard so that the TLS creds can be pulled
// for new services registered in its namespace
mapping := &structs.GatewayService{
Gateway: gateway,
Service: structs.NewServiceID(service.Name, &service.EnterpriseMeta),
GatewayKind: structs.ServiceKindTerminatingGateway,
KeyFile: service.KeyFile,
CertFile: service.CertFile,
CAFile: service.CAFile,
}
if err := tx.Insert(terminatingGatewayServicesTableName, mapping); err != nil {
return fmt.Errorf("failed inserting gateway service mapping: %s", err)
err = s.updateGatewayService(tx, idx, service)
if err != nil {
return err
}
return nil
}
// updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway
func (s *Store) updateTerminatingGatewayService(tx *memdb.Txn, idx uint64, gateway structs.ServiceID, service string, entMeta *structs.EnterpriseMeta) error {
mapping := &structs.GatewayService{
Gateway: gateway,
Service: structs.NewServiceID(service, entMeta),
GatewayKind: structs.ServiceKindTerminatingGateway,
}
// If a wildcard specifier is registered for that namespace, use its TLS config
wc, err := s.serviceTerminatingGateway(tx, structs.WildcardSpecifier, entMeta)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
if wc != nil {
cfg := wc.(*structs.GatewayService)
mapping.CAFile = cfg.CAFile
mapping.CertFile = cfg.CertFile
mapping.KeyFile = cfg.KeyFile
}
func (s *Store) updateGatewayService(tx *memdb.Txn, idx uint64, mapping *structs.GatewayService) error {
// Check if mapping already exists in table if it's already in the table
// Avoid insert if nothing changed
existing, err := s.serviceTerminatingGateway(tx, service, entMeta)
existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
if gs, ok := existing.(*structs.GatewayService); ok && gs != nil {
mapping.CreateIndex = gs.CreateIndex
if gs.IsSame(mapping) {
return nil
}
} else {
// We have a new mapping
mapping.CreateIndex = idx
}
mapping.ModifyIndex = idx
if err := tx.Insert(terminatingGatewayServicesTableName, mapping); err != nil {
if err := tx.Insert(gatewayServicesTableName, mapping); err != nil {
return fmt.Errorf("failed inserting gateway service mapping: %s", err)
}
if err := indexUpdateMaxTxn(tx, idx, terminatingGatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating terminating-gateway-services index: %v", err)
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
return nil
}
func (s *Store) serviceTerminatingGateway(tx *memdb.Txn, name string, entMeta *structs.EnterpriseMeta) (interface{}, error) {
return tx.First(terminatingGatewayServicesTableName, "service", structs.NewServiceID(name, entMeta))
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service.
func (s *Store) serviceGateways(tx *memdb.Txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "service", structs.NewServiceID(name, entMeta))
}
func (s *Store) terminatingGatewayServices(tx *memdb.Txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(terminatingGatewayServicesTableName, "gateway", structs.NewServiceID(name, entMeta))
func (s *Store) gatewayServices(tx *memdb.Txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceID(name, entMeta))
}
func (s *Store) serviceTerminatingGatewayNodes(tx *memdb.Txn, service string, entMeta *structs.EnterpriseMeta) (structs.ServiceNodes, <-chan struct{}, error) {
// TODO(ingress): How to handle index rolling back when a config entry is
// deleted that references a service?
// We might need something like the service_last_extinction index?
func (s *Store) serviceGatewayNodes(tx *memdb.Txn, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, []<-chan struct{}, error) {
// Look up gateway name associated with the service
gw, err := s.serviceTerminatingGateway(tx, service, entMeta)
gws, err := s.serviceGateways(tx, service, entMeta)
if err != nil {
return nil, nil, fmt.Errorf("failed gateway lookup: %s", err)
return 0, nil, nil, fmt.Errorf("failed gateway lookup: %s", err)
}
var ret structs.ServiceNodes
var watchChan <-chan struct{}
var watchChans []<-chan struct{}
var maxIdx uint64
if gw != nil {
mapping := gw.(*structs.GatewayService)
for gateway := gws.Next(); gateway != nil; gateway = gws.Next() {
mapping := gateway.(*structs.GatewayService)
// TODO(ingress): Test this conditional
if mapping.GatewayKind != kind {
continue
}
if mapping.ModifyIndex > maxIdx {
maxIdx = mapping.ModifyIndex
}
// Look up nodes for gateway
gateways, err := s.catalogServiceNodeList(tx, mapping.Gateway.ID, "service", &mapping.Gateway.EnterpriseMeta)
gwServices, err := s.catalogServiceNodeList(tx, mapping.Gateway.ID, "service", &mapping.Gateway.EnterpriseMeta)
if err != nil {
return nil, nil, fmt.Errorf("failed service lookup: %s", err)
return 0, nil, nil, fmt.Errorf("failed service lookup: %s", err)
}
for gateway := gateways.Next(); gateway != nil; gateway = gateways.Next() {
sn := gateway.(*structs.ServiceNode)
for svc := gwServices.Next(); svc != nil; svc = gwServices.Next() {
sn := svc.(*structs.ServiceNode)
ret = append(ret, sn)
}
watchChan = gateways.WatchCh()
watchChans = append(watchChans, gwServices.WatchCh())
}
return ret, watchChan, nil
return maxIdx, ret, watchChans, nil
}

View File

@ -4383,12 +4383,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
tx.Commit()
}
func TestStateStore_TerminatingGatewayServices(t *testing.T) {
func TestStateStore_GatewayServices_Terminating(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns an empty list.
ws := memdb.NewWatchSet()
idx, nodes, err := s.TerminatingGatewayServices(ws, "db", nil)
idx, nodes, err := s.GatewayServices(ws, "db", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(0))
assert.Len(t, nodes, 0)
@ -4444,7 +4444,7 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
// Read everything back.
ws = memdb.NewWatchSet()
idx, out, err := s.TerminatingGatewayServices(ws, "gateway", nil)
idx, out, err := s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(21))
assert.Len(t, out, 2)
@ -4454,11 +4454,19 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 21,
ModifyIndex: 21,
},
},
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 21,
ModifyIndex: 21,
},
},
}
assert.Equal(t, expect, out)
@ -4489,7 +4497,7 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
// Read everything back.
ws = memdb.NewWatchSet()
idx, out, err = s.TerminatingGatewayServices(ws, "gateway", nil)
idx, out, err = s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(22))
assert.Len(t, out, 2)
@ -4502,11 +4510,19 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
RaftIndex: structs.RaftIndex{
CreateIndex: 22,
ModifyIndex: 22,
},
},
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 22,
ModifyIndex: 22,
},
},
}
assert.Equal(t, expect, out)
@ -4515,7 +4531,7 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
assert.Nil(t, s.EnsureService(23, "bar", &structs.NodeService{ID: "redis", Service: "redis", Tags: nil, Address: "", Port: 6379}))
assert.True(t, watchFired(ws))
idx, out, err = s.TerminatingGatewayServices(ws, "gateway", nil)
idx, out, err = s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(23))
assert.Len(t, out, 3)
@ -4528,11 +4544,19 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
RaftIndex: structs.RaftIndex{
CreateIndex: 22,
ModifyIndex: 22,
},
},
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 22,
ModifyIndex: 22,
},
},
{
Service: structs.NewServiceID("redis", nil),
@ -4541,6 +4565,10 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
RaftIndex: structs.RaftIndex{
CreateIndex: 23,
ModifyIndex: 23,
},
},
}
assert.Equal(t, expect, out)
@ -4549,7 +4577,7 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
assert.Nil(t, s.DeleteService(24, "bar", "redis", nil))
assert.True(t, watchFired(ws))
idx, out, err = s.TerminatingGatewayServices(ws, "gateway", nil)
idx, out, err = s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(24))
assert.Len(t, out, 2)
@ -4562,16 +4590,24 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
RaftIndex: structs.RaftIndex{
CreateIndex: 22,
ModifyIndex: 22,
},
},
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 22,
ModifyIndex: 22,
},
},
}
assert.Equal(t, expect, out)
// Create a new entry that only leaves one service
// Update the entry that only leaves one service
assert.Nil(t, s.EnsureConfigEntry(25, &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
@ -4583,7 +4619,7 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
}, nil))
assert.True(t, watchFired(ws))
idx, out, err = s.TerminatingGatewayServices(ws, "gateway", nil)
idx, out, err = s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(25))
assert.Len(t, out, 1)
@ -4594,12 +4630,16 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 25,
ModifyIndex: 25,
},
},
}
assert.Equal(t, expect, out)
// Attempt to associate a different gateway with services that include db
assert.Error(t, s.EnsureConfigEntry(26, &structs.TerminatingGatewayConfigEntry{
assert.Nil(t, s.EnsureConfigEntry(26, &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway2",
Services: []structs.LinkedService{
@ -4607,14 +4647,307 @@ func TestStateStore_TerminatingGatewayServices(t *testing.T) {
Name: "*",
},
},
}, nil), "service \"db\" is associated with different gateway")
}, nil))
// Deleting the config entry should remove existing mappings
assert.Nil(t, s.DeleteConfigEntry(26, "terminating-gateway", "gateway", nil))
assert.True(t, watchFired(ws))
idx, out, err = s.TerminatingGatewayServices(ws, "gateway", nil)
idx, out, err = s.GatewayServices(ws, "gateway2", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(26))
assert.Len(t, out, 2)
expect = structs.GatewayServices{
{
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway2", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 26,
ModifyIndex: 26,
},
},
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway2", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
RaftIndex: structs.RaftIndex{
CreateIndex: 26,
ModifyIndex: 26,
},
},
}
assert.Equal(t, expect, out)
// Deleting the config entry should remove existing mappings
assert.Nil(t, s.DeleteConfigEntry(27, "terminating-gateway", "gateway", nil))
assert.True(t, watchFired(ws))
idx, out, err = s.GatewayServices(ws, "gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(27))
assert.Len(t, out, 0)
}
func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
s := testStateStore(t)
ws := setupIngressState(t, s)
require := require.New(t)
t.Run("check service1 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(err)
require.Equal(uint64(13), idx)
// Multiple instances of the ingress2 service
require.Len(results, 4)
ids := make(map[string]struct{})
for _, n := range results {
ids[n.Service.ID] = struct{}{}
}
expectedIds := map[string]struct{}{
"ingress1": struct{}{},
"ingress2": struct{}{},
"wildcardIngress": struct{}{},
}
require.Equal(expectedIds, ids)
})
t.Run("check service2 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(err)
require.Equal(uint64(12), idx)
require.Len(results, 2)
ids := make(map[string]struct{})
for _, n := range results {
ids[n.Service.ID] = struct{}{}
}
expectedIds := map[string]struct{}{
"ingress1": struct{}{},
"wildcardIngress": struct{}{},
}
require.Equal(expectedIds, ids)
})
t.Run("check service3 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(err)
require.Equal(uint64(11), idx)
require.Len(results, 1)
require.Equal("wildcardIngress", results[0].Service.ID)
})
t.Run("delete a wildcard entry", func(t *testing.T) {
require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil))
require.True(watchFired(ws))
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(err)
require.Equal(uint64(13), idx)
require.Len(results, 3)
idx, results, err = s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(err)
require.Equal(uint64(12), idx)
require.Len(results, 1)
idx, results, err = s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(err)
require.Equal(uint64(0), idx)
// TODO(ingress): index goes backward when deleting last config entry
// require.Equal(uint64(11), idx)
require.Len(results, 0)
})
}
func TestStateStore_GatewayServices_Ingress(t *testing.T) {
s := testStateStore(t)
ws := setupIngressState(t, s)
require := require.New(t)
t.Run("ingress1 gateway services", func(t *testing.T) {
idx, results, err := s.GatewayServices(ws, "ingress1", nil)
require.NoError(err)
require.Equal(uint64(14), idx)
require.Len(results, 2)
require.Equal("ingress1", results[0].Gateway.ID)
require.Equal("service1", results[0].Service.ID)
require.Equal(1111, results[0].Port)
require.Equal("ingress1", results[1].Gateway.ID)
require.Equal("service2", results[1].Service.ID)
require.Equal(2222, results[1].Port)
})
t.Run("ingress2 gateway services", func(t *testing.T) {
idx, results, err := s.GatewayServices(ws, "ingress2", nil)
require.NoError(err)
require.Equal(uint64(14), idx)
require.Len(results, 1)
require.Equal("ingress2", results[0].Gateway.ID)
require.Equal("service1", results[0].Service.ID)
require.Equal(3333, results[0].Port)
})
t.Run("No gatway services associated", func(t *testing.T) {
idx, results, err := s.GatewayServices(ws, "nothingIngress", nil)
require.NoError(err)
require.Equal(uint64(14), idx)
require.Len(results, 0)
})
t.Run("wildcard gateway services", func(t *testing.T) {
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
require.NoError(err)
require.Equal(uint64(14), idx)
require.Len(results, 3)
require.Equal("wildcardIngress", results[0].Gateway.ID)
require.Equal("service1", results[0].Service.ID)
require.Equal(4444, results[0].Port)
require.Equal("wildcardIngress", results[1].Gateway.ID)
require.Equal("service2", results[1].Service.ID)
require.Equal(4444, results[1].Port)
require.Equal("wildcardIngress", results[2].Gateway.ID)
require.Equal("service3", results[2].Service.ID)
require.Equal(4444, results[2].Port)
})
t.Run("deregistering a service", func(t *testing.T) {
require.Nil(s.DeleteService(18, "node1", "service1", nil))
require.True(watchFired(ws))
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
require.NoError(err)
require.Equal(uint64(18), idx)
require.Len(results, 2)
})
// TODO(ingress): This test case fails right now because of a
// bug in DeleteService where we delete are entries associated
// to a service, not just an entry created by a wildcard.
// t.Run("check ingress2 gateway services again", func(t *testing.T) {
// idx, results, err := s.GatewayServices(ws, "ingress2", nil)
// require.NoError(err)
// require.Equal(uint64(18), idx)
// require.Len(results, 1)
// require.Equal("ingress2", results[0].Gateway.ID)
// require.Equal("service1", results[0].Service.ID)
// require.Equal(3333, results[0].Port)
// })
t.Run("deleting a wildcard config entry", func(t *testing.T) {
require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil))
require.True(watchFired(ws))
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
require.NoError(err)
require.Equal(uint64(19), idx)
require.Len(results, 0)
})
t.Run("updating a config entry with zero listeners", func(t *testing.T) {
ingress1 := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress1",
Listeners: []structs.IngressListener{},
}
require.Nil(s.EnsureConfigEntry(20, ingress1, nil))
require.True(watchFired(ws))
idx, results, err := s.GatewayServices(ws, "ingress1", nil)
require.NoError(err)
require.Equal(uint64(20), idx)
require.Len(results, 0)
})
}
func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
// Querying with no matches gives an empty response
ws := memdb.NewWatchSet()
idx, res, err := s.GatewayServices(ws, "ingress1", nil)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register some nodes.
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
// Register a service against the nodes.
testRegisterIngressService(t, s, 3, "node1", "wildcardIngress")
testRegisterIngressService(t, s, 4, "node1", "ingress1")
testRegisterIngressService(t, s, 5, "node1", "ingress2")
testRegisterIngressService(t, s, 6, "node2", "ingress2")
testRegisterIngressService(t, s, 7, "node1", "nothingIngress")
testRegisterService(t, s, 8, "node1", "service1")
testRegisterService(t, s, 9, "node2", "service2")
testRegisterService(t, s, 10, "node2", "service3")
// Register some ingress config entries.
wildcardIngress := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "wildcardIngress",
Listeners: []structs.IngressListener{
{
Port: 4444,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "*",
},
},
},
},
}
assert.NoError(t, s.EnsureConfigEntry(11, wildcardIngress, nil))
assert.True(t, watchFired(ws))
ingress1 := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress1",
Listeners: []structs.IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "service1",
},
},
},
{
Port: 2222,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "service2",
},
},
},
},
}
assert.NoError(t, s.EnsureConfigEntry(12, ingress1, nil))
assert.True(t, watchFired(ws))
ingress2 := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress2",
Listeners: []structs.IngressListener{
{
Port: 3333,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "service1",
},
},
},
},
}
assert.NoError(t, s.EnsureConfigEntry(13, ingress2, nil))
assert.True(t, watchFired(ws))
nothingIngress := &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "nothingIngress",
Listeners: []structs.IngressListener{},
}
assert.NoError(t, s.EnsureConfigEntry(14, nothingIngress, nil))
assert.True(t, watchFired(ws))
return ws
}

View File

@ -2,6 +2,7 @@ package state
import (
"fmt"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
@ -214,10 +215,10 @@ func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.Con
return err // Err is already sufficiently decorated.
}
// If the config entry is for terminating gateways we update the memdb table
// If the config entry is for a terminating or ingress gateway we update the memdb table
// that associates gateways <-> services.
if conf.GetKind() == structs.TerminatingGateway {
err = s.updateTerminatingGatewayServices(tx, idx, conf, entMeta)
if conf.GetKind() == structs.TerminatingGateway || conf.GetKind() == structs.IngressGateway {
err = s.updateGatewayServices(tx, idx, conf, entMeta)
if err != nil {
return fmt.Errorf("failed to associate services to gateway: %v", err)
}
@ -282,14 +283,14 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
return nil
}
// If the config entry is for terminating gateways we delete entries from the memdb table
// If the config entry is for terminating or ingress gateways we delete entries from the memdb table
// that associates gateways <-> services.
if kind == structs.TerminatingGateway {
if _, err := tx.DeleteAll(terminatingGatewayServicesTableName, "gateway", structs.NewServiceID(name, entMeta)); err != nil {
if kind == structs.TerminatingGateway || kind == structs.IngressGateway {
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceID(name, entMeta)); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, terminatingGatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating terminating-gateway-services index: %v", err)
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
@ -345,7 +346,15 @@ func (s *Store) validateProposedConfigEntryInGraph(
case structs.ServiceSplitter:
case structs.ServiceResolver:
case structs.IngressGateway:
err := s.checkGatewayClash(tx, name, structs.IngressGateway, structs.TerminatingGateway, entMeta)
if err != nil {
return err
}
case structs.TerminatingGateway:
err := s.checkGatewayClash(tx, name, structs.TerminatingGateway, structs.IngressGateway, entMeta)
if err != nil {
return err
}
default:
return fmt.Errorf("unhandled kind %q during validation of %q", kind, name)
}
@ -353,6 +362,22 @@ func (s *Store) validateProposedConfigEntryInGraph(
return s.validateProposedConfigEntryInServiceGraph(tx, idx, kind, name, next, validateAllChains, entMeta)
}
func (s *Store) checkGatewayClash(
tx *memdb.Txn,
name, selfKind, otherKind string,
entMeta *structs.EnterpriseMeta,
) error {
_, entry, err := s.configEntryTxn(tx, nil, otherKind, name, entMeta)
if err != nil {
return err
}
if entry != nil {
return fmt.Errorf("cannot create a %q config entry with name %q, "+
"a %q config entry with that name already exists", selfKind, name, otherKind)
}
return nil
}
var serviceGraphKinds = []string{
structs.ServiceRouter,
structs.ServiceSplitter,

View File

@ -1250,3 +1250,37 @@ func TestStore_ReadDiscoveryChainConfigEntries_SubsetSplit(t *testing.T) {
require.Len(t, entrySet.Resolvers, 1)
require.Len(t, entrySet.Services, 1)
}
// TODO(ingress): test that having the same name in different namespace is valid
func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) {
s := testStateStore(t)
ingress := &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
}
require.NoError(t, s.EnsureConfigEntry(0, ingress, nil))
terminating := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "gateway",
}
// Cannot have 2 gateways with same service name
require.Error(t, s.EnsureConfigEntry(1, terminating, nil))
ingress = &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
Listeners: []structs.IngressListener{
{Port: 8080},
},
}
require.NoError(t, s.EnsureConfigEntry(2, ingress, nil))
require.NoError(t, s.DeleteConfigEntry(3, structs.IngressGateway, "gateway", nil))
// Adding the terminating gateway with same name should now work
require.NoError(t, s.EnsureConfigEntry(4, terminating, nil))
// Cannot have 2 gateways with same service name
require.Error(t, s.EnsureConfigEntry(5, ingress, nil))
}

View File

@ -126,6 +126,31 @@ func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID s
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
}
func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
svc := &structs.NodeService{
ID: serviceID,
Service: serviceID,
Kind: structs.ServiceKindIngressGateway,
Address: "1.1.1.1",
Port: 1111,
}
if err := s.EnsureService(idx, nodeID, svc); err != nil {
t.Fatalf("err: %s", err)
}
tx := s.db.Txn(false)
defer tx.Abort()
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID)
if err != nil {
t.Fatalf("err: %s", err)
}
if result, ok := service.(*structs.ServiceNode); !ok ||
result.Node != nodeID ||
result.ServiceID != serviceID {
t.Fatalf("bad service: %#v", result)
}
}
func testRegisterCheck(t *testing.T, s *Store, idx uint64,
nodeID string, serviceID string, checkID types.CheckID, state string) {
chk := &structs.HealthCheck{

View File

@ -77,6 +77,17 @@ type dnsConfig struct {
enterpriseDNSConfig
}
type serviceLookup struct {
Network string
Datacenter string
Service string
Tag string
MaxRecursionLevel int
Connect bool
Ingress bool
structs.EnterpriseMeta
}
// DNSServer is used to wrap an Agent and expose various
// service discovery endpoints using a DNS interface.
type DNSServer struct {
@ -501,7 +512,13 @@ func (d *DNSServer) addSOA(cfg *dnsConfig, msg *dns.Msg) {
// in the current cluster which serve as authoritative name servers for zone.
func (d *DNSServer) nameservers(cfg *dnsConfig, maxRecursionLevel int) (ns []dns.RR, extra []dns.RR) {
out, err := d.lookupServiceNodes(cfg, d.agent.config.Datacenter, structs.ConsulServiceName, "", structs.DefaultEnterpriseMeta(), false)
out, err := d.lookupServiceNodes(cfg, serviceLookup{
Datacenter: d.agent.config.Datacenter,
Service: structs.ConsulServiceName,
Connect: false,
Ingress: false,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
})
if err != nil {
d.logger.Warn("Unable to get list of servers", "error", err)
return nil, nil
@ -598,7 +615,7 @@ func (d *DNSServer) doDispatch(network string, remoteAddr net.Addr, req, resp *d
done := false
for i := len(labels) - 1; i >= 0 && !done; i-- {
switch labels[i] {
case "service", "connect", "node", "query", "addr":
case "service", "connect", "ingress", "node", "query", "addr":
queryParts = labels[:i]
querySuffixes = labels[i+1:]
queryKind = labels[i]
@ -630,6 +647,14 @@ func (d *DNSServer) doDispatch(network string, remoteAddr net.Addr, req, resp *d
goto INVALID
}
lookup := serviceLookup{
Network: network,
Datacenter: datacenter,
Connect: false,
Ingress: false,
MaxRecursionLevel: maxRecursionLevel,
EnterpriseMeta: entMeta,
}
// Support RFC 2782 style syntax
if n == 2 && strings.HasPrefix(queryParts[1], "_") && strings.HasPrefix(queryParts[0], "_") {
@ -641,8 +666,10 @@ func (d *DNSServer) doDispatch(network string, remoteAddr net.Addr, req, resp *d
tag = ""
}
lookup.Tag = tag
lookup.Service = queryParts[0][1:]
// _name._tag.service.consul
d.serviceLookup(cfg, network, datacenter, queryParts[0][1:], tag, &entMeta, false, req, resp, maxRecursionLevel)
d.serviceLookup(cfg, lookup, req, resp)
// Consul 0.3 and prior format for SRV queries
} else {
@ -653,8 +680,11 @@ func (d *DNSServer) doDispatch(network string, remoteAddr net.Addr, req, resp *d
tag = strings.Join(queryParts[:n-1], ".")
}
lookup.Tag = tag
lookup.Service = queryParts[n-1]
// tag[.tag].name.service.consul
d.serviceLookup(cfg, network, datacenter, queryParts[n-1], tag, &entMeta, false, req, resp, maxRecursionLevel)
d.serviceLookup(cfg, lookup, req, resp)
}
case "connect":
if len(queryParts) < 1 {
@ -665,8 +695,37 @@ func (d *DNSServer) doDispatch(network string, remoteAddr net.Addr, req, resp *d
goto INVALID
}
lookup := serviceLookup{
Network: network,
Datacenter: datacenter,
Service: queryParts[len(queryParts)-1],
Connect: true,
Ingress: false,
MaxRecursionLevel: maxRecursionLevel,
EnterpriseMeta: entMeta,
}
// name.connect.consul
d.serviceLookup(cfg, network, datacenter, queryParts[len(queryParts)-1], "", &entMeta, true, req, resp, maxRecursionLevel)
d.serviceLookup(cfg, lookup, req, resp)
case "ingress":
if len(queryParts) < 1 {
goto INVALID
}
if !d.parseDatacenterAndEnterpriseMeta(querySuffixes, cfg, &datacenter, &entMeta) {
goto INVALID
}
lookup := serviceLookup{
Network: network,
Datacenter: datacenter,
Service: queryParts[len(queryParts)-1],
Connect: false,
Ingress: true,
MaxRecursionLevel: maxRecursionLevel,
EnterpriseMeta: entMeta,
}
// name.ingress.consul
d.serviceLookup(cfg, lookup, req, resp)
case "node":
if len(queryParts) < 1 {
goto INVALID
@ -1076,22 +1135,20 @@ func (d *DNSServer) trimDNSResponse(cfg *dnsConfig, network string, req, resp *d
}
// lookupServiceNodes returns nodes with a given service.
func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, datacenter, service, tag string, entMeta *structs.EnterpriseMeta, connect bool) (structs.IndexedCheckServiceNodes, error) {
func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (structs.IndexedCheckServiceNodes, error) {
args := structs.ServiceSpecificRequest{
Connect: connect,
Datacenter: datacenter,
ServiceName: service,
ServiceTags: []string{tag},
TagFilter: tag != "",
Connect: lookup.Connect,
Ingress: lookup.Ingress,
Datacenter: lookup.Datacenter,
ServiceName: lookup.Service,
ServiceTags: []string{lookup.Tag},
TagFilter: lookup.Tag != "",
QueryOptions: structs.QueryOptions{
Token: d.agent.tokens.UserToken(),
AllowStale: cfg.AllowStale,
MaxAge: cfg.CacheMaxAge,
},
}
if entMeta != nil {
args.EnterpriseMeta = *entMeta
EnterpriseMeta: lookup.EnterpriseMeta,
}
var out structs.IndexedCheckServiceNodes
@ -1108,7 +1165,7 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, datacenter, service, tag
}
d.logger.Trace("cache results for service",
"cache_hit", m.Hit,
"service", service,
"service", lookup.Service,
)
out = *reply
@ -1141,8 +1198,8 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, datacenter, service, tag
}
// serviceLookup is used to handle a service query
func (d *DNSServer) serviceLookup(cfg *dnsConfig, network, datacenter, service, tag string, entMeta *structs.EnterpriseMeta, connect bool, req, resp *dns.Msg, maxRecursionLevel int) {
out, err := d.lookupServiceNodes(cfg, datacenter, service, tag, entMeta, connect)
func (d *DNSServer) serviceLookup(cfg *dnsConfig, lookup serviceLookup, req, resp *dns.Msg) {
out, err := d.lookupServiceNodes(cfg, lookup)
if err != nil {
d.logger.Error("rpc error", "error", err)
resp.SetRcode(req, dns.RcodeServerFailure)
@ -1160,17 +1217,17 @@ func (d *DNSServer) serviceLookup(cfg *dnsConfig, network, datacenter, service,
out.Nodes.Shuffle()
// Determine the TTL
ttl, _ := cfg.GetTTLForService(service)
ttl, _ := cfg.GetTTLForService(lookup.Service)
// Add various responses depending on the request
qType := req.Question[0].Qtype
if qType == dns.TypeSRV {
d.serviceSRVRecords(cfg, datacenter, out.Nodes, req, resp, ttl, maxRecursionLevel)
d.serviceSRVRecords(cfg, lookup.Datacenter, out.Nodes, req, resp, ttl, lookup.MaxRecursionLevel)
} else {
d.serviceNodeRecords(cfg, datacenter, out.Nodes, req, resp, ttl, maxRecursionLevel)
d.serviceNodeRecords(cfg, lookup.Datacenter, out.Nodes, req, resp, ttl, lookup.MaxRecursionLevel)
}
d.trimDNSResponse(cfg, network, req, resp)
d.trimDNSResponse(cfg, lookup.Network, req, resp)
// If the answer is empty and the response isn't truncated, return not found
if len(resp.Answer) == 0 && !resp.Truncated {

View File

@ -1636,6 +1636,90 @@ func TestDNS_ConnectServiceLookup(t *testing.T) {
}
}
func TestDNS_IngressServiceLookup(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register ingress-gateway service
{
args := structs.TestRegisterIngressGateway(t)
var out struct{}
require.Nil(t, a.RPC("Catalog.Register", args, &out))
}
// Register db service
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "db",
Address: "",
Port: 80,
},
}
var out struct{}
require.Nil(t, a.RPC("Catalog.Register", args, &out))
}
// Register ingress-gateway config entry
{
args := &structs.IngressGatewayConfigEntry{
Name: "ingress-gateway",
Kind: structs.IngressGateway,
Listeners: []structs.IngressListener{
{
Port: 8888,
Protocol: "http",
Services: []structs.IngressService{
{Name: "db"},
{Name: "api"},
},
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
}
var out bool
require.Nil(t, a.RPC("ConfigEntry.Apply", req, &out))
require.True(t, out)
}
// Look up the service
questions := []string{
"api.ingress.consul.",
"api.ingress.dc1.consul.",
"db.ingress.consul.",
"db.ingress.dc1.consul.",
}
for _, question := range questions {
t.Run(question, func(t *testing.T) {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeA)
c := new(dns.Client)
in, _, err := c.Exchange(m, a.DNSAddr())
require.Nil(t, err)
require.Len(t, in.Answer, 1)
cnameRec, ok := in.Answer[0].(*dns.A)
require.True(t, ok)
require.Equal(t, question, cnameRec.Hdr.Name)
require.Equal(t, uint32(0), cnameRec.Hdr.Ttl)
require.Equal(t, "127.0.0.1", cnameRec.A.String())
})
}
}
func TestDNS_ExternalServiceLookup(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")

View File

@ -133,7 +133,9 @@ func (m *Manager) syncState() {
// Traverse the local state and ensure all proxy services are registered
services := m.State.Services(structs.WildcardEnterpriseMeta())
for sid, svc := range services {
if svc.Kind != structs.ServiceKindConnectProxy && svc.Kind != structs.ServiceKindMeshGateway {
if svc.Kind != structs.ServiceKindConnectProxy &&
svc.Kind != structs.ServiceKindMeshGateway &&
svc.Kind != structs.ServiceKindIngressGateway {
continue
}
// TODO(banks): need to work out when to default some stuff. For example

View File

@ -202,6 +202,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
TaggedAddresses: make(map[string]structs.ServiceAddress),
Roots: roots,
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbDefaultChain(),
@ -216,6 +217,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
},
@ -247,6 +249,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
TaggedAddresses: make(map[string]structs.ServiceAddress),
Roots: roots,
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbSplitChain(),
@ -262,6 +265,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
},

View File

@ -7,15 +7,39 @@ import (
"github.com/mitchellh/copystructure"
)
type configSnapshotConnectProxy struct {
// TODO(ingress): Can we think of a better for this bag of data?
// A shared data structure that contains information about discovered upstreams
type ConfigSnapshotUpstreams struct {
Leaf *structs.IssuedCert
DiscoveryChain map[string]*structs.CompiledDiscoveryChain // this is keyed by the Upstream.Identifier(), not the chain name
WatchedUpstreams map[string]map[string]context.CancelFunc
WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes
WatchedGateways map[string]map[string]context.CancelFunc
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
WatchedServiceChecks map[structs.ServiceID][]structs.CheckType // TODO: missing garbage collection
// DiscoveryChain is a map of upstream.Identifier() ->
// CompiledDiscoveryChain's, and is used to determine what services could be
// targeted by this upstream. We then instantiate watches for those targets.
DiscoveryChain map[string]*structs.CompiledDiscoveryChain
// WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID ->
// CancelFunc's) in order to cancel any watches when the configuration is
// changed.
WatchedUpstreams map[string]map[string]context.CancelFunc
// WatchedUpstreamEndpoints is a map of upstream.Identifier() -> (map of
// TargetID -> CheckServiceNodes) and is used to determine the backing
// endpoints of an upstream.
WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes
// WatchedGateways is a map of upstream.Identifier() -> (map of
// TargetID -> CancelFunc) in order to cancel watches for mesh gateways
WatchedGateways map[string]map[string]context.CancelFunc
// WatchedGatewayEndpoints is a map of upstream.Identifier() -> (map of
// TargetID -> CheckServiceNodes) and is used to determine the backing
// endpoints of a mesh gateway.
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
}
type configSnapshotConnectProxy struct {
ConfigSnapshotUpstreams
WatchedServiceChecks map[structs.ServiceID][]structs.CheckType // TODO: missing garbage collection
PreparedQueryEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints
}
@ -108,6 +132,31 @@ func (c *configSnapshotMeshGateway) IsEmpty() bool {
len(c.ConsulServers) == 0
}
type configSnapshotIngressGateway struct {
ConfigSnapshotUpstreams
// Upstreams is a list of upstreams this ingress gateway should serve traffic
// to. This is constructed from the ingress-gateway config entry, and uses
// the GatewayServices RPC to retrieve them.
Upstreams []structs.Upstream
// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the ingress gateway configuration is
// changed. Ingress gateways need this because discovery chain watches are
// added and removed through the lifecycle of single proxycfg.state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
}
func (c *configSnapshotIngressGateway) IsEmpty() bool {
if c == nil {
return true
}
return len(c.Upstreams) == 0 &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0
}
// ConfigSnapshot captures all the resulting config needed for a proxy instance.
// It is meant to be point-in-time coherent and is used to deliver the current
// config state to observers who need it to be pushed in (e.g. XDS server).
@ -131,6 +180,9 @@ type ConfigSnapshot struct {
// mesh-gateway specific
MeshGateway configSnapshotMeshGateway
// ingress-gateway specific
IngressGateway configSnapshotIngressGateway
// Skip intentions for now as we don't push those down yet, just pre-warm them.
}
@ -146,6 +198,9 @@ func (s *ConfigSnapshot) Valid() bool {
}
}
return s.Roots != nil && (s.MeshGateway.WatchedServicesSet || len(s.MeshGateway.ServiceGroups) > 0)
case structs.ServiceKindIngressGateway:
return s.Roots != nil &&
s.IngressGateway.Leaf != nil
default:
return false
}
@ -169,7 +224,21 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
case structs.ServiceKindMeshGateway:
snap.MeshGateway.WatchedDatacenters = nil
snap.MeshGateway.WatchedServices = nil
case structs.ServiceKindIngressGateway:
snap.IngressGateway.WatchedUpstreams = nil
snap.IngressGateway.WatchedDiscoveryChains = nil
}
return snap, nil
}
func (s *ConfigSnapshot) Leaf() *structs.IssuedCert {
switch s.Kind {
case structs.ServiceKindConnectProxy:
return s.ConnectProxy.Leaf
case structs.ServiceKindIngressGateway:
return s.IngressGateway.Leaf
default:
return nil
}
}

View File

@ -32,6 +32,7 @@ const (
consulServerListWatchID = "consul-server-list"
datacentersWatchID = "datacenters"
serviceResolversWatchID = "service-resolvers"
gatewayServicesWatchID = "gateway-services"
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
@ -106,8 +107,12 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error
// The returned state needs its required dependencies to be set before Watch
// can be called.
func newState(ns *structs.NodeService, token string) (*state, error) {
if ns.Kind != structs.ServiceKindConnectProxy && ns.Kind != structs.ServiceKindMeshGateway {
return nil, errors.New("not a connect-proxy or mesh-gateway")
switch ns.Kind {
case structs.ServiceKindConnectProxy:
case structs.ServiceKindMeshGateway:
case structs.ServiceKindIngressGateway:
default:
return nil, errors.New("not a connect-proxy, mesh-gateway, or ingress-gateway")
}
proxyCfg, err := copyProxyConfig(ns)
@ -181,6 +186,8 @@ func (s *state) initWatches() error {
return s.initWatchesConnectProxy()
case structs.ServiceKindMeshGateway:
return s.initWatchesMeshGateway()
case structs.ServiceKindIngressGateway:
return s.initWatchesIngressGateway()
default:
return fmt.Errorf("Unsupported service kind")
}
@ -432,6 +439,42 @@ func (s *state) initWatchesMeshGateway() error {
return err
}
func (s *state) initWatchesIngressGateway() error {
// Watch for root changes
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, rootsWatchID, s.ch)
if err != nil {
return err
}
// Watch the leaf cert
err = s.cache.Notify(s.ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Service: s.service,
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, leafWatchID, s.ch)
if err != nil {
return err
}
// Watch the ingress-gateway's list of upstreams
err = s.cache.Notify(s.ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.service,
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, gatewayServicesWatchID, s.ch)
if err != nil {
return err
}
return nil
}
func (s *state) initialConfigSnapshot() ConfigSnapshot {
snap := ConfigSnapshot{
Kind: s.kind,
@ -464,6 +507,13 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceID]*structs.ServiceResolverConfigEntry)
// there is no need to initialize the map of service resolvers as we
// fully rebuild it every time we get updates
case structs.ServiceKindIngressGateway:
snap.IngressGateway.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.IngressGateway.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.IngressGateway.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.IngressGateway.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.IngressGateway.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.IngressGateway.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
}
return snap
@ -563,6 +613,8 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
return s.handleUpdateConnectProxy(u, snap)
case structs.ServiceKindMeshGateway:
return s.handleUpdateMeshGateway(u, snap)
case structs.ServiceKindIngressGateway:
return s.handleUpdateIngressGateway(u, snap)
default:
return fmt.Errorf("Unsupported service kind")
}
@ -580,64 +632,9 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
return fmt.Errorf("invalid type for response: %T", u.Result)
}
snap.Roots = roots
case u.CorrelationID == leafWatchID:
leaf, ok := u.Result.(*structs.IssuedCert)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
snap.ConnectProxy.Leaf = leaf
case u.CorrelationID == intentionsWatchID:
// Not in snapshot currently, no op
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain
if err := s.resetWatchesFromChain(svc, resp.Chain, snap); err != nil {
return err
}
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
targetID, svc, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc]
if !ok {
m = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedUpstreamEndpoints[svc] = m
}
snap.ConnectProxy.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
dc, svc, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
m, ok := snap.ConnectProxy.WatchedGatewayEndpoints[svc]
if !ok {
m = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGatewayEndpoints[svc] = m
}
snap.ConnectProxy.WatchedGatewayEndpoints[svc][dc] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
if !ok {
@ -653,7 +650,71 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
}
svcID := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix))
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
default:
return s.handleUpdateUpstreams(u, &snap.ConnectProxy.ConfigSnapshotUpstreams)
}
return nil
}
func (s *state) handleUpdateUpstreams(u cache.UpdateEvent, snap *ConfigSnapshotUpstreams) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
switch {
case u.CorrelationID == leafWatchID:
leaf, ok := u.Result.(*structs.IssuedCert)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
snap.Leaf = leaf
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
snap.DiscoveryChain[svc] = resp.Chain
if err := s.resetWatchesFromChain(svc, resp.Chain, snap); err != nil {
return err
}
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
targetID, svc, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
m, ok := snap.WatchedUpstreamEndpoints[svc]
if !ok {
m = make(map[string]structs.CheckServiceNodes)
snap.WatchedUpstreamEndpoints[svc] = m
}
snap.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
dc, svc, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
m, ok := snap.WatchedGatewayEndpoints[svc]
if !ok {
m = make(map[string]structs.CheckServiceNodes)
snap.WatchedGatewayEndpoints[svc] = m
}
snap.WatchedGatewayEndpoints[svc][dc] = resp.Nodes
default:
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
}
@ -671,7 +732,7 @@ func removeColonPrefix(s string) (string, string, bool) {
func (s *state) resetWatchesFromChain(
id string,
chain *structs.CompiledDiscoveryChain,
snap *ConfigSnapshot,
snap *ConfigSnapshotUpstreams,
) error {
s.logger.Trace("resetting watches for discovery chain", "id", id)
if chain == nil {
@ -679,17 +740,17 @@ func (s *state) resetWatchesFromChain(
}
// Initialize relevant sub maps.
if _, ok := snap.ConnectProxy.WatchedUpstreams[id]; !ok {
snap.ConnectProxy.WatchedUpstreams[id] = make(map[string]context.CancelFunc)
if _, ok := snap.WatchedUpstreams[id]; !ok {
snap.WatchedUpstreams[id] = make(map[string]context.CancelFunc)
}
if _, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[id]; !ok {
snap.ConnectProxy.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes)
if _, ok := snap.WatchedUpstreamEndpoints[id]; !ok {
snap.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes)
}
if _, ok := snap.ConnectProxy.WatchedGateways[id]; !ok {
snap.ConnectProxy.WatchedGateways[id] = make(map[string]context.CancelFunc)
if _, ok := snap.WatchedGateways[id]; !ok {
snap.WatchedGateways[id] = make(map[string]context.CancelFunc)
}
if _, ok := snap.ConnectProxy.WatchedGatewayEndpoints[id]; !ok {
snap.ConnectProxy.WatchedGatewayEndpoints[id] = make(map[string]structs.CheckServiceNodes)
if _, ok := snap.WatchedGatewayEndpoints[id]; !ok {
snap.WatchedGatewayEndpoints[id] = make(map[string]structs.CheckServiceNodes)
}
// We could invalidate this selectively based on a hash of the relevant
@ -697,14 +758,14 @@ func (s *state) resetWatchesFromChain(
// upstream when the chain changes in any way.
//
// TODO(rb): content hash based add/remove
for targetID, cancelFn := range snap.ConnectProxy.WatchedUpstreams[id] {
for targetID, cancelFn := range snap.WatchedUpstreams[id] {
s.logger.Trace("stopping watch of target",
"upstream", id,
"chain", chain.ServiceName,
"target", targetID,
)
delete(snap.ConnectProxy.WatchedUpstreams[id], targetID)
delete(snap.ConnectProxy.WatchedUpstreamEndpoints[id], targetID)
delete(snap.WatchedUpstreams[id], targetID)
delete(snap.WatchedUpstreamEndpoints[id], targetID)
cancelFn()
}
@ -740,11 +801,11 @@ func (s *state) resetWatchesFromChain(
return err
}
snap.ConnectProxy.WatchedUpstreams[id][target.ID] = cancel
snap.WatchedUpstreams[id][target.ID] = cancel
}
for dc, _ := range needGateways {
if _, ok := snap.ConnectProxy.WatchedGateways[id][dc]; ok {
if _, ok := snap.WatchedGateways[id][dc]; ok {
continue
}
@ -761,10 +822,10 @@ func (s *state) resetWatchesFromChain(
return err
}
snap.ConnectProxy.WatchedGateways[id][dc] = cancel
snap.WatchedGateways[id][dc] = cancel
}
for dc, cancelFn := range snap.ConnectProxy.WatchedGateways[id] {
for dc, cancelFn := range snap.WatchedGateways[id] {
if _, ok := needGateways[dc]; ok {
continue
}
@ -773,8 +834,8 @@ func (s *state) resetWatchesFromChain(
"chain", chain.ServiceName,
"datacenter", dc,
)
delete(snap.ConnectProxy.WatchedGateways[id], dc)
delete(snap.ConnectProxy.WatchedGatewayEndpoints[id], dc)
delete(snap.WatchedGateways[id], dc)
delete(snap.WatchedGatewayEndpoints[id], dc)
cancelFn()
}
@ -969,6 +1030,89 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
return nil
}
func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
switch {
case u.CorrelationID == rootsWatchID:
roots, ok := u.Result.(*structs.IndexedCARoots)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
snap.Roots = roots
case u.CorrelationID == gatewayServicesWatchID:
services, ok := u.Result.(*structs.IndexedGatewayServices)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
var upstreams structs.Upstreams
watchedSvcs := make(map[string]struct{})
for _, service := range services.Services {
u := makeUpstream(service, s.address)
err := s.watchIngressDiscoveryChain(snap, u)
if err != nil {
return err
}
watchedSvcs[u.Identifier()] = struct{}{}
upstreams = append(upstreams, u)
}
snap.IngressGateway.Upstreams = upstreams
for id, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains {
if _, ok := watchedSvcs[id]; !ok {
cancelFn()
delete(snap.IngressGateway.WatchedDiscoveryChains, id)
}
}
default:
return s.handleUpdateUpstreams(u, &snap.IngressGateway.ConfigSnapshotUpstreams)
}
return nil
}
func makeUpstream(g *structs.GatewayService, bindAddr string) structs.Upstream {
upstream := structs.Upstream{
DestinationName: g.Service.ID,
DestinationNamespace: g.Service.NamespaceOrDefault(),
LocalBindPort: g.Port,
}
upstream.LocalBindAddress = bindAddr
if bindAddr == "" {
upstream.LocalBindAddress = "0.0.0.0"
}
return upstream
}
func (s *state) watchIngressDiscoveryChain(snap *ConfigSnapshot, u structs.Upstream) error {
if _, ok := snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()]; ok {
return nil
}
ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: u.DestinationName,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: u.DestinationNamespace,
// TODO(ingress): Deal with MeshGateway and Protocol overrides here
}, "discovery-chain:"+u.Identifier(), s.ch)
if err != nil {
cancel()
return err
}
snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()] = cancel
return nil
}
// CurrentSnapshot synchronously returns the current ConfigSnapshot if there is
// one ready. If we don't have one yet because not all necessary parts have been
// returned (i.e. both roots and leaf cert), nil is returned.

View File

@ -665,6 +665,139 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"ingress-gateway": testCase{
ns: structs.NodeService{
Kind: structs.ServiceKindIngressGateway,
ID: "ingress-gateway",
Service: "ingress-gateway",
Address: "10.0.1.1",
},
sourceDC: "dc1",
stages: []verificationStage{
verificationStage{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
leafWatchID: genVerifyLeafWatch("ingress-gateway", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid")
require.True(t, snap.IngressGateway.IsEmpty())
},
},
verificationStage{
events: []cache.UpdateEvent{
rootWatchEvent(),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without leaf is not valid")
require.Equal(t, indexedRoots, snap.Roots)
},
},
verificationStage{
events: []cache.UpdateEvent{
cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "gateway with root and leaf certs is valid")
require.Equal(t, issuedCert, snap.IngressGateway.Leaf)
},
},
verificationStage{
events: []cache.UpdateEvent{
cache.UpdateEvent{
CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{
Services: structs.GatewayServices{
{
Gateway: structs.NewServiceID("ingress-gateway", nil),
Service: structs.NewServiceID("api", nil),
Port: 9999,
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.IngressGateway.Upstreams, 1)
require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1)
require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, "api")
},
},
verificationStage{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:api": genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
}),
},
events: []cache.UpdateEvent{
cache.UpdateEvent{
CorrelationID: "discovery-chain:api",
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "dc1", "trustdomain.consul", "dc1", nil),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.IngressGateway.WatchedUpstreams, 1)
require.Len(t, snap.IngressGateway.WatchedUpstreams["api"], 1)
},
},
verificationStage{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:api.default.dc1:api": genVerifyServiceWatch("api", "", "dc1", true),
},
events: []cache.UpdateEvent{
cache.UpdateEvent{
CorrelationID: "upstream-target:api.default.dc1:api",
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: "api1",
Service: "api",
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints, 1)
require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, "api")
require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints["api"], 1)
require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints["api"], "api.default.dc1")
require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints["api"]["api.default.dc1"],
structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: "api1",
Service: "api",
},
},
},
)
},
},
},
},
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
}

View File

@ -586,19 +586,21 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
},
Roots: roots,
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{
"prepared_query:geo-cache": TestUpstreamNodes(t),
},
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": map[string]structs.CheckServiceNodes{
"db.default.dc1": TestUpstreamNodes(t),
},
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{
"prepared_query:geo-cache": TestUpstreamNodes(t),
},
},
Datacenter: "dc1",
}
}
@ -881,6 +883,7 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE
},
Roots: roots,
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
@ -891,6 +894,7 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE
},
},
},
},
Datacenter: "dc1",
}
@ -1036,6 +1040,54 @@ func testConfigSnapshotMeshGateway(t testing.T, populateServices bool, useFedera
return snap
}
func TestConfigSnapshotIngressGateway(t testing.T) *ConfigSnapshot {
return testConfigSnapshotIngressGateway(t, true)
}
func TestConfigSnapshotIngressGatewayNoServices(t testing.T) *ConfigSnapshot {
return testConfigSnapshotIngressGateway(t, false)
}
func testConfigSnapshotIngressGateway(t testing.T, populateServices bool) *ConfigSnapshot {
roots, leaf := TestCerts(t)
dbChain := discoverychain.TestCompileConfigEntries(
t, "db", "default", "dc1",
connect.TestClusterID+".consul", "dc1", nil)
snap := &ConfigSnapshot{
Kind: structs.ServiceKindIngressGateway,
Service: "ingress-gateway",
ProxyID: structs.NewServiceID("ingress-gateway", nil),
Address: "1.2.3.4",
Roots: roots,
Datacenter: "dc1",
}
if populateServices {
snap.IngressGateway = configSnapshotIngressGateway{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
},
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": map[string]structs.CheckServiceNodes{
"db.default.dc1": TestUpstreamNodes(t),
},
},
},
Upstreams: structs.Upstreams{
{
// We rely on this one having default type in a few tests...
DestinationName: "db",
LocalBindPort: 9191,
LocalBindAddress: "2.3.4.5",
},
},
}
}
return snap
}
func TestConfigSnapshotExposeConfig(t testing.T) *ConfigSnapshot {
return &ConfigSnapshot{
Kind: structs.ServiceKindConnectProxy,

View File

@ -77,11 +77,19 @@ func (e *IngressGatewayConfigEntry) Normalize() error {
}
e.Kind = IngressGateway
for _, listener := range e.Listeners {
for i, listener := range e.Listeners {
if listener.Protocol == "" {
listener.Protocol = "tcp"
}
listener.Protocol = strings.ToLower(listener.Protocol)
for i := range listener.Services {
listener.Services[i].EnterpriseMeta.Normalize()
}
// Make sure to set the item back into the array, since we are not using
// pointers to structs
e.Listeners[i] = listener
}
e.EnterpriseMeta.Normalize()
@ -135,7 +143,7 @@ func (e *IngressGatewayConfigEntry) Validate() error {
func (e *IngressGatewayConfigEntry) CanRead(authz acl.Authorizer) bool {
var authzContext acl.AuthorizerContext
e.FillAuthzContext(&authzContext)
return authz.OperatorRead(&authzContext) == acl.Allow
return authz.ServiceRead(e.Name, &authzContext) == acl.Allow
}
func (e *IngressGatewayConfigEntry) CanWrite(authz acl.Authorizer) bool {
@ -160,6 +168,10 @@ func (e *IngressGatewayConfigEntry) GetEnterpriseMeta() *EnterpriseMeta {
return &e.EnterpriseMeta
}
func (s *IngressService) ToServiceID() ServiceID {
return NewServiceID(s.Name, &s.EnterpriseMeta)
}
// TerminatingGatewayConfigEntry manages the configuration for a terminating service
// with the given name.
type TerminatingGatewayConfigEntry struct {
@ -283,9 +295,11 @@ type GatewayService struct {
Gateway ServiceID
Service ServiceID
GatewayKind ServiceKind
Port int
CAFile string
CertFile string
KeyFile string
RaftIndex
}
type GatewayServices []*GatewayService
@ -294,7 +308,21 @@ func (g *GatewayService) IsSame(o *GatewayService) bool {
return g.Gateway.Matches(&o.Gateway) &&
g.Service.Matches(&o.Service) &&
g.GatewayKind == o.GatewayKind &&
g.Port == o.Port &&
g.CAFile == o.CAFile &&
g.CertFile == o.CertFile &&
g.KeyFile == o.KeyFile
}
func (g *GatewayService) Clone() *GatewayService {
return &GatewayService{
Gateway: g.Gateway,
Service: g.Service,
GatewayKind: g.GatewayKind,
Port: g.Port,
CAFile: g.CAFile,
CertFile: g.CertFile,
KeyFile: g.KeyFile,
RaftIndex: g.RaftIndex,
}
}

View File

@ -6,6 +6,89 @@ import (
"github.com/stretchr/testify/require"
)
func TestIngressConfigEntry_Normalize(t *testing.T) {
t.Parallel()
cases := []struct {
name string
entry IngressGatewayConfigEntry
expected IngressGatewayConfigEntry
}{
{
name: "empty protocol",
entry: IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "",
Services: []IngressService{},
},
},
},
expected: IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{},
},
},
},
},
{
name: "lowercase protocols",
entry: IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "TCP",
Services: []IngressService{},
},
{
Port: 1112,
Protocol: "HtTP",
Services: []IngressService{},
},
},
},
expected: IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{},
},
{
Port: 1112,
Protocol: "http",
Services: []IngressService{},
},
},
},
},
}
for _, test := range cases {
// We explicitly copy the variable for the range statement so that can run
// tests in parallel.
tc := test
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
err := tc.entry.Normalize()
require.NoError(t, err)
require.Equal(t, tc.expected, tc.entry)
})
}
}
func TestIngressConfigEntry_Validate(t *testing.T) {
t.Parallel()
@ -333,6 +416,7 @@ func TestTerminatingConfigEntry_Validate(t *testing.T) {
tc := test
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
err := tc.entry.Validate()
if tc.expectErr != "" {
require.Error(t, err)

View File

@ -502,7 +502,6 @@ type ServiceSpecificRequest struct {
Datacenter string
NodeMetaFilters map[string]string
ServiceName string
ServiceKind ServiceKind
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
ServiceTag string
@ -514,6 +513,12 @@ type ServiceSpecificRequest struct {
// Connect if true will only search for Connect-compatible services.
Connect bool
// TODO(ingress): Add corresponding API changes after figuring out what the
// HTTP endpoint looks like
// Ingress if true will only search for Ingress gateways for the given service.
Ingress bool
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
QueryOptions
}

View File

@ -29,6 +29,17 @@ func TestRegisterRequestProxy(t testing.T) *RegisterRequest {
}
}
// TestRegisterIngressGateway returns a RegisterRequest for registering an
// ingress gateway
func TestRegisterIngressGateway(t testing.T) *RegisterRequest {
return &RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: TestNodeServiceIngressGateway(t, ""),
}
}
// TestNodeService returns a *NodeService representing a valid regular service.
func TestNodeService(t testing.T) *NodeService {
return &NodeService{

View File

@ -32,6 +32,8 @@ func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, _ string
return s.clustersFromSnapshotConnectProxy(cfgSnap)
case structs.ServiceKindMeshGateway:
return s.clustersFromSnapshotMeshGateway(cfgSnap)
case structs.ServiceKindIngressGateway:
return s.clustersFromSnapshotIngressGateway(cfgSnap)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
@ -63,7 +65,13 @@ func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapsh
} else {
chain := cfgSnap.ConnectProxy.DiscoveryChain[id]
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(u, chain, cfgSnap)
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(u, chain, chainEndpoints, cfgSnap)
if err != nil {
return nil, err
}
@ -192,6 +200,34 @@ func (s *Server) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsho
return clusters, nil
}
func (s *Server) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var clusters []proto.Message
for _, u := range cfgSnap.IngressGateway.Upstreams {
id := u.Identifier()
chain, ok := cfgSnap.IngressGateway.DiscoveryChain[id]
if !ok {
// this should not happen
return nil, fmt.Errorf("no discovery chain for upstream %q", id)
}
chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(u, chain, chainEndpoints, cfgSnap)
if err != nil {
return nil, err
}
for _, c := range upstreamClusters {
clusters = append(clusters, c)
}
}
return clusters, nil
}
func (s *Server) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot, name, pathProtocol string, port int) (*envoy.Cluster, error) {
var c *envoy.Cluster
var err error
@ -299,6 +335,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
func (s *Server) makeUpstreamClustersForDiscoveryChain(
upstream structs.Upstream,
chain *structs.CompiledDiscoveryChain,
chainEndpoints map[string]structs.CheckServiceNodes,
cfgSnap *proxycfg.ConfigSnapshot,
) ([]*envoy.Cluster, error) {
if chain == nil {
@ -329,15 +366,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
}
}
id := upstream.Identifier()
chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
}
var out []*envoy.Cluster
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
@ -356,7 +385,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
if failoverThroughMeshGateway {
actualTargetID := firstHealthyTarget(
chain.Targets,
chainEndpointMap,
chainEndpoints,
targetID,
failover.Targets,
)

View File

@ -343,6 +343,16 @@ func TestClustersFromSnapshot(t *testing.T) {
}
},
},
{
name: "ingress-gateway",
create: proxycfg.TestConfigSnapshotIngressGateway,
setup: nil,
},
{
name: "ingress-gateway-no-services",
create: proxycfg.TestConfigSnapshotIngressGatewayNoServices,
setup: nil,
},
}
for _, tt := range tests {
@ -355,13 +365,7 @@ func TestClustersFromSnapshot(t *testing.T) {
// We need to replace the TLS certs with deterministic ones to make golden
// files workable. Note we don't update these otherwise they'd change
// golder files for every test case and so not be any use!
if snap.ConnectProxy.Leaf != nil {
snap.ConnectProxy.Leaf.CertPEM = golden(t, "test-leaf-cert", "")
snap.ConnectProxy.Leaf.PrivateKeyPEM = golden(t, "test-leaf-key", "")
}
if snap.Roots != nil {
snap.Roots.Roots[0].RootCert = golden(t, "test-root-cert", "")
}
setupTLSRootsAndLeaf(t, snap)
if tt.setup != nil {
tt.setup(snap)
@ -537,3 +541,19 @@ func customAppClusterJSON(t *testing.T, opts customClusterJSONOptions) string {
require.NoError(t, err)
return buf.String()
}
func setupTLSRootsAndLeaf(t *testing.T, snap *proxycfg.ConfigSnapshot) {
if snap.Leaf() != nil {
switch snap.Kind {
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.Leaf.CertPEM = golden(t, "test-leaf-cert", "")
snap.ConnectProxy.Leaf.PrivateKeyPEM = golden(t, "test-leaf-key", "")
case structs.ServiceKindIngressGateway:
snap.IngressGateway.Leaf.CertPEM = golden(t, "test-leaf-cert", "")
snap.IngressGateway.Leaf.PrivateKeyPEM = golden(t, "test-leaf-key", "")
}
}
if snap.Roots != nil {
snap.Roots.Roots[0].RootCert = golden(t, "test-root-cert", "")
}
}

View File

@ -32,6 +32,8 @@ func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, _ strin
return s.endpointsFromSnapshotConnectProxy(cfgSnap)
case structs.ServiceKindMeshGateway:
return s.endpointsFromSnapshotMeshGateway(cfgSnap)
case structs.ServiceKindIngressGateway:
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
@ -74,79 +76,13 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
} else {
// Newfangled discovery chain plumbing.
// Find all resolver nodes.
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
}
failover := node.Resolver.Failover
targetID := node.Resolver.Target
target := chain.Targets[targetID]
clusterName := CustomizeClusterName(target.Name, chain)
// Determine if we have to generate the entire cluster differently.
failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node)
if failoverThroughMeshGateway {
actualTargetID := firstHealthyTarget(
chain.Targets,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
targetID,
failover.Targets,
)
if actualTargetID != targetID {
targetID = actualTargetID
target = chain.Targets[actualTargetID]
}
failover = nil
}
primaryGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
es := s.endpointsFromDiscoveryChain(
chain,
cfgSnap.Datacenter,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
targetID,
cfgSnap.Datacenter,
)
if !valid {
continue // skip the cluster if we're still populating the snapshot
}
var endpointGroups []loadAssignmentEndpointGroup
if failover != nil && len(failover.Targets) > 0 {
endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1)
endpointGroups = append(endpointGroups, primaryGroup)
for _, failTargetID := range failover.Targets {
failoverGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
failTargetID,
cfgSnap.Datacenter,
)
if !valid {
continue // skip the failover target if we're still populating the snapshot
}
endpointGroups = append(endpointGroups, failoverGroup)
}
} else {
endpointGroups = append(endpointGroups, primaryGroup)
}
la := makeLoadAssignment(
clusterName,
endpointGroups,
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
resources = append(resources, es...)
}
}
@ -297,6 +233,22 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
return resources, nil
}
func (s *Server) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
for _, u := range cfgSnap.IngressGateway.Upstreams {
id := u.Identifier()
es := s.endpointsFromDiscoveryChain(
cfgSnap.IngressGateway.DiscoveryChain[id],
cfgSnap.Datacenter,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id],
nil,
)
resources = append(resources, es...)
}
return resources, nil
}
func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
return envoyendpoint.LbEndpoint{
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
@ -307,6 +259,93 @@ func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
}
}
func (s *Server) endpointsFromDiscoveryChain(
chain *structs.CompiledDiscoveryChain,
datacenter string,
upstreamEndpoints, gatewayEndpoints map[string]structs.CheckServiceNodes,
) []proto.Message {
var resources []proto.Message
if chain == nil {
return resources
}
// Find all resolver nodes.
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
}
failover := node.Resolver.Failover
targetID := node.Resolver.Target
target := chain.Targets[targetID]
clusterName := CustomizeClusterName(target.Name, chain)
// Determine if we have to generate the entire cluster differently.
failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node)
if failoverThroughMeshGateway {
actualTargetID := firstHealthyTarget(
chain.Targets,
upstreamEndpoints,
targetID,
failover.Targets,
)
if actualTargetID != targetID {
targetID = actualTargetID
target = chain.Targets[actualTargetID]
}
failover = nil
}
primaryGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
upstreamEndpoints,
gatewayEndpoints,
targetID,
datacenter,
)
if !valid {
continue // skip the cluster if we're still populating the snapshot
}
var endpointGroups []loadAssignmentEndpointGroup
if failover != nil && len(failover.Targets) > 0 {
endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1)
endpointGroups = append(endpointGroups, primaryGroup)
for _, failTargetID := range failover.Targets {
failoverGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
upstreamEndpoints,
gatewayEndpoints,
failTargetID,
datacenter,
)
if !valid {
continue // skip the failover target if we're still populating the snapshot
}
endpointGroups = append(endpointGroups, failoverGroup)
}
} else {
endpointGroups = append(endpointGroups, primaryGroup)
}
la := makeLoadAssignment(
clusterName,
endpointGroups,
datacenter,
)
resources = append(resources, la)
}
return resources
}
type loadAssignmentEndpointGroup struct {
Endpoints structs.CheckServiceNodes
OnlyPassing bool

View File

@ -381,6 +381,16 @@ func Test_endpointsFromSnapshot(t *testing.T) {
}
},
},
{
name: "ingress-gateway",
create: proxycfg.TestConfigSnapshotIngressGateway,
setup: nil,
},
{
name: "ingress-gateway-no-services",
create: proxycfg.TestConfigSnapshotIngressGatewayNoServices,
setup: nil,
},
}
for _, tt := range tests {
@ -393,13 +403,7 @@ func Test_endpointsFromSnapshot(t *testing.T) {
// We need to replace the TLS certs with deterministic ones to make golden
// files workable. Note we don't update these otherwise they'd change
// golden files for every test case and so not be any use!
if snap.ConnectProxy.Leaf != nil {
snap.ConnectProxy.Leaf.CertPEM = golden(t, "test-leaf-cert", "")
snap.ConnectProxy.Leaf.PrivateKeyPEM = golden(t, "test-leaf-key", "")
}
if snap.Roots != nil {
snap.Roots.Roots[0].RootCert = golden(t, "test-root-cert", "")
}
setupTLSRootsAndLeaf(t, snap)
if tt.setup != nil {
tt.setup(snap)

View File

@ -40,6 +40,8 @@ func (s *Server) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token s
return s.listenersFromSnapshotConnectProxy(cfgSnap, token)
case structs.ServiceKindMeshGateway:
return s.listenersFromSnapshotMeshGateway(cfgSnap)
case structs.ServiceKindIngressGateway:
return s.listenersFromSnapshotIngressGateway(cfgSnap)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
@ -226,6 +228,34 @@ func (s *Server) listenersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
return resources, err
}
// TODO(ingress): Support configured bind addresses from similar to mesh gateways
// See: https://www.consul.io/docs/connect/proxies/envoy.html#mesh-gateway-options
func (s *Server) listenersFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
// TODO(ingress): We give each upstream a distinct listener at the moment,
// for http listeners we will need to multiplex upstreams on a single
// listener.
for _, u := range cfgSnap.IngressGateway.Upstreams {
id := u.Identifier()
chain := cfgSnap.IngressGateway.DiscoveryChain[id]
var upstreamListener proto.Message
var err error
if chain == nil || chain.IsDefault() {
upstreamListener, err = s.makeUpstreamListenerIgnoreDiscoveryChain(&u, chain, cfgSnap)
} else {
upstreamListener, err = s.makeUpstreamListenerForDiscoveryChain(&u, chain, cfgSnap)
}
if err != nil {
return nil, err
}
resources = append(resources, upstreamListener)
}
return resources, nil
}
// makeListener returns a listener with name and bind details set. Filters must
// be added before it's useful.
//
@ -862,18 +892,19 @@ func makeCommonTLSContext(cfgSnap *proxycfg.ConfigSnapshot) *envoyauth.CommonTls
rootPEMS += root.RootCert
}
leaf := cfgSnap.Leaf()
return &envoyauth.CommonTlsContext{
TlsParams: &envoyauth.TlsParameters{},
TlsCertificates: []*envoyauth.TlsCertificate{
&envoyauth.TlsCertificate{
CertificateChain: &envoycore.DataSource{
Specifier: &envoycore.DataSource_InlineString{
InlineString: cfgSnap.ConnectProxy.Leaf.CertPEM,
InlineString: leaf.CertPEM,
},
},
PrivateKey: &envoycore.DataSource{
Specifier: &envoycore.DataSource_InlineString{
InlineString: cfgSnap.ConnectProxy.Leaf.PrivateKeyPEM,
InlineString: leaf.PrivateKeyPEM,
},
},
},

View File

@ -263,6 +263,16 @@ func TestListenersFromSnapshot(t *testing.T) {
}
},
},
{
name: "ingress-gateway",
create: proxycfg.TestConfigSnapshotIngressGateway,
setup: nil,
},
{
name: "ingress-gateway-no-services",
create: proxycfg.TestConfigSnapshotIngressGatewayNoServices,
setup: nil,
},
}
for _, tt := range tests {
@ -275,13 +285,7 @@ func TestListenersFromSnapshot(t *testing.T) {
// We need to replace the TLS certs with deterministic ones to make golden
// files workable. Note we don't update these otherwise they'd change
// golder files for every test case and so not be any use!
if snap.ConnectProxy.Leaf != nil {
snap.ConnectProxy.Leaf.CertPEM = golden(t, "test-leaf-cert", "")
snap.ConnectProxy.Leaf.PrivateKeyPEM = golden(t, "test-leaf-key", "")
}
if snap.Roots != nil {
snap.Roots.Roots[0].RootCert = golden(t, "test-root-cert", "")
}
setupTLSRootsAndLeaf(t, snap)
if tt.setup != nil {
tt.setup(snap)

View File

@ -335,13 +335,7 @@ func TestRoutesFromSnapshot(t *testing.T) {
// We need to replace the TLS certs with deterministic ones to make golden
// files workable. Note we don't update these otherwise they'd change
// golden files for every test case and so not be any use!
if snap.ConnectProxy.Leaf != nil {
snap.ConnectProxy.Leaf.CertPEM = golden(t, "test-leaf-cert", "")
snap.ConnectProxy.Leaf.PrivateKeyPEM = golden(t, "test-leaf-key", "")
}
if snap.Roots != nil {
snap.Roots.Roots[0].RootCert = golden(t, "test-root-cert", "")
}
setupTLSRootsAndLeaf(t, snap)
if tt.setup != nil {
tt.setup(snap)

View File

@ -267,6 +267,11 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
if rule != nil && rule.ServiceWrite(cfgSnap.Service, &authzContext) != acl.Allow {
return status.Errorf(codes.PermissionDenied, "permission denied")
}
case structs.ServiceKindIngressGateway:
cfgSnap.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext)
if rule != nil && rule.ServiceWrite(cfgSnap.Service, &authzContext) != acl.Allow {
return status.Errorf(codes.PermissionDenied, "permission denied")
}
default:
return status.Errorf(codes.Internal, "Invalid service kind")
}

View File

@ -344,10 +344,10 @@ func expectedTLSContextJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, require
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "` + strings.Replace(snap.ConnectProxy.Leaf.CertPEM, "\n", "\\n", -1) + `"
"inlineString": "` + strings.Replace(snap.Leaf().CertPEM, "\n", "\\n", -1) + `"
},
"privateKey": {
"inlineString": "` + strings.Replace(snap.ConnectProxy.Leaf.PrivateKeyPEM, "\n", "\\n", -1) + `"
"inlineString": "` + strings.Replace(snap.Leaf().PrivateKeyPEM, "\n", "\\n", -1) + `"
}
}
],
@ -400,6 +400,7 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
acl string
token string
wantDenied bool
cfgSnap *proxycfg.ConfigSnapshot
}{
// Note that although we've stubbed actual ACL checks in the testManager
// ConnectAuthorize mock, by asserting against specific reason strings here
@ -437,6 +438,14 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
token: "service-write-on-not-web",
wantDenied: true,
},
{
name: "ingress default deny, write token on different service",
defaultDeny: true,
acl: `service "not-ingress" { policy = "write" }`,
token: "service-write-on-not-ingress",
wantDenied: true,
cfgSnap: proxycfg.TestConfigSnapshotIngressGateway(t),
},
}
for _, tt := range tests {
@ -480,7 +489,10 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
mgr.RegisterProxy(t, sid)
// Deliver a new snapshot
snap := proxycfg.TestConfigSnapshot(t)
snap := tt.cfgSnap
if snap == nil {
snap = proxycfg.TestConfigSnapshot(t)
}
mgr.DeliverConfig(t, sid, snap)
// Send initial listener discover, in real life Envoy always sends cluster

View File

@ -0,0 +1,7 @@
{
"versionInfo": "00000001",
"resources": [
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,55 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
}
}
},
"connectTimeout": "5s",
"circuitBreakers": {
},
"tlsContext": {
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"sni": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
},
"outlierDetection": {
},
"commonLbConfig": {
"healthyPanicThreshold": {
}
}
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,7 @@
{
"versionInfo": "00000001",
"resources": [
],
"typeUrl": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"nonce": "00000001"
}

View File

@ -0,0 +1,41 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"clusterName": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"nonce": "00000001"
}

View File

@ -0,0 +1,7 @@
{
"versionInfo": "00000001",
"resources": [
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Listener",
"nonce": "00000001"
}

View File

@ -0,0 +1,30 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "db:2.3.4.5:9191",
"address": {
"socketAddress": {
"address": "2.3.4.5",
"portValue": 9191
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.tcp_proxy",
"config": {
"cluster": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"stat_prefix": "upstream_db_tcp"
}
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Listener",
"nonce": "00000001"
}

View File

@ -195,23 +195,6 @@ func TestAPI_ConfigEntries_TerminatingGateway(t *testing.T) {
require.NotNil(t, wm)
require.NotEqual(t, 0, wm.RequestTime)
// web is associated with the other gateway, should get an error
terminating2.Services = []LinkedService{
{
Name: "*",
CAFile: "/etc/certs/ca.crt",
CertFile: "/etc/certs/client.crt",
KeyFile: "/etc/certs/tls.key",
},
{
Name: "web",
},
}
_, wm, err = configEntries.Set(terminating2, nil)
require.Error(t, err, "service \"web\" is associated with a different gateway")
require.Nil(t, wm)
// try again without web
terminating2.Services = []LinkedService{
{
Name: "*",

View File

@ -0,0 +1,20 @@
enable_central_service_config = true
config_entries {
bootstrap {
kind = "ingress-gateway"
name = "ingress-gateway"
listeners = [
{
port = 9999
protocol = "tcp"
services = [
{
name = "s1"
}
]
}
]
}
}

View File

@ -0,0 +1,4 @@
services {
name = "ingress-gateway"
kind = "ingress-gateway"
}

View File

@ -0,0 +1,10 @@
#!/bin/bash
set -euo pipefail
# wait for bootstrap to apply config entries
wait_for_config_entry ingress-gateway ingress-gateway
gen_envoy_bootstrap ingress-gateway 20000 primary true
gen_envoy_bootstrap s1 19000
gen_envoy_bootstrap s2 19001

View File

@ -0,0 +1,3 @@
#!/bin/bash
export REQUIRED_SERVICES="$DEFAULT_REQUIRED_SERVICES ingress-gateway-primary"

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bats
load helpers
@test "ingress proxy admin is up on :20000" {
retry_default curl -f -s localhost:20000/stats -o /dev/null
}
@test "s1 proxy admin is up on :19000" {
retry_default curl -f -s localhost:19000/stats -o /dev/null
}
@test "s2 proxy admin is up on :19001" {
retry_default curl -f -s localhost:19001/stats -o /dev/null
}
@test "s1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s1
}
@test "ingress-gateway should have healthy endpoints for s1" {
assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s1 HEALTHY 1
}
@test "ingress should be able to connect to s1 via configured port" {
run retry_default curl -s -f -d hello localhost:9999
[ "$status" -eq 0 ]
[ "$output" = "hello" ]
}

View File

@ -563,6 +563,23 @@ services:
- *workdir-volume
network_mode: service:consul-secondary
ingress-gateway-primary:
depends_on:
- consul-primary
image: "envoyproxy/envoy:v${ENVOY_VERSION}"
command:
- "envoy"
- "-c"
- "/workdir/primary/envoy/ingress-gateway-bootstrap.json"
- "-l"
- "debug"
- "--disable-hot-restart"
- "--drain-time-s"
- "1"
volumes:
- *workdir-volume
network_mode: service:consul-primary
verify-primary:
depends_on:
- consul-primary

View File

@ -528,11 +528,11 @@ function gen_envoy_bootstrap {
SERVICE=$1
ADMIN_PORT=$2
DC=${3:-primary}
IS_MGW=${4:-0}
IS_GW=${4:-0}
EXTRA_ENVOY_BS_ARGS="${5-}"
PROXY_ID="$SERVICE"
if ! is_set "$IS_MGW"
if ! is_set "$IS_GW"
then
PROXY_ID="$SERVICE-sidecar-proxy"
fi