diff --git a/.changelog/11924.txt b/.changelog/11924.txt new file mode 100644 index 000000000..d76445017 --- /dev/null +++ b/.changelog/11924.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix a deadlock when the snapshot channel already have a snapshot to be consumed. +``` diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 083291c13..d5d102b1e 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -127,7 +127,7 @@ func (m *Manager) Run() error { defer m.State.StopNotify(stateCh) for { - m.syncState() + m.syncState(m.notifyBroadcast) // Wait for a state change _, ok := <-stateCh @@ -140,7 +140,7 @@ func (m *Manager) Run() error { // syncState is called whenever the local state notifies a change. It holds the // lock while finding any new or updated proxies and removing deleted ones. -func (m *Manager) syncState() { +func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) { m.mu.Lock() defer m.mu.Unlock() @@ -160,7 +160,7 @@ func (m *Manager) syncState() { // know that so we'd need to set it here if not during registration of the // proxy service. Sidecar Service in the interim can do that, but we should // validate more generally that that is always true. - err := m.ensureProxyServiceLocked(svc) + err := m.ensureProxyServiceLocked(svc, notifyBroadcast) if err != nil { m.Logger.Error("failed to watch proxy service", "service", sid.String(), @@ -179,7 +179,7 @@ func (m *Manager) syncState() { } // ensureProxyServiceLocked adds or changes the proxy to our state. -func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error { +func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error { sid := ns.CompoundServiceID() // Retrieve the token used to register the service, or fallback to the @@ -227,16 +227,18 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error { m.proxies[sid] = state // Start a goroutine that will wait for changes and broadcast them to watchers. - go func(ch <-chan ConfigSnapshot) { - // Run until ch is closed - for snap := range ch { - m.notify(&snap) - } - }(ch) + go notifyBroadcast(ch) return nil } +func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) { + // Run until ch is closed + for snap := range ch { + m.notify(&snap) + } +} + // removeProxyService is called when a service deregisters and frees all // resources for that service. func (m *Manager) removeProxyServiceLocked(proxyID structs.ServiceID) { diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 9e659b5f0..cdb271ee3 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -598,7 +598,146 @@ func TestManager_SyncState_DefaultToken(t *testing.T) { err = state.AddServiceWithChecks(srv, nil, "") require.NoError(t, err) - m.syncState() + m.syncState(m.notifyBroadcast) require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].serviceInstance.token) } + +func TestManager_SyncState_No_Notify(t *testing.T) { + types := NewTestCacheTypes(t) + c := TestCacheWithTypes(t, types) + logger := testutil.Logger(t) + tokens := new(token.Store) + tokens.UpdateUserToken("default-token", token.TokenSourceConfig) + + state := local.NewState(local.Config{}, logger, tokens) + state.TriggerSyncChanges = func() {} + + m, err := NewManager(ManagerConfig{ + Cache: c, + Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + State: state, + Tokens: tokens, + Source: &structs.QuerySource{Datacenter: "dc1"}, + Logger: logger, + }) + require.NoError(t, err) + defer m.Close() + + srv := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 9999, + Meta: map[string]string{}, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceID: "web", + DestinationServiceName: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8080, + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + } + + err = state.AddServiceWithChecks(srv, nil, "") + require.NoError(t, err) + + readEvent := make(chan bool, 1) + snapSent := make(chan bool, 1) + + m.syncState(func(ch <-chan ConfigSnapshot) { + for { + <-readEvent + snap := <-ch + m.notify(&snap) + snapSent <- true + } + }) + + // Get the relevant notification Channel, should only have 1 + notifyCH := m.proxies[srv.CompoundServiceID()].ch + + // update the leaf certs + roots, issuedCert := TestCerts(t) + notifyCH <- cache.UpdateEvent{ + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + } + // at this point the snapshot should not be valid and not be sent + after := time.After(200 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be valid") + case <-after: + + } + + // update the root certs + notifyCH <- cache.UpdateEvent{ + CorrelationID: rootsWatchID, + Result: roots, + Err: nil, + } + + // at this point the snapshot should not be valid and not be sent + after = time.After(200 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be valid") + case <-after: + + } + + // prepare to read a snapshot update as the next update should make the snapshot valid + readEvent <- true + + // update the intentions + notifyCH <- cache.UpdateEvent{ + CorrelationID: intentionsWatchID, + Result: &structs.IndexedIntentionMatches{}, + Err: nil, + } + + // at this point we have a valid snapshot + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + case <-after: + t.Fatal("snap should be valid") + + } + + // send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case + for i := 0; i < 2; i++ { + time.Sleep(250 * time.Millisecond) + notifyCH <- cache.UpdateEvent{ + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + } + } + + // make sure that we are not receiving any snapshot and wait for the snapshots to be processed + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be sent") + case <-after: + } + + // now make sure that both snapshots got propagated + for i := 0; i < 2; i++ { + + readEvent <- true + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + case <-after: + t.Fatal("snap should be valid") + + } + } +} diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 0b535d0e4..586bc3938 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -294,6 +294,8 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { } case <-sendCh: + // Allow the next change to trigger a send + coalesceTimer = nil // Make a deep copy of snap so we don't mutate any of the embedded structs // etc on future updates. snapCopy, err := snap.Clone() @@ -307,9 +309,6 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { case s.snapCh <- *snapCopy: s.logger.Trace("Delivered new snapshot to proxy config watchers") - // Allow the next change to trigger a send - coalesceTimer = nil - // Skip rest of loop - there is nothing to send since nothing changed on // this iteration continue @@ -320,11 +319,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. - if coalesceTimer == nil { - coalesceTimer = time.AfterFunc(coalesceTimeout, func() { - sendCh <- struct{}{} - }) - } + coalesceTimer = time.AfterFunc(coalesceTimeout, func() { + sendCh <- struct{}{} + }) // Do not reset coalesceTimer since we just queued a timer-based refresh continue