From 5dd131fee8f658b065cf2a1f6cb537598aa94cdb Mon Sep 17 00:00:00 2001 From: Eric Haberkorn Date: Thu, 8 Dec 2022 14:46:42 -0500 Subject: [PATCH] Remove the `connect.enable_serverless_plugin` agent configuration option (#15710) --- .changelog/15710.txt | 4 + agent/agent.go | 1 - agent/config/builder.go | 2 - agent/config/config.go | 1 - agent/config/runtime.go | 6 - agent/config/runtime_test.go | 1 - .../TestRuntimeConfig_Sanitize.golden | 7 +- agent/config/testdata/full-config.hcl | 1 - agent/config/testdata/full-config.json | 3 +- agent/xds/delta.go | 8 +- agent/xds/delta_test.go | 404 +++++++++--------- agent/xds/server.go | 21 +- agent/xds/xds_protocol_helpers_test.go | 2 - .../envoy/case-mesh-to-lambda/serverless.hcl | 3 - .../docs/agent/config/config-files.mdx | 5 - .../docs/lambda/registration/index.mdx | 10 +- .../docs/upgrading/upgrade-specific.mdx | 8 +- 17 files changed, 226 insertions(+), 261 deletions(-) create mode 100644 .changelog/15710.txt delete mode 100644 test/integration/connect/envoy/case-mesh-to-lambda/serverless.hcl diff --git a/.changelog/15710.txt b/.changelog/15710.txt new file mode 100644 index 000000000..fbc335258 --- /dev/null +++ b/.changelog/15710.txt @@ -0,0 +1,4 @@ +```release-note:breaking-change +xds: Remove the `connect.enable_serverless_plugin` agent configuration option. Now +Lambda integration is enabled by default. +``` diff --git a/agent/agent.go b/agent/agent.go index 7c1750266..52d799e7d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -848,7 +848,6 @@ func (a *Agent) listenAndServeGRPC() error { a.xdsServer = xds.NewServer( a.config.NodeName, a.logger.Named(logging.Envoy), - a.config.ConnectServerlessPluginEnabled, cfg, func(id string) (acl.Authorizer, error) { return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) diff --git a/agent/config/builder.go b/agent/config/builder.go index f77054db7..f071bd206 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -666,7 +666,6 @@ func (b *builder) build() (rt RuntimeConfig, err error) { connectEnabled := boolVal(c.Connect.Enabled) connectCAProvider := stringVal(c.Connect.CAProvider) connectCAConfig := c.Connect.CAConfig - serverlessPluginEnabled := boolVal(c.Connect.EnableServerlessPlugin) // autoEncrypt and autoConfig implicitly turns on connect which is why // they need to be above other settings that rely on connect. @@ -969,7 +968,6 @@ func (b *builder) build() (rt RuntimeConfig, err error) { ConnectCAProvider: connectCAProvider, ConnectCAConfig: connectCAConfig, ConnectMeshGatewayWANFederationEnabled: connectMeshGatewayWANFederationEnabled, - ConnectServerlessPluginEnabled: serverlessPluginEnabled, ConnectSidecarMinPort: sidecarMinPort, ConnectSidecarMaxPort: sidecarMaxPort, ConnectTestCALeafRootChangeSpread: b.durationVal("connect.test_ca_leaf_root_change_spread", c.Connect.TestCALeafRootChangeSpread), diff --git a/agent/config/config.go b/agent/config/config.go index 83d0548fe..e19922541 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -613,7 +613,6 @@ type Connect struct { CAProvider *string `mapstructure:"ca_provider" json:"ca_provider,omitempty"` CAConfig map[string]interface{} `mapstructure:"ca_config" json:"ca_config,omitempty"` MeshGatewayWANFederationEnabled *bool `mapstructure:"enable_mesh_gateway_wan_federation" json:"enable_mesh_gateway_wan_federation,omitempty"` - EnableServerlessPlugin *bool `mapstructure:"enable_serverless_plugin" json:"enable_serverless_plugin,omitempty"` // TestCALeafRootChangeSpread controls how long after a CA roots change before new leaf certs will be generated. // This is only tuned in tests, generally set to 1ns to make tests deterministic with when to expect updated leaf diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 5388f3d7c..199e2ac09 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -494,12 +494,6 @@ type RuntimeConfig struct { // and servers in a cluster for correct connect operation. ConnectEnabled bool - // ConnectServerlessPluginEnabled opts the agent into the serverless plugin. - // This plugin allows services to be configured as AWS Lambdas. After the - // Lambda service is configured, Connect services can invoke the Lambda - // service like any other upstream. - ConnectServerlessPluginEnabled bool - // ConnectSidecarMinPort is the inclusive start of the range of ports // allocated to the agent for asigning to sidecar services where no port is // specified. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 576873bfa..e5e571fee 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -6064,7 +6064,6 @@ func TestLoad_FullConfig(t *testing.T) { "CSRMaxConcurrent": float64(2), }, ConnectMeshGatewayWANFederationEnabled: false, - ConnectServerlessPluginEnabled: true, Cloud: hcpconfig.CloudConfig{ ResourceID: "N43DsscE", ClientID: "6WvsDZCP", diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 23aff3a88..98317d3cf 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -110,9 +110,9 @@ "Method": "", "Name": "zoo", "Notes": "", + "OSService": "", "OutputMaxSize": 4096, "ScriptArgs": [], - "OSService": "", "ServiceID": "", "Shell": "", "Status": "", @@ -140,7 +140,6 @@ "ConnectCAProvider": "", "ConnectEnabled": false, "ConnectMeshGatewayWANFederationEnabled": false, - "ConnectServerlessPluginEnabled": false, "ConnectSidecarMaxPort": 0, "ConnectSidecarMinPort": 0, "ConnectTestCALeafRootChangeSpread": "0s", @@ -258,6 +257,7 @@ "PrimaryGatewaysInterval": "0s", "RPCAdvertiseAddr": "", "RPCBindAddr": "", + "RPCClientTimeout": "0s", "RPCConfig": { "EnableStreaming": false }, @@ -267,7 +267,6 @@ "RPCMaxConnsPerClient": 0, "RPCProtocol": 0, "RPCRateLimit": 0, - "RPCClientTimeout": "0s", "RaftBoltDBConfig": { "NoFreelistSync": false }, @@ -331,6 +330,7 @@ "Method": "", "Name": "blurb", "Notes": "", + "OSService": "", "OutputMaxSize": 4096, "ProxyGRPC": "", "ProxyHTTP": "", @@ -338,7 +338,6 @@ "Shell": "", "Status": "", "SuccessBeforePassing": 0, - "OSService": "", "TCP": "", "TLSServerName": "", "TLSSkipVerify": false, diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index 1e38cd48e..f49946419 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -225,7 +225,6 @@ connect { } enable_mesh_gateway_wan_federation = false enabled = true - enable_serverless_plugin = true } gossip_lan { gossip_nodes = 6 diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index 507aef157..e578e6f33 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -224,8 +224,7 @@ "csr_max_concurrent": 2 }, "enable_mesh_gateway_wan_federation": false, - "enabled": true, - "enable_serverless_plugin": true + "enabled": true }, "gossip_lan" : { "gossip_nodes": 6, diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 4c042eeb9..b90c10e0f 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -251,11 +251,9 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove s.ResourceMapMutateFn(newResourceMap) } - if s.serverlessPluginEnabled { - newResourceMap, err = serverlessplugin.MutateIndexedResources(newResourceMap, xdscommon.MakePluginConfiguration(cfgSnap)) - if err != nil { - return status.Errorf(codes.Unavailable, "failed to patch xDS resources in the serverless plugin: %v", err) - } + newResourceMap, err = serverlessplugin.MutateIndexedResources(newResourceMap, xdscommon.MakePluginConfiguration(cfgSnap)) + if err != nil { + return status.Errorf(codes.Unavailable, "failed to patch xDS resources in the serverless plugin: %v", err) } if err := populateChildIndexMap(newResourceMap); err != nil { diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 7e57e3dba..803c19a78 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -2,7 +2,6 @@ package xds import ( "errors" - "fmt" "strings" "sync/atomic" "testing" @@ -30,206 +29,201 @@ import ( // Stick to very straightforward stuff in xds_protocol_helpers_test.go. func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { - for _, serverlessPluginEnabled := range []bool{false, true} { - t.Run(fmt.Sprintf("serverless patcher: %t", serverlessPluginEnabled), func(t *testing.T) { + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy - aclResolve := func(id string) (acl.Authorizer, error) { - // Allow all - return acl.RootAuthorizer("manage"), nil - } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled, nil) - mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + sid := structs.NewServiceID("web-sidecar-proxy", nil) - sid := structs.NewServiceID("web-sidecar-proxy", nil) + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) - // Register the proxy to create state needed to Watch() on - mgr.RegisterProxy(t, sid) + var snap *proxycfg.ConfigSnapshot - var snap *proxycfg.ConfigSnapshot + testutil.RunStep(t, "initial setup", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") - testutil.RunStep(t, "initial setup", func(t *testing.T) { - snap = newTestSnapshot(t, nil, "") - - // Send initial cluster discover. We'll assume we are testing a partial - // reconnect and include some initial resource versions that will be - // cleaned up. - envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - InitialResourceVersions: mustMakeVersionMap(t, - makeTestCluster(t, snap, "tcp:geo-cache"), - ), - }) - - // Check no response sent yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - requireProtocolVersionGauge(t, scenario, "v3", 1) - - // Deliver a new snapshot (tcp with one tcp upstream) - mgr.DeliverConfig(t, sid, snap) - }) - - testutil.RunStep(t, "first sync", func(t *testing.T) { - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.ClusterType, - Nonce: hexString(1), - Resources: makeTestResources(t, - makeTestCluster(t, snap, "tcp:local_app"), - makeTestCluster(t, snap, "tcp:db"), - // SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"), - ), - }) - - // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - // We'll assume we are testing a partial "reconnect" - InitialResourceVersions: mustMakeVersionMap(t, - makeTestEndpoints(t, snap, "tcp:geo-cache"), - ), - ResourceNamesSubscribe: []string{ - "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", - // "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", - // - // Include "fake-endpoints" here to test subscribing to an unknown - // thing and have consul tell us there's no data for it. - "fake-endpoints", - }, - }) - - // We should get a response immediately since the config is already present in - // the server for endpoints. Note that this should not be racy if the server - // is behaving well since the Cluster send above should be blocked until we - // deliver a new config version. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.EndpointType, - Nonce: hexString(2), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db"), - // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), - // SAME_AS_INITIAL_VERSION: "fake-endpoints", - ), - }) - - // After receiving the endpoints Envoy sends an ACK for the cluster - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - - // We are caught up, so there should be nothing queued to send. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // Envoy now sends listener request - envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) - - // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) - - // And should get a response immediately. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.ListenerType, - Nonce: hexString(3), - Resources: makeTestResources(t, - makeTestListener(t, snap, "tcp:public_listener"), - makeTestListener(t, snap, "tcp:db"), - makeTestListener(t, snap, "tcp:geo-cache"), - ), - }) - - // We are caught up, so there should be nothing queued to send. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // ACKs the listener - envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) - - // If Envoy re-subscribes to something even if there are no changes we send a - // fresh copy. - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - ResourceNamesSubscribe: []string{ - "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", - }, - }) - - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.EndpointType, - Nonce: hexString(4), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:geo-cache"), - ), - }) - - envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) - - // We are caught up, so there should be nothing queued to send. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) { - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] = - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1] - } - - testutil.RunStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - ResourceNamesUnsubscribe: []string{ - "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", - }, - }) - - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db. - snap = newTestSnapshot(t, snap, "") - deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") - mgr.DeliverConfig(t, sid, snap) - - // We never send an EDS reply about this change because Envoy is not subscribed to db anymore. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) { - // Restore db's deleted endpoints by generating a new snapshot. - snap = newTestSnapshot(t, snap, "") - mgr.DeliverConfig(t, sid, snap) - - // We never send an EDS reply about this change because Envoy is still not subscribed to db. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // When Envoy re-subscribes to db we send the endpoints for it. - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - ResourceNamesSubscribe: []string{ - "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", - }, - }) - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.EndpointType, - Nonce: hexString(5), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db"), - ), - }) - - envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5) - - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - // NOTE: this has to be the last subtest since it kills the stream - testutil.RunStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) { - // Force sends to fail - envoy.SetSendErr(errors.New("test error")) - - // Trigger only an EDS update by deleting endpoints again. - deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") - - // We never send any replies about this change because we died. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - envoy.Close() - select { - case err := <-errCh: - require.NoError(t, err) - case <-time.After(50 * time.Millisecond): - t.Fatalf("timed out waiting for handler to finish") - } + // Send initial cluster discover. We'll assume we are testing a partial + // reconnect and include some initial resource versions that will be + // cleaned up. + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + InitialResourceVersions: mustMakeVersionMap(t, + makeTestCluster(t, snap, "tcp:geo-cache"), + ), }) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot (tcp with one tcp upstream) + mgr.DeliverConfig(t, sid, snap) + }) + + testutil.RunStep(t, "first sync", func(t *testing.T) { + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + // SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Envoy then tries to discover endpoints for those clusters. + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + // We'll assume we are testing a partial "reconnect" + InitialResourceVersions: mustMakeVersionMap(t, + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + // "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + // + // Include "fake-endpoints" here to test subscribing to an unknown + // thing and have consul tell us there's no data for it. + "fake-endpoints", + }, + }) + + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), + // SAME_AS_INITIAL_VERSION: "fake-endpoints", + ), + }) + + // After receiving the endpoints Envoy sends an ACK for the cluster + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We are caught up, so there should be nothing queued to send. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends listener request + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ListenerType, + Nonce: hexString(3), + Resources: makeTestResources(t, + makeTestListener(t, snap, "tcp:public_listener"), + makeTestListener(t, snap, "tcp:db"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // We are caught up, so there should be nothing queued to send. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // ACKs the listener + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) + + // If Envoy re-subscribes to something even if there are no changes we send a + // fresh copy. + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(4), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) + + // We are caught up, so there should be nothing queued to send. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) { + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] = + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1] + } + + testutil.RunStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesUnsubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db. + snap = newTestSnapshot(t, snap, "") + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") + mgr.DeliverConfig(t, sid, snap) + + // We never send an EDS reply about this change because Envoy is not subscribed to db anymore. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) { + // Restore db's deleted endpoints by generating a new snapshot. + snap = newTestSnapshot(t, snap, "") + mgr.DeliverConfig(t, sid, snap) + + // We never send an EDS reply about this change because Envoy is still not subscribed to db. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // When Envoy re-subscribes to db we send the endpoints for it. + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + }) + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(5), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + ), + }) + + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + // NOTE: this has to be the last subtest since it kills the stream + testutil.RunStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) { + // Force sends to fail + envoy.SetSendErr(errors.New("test error")) + + // Trigger only an EDS update by deleting endpoints again. + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") + + // We never send any replies about this change because we died. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") } } @@ -238,7 +232,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -370,7 +364,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -522,7 +516,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy // This mutateFn causes any endpoint with a name containing "geo-cache" to be @@ -667,7 +661,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -804,7 +798,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1062,7 +1056,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1139,7 +1133,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri } scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. - false, nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1239,7 +1232,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa } scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. - false, nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1321,7 +1313,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("ingress-gateway", nil) @@ -1376,7 +1368,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, capacityReachedLimiter{}) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, capacityReachedLimiter{}) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1410,7 +1402,7 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { limiter := &testLimiter{} aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, limiter) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, limiter) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) diff --git a/agent/xds/server.go b/agent/xds/server.go index 828a4e202..f3d36bdad 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -140,8 +140,7 @@ type Server struct { // ResourceMapMutateFn exclusively exists for testing purposes. ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources) - activeStreams *activeStreamCounters - serverlessPluginEnabled bool + activeStreams *activeStreamCounters } // activeStreamCounters simply encapsulates two counters accessed atomically to @@ -177,22 +176,20 @@ func (c *activeStreamCounters) Increment(xdsVersion string) func() { func NewServer( nodeName string, logger hclog.Logger, - serverlessPluginEnabled bool, cfgMgr ProxyConfigSource, resolveToken ACLResolverFunc, cfgFetcher ConfigFetcher, limiter SessionLimiter, ) *Server { return &Server{ - NodeName: nodeName, - Logger: logger, - CfgSrc: cfgMgr, - ResolveToken: resolveToken, - CfgFetcher: cfgFetcher, - SessionLimiter: limiter, - AuthCheckFrequency: DefaultAuthCheckFrequency, - activeStreams: &activeStreamCounters{}, - serverlessPluginEnabled: serverlessPluginEnabled, + NodeName: nodeName, + Logger: logger, + CfgSrc: cfgMgr, + ResolveToken: resolveToken, + CfgFetcher: cfgFetcher, + SessionLimiter: limiter, + AuthCheckFrequency: DefaultAuthCheckFrequency, + activeStreams: &activeStreamCounters{}, } } diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 7cb413def..b905035df 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -136,7 +136,6 @@ func newTestServerDeltaScenario( proxyID string, token string, authCheckFrequency time.Duration, - serverlessPluginEnabled bool, sessionLimiter SessionLimiter, ) *testServerScenario { mgr := newTestManager(t) @@ -163,7 +162,6 @@ func newTestServerDeltaScenario( s := NewServer( "node-123", testutil.Logger(t), - serverlessPluginEnabled, mgr, resolveToken, nil, /*cfgFetcher ConfigFetcher*/ diff --git a/test/integration/connect/envoy/case-mesh-to-lambda/serverless.hcl b/test/integration/connect/envoy/case-mesh-to-lambda/serverless.hcl deleted file mode 100644 index 41447a466..000000000 --- a/test/integration/connect/envoy/case-mesh-to-lambda/serverless.hcl +++ /dev/null @@ -1,3 +0,0 @@ -connect { - enable_serverless_plugin = true -} diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index aa418ccfb..31809605a 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -1068,11 +1068,6 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `enable_mesh_gateway_wan_federation` ((#connect_enable_mesh_gateway_wan_federation)) (Defaults to `false`) Controls whether cross-datacenter federation traffic between servers is funneled through mesh gateways. This was added in Consul 1.8.0. - - `enable_serverless_plugin` ((#connect_enable_serverless_plugin)) (Defaults to `false`) Determines whether the serverless plugin - is enabled. The serverless plugin supports [AWS - Lambda](https://aws.amazon.com/lambda/). For additional information on invoking Lambda functions - from mesh services, refer to the [Lambda documentation](/docs/lambda). - - `ca_provider` ((#connect_ca_provider)) Controls which CA provider to use for Connect's CA. Currently only the `aws-pca`, `consul`, and `vault` providers are supported. This is only used when initially bootstrapping the cluster. For an existing cluster, diff --git a/website/content/docs/lambda/registration/index.mdx b/website/content/docs/lambda/registration/index.mdx index 8b64afd81..de4f12da4 100644 --- a/website/content/docs/lambda/registration/index.mdx +++ b/website/content/docs/lambda/registration/index.mdx @@ -20,14 +20,6 @@ Consul v1.12.1 and later Complete the following prerequisites prior to registering your Lambda functions. You only need to perform these steps once. -### Enable the Serverless Plugin - -Add the following configuration to all Consul clients: - -`connect { enable_serverless_plugin = true, connect = true }` - -Refer to the [`enable_serverless_plugin`](/docs/agent/config/config-files#connect_enable_serverless_plugin) configuration documentation for additional information. - ### Configure IAM Permissions for Envoy The Envoy proxy that invokes Lambda must have the `lambda:InvokeFunction` AWS IAM @@ -84,4 +76,4 @@ Refer to the following documentation and tutorials for instructions on how to se To register a Lambda service with a terminating gateway, add the service to the `Services` field of the terminating gateway's `terminating-gateway` -configuration entry. \ No newline at end of file +configuration entry. diff --git a/website/content/docs/upgrading/upgrade-specific.mdx b/website/content/docs/upgrading/upgrade-specific.mdx index 2b0369db6..d63fc5d27 100644 --- a/website/content/docs/upgrading/upgrade-specific.mdx +++ b/website/content/docs/upgrading/upgrade-specific.mdx @@ -14,6 +14,12 @@ provided for their upgrades as a result of new features or changed behavior. This page is used to document those details separately from the standard upgrade flow. +## Consul 1.15.x + +#### Removing configuration options + +The `connect.enable_serverless_plugin` configuration option was removed. Lambda integration is now enabled by default. + ## Consul 1.14.x ### Service Mesh Compatibility @@ -25,7 +31,7 @@ A breaking change was made in Consul 1.14 that: [`connect.enabled`](/docs/agent/config/config-files#connect_enabled) to `false`. The changes to Consul service mesh in version 1.14 are incompatible with Nomad 1.4.2 and -earlier. If you operate Consul service mesh using Nomad 1.4.2 or earlier, do not upgrade to Consul 1.14 until +earlier. If you operate Consul service mesh using Nomad 1.4.2 or earlier, do not upgrade to Consul 1.14 until [hashicorp/nomad#15266](https://github.com/hashicorp/nomad/issues/15266) is fixed.