diff --git a/.changelog/16497.txt b/.changelog/16497.txt new file mode 100644 index 000000000..3aa3633ac --- /dev/null +++ b/.changelog/16497.txt @@ -0,0 +1,3 @@ +```release-note:bug +proxycfg: ensure that an irrecoverable error in proxycfg closes the xds session and triggers a replacement proxycfg watcher +``` diff --git a/agent/agent.go b/agent/agent.go index 7b218174a..0d11e462d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -721,11 +721,12 @@ func (a *Agent) Start(ctx context.Context) error { go localproxycfg.Sync( &lib.StopChannelContext{StopCh: a.shutdownCh}, localproxycfg.SyncConfig{ - Manager: a.proxyConfig, - State: a.State, - Logger: a.proxyConfig.Logger.Named("agent-state"), - Tokens: a.baseDeps.Tokens, - NodeName: a.config.NodeName, + Manager: a.proxyConfig, + State: a.State, + Logger: a.proxyConfig.Logger.Named("agent-state"), + Tokens: a.baseDeps.Tokens, + NodeName: a.config.NodeName, + ResyncFrequency: a.config.LocalProxyConfigResyncInterval, }, ) diff --git a/agent/config/builder.go b/agent/config/builder.go index f682bf7b1..5d697b027 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1091,6 +1091,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { Watches: c.Watches, XDSUpdateRateLimit: limitVal(c.XDS.UpdateMaxPerSecond), AutoReloadConfigCoalesceInterval: 1 * time.Second, + LocalProxyConfigResyncInterval: 30 * time.Second, } rt.TLS, err = b.buildTLSConfig(rt, c.TLS) diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 627c1e564..b0d9cf436 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -1475,6 +1475,10 @@ type RuntimeConfig struct { // AutoReloadConfigCoalesceInterval Coalesce Interval for auto reload config AutoReloadConfigCoalesceInterval time.Duration + // LocalProxyConfigResyncInterval is not a user-configurable value and exists + // here so that tests can use a smaller value. + LocalProxyConfigResyncInterval time.Duration + EnterpriseRuntimeConfig } diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 2844dd3a7..a8208f0ec 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -5995,12 +5995,13 @@ func TestLoad_FullConfig(t *testing.T) { nodeEntMeta := structs.NodeEnterpriseMetaInDefaultPartition() expected := &RuntimeConfig{ // non-user configurable values - AEInterval: time.Minute, - CheckDeregisterIntervalMin: time.Minute, - CheckReapInterval: 30 * time.Second, - SegmentNameLimit: 64, - SyncCoordinateIntervalMin: 15 * time.Second, - SyncCoordinateRateTarget: 64, + AEInterval: time.Minute, + CheckDeregisterIntervalMin: time.Minute, + CheckReapInterval: 30 * time.Second, + SegmentNameLimit: 64, + SyncCoordinateIntervalMin: 15 * time.Second, + SyncCoordinateRateTarget: 64, + LocalProxyConfigResyncInterval: 30 * time.Second, Revision: "JNtPSav3", Version: "R909Hblt", diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 75d216fab..2c5b91c98 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -233,6 +233,7 @@ "KVMaxValueSize": 1234567800000000, "LeaveDrainTime": "0s", "LeaveOnTerm": false, + "LocalProxyConfigResyncInterval": "0s", "Logging": { "EnableSyslog": false, "LogFilePath": "", diff --git a/agent/proxycfg-sources/local/sync.go b/agent/proxycfg-sources/local/sync.go index c6cee8c61..5702d2f36 100644 --- a/agent/proxycfg-sources/local/sync.go +++ b/agent/proxycfg-sources/local/sync.go @@ -2,6 +2,7 @@ package local import ( "context" + "time" "github.com/hashicorp/go-hclog" @@ -11,6 +12,8 @@ import ( "github.com/hashicorp/consul/agent/token" ) +const resyncFrequency = 30 * time.Second + const source proxycfg.ProxySource = "local" // SyncConfig contains the dependencies required by Sync. @@ -30,6 +33,10 @@ type SyncConfig struct { // Logger will be used to write log messages. Logger hclog.Logger + + // ResyncFrequency is how often to do a resync and recreate any terminated + // watches. + ResyncFrequency time.Duration } // Sync watches the agent's local state and registers/deregisters services with @@ -50,12 +57,19 @@ func Sync(ctx context.Context, cfg SyncConfig) { cfg.State.Notify(stateCh) defer cfg.State.StopNotify(stateCh) + var resyncCh <-chan time.Time for { sync(cfg) + if resyncCh == nil && cfg.ResyncFrequency > 0 { + resyncCh = time.After(cfg.ResyncFrequency) + } + select { case <-stateCh: // Wait for a state change. + case <-resyncCh: + resyncCh = nil case <-ctx.Done(): return } diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index c58268e7e..d21ff4f1e 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -158,7 +158,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error { state, ok := m.proxies[id] - if ok { + if ok && !state.stoppedRunning() { if state.source != source && !overwrite { // Registered by a different source, leave as-is. return nil diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index d312c3b4c..2347b04cc 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -83,10 +83,20 @@ type state struct { ch chan UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot + doneCh chan struct{} rateLimiter *rate.Limiter } +func (s *state) stoppedRunning() bool { + select { + case <-s.doneCh: + return true + default: + return false + } +} + // failed returns whether run exited because a data source is in an // irrecoverable state. func (s *state) failed() bool { @@ -182,6 +192,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str ch: ch, snapCh: make(chan ConfigSnapshot, 1), reqCh: make(chan chan *ConfigSnapshot, 1), + doneCh: make(chan struct{}), rateLimiter: rateLimiter, }, nil } @@ -265,6 +276,9 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) { // Close discards the state and stops any long-running watches. func (s *state) Close(failed bool) error { + if s.stoppedRunning() { + return nil + } if s.cancel != nil { s.cancel() } @@ -314,6 +328,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { } func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) { + // Closing the done channel signals that this entire state is no longer + // going to be updated. + defer close(s.doneCh) // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it @@ -429,9 +446,20 @@ func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) { func (s *state) CurrentSnapshot() *ConfigSnapshot { // Make a chan for the response to be sent on ch := make(chan *ConfigSnapshot, 1) - s.reqCh <- ch + + select { + case <-s.doneCh: + return nil + case s.reqCh <- ch: + } + // Wait for the response - return <-ch + select { + case <-s.doneCh: + return nil + case resp := <-ch: + return resp + } } // Changed returns whether or not the passed NodeService has had any of the diff --git a/agent/proxycfg_test.go b/agent/proxycfg_test.go new file mode 100644 index 000000000..18a5c5862 --- /dev/null +++ b/agent/proxycfg_test.go @@ -0,0 +1,138 @@ +package agent + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testrpc" +) + +func TestAgent_local_proxycfg(t *testing.T) { + a := NewTestAgent(t, TestACLConfig()) + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + token := generateUUID() + + svc := &structs.NodeService{ + ID: "db", + Service: "db", + Port: 5000, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + } + require.NoError(t, a.State.AddServiceWithChecks(svc, nil, token, true)) + + proxy := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "db-sidecar-proxy", + Service: "db-sidecar-proxy", + Port: 5000, + // Set this internal state that we expect sidecar registrations to have. + LocallyRegisteredAsSidecar: true, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "db", + Upstreams: structs.TestUpstreams(t), + }, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + } + require.NoError(t, a.State.AddServiceWithChecks(proxy, nil, token, true)) + + // This is a little gross, but this gives us the layered pair of + // local/catalog sources for now. + cfg := a.xdsServer.CfgSrc + + var ( + timer = time.After(100 * time.Millisecond) + timerFired = false + finalTimer <-chan time.Time + ) + + var ( + firstTime = true + ch <-chan *proxycfg.ConfigSnapshot + stc limiter.SessionTerminatedChan + cancel proxycfg.CancelFunc + ) + defer func() { + if cancel != nil { + cancel() + } + }() + for { + if ch == nil { + // Sign up for a stream of config snapshots, in the same manner as the xds server. + sid := proxy.CompoundServiceID() + + if firstTime { + firstTime = false + } else { + t.Logf("re-creating watch") + } + + // Prior to fixes in https://github.com/hashicorp/consul/pull/16497 + // this call to Watch() would deadlock. + var err error + ch, stc, cancel, err = cfg.Watch(sid, a.config.NodeName, token) + require.NoError(t, err) + } + select { + case <-stc: + t.Fatal("session unexpectedly terminated") + case snap, ok := <-ch: + if !ok { + t.Logf("channel is closed") + cancel() + ch, stc, cancel = nil, nil, nil + continue + } + require.NotNil(t, snap) + if !timerFired { + t.Fatal("should not have gotten snapshot until after we manifested the token") + } + return + case <-timer: + timerFired = true + finalTimer = time.After(1 * time.Second) + + // This simulates the eventual consistency of a token + // showing up on a server after it's creation by + // pre-creating the UUID and later using that as the + // initial SecretID for a real token. + gotToken := testWriteToken(t, a, &api.ACLToken{ + AccessorID: generateUUID(), + SecretID: token, + Description: "my token", + ServiceIdentities: []*api.ACLServiceIdentity{{ + ServiceName: "db", + }}, + }) + require.Equal(t, token, gotToken) + case <-finalTimer: + t.Fatal("did not receive a snapshot after the token manifested") + } + } + +} + +func testWriteToken(t *testing.T, a *TestAgent, tok *api.ACLToken) string { + req, _ := http.NewRequest("PUT", "/v1/acl/token", jsonReader(tok)) + req.Header.Add("X-Consul-Token", "root") + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + dec := json.NewDecoder(resp.Body) + aclResp := &structs.ACLToken{} + require.NoError(t, dec.Decode(aclResp)) + return aclResp.SecretID +} diff --git a/agent/testagent.go b/agent/testagent.go index 54db5c72b..76d82a2f8 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -214,6 +214,9 @@ func (a *TestAgent) Start(t *testing.T) error { // Lower the maximum backoff period of a cache refresh just for // tests see #14956 for more. result.RuntimeConfig.Cache.CacheRefreshMaxWait = 1 * time.Second + + // Lower the resync interval for tests. + result.RuntimeConfig.LocalProxyConfigResyncInterval = 250 * time.Millisecond } return result, err }