open-consul/agent/consul/xdscapacity/capacity.go
Dan Upton 9fe6c33c0d
xDS Load Balancing (#14397)
Prior to #13244, connect proxies and gateways could only be configured by an
xDS session served by the local client agent.

In an upcoming release, it will be possible to deploy a Consul service mesh
without client agents. In this model, xDS sessions will be handled by the
servers themselves, which necessitates load-balancing to prevent a single
server from receiving a disproportionate amount of load and becoming
overwhelmed.

This introduces a simple form of load-balancing where Consul will attempt to
achieve an even spread of load (xDS sessions) between all healthy servers.
It does so by implementing a concurrent session limiter (limiter.SessionLimiter)
and adjusting the limit according to autopilot state and proxy service
registrations in the catalog.

If a server is already over capacity (i.e. the session limit is lowered),
Consul will begin draining sessions to rebalance the load. This will result
in the client receiving a `RESOURCE_EXHAUSTED` status code. It is the client's
responsibility to observe this response and reconnect to a different server.

Users of the gRPC client connection brokered by the
consul-server-connection-manager library will get this for free.

The rate at which Consul will drain sessions to rebalance load is scaled
dynamically based on the number of proxies in the catalog.
2022-09-09 15:02:01 +01:00

211 lines
5.4 KiB
Go

package xdscapacity
import (
"context"
"math"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"golang.org/x/time/rate"
)
var StatsGauges = []prometheus.GaugeDefinition{
{
Name: []string{"xds", "server", "idealStreamsMax"},
Help: "The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers.",
},
}
// errorMargin is amount to which we allow a server to be over-occupied,
// expressed as a percentage (between 0 and 1).
//
// We allow 10% more than the ideal number of streams per server.
const errorMargin = 0.1
// Controller determines the ideal number of xDS streams for the server to
// handle and enforces it using the given SessionLimiter.
//
// We aim for a roughly even spread of streams between servers in the cluster
// and, to that end, limit the number of streams each server can handle to:
//
// (<number of proxies> / <number of healthy servers>) + <error margin>
//
// Controller receives changes to the number of healthy servers from the
// autopilot delegate. It queries the state store's catalog tables to discover
// the number of registered proxy (sidecar and gateway) services.
type Controller struct {
cfg Config
serverCh chan uint32
doneCh chan struct{}
prevMaxSessions uint32
prevRateLimit rate.Limit
}
// Config contains the dependencies for Controller.
type Config struct {
Logger hclog.Logger
GetStore func() Store
SessionLimiter SessionLimiter
}
// SessionLimiter is used to enforce the session limit to achieve the ideal
// spread of xDS streams between servers.
type SessionLimiter interface {
SetMaxSessions(maxSessions uint32)
SetDrainRateLimit(rateLimit rate.Limit)
}
// NewController creates a new capacity controller with the given config.
//
// Call Run to start the control-loop.
func NewController(cfg Config) *Controller {
return &Controller{
cfg: cfg,
serverCh: make(chan uint32),
doneCh: make(chan struct{}),
}
}
// Run the control-loop until the given context is canceled or reaches its
// deadline.
func (c *Controller) Run(ctx context.Context) {
defer close(c.doneCh)
ws, numProxies, err := c.countProxies(ctx)
if err != nil {
return
}
var numServers uint32
for {
select {
case s := <-c.serverCh:
numServers = s
c.updateMaxSessions(numServers, numProxies)
case <-ws.WatchCh(ctx):
ws, numProxies, err = c.countProxies(ctx)
if err != nil {
return
}
c.updateDrainRateLimit(numProxies)
c.updateMaxSessions(numServers, numProxies)
case <-ctx.Done():
return
}
}
}
// SetServerCount updates the number of healthy servers that is used when
// determining capacity. It is called by the autopilot delegate.
func (c *Controller) SetServerCount(count uint32) {
select {
case c.serverCh <- count:
case <-c.doneCh:
}
}
func (c *Controller) updateDrainRateLimit(numProxies uint32) {
rateLimit := calcRateLimit(numProxies)
if rateLimit == c.prevRateLimit {
return
}
c.cfg.Logger.Debug("updating drain rate limit", "rate_limit", rateLimit)
c.cfg.SessionLimiter.SetDrainRateLimit(rateLimit)
c.prevRateLimit = rateLimit
}
// We dynamically scale the rate at which excess sessions will be drained
// according to the number of proxies in the catalog.
//
// The numbers here are pretty arbitrary (change them if you find better ones!)
// but the logic is:
//
// 0-512 proxies: drain 1 per second
// 513-2815 proxies: linearly scaled by 1/s for every additional 256 proxies
// 2816+ proxies: drain 10 per second
//
func calcRateLimit(numProxies uint32) rate.Limit {
perSecond := math.Floor((float64(numProxies) - 256) / 256)
if perSecond < 1 {
return 1
}
if perSecond > 10 {
return 10
}
return rate.Limit(perSecond)
}
func (c *Controller) updateMaxSessions(numServers, numProxies uint32) {
if numServers == 0 || numProxies == 0 {
return
}
maxSessions := uint32(math.Ceil((float64(numProxies) / float64(numServers)) * (1 + errorMargin)))
if maxSessions == c.prevMaxSessions {
return
}
c.cfg.Logger.Debug(
"updating max sessions",
"max_sessions", maxSessions,
"num_servers", numServers,
"num_proxies", numProxies,
)
metrics.SetGauge([]string{"xds", "server", "idealStreamsMax"}, float32(maxSessions))
c.cfg.SessionLimiter.SetMaxSessions(maxSessions)
c.prevMaxSessions = maxSessions
}
// countProxies counts the number of registered proxy services, retrying on
// error until the given context is cancelled.
func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, error) {
retryWaiter := &retry.Waiter{
MinFailures: 1,
MinWait: 1 * time.Second,
MaxWait: 1 * time.Minute,
}
for {
store := c.cfg.GetStore()
ws := memdb.NewWatchSet()
ws.Add(store.AbandonCh())
var count uint32
_, usage, err := store.ServiceUsage(ws)
// Query failed? Wait for a while, and then go to the top of the loop to
// retry (unless the context is cancelled).
if err != nil {
if err := retryWaiter.Wait(ctx); err != nil {
return nil, 0, err
}
continue
}
for kind, kindCount := range usage.ConnectServiceInstances {
if structs.ServiceKind(kind).IsProxy() {
count += uint32(kindCount)
}
}
return ws, count, nil
}
}
type Store interface {
AbandonCh() <-chan struct{}
ServiceUsage(ws memdb.WatchSet) (uint64, state.ServiceUsage, error)
}