reset `coalesceTimer` to nil as soon as the event is consumed (#11924)

* reset `coalesceTimer` to nil as soon as the event is consumed

* add change log

* refactor to add relevant test.

* fix linter

* Apply suggestions from code review

Co-authored-by: Freddy <freddygv@users.noreply.github.com>

* remove non needed check

Co-authored-by: Freddy <freddygv@users.noreply.github.com>
This commit is contained in:
Dhia Ayachi 2022-01-05 12:17:47 -05:00 committed by GitHub
parent dc18933cc2
commit 5f6bf369af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 160 additions and 19 deletions

3
.changelog/11924.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: fix a deadlock when the snapshot channel already have a snapshot to be consumed.
```

View File

@ -127,7 +127,7 @@ func (m *Manager) Run() error {
defer m.State.StopNotify(stateCh) defer m.State.StopNotify(stateCh)
for { for {
m.syncState() m.syncState(m.notifyBroadcast)
// Wait for a state change // Wait for a state change
_, ok := <-stateCh _, ok := <-stateCh
@ -140,7 +140,7 @@ func (m *Manager) Run() error {
// syncState is called whenever the local state notifies a change. It holds the // syncState is called whenever the local state notifies a change. It holds the
// lock while finding any new or updated proxies and removing deleted ones. // lock while finding any new or updated proxies and removing deleted ones.
func (m *Manager) syncState() { func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -160,7 +160,7 @@ func (m *Manager) syncState() {
// know that so we'd need to set it here if not during registration of the // know that so we'd need to set it here if not during registration of the
// proxy service. Sidecar Service in the interim can do that, but we should // proxy service. Sidecar Service in the interim can do that, but we should
// validate more generally that that is always true. // validate more generally that that is always true.
err := m.ensureProxyServiceLocked(svc) err := m.ensureProxyServiceLocked(svc, notifyBroadcast)
if err != nil { if err != nil {
m.Logger.Error("failed to watch proxy service", m.Logger.Error("failed to watch proxy service",
"service", sid.String(), "service", sid.String(),
@ -179,7 +179,7 @@ func (m *Manager) syncState() {
} }
// ensureProxyServiceLocked adds or changes the proxy to our state. // ensureProxyServiceLocked adds or changes the proxy to our state.
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error { func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error {
sid := ns.CompoundServiceID() sid := ns.CompoundServiceID()
// Retrieve the token used to register the service, or fallback to the // Retrieve the token used to register the service, or fallback to the
@ -227,14 +227,16 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error {
m.proxies[sid] = state m.proxies[sid] = state
// Start a goroutine that will wait for changes and broadcast them to watchers. // Start a goroutine that will wait for changes and broadcast them to watchers.
go func(ch <-chan ConfigSnapshot) { go notifyBroadcast(ch)
return nil
}
func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) {
// Run until ch is closed // Run until ch is closed
for snap := range ch { for snap := range ch {
m.notify(&snap) m.notify(&snap)
} }
}(ch)
return nil
} }
// removeProxyService is called when a service deregisters and frees all // removeProxyService is called when a service deregisters and frees all

View File

@ -598,7 +598,146 @@ func TestManager_SyncState_DefaultToken(t *testing.T) {
err = state.AddServiceWithChecks(srv, nil, "") err = state.AddServiceWithChecks(srv, nil, "")
require.NoError(t, err) require.NoError(t, err)
m.syncState() m.syncState(m.notifyBroadcast)
require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].serviceInstance.token) require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].serviceInstance.token)
} }
func TestManager_SyncState_No_Notify(t *testing.T) {
types := NewTestCacheTypes(t)
c := TestCacheWithTypes(t, types)
logger := testutil.Logger(t)
tokens := new(token.Store)
tokens.UpdateUserToken("default-token", token.TokenSourceConfig)
state := local.NewState(local.Config{}, logger, tokens)
state.TriggerSyncChanges = func() {}
m, err := NewManager(ManagerConfig{
Cache: c,
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName},
State: state,
Tokens: tokens,
Source: &structs.QuerySource{Datacenter: "dc1"},
Logger: logger,
})
require.NoError(t, err)
defer m.Close()
srv := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 9999,
Meta: map[string]string{},
Proxy: structs.ConnectProxyConfig{
DestinationServiceID: "web",
DestinationServiceName: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8080,
Config: map[string]interface{}{
"foo": "bar",
},
},
}
err = state.AddServiceWithChecks(srv, nil, "")
require.NoError(t, err)
readEvent := make(chan bool, 1)
snapSent := make(chan bool, 1)
m.syncState(func(ch <-chan ConfigSnapshot) {
for {
<-readEvent
snap := <-ch
m.notify(&snap)
snapSent <- true
}
})
// Get the relevant notification Channel, should only have 1
notifyCH := m.proxies[srv.CompoundServiceID()].ch
// update the leaf certs
roots, issuedCert := TestCerts(t)
notifyCH <- cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
}
// at this point the snapshot should not be valid and not be sent
after := time.After(200 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be valid")
case <-after:
}
// update the root certs
notifyCH <- cache.UpdateEvent{
CorrelationID: rootsWatchID,
Result: roots,
Err: nil,
}
// at this point the snapshot should not be valid and not be sent
after = time.After(200 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be valid")
case <-after:
}
// prepare to read a snapshot update as the next update should make the snapshot valid
readEvent <- true
// update the intentions
notifyCH <- cache.UpdateEvent{
CorrelationID: intentionsWatchID,
Result: &structs.IndexedIntentionMatches{},
Err: nil,
}
// at this point we have a valid snapshot
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
case <-after:
t.Fatal("snap should be valid")
}
// send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case
for i := 0; i < 2; i++ {
time.Sleep(250 * time.Millisecond)
notifyCH <- cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
}
}
// make sure that we are not receiving any snapshot and wait for the snapshots to be processed
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
t.Fatal("snap should not be sent")
case <-after:
}
// now make sure that both snapshots got propagated
for i := 0; i < 2; i++ {
readEvent <- true
after = time.After(500 * time.Millisecond)
select {
case <-snapSent:
case <-after:
t.Fatal("snap should be valid")
}
}
}

View File

@ -294,6 +294,8 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
} }
case <-sendCh: case <-sendCh:
// Allow the next change to trigger a send
coalesceTimer = nil
// Make a deep copy of snap so we don't mutate any of the embedded structs // Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates. // etc on future updates.
snapCopy, err := snap.Clone() snapCopy, err := snap.Clone()
@ -307,9 +309,6 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
case s.snapCh <- *snapCopy: case s.snapCh <- *snapCopy:
s.logger.Trace("Delivered new snapshot to proxy config watchers") s.logger.Trace("Delivered new snapshot to proxy config watchers")
// Allow the next change to trigger a send
coalesceTimer = nil
// Skip rest of loop - there is nothing to send since nothing changed on // Skip rest of loop - there is nothing to send since nothing changed on
// this iteration // this iteration
continue continue
@ -320,11 +319,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")
// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
if coalesceTimer == nil {
coalesceTimer = time.AfterFunc(coalesceTimeout, func() { coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{} sendCh <- struct{}{}
}) })
}
// Do not reset coalesceTimer since we just queued a timer-based refresh // Do not reset coalesceTimer since we just queued a timer-based refresh
continue continue