From 5e8254bedaf21b88e13b285281f1e652b6701f39 Mon Sep 17 00:00:00 2001 From: Yoan Blanc Date: Sat, 9 Apr 2022 13:22:44 +0200 Subject: [PATCH 1/2] feat: remove dependency to consul/lib Signed-off-by: Yoan Blanc --- client/allocwatcher/alloc_watcher.go | 6 ++--- client/client.go | 17 ++++++------ client/rpc.go | 6 ++--- client/servers/manager.go | 6 ++--- command/agent/agent.go | 6 ++--- command/agent/command.go | 3 +-- helper/cluster.go | 36 +++++++++++++++++++++++++ helper/eof.go | 39 ++++++++++++++++++++++++++++ helper/math.go | 16 ++++++++++++ helper/path.go | 15 +++++++++++ helper/pool/pool.go | 4 +-- nomad/blocked_evals.go | 3 +-- nomad/heartbeat.go | 6 ++--- nomad/rpc.go | 6 ++--- nomad/server.go | 15 +++++------ 15 files changed, 143 insertions(+), 41 deletions(-) create mode 100644 helper/cluster.go create mode 100644 helper/eof.go create mode 100644 helper/math.go create mode 100644 helper/path.go diff --git a/client/allocwatcher/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go index beed670b4..5cb1dd75f 100644 --- a/client/allocwatcher/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -11,12 +11,12 @@ import ( "syscall" "time" - "github.com/hashicorp/consul/lib" hclog "github.com/hashicorp/go-hclog" nomadapi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -392,7 +392,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp) if err != nil { p.logger.Error("error querying previous alloc", "error", err) - retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv) + retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) select { case <-time.After(retry): continue @@ -482,7 +482,7 @@ func (p *remotePrevAlloc) getNodeAddr(ctx context.Context, nodeID string) (strin err := p.rpc.RPC("Node.GetNode", &req, &resp) if err != nil { p.logger.Error("failed to query node", "error", err, "node", nodeID) - retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv) + retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) select { case <-time.After(retry): continue diff --git a/client/client.go b/client/client.go index acf905fcb..07beb2c11 100644 --- a/client/client.go +++ b/client/client.go @@ -17,7 +17,6 @@ import ( metrics "github.com/armon/go-metrics" consulapi "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib" hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" @@ -1595,7 +1594,7 @@ func (c *Client) retryIntv(base time.Duration) time.Duration { if c.config.DevMode { return devModeRetryIntv } - return base + lib.RandomStagger(base) + return base + helper.RandomStagger(base) } // registerAndHeartbeat is a long lived goroutine used to register the client @@ -1617,7 +1616,7 @@ func (c *Client) registerAndHeartbeat() { if c.config.DevMode { heartbeat = time.After(0) } else { - heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger)) + heartbeat = time.After(helper.RandomStagger(initialHeartbeatStagger)) } for { @@ -1634,7 +1633,7 @@ func (c *Client) registerAndHeartbeat() { // Re-register the node c.logger.Info("re-registering node") c.retryRegisterNode() - heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger)) + heartbeat = time.After(helper.RandomStagger(initialHeartbeatStagger)) } else { intv := c.getHeartbeatRetryIntv(err) c.logger.Error("error heartbeating. retrying", "error", err, "period", intv) @@ -1690,16 +1689,16 @@ func (c *Client) getHeartbeatRetryIntv(err error) time.Duration { // Make left the absolute value so we delay and jitter properly. left *= -1 case left < 0: - return time.Second + lib.RandomStagger(time.Second) + return time.Second + helper.RandomStagger(time.Second) default: } - stagger := lib.RandomStagger(left) + stagger := helper.RandomStagger(left) switch { case stagger < time.Second: - return time.Second + lib.RandomStagger(time.Second) + return time.Second + helper.RandomStagger(time.Second) case stagger > 30*time.Second: - return 25*time.Second + lib.RandomStagger(5*time.Second) + return 25*time.Second + helper.RandomStagger(5*time.Second) default: return stagger } @@ -2780,7 +2779,7 @@ func (c *Client) consulDiscoveryImpl() error { // datacenterQueryLimit, the next heartbeat will pick // a new set of servers so it's okay. shuffleStrings(dcs[1:]) - dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] + dcs = dcs[0:helper.MinInt(len(dcs), datacenterQueryLimit)] } // Query for servers in this client's region only diff --git a/client/rpc.go b/client/rpc.go index 2a94a1835..3dc33ef14 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -9,9 +9,9 @@ import ( "time" metrics "github.com/armon/go-metrics" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad/client/servers" + "github.com/hashicorp/nomad/helper" inmem "github.com/hashicorp/nomad/helper/codec" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/nomad/structs" @@ -110,7 +110,7 @@ TRY: // Wait to avoid thundering herd select { - case <-time.After(lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)): + case <-time.After(helper.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)): // If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline. if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 { newBlockTime := time.Until(deadline) @@ -139,7 +139,7 @@ func canRetry(args interface{}, err error) bool { // Reads are safe to retry for stream errors, such as if a server was // being shut down. info, ok := args.(structs.RPCInfo) - if ok && info.IsRead() && lib.IsErrEOF(err) { + if ok && info.IsRead() && helper.IsErrEOF(err) { return true } diff --git a/client/servers/manager.go b/client/servers/manager.go index 246133daa..d99df578e 100644 --- a/client/servers/manager.go +++ b/client/servers/manager.go @@ -11,8 +11,8 @@ import ( "sync" "time" - "github.com/hashicorp/consul/lib" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper" ) const ( @@ -345,8 +345,8 @@ func (m *Manager) refreshServerRebalanceTimer() time.Duration { // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer) - connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, clientRPCMinReuseDuration, int(m.numNodes)) - connRebalanceTimeout += lib.RandomStagger(connRebalanceTimeout) + connRebalanceTimeout := helper.RateScaledInterval(clusterWideRebalanceConnsPerSec, clientRPCMinReuseDuration, int(m.numNodes)) + connRebalanceTimeout += helper.RandomStagger(connRebalanceTimeout) m.rebalanceTimer.Reset(connRebalanceTimeout) return connRebalanceTimeout diff --git a/command/agent/agent.go b/command/agent/agent.go index 3bd6117d4..76472869b 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -16,7 +16,6 @@ import ( metrics "github.com/armon/go-metrics" consulapi "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib" log "github.com/hashicorp/go-hclog" uuidparse "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/client" @@ -25,6 +24,7 @@ import ( "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/command/agent/event" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/bufconndialer" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/uuid" @@ -845,7 +845,7 @@ func (a *Agent) setupNodeID(config *nomad.Config) error { return err } // Persist this configured nodeID to our data directory - if err := lib.EnsurePath(fileID, false); err != nil { + if err := helper.EnsurePath(fileID, false); err != nil { return err } if err := ioutil.WriteFile(fileID, []byte(config.NodeID), 0600); err != nil { @@ -857,7 +857,7 @@ func (a *Agent) setupNodeID(config *nomad.Config) error { // If we still don't have a valid node ID, make one. if config.NodeID == "" { id := uuid.Generate() - if err := lib.EnsurePath(fileID, false); err != nil { + if err := helper.EnsurePath(fileID, false); err != nil { return err } if err := ioutil.WriteFile(fileID, []byte(id), 0600); err != nil { diff --git a/command/agent/command.go b/command/agent/command.go index 8821bc815..74cbb7ef0 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -19,7 +19,6 @@ import ( "github.com/armon/go-metrics/circonus" "github.com/armon/go-metrics/datadog" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/lib" checkpoint "github.com/hashicorp/go-checkpoint" discover "github.com/hashicorp/go-discover" hclog "github.com/hashicorp/go-hclog" @@ -556,7 +555,7 @@ func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOu // Do an immediate check within the next 30 seconds go func() { - time.Sleep(lib.RandomStagger(30 * time.Second)) + time.Sleep(helper.RandomStagger(30 * time.Second)) c.checkpointResults(checkpoint.Check(updateParams)) }() } diff --git a/helper/cluster.go b/helper/cluster.go new file mode 100644 index 000000000..e3fda8b23 --- /dev/null +++ b/helper/cluster.go @@ -0,0 +1,36 @@ +// These functions are coming from consul/lib/cluster.go +package helper + +import ( + "math/rand" + "time" +) + +const ( + // minRate is the minimum rate at which we allow an action to be performed + // across the whole cluster. The value is once a day: 1 / (1 * time.Day) + minRate = 1.0 / 86400 +) + +// RandomStagger returns an interval between 0 and the duration +func RandomStagger(intv time.Duration) time.Duration { + if intv == 0 { + return 0 + } + return time.Duration(uint64(rand.Int63()) % uint64(intv)) +} + +// RateScaledInterval is used to choose an interval to perform an action in +// order to target an aggregate number of actions per second across the whole +// cluster. +func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration { + if rate <= minRate { + return min + } + interval := time.Duration(float64(time.Second) * float64(n) / rate) + if interval < min { + return min + } + + return interval +} diff --git a/helper/eof.go b/helper/eof.go new file mode 100644 index 000000000..5152fbdb3 --- /dev/null +++ b/helper/eof.go @@ -0,0 +1,39 @@ +// These functions are coming from consul/lib/eof.go +package helper + +import ( + "errors" + "fmt" + "io" + "net/rpc" + "strings" + + "github.com/hashicorp/yamux" +) + +var yamuxStreamClosed = yamux.ErrStreamClosed.Error() +var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error() + +// IsErrEOF returns true if we get an EOF error from the socket itself, or +// an EOF equivalent error from yamux. +func IsErrEOF(err error) bool { + if err == nil { + return false + } + if errors.Is(err, io.EOF) { + return true + } + + errStr := err.Error() + if strings.Contains(errStr, yamuxStreamClosed) || + strings.Contains(errStr, yamuxSessionShutdown) { + return true + } + + var serverError rpc.ServerError + if errors.As(err, &serverError) { + return strings.HasSuffix(err.Error(), fmt.Sprintf(": %s", io.EOF.Error())) + } + + return false +} diff --git a/helper/math.go b/helper/math.go new file mode 100644 index 000000000..b621b1bd6 --- /dev/null +++ b/helper/math.go @@ -0,0 +1,16 @@ +// These functions are coming from consul/lib/math.go +package helper + +func MaxInt(a, b int) int { + if a > b { + return a + } + return b +} + +func MinInt(a, b int) int { + if a > b { + return b + } + return a +} diff --git a/helper/path.go b/helper/path.go new file mode 100644 index 000000000..a4684a29b --- /dev/null +++ b/helper/path.go @@ -0,0 +1,15 @@ +// These functions are coming from consul/path.go +package helper + +import ( + "os" + "path/filepath" +) + +// EnsurePath is used to make sure a path exists +func EnsurePath(path string, dir bool) error { + if !dir { + path = filepath.Dir(path) + } + return os.MkdirAll(path, 0755) +} diff --git a/helper/pool/pool.go b/helper/pool/pool.go index d173b7c04..e554c93d4 100644 --- a/helper/pool/pool.go +++ b/helper/pool/pool.go @@ -11,9 +11,9 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/lib" hclog "github.com/hashicorp/go-hclog" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/yamux" @@ -491,7 +491,7 @@ func (p *ConnPool) RPC(region string, addr net.Addr, method string, args interfa // If we read EOF, the session is toast. Clear it and open a // new session next time // See https://github.com/hashicorp/consul/blob/v1.6.3/agent/pool/pool.go#L471-L477 - if lib.IsErrEOF(err) { + if helper.IsErrEOF(err) { p.clearConn(conn) } diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index aac55e978..26b0406aa 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -5,7 +5,6 @@ import ( "time" metrics "github.com/armon/go-metrics" - "github.com/hashicorp/consul/lib" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" @@ -529,7 +528,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) { // because any node could potentially be feasible. numEscaped := len(b.escaped) numQuotaLimit := 0 - unblocked := make(map[*structs.Evaluation]string, lib.MaxInt(numEscaped, 4)) + unblocked := make(map[*structs.Evaluation]string, helper.MaxInt(numEscaped, 4)) if numEscaped != 0 && computedClass != "" { for id, wrapped := range b.escaped { diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go index 995ca21f2..403bd83e2 100644 --- a/nomad/heartbeat.go +++ b/nomad/heartbeat.go @@ -9,7 +9,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/consul/lib" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -101,8 +101,8 @@ func (h *nodeHeartbeater) resetHeartbeatTimer(id string) (time.Duration, error) // Compute the target TTL value n := len(h.heartbeatTimers) - ttl := lib.RateScaledInterval(h.config.MaxHeartbeatsPerSecond, h.config.MinHeartbeatTTL, n) - ttl += lib.RandomStagger(ttl) + ttl := helper.RateScaledInterval(h.config.MaxHeartbeatsPerSecond, h.config.MinHeartbeatTTL, n) + ttl += helper.RandomStagger(ttl) // Reset the TTL h.resetHeartbeatTimerLocked(id, ttl+h.config.HeartbeatGrace) diff --git a/nomad/rpc.go b/nomad/rpc.go index 331e56631..0dcf434b3 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -15,11 +15,11 @@ import ( "time" metrics "github.com/armon/go-metrics" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-connlimit" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -595,7 +595,7 @@ CHECK_LEADER: firstCheck = time.Now() } if time.Since(firstCheck) < r.config.RPCHoldTimeout { - jitter := lib.RandomStagger(r.config.RPCHoldTimeout / structs.JitterFraction) + jitter := helper.RandomStagger(r.config.RPCHoldTimeout / structs.JitterFraction) select { case <-time.After(jitter): goto CHECK_LEADER @@ -818,7 +818,7 @@ func (r *rpcHandler) blockingRPC(opts *blockingOptions) error { opts.queryOpts.MaxQueryTime = opts.queryOpts.TimeToBlock() // Apply a small amount of jitter to the request - opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction) + opts.queryOpts.MaxQueryTime += helper.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction) // Setup a query timeout ctx, cancel = context.WithTimeout(context.Background(), opts.queryOpts.MaxQueryTime) diff --git a/nomad/server.go b/nomad/server.go index f09853f69..a89c7e662 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -22,7 +22,6 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/consul/autopilot" consulapi "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib" log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" @@ -885,7 +884,7 @@ func (s *Server) setupBootstrapHandler() error { // `bootstrap_expect`. raftPeers, err := s.numPeers() if err != nil { - peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor)) return nil } @@ -894,7 +893,7 @@ func (s *Server) setupBootstrapHandler() error { // Consul. Let the normal timeout-based strategy // take over. if raftPeers >= bootstrapExpect { - peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor)) return nil } } @@ -904,7 +903,7 @@ func (s *Server) setupBootstrapHandler() error { dcs, err := s.consulCatalog.Datacenters() if err != nil { - peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("server.nomad: unable to query Consul datacenters: %v", err) } if len(dcs) > 2 { @@ -914,7 +913,7 @@ func (s *Server) setupBootstrapHandler() error { // walk all datacenter until it finds enough hosts to // form a quorum. shuffleStrings(dcs[1:]) - dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] + dcs = dcs[0:helper.MinInt(len(dcs), datacenterQueryLimit)] } nomadServerServiceName := s.config.ConsulConfig.ServerServiceName @@ -953,13 +952,13 @@ func (s *Server) setupBootstrapHandler() error { if len(nomadServerServices) == 0 { if len(mErr.Errors) > 0 { - peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor)) return mErr.ErrorOrNil() } // Log the error and return nil so future handlers // can attempt to register the `nomad` service. - pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor) + pollInterval := peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor) s.logger.Trace("no Nomad Servers advertising Nomad service in Consul datacenters", "service_name", nomadServerServiceName, "datacenters", dcs, "retry", pollInterval) peersTimeout.Reset(pollInterval) return nil @@ -967,7 +966,7 @@ func (s *Server) setupBootstrapHandler() error { numServersContacted, err := s.Join(nomadServerServices) if err != nil { - peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) } From 3e79d58e4ad5703bf24061ecf5f4bdfdcc1607d1 Mon Sep 17 00:00:00 2001 From: Yoan Blanc Date: Mon, 11 Apr 2022 19:37:14 +0200 Subject: [PATCH 2/2] fix: use NewSafeTimer Signed-off-by: Yoan Blanc --- client/rpc.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/rpc.go b/client/rpc.go index 3dc33ef14..7d1dfb6e4 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -109,8 +109,11 @@ TRY: } // Wait to avoid thundering herd + timer, cancel := helper.NewSafeTimer(helper.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)) + defer cancel() + select { - case <-time.After(helper.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)): + case <-timer.C: // If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline. if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 { newBlockTime := time.Until(deadline)