leader: move the virtual IP version check into a goroutine
This commit is contained in:
parent
336a234927
commit
2a52630067
|
@ -55,8 +55,6 @@ var (
|
|||
// minCentralizedConfigVersion is the minimum Consul version in which centralized
|
||||
// config is supported
|
||||
minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0"))
|
||||
|
||||
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
|
||||
)
|
||||
|
||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||
|
@ -188,10 +186,6 @@ RECONCILE:
|
|||
s.logger.Error("failed to reconcile", "error", err)
|
||||
goto WAIT
|
||||
}
|
||||
if err := s.setVirtualIPFlag(); err != nil {
|
||||
s.logger.Error("failed to set virtual IP flag", "error", err)
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
// Initial reconcile worked, now we can process the channel
|
||||
// updates
|
||||
|
@ -219,7 +213,6 @@ WAIT:
|
|||
goto RECONCILE
|
||||
case member := <-reconcileCh:
|
||||
s.reconcileMember(member)
|
||||
s.setVirtualIPFlag()
|
||||
case index := <-s.tombstoneGC.ExpireCh():
|
||||
go s.reapTombstones(index)
|
||||
case errCh := <-s.reassertLeaderCh:
|
||||
|
@ -322,10 +315,6 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := s.setVirtualIPFlag(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.setConsistentReadReady()
|
||||
|
||||
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
|
||||
|
@ -892,25 +881,6 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) setVirtualIPFlag() error {
|
||||
// Return early if the flag is already set.
|
||||
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if val != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
|
||||
s.logger.Warn(fmt.Sprintf("can't allocate Virtual IPs until all servers >= %s",
|
||||
minVirtualIPVersion.String()))
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true")
|
||||
}
|
||||
|
||||
// reconcileReaped is used to reconcile nodes that have failed and been reaped
|
||||
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
|
||||
// We generate a "reap" event to cause the node to be cleaned up.
|
||||
|
|
|
@ -2,12 +2,14 @@ package consul
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-version"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -24,6 +26,14 @@ var (
|
|||
// maxRetryBackoff is the maximum number of seconds to wait between failed blocking
|
||||
// queries when backing off.
|
||||
maxRetryBackoff = 256
|
||||
|
||||
// minVirtualIPVersion is the minimum version for all Consul servers for virtual IP
|
||||
// assignment to be enabled.
|
||||
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
|
||||
|
||||
// virtualIPVersionCheckInterval is the frequency we check whether all servers meet
|
||||
// the minimum version to enable virtual IP assignment for services.
|
||||
virtualIPVersionCheckInterval = time.Minute
|
||||
)
|
||||
|
||||
// startConnectLeader starts multi-dc connect leader routines.
|
||||
|
@ -36,6 +46,7 @@ func (s *Server) startConnectLeader(ctx context.Context) error {
|
|||
s.leaderRoutineManager.Start(ctx, caRootPruningRoutineName, s.runCARootPruning)
|
||||
s.leaderRoutineManager.Start(ctx, caRootMetricRoutineName, rootCAExpiryMonitor(s).Monitor)
|
||||
s.leaderRoutineManager.Start(ctx, caSigningMetricRoutineName, signingCAExpiryMonitor(s).Monitor)
|
||||
s.leaderRoutineManager.Start(ctx, virtualIPCheckRoutineName, s.runVirtualIPVersionCheck)
|
||||
|
||||
return s.startIntentionConfigEntryMigration(ctx)
|
||||
}
|
||||
|
@ -47,6 +58,7 @@ func (s *Server) stopConnectLeader() {
|
|||
s.leaderRoutineManager.Stop(caRootPruningRoutineName)
|
||||
s.leaderRoutineManager.Stop(caRootMetricRoutineName)
|
||||
s.leaderRoutineManager.Stop(caSigningMetricRoutineName)
|
||||
s.leaderRoutineManager.Stop(virtualIPCheckRoutineName)
|
||||
}
|
||||
|
||||
func (s *Server) runCARootPruning(ctx context.Context) error {
|
||||
|
@ -111,6 +123,54 @@ func (s *Server) pruneCARoots() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error {
|
||||
// Return early if the flag is already set.
|
||||
_, err := s.setVirtualIPVersionFlag()
|
||||
if err != nil {
|
||||
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(virtualIPVersionCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
ok, err := s.setVirtualIPVersionFlag()
|
||||
if err != nil {
|
||||
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
|
||||
continue
|
||||
}
|
||||
if ok {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) setVirtualIPVersionFlag() (bool, error) {
|
||||
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if val != "" {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
|
||||
return false, fmt.Errorf("can't allocate Virtual IPs until all servers >= %s",
|
||||
minVirtualIPVersion.String())
|
||||
}
|
||||
|
||||
if err := s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
|
||||
// upon errors up to a maximum of maxRetryBackoff seconds.
|
||||
func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) {
|
||||
|
|
|
@ -2126,6 +2126,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
|
|||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
virtualIPVersionCheckInterval = 50 * time.Millisecond
|
||||
|
||||
conf := func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
|
|
|
@ -116,6 +116,7 @@ const (
|
|||
secondaryCARootWatchRoutineName = "secondary CA roots watch"
|
||||
intermediateCertRenewWatchRoutineName = "intermediate cert renew watch"
|
||||
backgroundCAInitializationRoutineName = "CA initialization"
|
||||
virtualIPCheckRoutineName = "virtual IP version check"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
Loading…
Reference in New Issue