Merge pull request #12532 from greut/feat/remove-consul-lib

feat: remove dependency to consul/lib
This commit is contained in:
Seth Hoenig 2022-04-11 13:52:05 -05:00 committed by GitHub
commit f59488bda6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 146 additions and 41 deletions

View file

@ -11,12 +11,12 @@ import (
"syscall"
"time"
"github.com/hashicorp/consul/lib"
hclog "github.com/hashicorp/go-hclog"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -392,7 +392,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp)
if err != nil {
p.logger.Error("error querying previous alloc", "error", err)
retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv)
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
select {
case <-time.After(retry):
continue
@ -482,7 +482,7 @@ func (p *remotePrevAlloc) getNodeAddr(ctx context.Context, nodeID string) (strin
err := p.rpc.RPC("Node.GetNode", &req, &resp)
if err != nil {
p.logger.Error("failed to query node", "error", err, "node", nodeID)
retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv)
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
select {
case <-time.After(retry):
continue

View file

@ -17,7 +17,6 @@ import (
metrics "github.com/armon/go-metrics"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
@ -1595,7 +1594,7 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
return devModeRetryIntv
}
return base + lib.RandomStagger(base)
return base + helper.RandomStagger(base)
}
// registerAndHeartbeat is a long lived goroutine used to register the client
@ -1617,7 +1616,7 @@ func (c *Client) registerAndHeartbeat() {
if c.config.DevMode {
heartbeat = time.After(0)
} else {
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
heartbeat = time.After(helper.RandomStagger(initialHeartbeatStagger))
}
for {
@ -1634,7 +1633,7 @@ func (c *Client) registerAndHeartbeat() {
// Re-register the node
c.logger.Info("re-registering node")
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
heartbeat = time.After(helper.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.getHeartbeatRetryIntv(err)
c.logger.Error("error heartbeating. retrying", "error", err, "period", intv)
@ -1690,16 +1689,16 @@ func (c *Client) getHeartbeatRetryIntv(err error) time.Duration {
// Make left the absolute value so we delay and jitter properly.
left *= -1
case left < 0:
return time.Second + lib.RandomStagger(time.Second)
return time.Second + helper.RandomStagger(time.Second)
default:
}
stagger := lib.RandomStagger(left)
stagger := helper.RandomStagger(left)
switch {
case stagger < time.Second:
return time.Second + lib.RandomStagger(time.Second)
return time.Second + helper.RandomStagger(time.Second)
case stagger > 30*time.Second:
return 25*time.Second + lib.RandomStagger(5*time.Second)
return 25*time.Second + helper.RandomStagger(5*time.Second)
default:
return stagger
}
@ -2780,7 +2779,7 @@ func (c *Client) consulDiscoveryImpl() error {
// datacenterQueryLimit, the next heartbeat will pick
// a new set of servers so it's okay.
shuffleStrings(dcs[1:])
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
dcs = dcs[0:helper.MinInt(len(dcs), datacenterQueryLimit)]
}
// Query for servers in this client's region only

View file

@ -9,9 +9,9 @@ import (
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/helper"
inmem "github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/structs"
@ -109,8 +109,11 @@ TRY:
}
// Wait to avoid thundering herd
timer, cancel := helper.NewSafeTimer(helper.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction))
defer cancel()
select {
case <-time.After(lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)):
case <-timer.C:
// If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline.
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
newBlockTime := time.Until(deadline)
@ -139,7 +142,7 @@ func canRetry(args interface{}, err error) bool {
// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) {
if ok && info.IsRead() && helper.IsErrEOF(err) {
return true
}

View file

@ -11,8 +11,8 @@ import (
"sync"
"time"
"github.com/hashicorp/consul/lib"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
)
const (
@ -345,8 +345,8 @@ func (m *Manager) refreshServerRebalanceTimer() time.Duration {
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer)
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, clientRPCMinReuseDuration, int(m.numNodes))
connRebalanceTimeout += lib.RandomStagger(connRebalanceTimeout)
connRebalanceTimeout := helper.RateScaledInterval(clusterWideRebalanceConnsPerSec, clientRPCMinReuseDuration, int(m.numNodes))
connRebalanceTimeout += helper.RandomStagger(connRebalanceTimeout)
m.rebalanceTimer.Reset(connRebalanceTimeout)
return connRebalanceTimeout

View file

@ -16,7 +16,6 @@ import (
metrics "github.com/armon/go-metrics"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
uuidparse "github.com/hashicorp/go-uuid"
"github.com/hashicorp/nomad/client"
@ -25,6 +24,7 @@ import (
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/command/agent/event"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/bufconndialer"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/uuid"
@ -845,7 +845,7 @@ func (a *Agent) setupNodeID(config *nomad.Config) error {
return err
}
// Persist this configured nodeID to our data directory
if err := lib.EnsurePath(fileID, false); err != nil {
if err := helper.EnsurePath(fileID, false); err != nil {
return err
}
if err := ioutil.WriteFile(fileID, []byte(config.NodeID), 0600); err != nil {
@ -857,7 +857,7 @@ func (a *Agent) setupNodeID(config *nomad.Config) error {
// If we still don't have a valid node ID, make one.
if config.NodeID == "" {
id := uuid.Generate()
if err := lib.EnsurePath(fileID, false); err != nil {
if err := helper.EnsurePath(fileID, false); err != nil {
return err
}
if err := ioutil.WriteFile(fileID, []byte(id), 0600); err != nil {

View file

@ -19,7 +19,6 @@ import (
"github.com/armon/go-metrics/circonus"
"github.com/armon/go-metrics/datadog"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/lib"
checkpoint "github.com/hashicorp/go-checkpoint"
discover "github.com/hashicorp/go-discover"
hclog "github.com/hashicorp/go-hclog"
@ -556,7 +555,7 @@ func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOu
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
time.Sleep(helper.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}

36
helper/cluster.go Normal file
View file

@ -0,0 +1,36 @@
// These functions are coming from consul/lib/cluster.go
package helper
import (
"math/rand"
"time"
)
const (
// minRate is the minimum rate at which we allow an action to be performed
// across the whole cluster. The value is once a day: 1 / (1 * time.Day)
minRate = 1.0 / 86400
)
// RandomStagger returns an interval between 0 and the duration
func RandomStagger(intv time.Duration) time.Duration {
if intv == 0 {
return 0
}
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}
// RateScaledInterval is used to choose an interval to perform an action in
// order to target an aggregate number of actions per second across the whole
// cluster.
func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
if rate <= minRate {
return min
}
interval := time.Duration(float64(time.Second) * float64(n) / rate)
if interval < min {
return min
}
return interval
}

39
helper/eof.go Normal file
View file

@ -0,0 +1,39 @@
// These functions are coming from consul/lib/eof.go
package helper
import (
"errors"
"fmt"
"io"
"net/rpc"
"strings"
"github.com/hashicorp/yamux"
)
var yamuxStreamClosed = yamux.ErrStreamClosed.Error()
var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error()
// IsErrEOF returns true if we get an EOF error from the socket itself, or
// an EOF equivalent error from yamux.
func IsErrEOF(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) {
return true
}
errStr := err.Error()
if strings.Contains(errStr, yamuxStreamClosed) ||
strings.Contains(errStr, yamuxSessionShutdown) {
return true
}
var serverError rpc.ServerError
if errors.As(err, &serverError) {
return strings.HasSuffix(err.Error(), fmt.Sprintf(": %s", io.EOF.Error()))
}
return false
}

16
helper/math.go Normal file
View file

@ -0,0 +1,16 @@
// These functions are coming from consul/lib/math.go
package helper
func MaxInt(a, b int) int {
if a > b {
return a
}
return b
}
func MinInt(a, b int) int {
if a > b {
return b
}
return a
}

15
helper/path.go Normal file
View file

@ -0,0 +1,15 @@
// These functions are coming from consul/path.go
package helper
import (
"os"
"path/filepath"
)
// EnsurePath is used to make sure a path exists
func EnsurePath(path string, dir bool) error {
if !dir {
path = filepath.Dir(path)
}
return os.MkdirAll(path, 0755)
}

View file

@ -11,9 +11,9 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/lib"
hclog "github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
@ -491,7 +491,7 @@ func (p *ConnPool) RPC(region string, addr net.Addr, method string, args interfa
// If we read EOF, the session is toast. Clear it and open a
// new session next time
// See https://github.com/hashicorp/consul/blob/v1.6.3/agent/pool/pool.go#L471-L477
if lib.IsErrEOF(err) {
if helper.IsErrEOF(err) {
p.clearConn(conn)
}

View file

@ -5,7 +5,6 @@ import (
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
@ -529,7 +528,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
// because any node could potentially be feasible.
numEscaped := len(b.escaped)
numQuotaLimit := 0
unblocked := make(map[*structs.Evaluation]string, lib.MaxInt(numEscaped, 4))
unblocked := make(map[*structs.Evaluation]string, helper.MaxInt(numEscaped, 4))
if numEscaped != 0 && computedClass != "" {
for id, wrapped := range b.escaped {

View file

@ -9,7 +9,7 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -101,8 +101,8 @@ func (h *nodeHeartbeater) resetHeartbeatTimer(id string) (time.Duration, error)
// Compute the target TTL value
n := len(h.heartbeatTimers)
ttl := lib.RateScaledInterval(h.config.MaxHeartbeatsPerSecond, h.config.MinHeartbeatTTL, n)
ttl += lib.RandomStagger(ttl)
ttl := helper.RateScaledInterval(h.config.MaxHeartbeatsPerSecond, h.config.MinHeartbeatTTL, n)
ttl += helper.RandomStagger(ttl)
// Reset the TTL
h.resetHeartbeatTimerLocked(id, ttl+h.config.HeartbeatGrace)

View file

@ -15,11 +15,11 @@ import (
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-connlimit"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -595,7 +595,7 @@ CHECK_LEADER:
firstCheck = time.Now()
}
if time.Since(firstCheck) < r.config.RPCHoldTimeout {
jitter := lib.RandomStagger(r.config.RPCHoldTimeout / structs.JitterFraction)
jitter := helper.RandomStagger(r.config.RPCHoldTimeout / structs.JitterFraction)
select {
case <-time.After(jitter):
goto CHECK_LEADER
@ -818,7 +818,7 @@ func (r *rpcHandler) blockingRPC(opts *blockingOptions) error {
opts.queryOpts.MaxQueryTime = opts.queryOpts.TimeToBlock()
// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction)
opts.queryOpts.MaxQueryTime += helper.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction)
// Setup a query timeout
ctx, cancel = context.WithTimeout(context.Background(), opts.queryOpts.MaxQueryTime)

View file

@ -22,7 +22,6 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/autopilot"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
@ -885,7 +884,7 @@ func (s *Server) setupBootstrapHandler() error {
// `bootstrap_expect`.
raftPeers, err := s.numPeers()
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}
@ -894,7 +893,7 @@ func (s *Server) setupBootstrapHandler() error {
// Consul. Let the normal timeout-based strategy
// take over.
if raftPeers >= bootstrapExpect {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}
}
@ -904,7 +903,7 @@ func (s *Server) setupBootstrapHandler() error {
dcs, err := s.consulCatalog.Datacenters()
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor))
return fmt.Errorf("server.nomad: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
@ -914,7 +913,7 @@ func (s *Server) setupBootstrapHandler() error {
// walk all datacenter until it finds enough hosts to
// form a quorum.
shuffleStrings(dcs[1:])
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
dcs = dcs[0:helper.MinInt(len(dcs), datacenterQueryLimit)]
}
nomadServerServiceName := s.config.ConsulConfig.ServerServiceName
@ -953,13 +952,13 @@ func (s *Server) setupBootstrapHandler() error {
if len(nomadServerServices) == 0 {
if len(mErr.Errors) > 0 {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor))
return mErr.ErrorOrNil()
}
// Log the error and return nil so future handlers
// can attempt to register the `nomad` service.
pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)
pollInterval := peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor)
s.logger.Trace("no Nomad Servers advertising Nomad service in Consul datacenters", "service_name", nomadServerServiceName, "datacenters", dcs, "retry", pollInterval)
peersTimeout.Reset(pollInterval)
return nil
@ -967,7 +966,7 @@ func (s *Server) setupBootstrapHandler() error {
numServersContacted, err := s.Join(nomadServerServices)
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
peersTimeout.Reset(peersPollInterval + helper.RandomStagger(peersPollInterval/peersPollJitterFactor))
return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err)
}