Remove the `connect.enable_serverless_plugin` agent configuration option (#15710)

This commit is contained in:
Eric Haberkorn 2022-12-08 14:46:42 -05:00 committed by GitHub
parent b459d58e8d
commit 5dd131fee8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 226 additions and 261 deletions

4
.changelog/15710.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:breaking-change
xds: Remove the `connect.enable_serverless_plugin` agent configuration option. Now
Lambda integration is enabled by default.
```

View File

@ -848,7 +848,6 @@ func (a *Agent) listenAndServeGRPC() error {
a.xdsServer = xds.NewServer( a.xdsServer = xds.NewServer(
a.config.NodeName, a.config.NodeName,
a.logger.Named(logging.Envoy), a.logger.Named(logging.Envoy),
a.config.ConnectServerlessPluginEnabled,
cfg, cfg,
func(id string) (acl.Authorizer, error) { func(id string) (acl.Authorizer, error) {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)

View File

@ -666,7 +666,6 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
connectEnabled := boolVal(c.Connect.Enabled) connectEnabled := boolVal(c.Connect.Enabled)
connectCAProvider := stringVal(c.Connect.CAProvider) connectCAProvider := stringVal(c.Connect.CAProvider)
connectCAConfig := c.Connect.CAConfig connectCAConfig := c.Connect.CAConfig
serverlessPluginEnabled := boolVal(c.Connect.EnableServerlessPlugin)
// autoEncrypt and autoConfig implicitly turns on connect which is why // autoEncrypt and autoConfig implicitly turns on connect which is why
// they need to be above other settings that rely on connect. // they need to be above other settings that rely on connect.
@ -969,7 +968,6 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
ConnectCAProvider: connectCAProvider, ConnectCAProvider: connectCAProvider,
ConnectCAConfig: connectCAConfig, ConnectCAConfig: connectCAConfig,
ConnectMeshGatewayWANFederationEnabled: connectMeshGatewayWANFederationEnabled, ConnectMeshGatewayWANFederationEnabled: connectMeshGatewayWANFederationEnabled,
ConnectServerlessPluginEnabled: serverlessPluginEnabled,
ConnectSidecarMinPort: sidecarMinPort, ConnectSidecarMinPort: sidecarMinPort,
ConnectSidecarMaxPort: sidecarMaxPort, ConnectSidecarMaxPort: sidecarMaxPort,
ConnectTestCALeafRootChangeSpread: b.durationVal("connect.test_ca_leaf_root_change_spread", c.Connect.TestCALeafRootChangeSpread), ConnectTestCALeafRootChangeSpread: b.durationVal("connect.test_ca_leaf_root_change_spread", c.Connect.TestCALeafRootChangeSpread),

View File

@ -613,7 +613,6 @@ type Connect struct {
CAProvider *string `mapstructure:"ca_provider" json:"ca_provider,omitempty"` CAProvider *string `mapstructure:"ca_provider" json:"ca_provider,omitempty"`
CAConfig map[string]interface{} `mapstructure:"ca_config" json:"ca_config,omitempty"` CAConfig map[string]interface{} `mapstructure:"ca_config" json:"ca_config,omitempty"`
MeshGatewayWANFederationEnabled *bool `mapstructure:"enable_mesh_gateway_wan_federation" json:"enable_mesh_gateway_wan_federation,omitempty"` MeshGatewayWANFederationEnabled *bool `mapstructure:"enable_mesh_gateway_wan_federation" json:"enable_mesh_gateway_wan_federation,omitempty"`
EnableServerlessPlugin *bool `mapstructure:"enable_serverless_plugin" json:"enable_serverless_plugin,omitempty"`
// TestCALeafRootChangeSpread controls how long after a CA roots change before new leaf certs will be generated. // TestCALeafRootChangeSpread controls how long after a CA roots change before new leaf certs will be generated.
// This is only tuned in tests, generally set to 1ns to make tests deterministic with when to expect updated leaf // This is only tuned in tests, generally set to 1ns to make tests deterministic with when to expect updated leaf

View File

@ -494,12 +494,6 @@ type RuntimeConfig struct {
// and servers in a cluster for correct connect operation. // and servers in a cluster for correct connect operation.
ConnectEnabled bool ConnectEnabled bool
// ConnectServerlessPluginEnabled opts the agent into the serverless plugin.
// This plugin allows services to be configured as AWS Lambdas. After the
// Lambda service is configured, Connect services can invoke the Lambda
// service like any other upstream.
ConnectServerlessPluginEnabled bool
// ConnectSidecarMinPort is the inclusive start of the range of ports // ConnectSidecarMinPort is the inclusive start of the range of ports
// allocated to the agent for asigning to sidecar services where no port is // allocated to the agent for asigning to sidecar services where no port is
// specified. // specified.

View File

@ -6064,7 +6064,6 @@ func TestLoad_FullConfig(t *testing.T) {
"CSRMaxConcurrent": float64(2), "CSRMaxConcurrent": float64(2),
}, },
ConnectMeshGatewayWANFederationEnabled: false, ConnectMeshGatewayWANFederationEnabled: false,
ConnectServerlessPluginEnabled: true,
Cloud: hcpconfig.CloudConfig{ Cloud: hcpconfig.CloudConfig{
ResourceID: "N43DsscE", ResourceID: "N43DsscE",
ClientID: "6WvsDZCP", ClientID: "6WvsDZCP",

View File

@ -110,9 +110,9 @@
"Method": "", "Method": "",
"Name": "zoo", "Name": "zoo",
"Notes": "", "Notes": "",
"OSService": "",
"OutputMaxSize": 4096, "OutputMaxSize": 4096,
"ScriptArgs": [], "ScriptArgs": [],
"OSService": "",
"ServiceID": "", "ServiceID": "",
"Shell": "", "Shell": "",
"Status": "", "Status": "",
@ -140,7 +140,6 @@
"ConnectCAProvider": "", "ConnectCAProvider": "",
"ConnectEnabled": false, "ConnectEnabled": false,
"ConnectMeshGatewayWANFederationEnabled": false, "ConnectMeshGatewayWANFederationEnabled": false,
"ConnectServerlessPluginEnabled": false,
"ConnectSidecarMaxPort": 0, "ConnectSidecarMaxPort": 0,
"ConnectSidecarMinPort": 0, "ConnectSidecarMinPort": 0,
"ConnectTestCALeafRootChangeSpread": "0s", "ConnectTestCALeafRootChangeSpread": "0s",
@ -258,6 +257,7 @@
"PrimaryGatewaysInterval": "0s", "PrimaryGatewaysInterval": "0s",
"RPCAdvertiseAddr": "", "RPCAdvertiseAddr": "",
"RPCBindAddr": "", "RPCBindAddr": "",
"RPCClientTimeout": "0s",
"RPCConfig": { "RPCConfig": {
"EnableStreaming": false "EnableStreaming": false
}, },
@ -267,7 +267,6 @@
"RPCMaxConnsPerClient": 0, "RPCMaxConnsPerClient": 0,
"RPCProtocol": 0, "RPCProtocol": 0,
"RPCRateLimit": 0, "RPCRateLimit": 0,
"RPCClientTimeout": "0s",
"RaftBoltDBConfig": { "RaftBoltDBConfig": {
"NoFreelistSync": false "NoFreelistSync": false
}, },
@ -331,6 +330,7 @@
"Method": "", "Method": "",
"Name": "blurb", "Name": "blurb",
"Notes": "", "Notes": "",
"OSService": "",
"OutputMaxSize": 4096, "OutputMaxSize": 4096,
"ProxyGRPC": "", "ProxyGRPC": "",
"ProxyHTTP": "", "ProxyHTTP": "",
@ -338,7 +338,6 @@
"Shell": "", "Shell": "",
"Status": "", "Status": "",
"SuccessBeforePassing": 0, "SuccessBeforePassing": 0,
"OSService": "",
"TCP": "", "TCP": "",
"TLSServerName": "", "TLSServerName": "",
"TLSSkipVerify": false, "TLSSkipVerify": false,

View File

@ -225,7 +225,6 @@ connect {
} }
enable_mesh_gateway_wan_federation = false enable_mesh_gateway_wan_federation = false
enabled = true enabled = true
enable_serverless_plugin = true
} }
gossip_lan { gossip_lan {
gossip_nodes = 6 gossip_nodes = 6

View File

@ -224,8 +224,7 @@
"csr_max_concurrent": 2 "csr_max_concurrent": 2
}, },
"enable_mesh_gateway_wan_federation": false, "enable_mesh_gateway_wan_federation": false,
"enabled": true, "enabled": true
"enable_serverless_plugin": true
}, },
"gossip_lan" : { "gossip_lan" : {
"gossip_nodes": 6, "gossip_nodes": 6,

View File

@ -251,11 +251,9 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
s.ResourceMapMutateFn(newResourceMap) s.ResourceMapMutateFn(newResourceMap)
} }
if s.serverlessPluginEnabled { newResourceMap, err = serverlessplugin.MutateIndexedResources(newResourceMap, xdscommon.MakePluginConfiguration(cfgSnap))
newResourceMap, err = serverlessplugin.MutateIndexedResources(newResourceMap, xdscommon.MakePluginConfiguration(cfgSnap)) if err != nil {
if err != nil { return status.Errorf(codes.Unavailable, "failed to patch xDS resources in the serverless plugin: %v", err)
return status.Errorf(codes.Unavailable, "failed to patch xDS resources in the serverless plugin: %v", err)
}
} }
if err := populateChildIndexMap(newResourceMap); err != nil { if err := populateChildIndexMap(newResourceMap); err != nil {

View File

@ -2,7 +2,6 @@ package xds
import ( import (
"errors" "errors"
"fmt"
"strings" "strings"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -30,206 +29,201 @@ import (
// Stick to very straightforward stuff in xds_protocol_helpers_test.go. // Stick to very straightforward stuff in xds_protocol_helpers_test.go.
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
for _, serverlessPluginEnabled := range []bool{false, true} { aclResolve := func(id string) (acl.Authorizer, error) {
t.Run(fmt.Sprintf("serverless patcher: %t", serverlessPluginEnabled), func(t *testing.T) { // Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
aclResolve := func(id string) (acl.Authorizer, error) { sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) // Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
// Register the proxy to create state needed to Watch() on var snap *proxycfg.ConfigSnapshot
mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot testutil.RunStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")
testutil.RunStep(t, "initial setup", func(t *testing.T) { // Send initial cluster discover. We'll assume we are testing a partial
snap = newTestSnapshot(t, nil, "") // reconnect and include some initial resource versions that will be
// cleaned up.
// Send initial cluster discover. We'll assume we are testing a partial envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// reconnect and include some initial resource versions that will be InitialResourceVersions: mustMakeVersionMap(t,
// cleaned up. makeTestCluster(t, snap, "tcp:geo-cache"),
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ),
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
})
testutil.RunStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// We'll assume we are testing a partial "reconnect"
InitialResourceVersions: mustMakeVersionMap(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
// "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
//
// Include "fake-endpoints" here to test subscribing to an unknown
// thing and have consul tell us there's no data for it.
"fake-endpoints",
},
})
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
// SAME_AS_INITIAL_VERSION: "fake-endpoints",
),
})
// After receiving the endpoints Envoy sends an ACK for the cluster
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// If Envoy re-subscribes to something even if there are no changes we send a
// fresh copy.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] =
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1]
}
testutil.RunStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesUnsubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db.
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change because Envoy is not subscribed to db anymore.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) {
// Restore db's deleted endpoints by generating a new snapshot.
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change because Envoy is still not subscribed to db.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// When Envoy re-subscribes to db we send the endpoints for it.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// NOTE: this has to be the last subtest since it kills the stream
testutil.RunStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))
// Trigger only an EDS update by deleting endpoints again.
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
// We never send any replies about this change because we died.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}) })
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
})
testutil.RunStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// We'll assume we are testing a partial "reconnect"
InitialResourceVersions: mustMakeVersionMap(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
// "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
//
// Include "fake-endpoints" here to test subscribing to an unknown
// thing and have consul tell us there's no data for it.
"fake-endpoints",
},
})
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
// SAME_AS_INITIAL_VERSION: "fake-endpoints",
),
})
// After receiving the endpoints Envoy sends an ACK for the cluster
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// If Envoy re-subscribes to something even if there are no changes we send a
// fresh copy.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] =
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1]
}
testutil.RunStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesUnsubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db.
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change because Envoy is not subscribed to db anymore.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) {
// Restore db's deleted endpoints by generating a new snapshot.
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change because Envoy is still not subscribed to db.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// When Envoy re-subscribes to db we send the endpoints for it.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// NOTE: this has to be the last subtest since it kills the stream
testutil.RunStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))
// Trigger only an EDS update by deleting endpoints again.
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
// We never send any replies about this change because we died.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
} }
} }
@ -238,7 +232,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
// Allow all // Allow all
return acl.RootAuthorizer("manage"), nil return acl.RootAuthorizer("manage"), nil
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -370,7 +364,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
// Allow all // Allow all
return acl.RootAuthorizer("manage"), nil return acl.RootAuthorizer("manage"), nil
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -522,7 +516,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
// Allow all // Allow all
return acl.RootAuthorizer("manage"), nil return acl.RootAuthorizer("manage"), nil
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
// This mutateFn causes any endpoint with a name containing "geo-cache" to be // This mutateFn causes any endpoint with a name containing "geo-cache" to be
@ -667,7 +661,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// Allow all // Allow all
return acl.RootAuthorizer("manage"), nil return acl.RootAuthorizer("manage"), nil
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -804,7 +798,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
// Allow all // Allow all
return acl.RootAuthorizer("manage"), nil return acl.RootAuthorizer("manage"), nil
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1062,7 +1056,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false, nil) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1139,7 +1133,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short. 100*time.Millisecond, // Make this short.
false,
nil, nil,
) )
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1239,7 +1232,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short. 100*time.Millisecond, // Make this short.
false,
nil, nil,
) )
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1321,7 +1313,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
// Allow all // Allow all
return acl.RootAuthorizer("manage"), nil return acl.RootAuthorizer("manage"), nil
} }
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false, nil) scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("ingress-gateway", nil) sid := structs.NewServiceID("ingress-gateway", nil)
@ -1376,7 +1368,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) { func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, capacityReachedLimiter{}) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, capacityReachedLimiter{})
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1410,7 +1402,7 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
limiter := &testLimiter{} limiter := &testLimiter{}
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, limiter) scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, limiter)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil) sid := structs.NewServiceID("web-sidecar-proxy", nil)

View File

@ -140,8 +140,7 @@ type Server struct {
// ResourceMapMutateFn exclusively exists for testing purposes. // ResourceMapMutateFn exclusively exists for testing purposes.
ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources) ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources)
activeStreams *activeStreamCounters activeStreams *activeStreamCounters
serverlessPluginEnabled bool
} }
// activeStreamCounters simply encapsulates two counters accessed atomically to // activeStreamCounters simply encapsulates two counters accessed atomically to
@ -177,22 +176,20 @@ func (c *activeStreamCounters) Increment(xdsVersion string) func() {
func NewServer( func NewServer(
nodeName string, nodeName string,
logger hclog.Logger, logger hclog.Logger,
serverlessPluginEnabled bool,
cfgMgr ProxyConfigSource, cfgMgr ProxyConfigSource,
resolveToken ACLResolverFunc, resolveToken ACLResolverFunc,
cfgFetcher ConfigFetcher, cfgFetcher ConfigFetcher,
limiter SessionLimiter, limiter SessionLimiter,
) *Server { ) *Server {
return &Server{ return &Server{
NodeName: nodeName, NodeName: nodeName,
Logger: logger, Logger: logger,
CfgSrc: cfgMgr, CfgSrc: cfgMgr,
ResolveToken: resolveToken, ResolveToken: resolveToken,
CfgFetcher: cfgFetcher, CfgFetcher: cfgFetcher,
SessionLimiter: limiter, SessionLimiter: limiter,
AuthCheckFrequency: DefaultAuthCheckFrequency, AuthCheckFrequency: DefaultAuthCheckFrequency,
activeStreams: &activeStreamCounters{}, activeStreams: &activeStreamCounters{},
serverlessPluginEnabled: serverlessPluginEnabled,
} }
} }

View File

@ -136,7 +136,6 @@ func newTestServerDeltaScenario(
proxyID string, proxyID string,
token string, token string,
authCheckFrequency time.Duration, authCheckFrequency time.Duration,
serverlessPluginEnabled bool,
sessionLimiter SessionLimiter, sessionLimiter SessionLimiter,
) *testServerScenario { ) *testServerScenario {
mgr := newTestManager(t) mgr := newTestManager(t)
@ -163,7 +162,6 @@ func newTestServerDeltaScenario(
s := NewServer( s := NewServer(
"node-123", "node-123",
testutil.Logger(t), testutil.Logger(t),
serverlessPluginEnabled,
mgr, mgr,
resolveToken, resolveToken,
nil, /*cfgFetcher ConfigFetcher*/ nil, /*cfgFetcher ConfigFetcher*/

View File

@ -1,3 +0,0 @@
connect {
enable_serverless_plugin = true
}

View File

@ -1068,11 +1068,6 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `enable_mesh_gateway_wan_federation` ((#connect_enable_mesh_gateway_wan_federation)) (Defaults to `false`) Controls whether cross-datacenter federation traffic between servers is funneled - `enable_mesh_gateway_wan_federation` ((#connect_enable_mesh_gateway_wan_federation)) (Defaults to `false`) Controls whether cross-datacenter federation traffic between servers is funneled
through mesh gateways. This was added in Consul 1.8.0. through mesh gateways. This was added in Consul 1.8.0.
- `enable_serverless_plugin` ((#connect_enable_serverless_plugin)) (Defaults to `false`) Determines whether the serverless plugin
is enabled. The serverless plugin supports [AWS
Lambda](https://aws.amazon.com/lambda/). For additional information on invoking Lambda functions
from mesh services, refer to the [Lambda documentation](/docs/lambda).
- `ca_provider` ((#connect_ca_provider)) Controls which CA provider to - `ca_provider` ((#connect_ca_provider)) Controls which CA provider to
use for Connect's CA. Currently only the `aws-pca`, `consul`, and `vault` providers are supported. use for Connect's CA. Currently only the `aws-pca`, `consul`, and `vault` providers are supported.
This is only used when initially bootstrapping the cluster. For an existing cluster, This is only used when initially bootstrapping the cluster. For an existing cluster,

View File

@ -20,14 +20,6 @@ Consul v1.12.1 and later
Complete the following prerequisites prior to registering your Lambda functions. You only need to perform these steps once. Complete the following prerequisites prior to registering your Lambda functions. You only need to perform these steps once.
### Enable the Serverless Plugin
Add the following configuration to all Consul clients:
`connect { enable_serverless_plugin = true, connect = true }`
Refer to the [`enable_serverless_plugin`](/docs/agent/config/config-files#connect_enable_serverless_plugin) configuration documentation for additional information.
### Configure IAM Permissions for Envoy ### Configure IAM Permissions for Envoy
The Envoy proxy that invokes Lambda must have the `lambda:InvokeFunction` AWS IAM The Envoy proxy that invokes Lambda must have the `lambda:InvokeFunction` AWS IAM
@ -84,4 +76,4 @@ Refer to the following documentation and tutorials for instructions on how to se
To register a Lambda service with a terminating gateway, add the service to the To register a Lambda service with a terminating gateway, add the service to the
`Services` field of the terminating gateway's `terminating-gateway` `Services` field of the terminating gateway's `terminating-gateway`
configuration entry. configuration entry.

View File

@ -14,6 +14,12 @@ provided for their upgrades as a result of new features or changed behavior.
This page is used to document those details separately from the standard This page is used to document those details separately from the standard
upgrade flow. upgrade flow.
## Consul 1.15.x
#### Removing configuration options
The `connect.enable_serverless_plugin` configuration option was removed. Lambda integration is now enabled by default.
## Consul 1.14.x ## Consul 1.14.x
### Service Mesh Compatibility ### Service Mesh Compatibility
@ -25,7 +31,7 @@ A breaking change was made in Consul 1.14 that:
[`connect.enabled`](/docs/agent/config/config-files#connect_enabled) to `false`. [`connect.enabled`](/docs/agent/config/config-files#connect_enabled) to `false`.
The changes to Consul service mesh in version 1.14 are incompatible with Nomad 1.4.2 and The changes to Consul service mesh in version 1.14 are incompatible with Nomad 1.4.2 and
earlier. If you operate Consul service mesh using Nomad 1.4.2 or earlier, do not upgrade to Consul 1.14 until earlier. If you operate Consul service mesh using Nomad 1.4.2 or earlier, do not upgrade to Consul 1.14 until
[hashicorp/nomad#15266](https://github.com/hashicorp/nomad/issues/15266) is [hashicorp/nomad#15266](https://github.com/hashicorp/nomad/issues/15266) is
fixed. fixed.