consul: avoid extra sync operations when no action required
This PR makes it so the Consul sync logic will ignore operations that do not specify an action to take (i.e. [de-]register [services|checks]). Ideally such noops would be discarded at the callsites (i.e. users of [Create|Update|Remove]Workload], but we can also be defensive at the commit point. Also adds 2 trace logging statements which are helpful for diagnosing sync operations with Consul - when they happen and why. Fixes #10797
This commit is contained in:
parent
dc3b13548b
commit
56a6a1b1df
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
consul: avoid extra sync operations when no action required
|
||||||
|
```
|
|
@ -348,6 +348,27 @@ type operations struct {
|
||||||
deregChecks []string
|
deregChecks []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *operations) empty() bool {
|
||||||
|
switch {
|
||||||
|
case o == nil:
|
||||||
|
return true
|
||||||
|
case len(o.regServices) > 0:
|
||||||
|
return false
|
||||||
|
case len(o.regChecks) > 0:
|
||||||
|
return false
|
||||||
|
case len(o.deregServices) > 0:
|
||||||
|
return false
|
||||||
|
case len(o.deregChecks) > 0:
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o operations) String() string {
|
||||||
|
return fmt.Sprintf("<%d, %d, %d, %d>", len(o.regServices), len(o.regChecks), len(o.deregServices), len(o.deregChecks))
|
||||||
|
}
|
||||||
|
|
||||||
// AllocRegistration holds the status of services registered for a particular
|
// AllocRegistration holds the status of services registered for a particular
|
||||||
// allocations by task.
|
// allocations by task.
|
||||||
type AllocRegistration struct {
|
type AllocRegistration struct {
|
||||||
|
@ -560,11 +581,24 @@ func (c *ServiceClient) hasSeen() bool {
|
||||||
type syncReason byte
|
type syncReason byte
|
||||||
|
|
||||||
const (
|
const (
|
||||||
syncPeriodic = iota
|
syncPeriodic syncReason = iota
|
||||||
syncShutdown
|
syncShutdown
|
||||||
syncNewOps
|
syncNewOps
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (sr syncReason) String() string {
|
||||||
|
switch sr {
|
||||||
|
case syncPeriodic:
|
||||||
|
return "periodic"
|
||||||
|
case syncShutdown:
|
||||||
|
return "shutdown"
|
||||||
|
case syncNewOps:
|
||||||
|
return "operations"
|
||||||
|
default:
|
||||||
|
return "unexpected"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run the Consul main loop which retries operations against Consul. It should
|
// Run the Consul main loop which retries operations against Consul. It should
|
||||||
// be called exactly once.
|
// be called exactly once.
|
||||||
func (c *ServiceClient) Run() {
|
func (c *ServiceClient) Run() {
|
||||||
|
@ -680,6 +714,24 @@ INIT:
|
||||||
|
|
||||||
// commit operations unless already shutting down.
|
// commit operations unless already shutting down.
|
||||||
func (c *ServiceClient) commit(ops *operations) {
|
func (c *ServiceClient) commit(ops *operations) {
|
||||||
|
c.logger.Trace("commit sync operations", "ops", ops)
|
||||||
|
|
||||||
|
// Ignore empty operations - ideally callers will optimize out syncs with
|
||||||
|
// nothing to do, but be defensive anyway. Sending an empty ops on the chan
|
||||||
|
// will trigger an unnecessary sync with Consul.
|
||||||
|
if ops.empty() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prioritize doing nothing if we are being signaled to shutdown.
|
||||||
|
select {
|
||||||
|
case <-c.shutdownCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the ops down the ops chan, triggering a sync with Consul. Unless we
|
||||||
|
// receive a signal to shutdown.
|
||||||
select {
|
select {
|
||||||
case c.opCh <- ops:
|
case c.opCh <- ops:
|
||||||
case <-c.shutdownCh:
|
case <-c.shutdownCh:
|
||||||
|
@ -713,6 +765,8 @@ func (c *ServiceClient) merge(ops *operations) {
|
||||||
|
|
||||||
// sync enqueued operations.
|
// sync enqueued operations.
|
||||||
func (c *ServiceClient) sync(reason syncReason) error {
|
func (c *ServiceClient) sync(reason syncReason) error {
|
||||||
|
c.logger.Trace("execute sync", "reason", reason)
|
||||||
|
|
||||||
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
|
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -599,3 +600,27 @@ func TestSyncLogic_proxyUpstreamsDifferent(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncReason_String(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require.Equal(t, "periodic", fmt.Sprintf("%s", syncPeriodic))
|
||||||
|
require.Equal(t, "shutdown", fmt.Sprintf("%s", syncShutdown))
|
||||||
|
require.Equal(t, "operations", fmt.Sprintf("%s", syncNewOps))
|
||||||
|
require.Equal(t, "unexpected", fmt.Sprintf("%s", syncReason(128)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncOps_empty(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
try := func(ops *operations, exp bool) {
|
||||||
|
require.Equal(t, exp, ops.empty())
|
||||||
|
}
|
||||||
|
|
||||||
|
try(&operations{regServices: make([]*api.AgentServiceRegistration, 1)}, false)
|
||||||
|
try(&operations{regChecks: make([]*api.AgentCheckRegistration, 1)}, false)
|
||||||
|
try(&operations{deregServices: make([]string, 1)}, false)
|
||||||
|
try(&operations{deregChecks: make([]string, 1)}, false)
|
||||||
|
try(&operations{}, true)
|
||||||
|
try(nil, true)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue