From 2a52630067043292cb3481ad316438cc00cb6fe1 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 9 Dec 2021 16:40:26 -0800 Subject: [PATCH 1/2] leader: move the virtual IP version check into a goroutine --- agent/consul/leader.go | 30 ----------------- agent/consul/leader_connect.go | 60 ++++++++++++++++++++++++++++++++++ agent/consul/leader_test.go | 2 ++ agent/consul/server.go | 1 + 4 files changed, 63 insertions(+), 30 deletions(-) diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 2861c6cbe..8ee6f5a54 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -55,8 +55,6 @@ var ( // minCentralizedConfigVersion is the minimum Consul version in which centralized // config is supported minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0")) - - minVirtualIPVersion = version.Must(version.NewVersion("1.11.0")) ) // monitorLeadership is used to monitor if we acquire or lose our role @@ -188,10 +186,6 @@ RECONCILE: s.logger.Error("failed to reconcile", "error", err) goto WAIT } - if err := s.setVirtualIPFlag(); err != nil { - s.logger.Error("failed to set virtual IP flag", "error", err) - goto WAIT - } // Initial reconcile worked, now we can process the channel // updates @@ -219,7 +213,6 @@ WAIT: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) - s.setVirtualIPFlag() case index := <-s.tombstoneGC.ExpireCh(): go s.reapTombstones(index) case errCh := <-s.reassertLeaderCh: @@ -322,10 +315,6 @@ func (s *Server) establishLeadership(ctx context.Context) error { return err } - if err := s.setVirtualIPFlag(); err != nil { - return err - } - s.setConsistentReadReady() s.logger.Debug("successfully established leadership", "duration", time.Since(start)) @@ -892,25 +881,6 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error { return nil } -func (s *Server) setVirtualIPFlag() error { - // Return early if the flag is already set. - val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled) - if err != nil { - return err - } - if val != "" { - return nil - } - - if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok { - s.logger.Warn(fmt.Sprintf("can't allocate Virtual IPs until all servers >= %s", - minVirtualIPVersion.String())) - return nil - } - - return s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true") -} - // reconcileReaped is used to reconcile nodes that have failed and been reaped // from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered. // We generate a "reap" event to cause the node to be cleaned up. diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index a90194ec5..c2209b5da 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -2,12 +2,14 @@ package consul import ( "context" + "fmt" "time" "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/go-version" ) const ( @@ -24,6 +26,14 @@ var ( // maxRetryBackoff is the maximum number of seconds to wait between failed blocking // queries when backing off. maxRetryBackoff = 256 + + // minVirtualIPVersion is the minimum version for all Consul servers for virtual IP + // assignment to be enabled. + minVirtualIPVersion = version.Must(version.NewVersion("1.11.0")) + + // virtualIPVersionCheckInterval is the frequency we check whether all servers meet + // the minimum version to enable virtual IP assignment for services. + virtualIPVersionCheckInterval = time.Minute ) // startConnectLeader starts multi-dc connect leader routines. @@ -36,6 +46,7 @@ func (s *Server) startConnectLeader(ctx context.Context) error { s.leaderRoutineManager.Start(ctx, caRootPruningRoutineName, s.runCARootPruning) s.leaderRoutineManager.Start(ctx, caRootMetricRoutineName, rootCAExpiryMonitor(s).Monitor) s.leaderRoutineManager.Start(ctx, caSigningMetricRoutineName, signingCAExpiryMonitor(s).Monitor) + s.leaderRoutineManager.Start(ctx, virtualIPCheckRoutineName, s.runVirtualIPVersionCheck) return s.startIntentionConfigEntryMigration(ctx) } @@ -47,6 +58,7 @@ func (s *Server) stopConnectLeader() { s.leaderRoutineManager.Stop(caRootPruningRoutineName) s.leaderRoutineManager.Stop(caRootMetricRoutineName) s.leaderRoutineManager.Stop(caSigningMetricRoutineName) + s.leaderRoutineManager.Stop(virtualIPCheckRoutineName) } func (s *Server) runCARootPruning(ctx context.Context) error { @@ -111,6 +123,54 @@ func (s *Server) pruneCARoots() error { return err } +func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error { + // Return early if the flag is already set. + _, err := s.setVirtualIPVersionFlag() + if err != nil { + s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err) + } + + ticker := time.NewTicker(virtualIPVersionCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + ok, err := s.setVirtualIPVersionFlag() + if err != nil { + s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err) + continue + } + if ok { + return nil + } + } + } +} + +func (s *Server) setVirtualIPVersionFlag() (bool, error) { + val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled) + if err != nil { + return false, err + } + if val != "" { + return true, nil + } + + if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok { + return false, fmt.Errorf("can't allocate Virtual IPs until all servers >= %s", + minVirtualIPVersion.String()) + } + + if err := s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil { + return false, nil + } + + return true, nil +} + // retryLoopBackoff loops a given function indefinitely, backing off exponentially // upon errors up to a maximum of maxRetryBackoff seconds. func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) { diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index dd748b370..af008358f 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -2126,6 +2126,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { t.Skip("too slow for testing.Short") } + virtualIPVersionCheckInterval = 50 * time.Millisecond + conf := func(c *Config) { c.Bootstrap = false c.BootstrapExpect = 3 diff --git a/agent/consul/server.go b/agent/consul/server.go index 57ac4eb62..91082a7d6 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -116,6 +116,7 @@ const ( secondaryCARootWatchRoutineName = "secondary CA roots watch" intermediateCertRenewWatchRoutineName = "intermediate cert renew watch" backgroundCAInitializationRoutineName = "CA initialization" + virtualIPCheckRoutineName = "virtual IP version check" ) var ( From ccc119c54907282ae49baf0ec4d00f36ab9d8359 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 10 Dec 2021 13:58:43 -0800 Subject: [PATCH 2/2] Exit before starting the vip check routine if possible --- agent/consul/leader_connect.go | 10 +++++++--- agent/consul/leader_test.go | 2 ++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index c2209b5da..2f796c691 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -125,10 +125,14 @@ func (s *Server) pruneCARoots() error { func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error { // Return early if the flag is already set. - _, err := s.setVirtualIPVersionFlag() + done, err := s.setVirtualIPVersionFlag() if err != nil { s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err) } + if done { + s.leaderRoutineManager.Stop(virtualIPCheckRoutineName) + return nil + } ticker := time.NewTicker(virtualIPVersionCheckInterval) defer ticker.Stop() @@ -138,12 +142,12 @@ func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: - ok, err := s.setVirtualIPVersionFlag() + done, err := s.setVirtualIPVersionFlag() if err != nil { s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err) continue } - if ok { + if done { return nil } } diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index af008358f..63b99b3a9 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -2126,7 +2126,9 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { t.Skip("too slow for testing.Short") } + orig := virtualIPVersionCheckInterval virtualIPVersionCheckInterval = 50 * time.Millisecond + t.Cleanup(func() { virtualIPVersionCheckInterval = orig }) conf := func(c *Config) { c.Bootstrap = false