xDS Load Balancing (#14397)
Prior to #13244, connect proxies and gateways could only be configured by an xDS session served by the local client agent. In an upcoming release, it will be possible to deploy a Consul service mesh without client agents. In this model, xDS sessions will be handled by the servers themselves, which necessitates load-balancing to prevent a single server from receiving a disproportionate amount of load and becoming overwhelmed. This introduces a simple form of load-balancing where Consul will attempt to achieve an even spread of load (xDS sessions) between all healthy servers. It does so by implementing a concurrent session limiter (limiter.SessionLimiter) and adjusting the limit according to autopilot state and proxy service registrations in the catalog. If a server is already over capacity (i.e. the session limit is lowered), Consul will begin draining sessions to rebalance the load. This will result in the client receiving a `RESOURCE_EXHAUSTED` status code. It is the client's responsibility to observe this response and reconnect to a different server. Users of the gRPC client connection brokered by the consul-server-connection-manager library will get this for free. The rate at which Consul will drain sessions to rebalance load is scaled dynamically based on the number of proxies in the catalog.
This commit is contained in:
parent
a5f4573c76
commit
9fe6c33c0d
|
@ -0,0 +1,3 @@
|
|||
```release-note:feature
|
||||
xds: servers will limit the number of concurrent xDS streams they can handle to balance the load across all servers
|
||||
```
|
|
@ -707,6 +707,9 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Start a goroutine to terminate excess xDS sessions.
|
||||
go a.baseDeps.XDSStreamLimiter.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
|
||||
|
||||
// register watches
|
||||
if err := a.reloadWatches(a.config); err != nil {
|
||||
return err
|
||||
|
@ -791,6 +794,7 @@ func (a *Agent) listenAndServeGRPC() error {
|
|||
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
|
||||
},
|
||||
a,
|
||||
a.baseDeps.XDSStreamLimiter,
|
||||
)
|
||||
a.xdsServer.Register(a.externalGRPCServer)
|
||||
|
||||
|
|
|
@ -55,6 +55,14 @@ func (d *AutopilotDelegate) NotifyState(state *autopilot.State) {
|
|||
}
|
||||
|
||||
d.readyServersPublisher.PublishReadyServersEvents(state)
|
||||
|
||||
var readyServers uint32
|
||||
for _, server := range state.Servers {
|
||||
if autopilotevents.IsServerReady(server) {
|
||||
readyServers++
|
||||
}
|
||||
}
|
||||
d.server.xdsCapacityController.SetServerCount(readyServers)
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) {
|
||||
|
|
|
@ -198,13 +198,11 @@ func (r *ReadyServersEventPublisher) readyServersEvents(state *autopilot.State)
|
|||
return []stream.Event{r.newReadyServersEvent(servers)}, true
|
||||
}
|
||||
|
||||
// autopilotStateToReadyServers will iterate through all servers in the autopilot
|
||||
// state and compile a list of servers which are "ready". Readiness means that
|
||||
// they would be an acceptable target for stale queries.
|
||||
func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopilot.State) EventPayloadReadyServers {
|
||||
var servers EventPayloadReadyServers
|
||||
for _, srv := range state.Servers {
|
||||
// All healthy servers are caught up enough to be included in a ready servers.
|
||||
// IsServerReady determines whether the given server (from the autopilot state)
|
||||
// is "ready" - by which we mean that they would be an acceptable target for
|
||||
// stale queries.
|
||||
func IsServerReady(srv *autopilot.ServerState) bool {
|
||||
// All healthy servers are caught up enough to be considered ready.
|
||||
// Servers with voting rights that are still healthy according to Serf are
|
||||
// also included as they have likely just fallen behind the leader a little
|
||||
// after initially replicating state. They are still acceptable targets
|
||||
|
@ -216,7 +214,16 @@ func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopil
|
|||
// TODO (agentless) for a non-voting server that is still alive but fell
|
||||
// behind, should we cause it to be removed. For voters we know they were caught
|
||||
// up at some point but for non-voters we cannot know the same thing.
|
||||
if srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive) {
|
||||
return srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive)
|
||||
}
|
||||
|
||||
// autopilotStateToReadyServers will iterate through all servers in the autopilot
|
||||
// state and compile a list of servers which are "ready". Readiness means that
|
||||
// they would be an acceptable target for stale queries.
|
||||
func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopilot.State) EventPayloadReadyServers {
|
||||
var servers EventPayloadReadyServers
|
||||
for _, srv := range state.Servers {
|
||||
if IsServerReady(srv) {
|
||||
// autopilot information contains addresses in the <host>:<port> form. We only care about the
|
||||
// the host so we parse it out here and discard the port.
|
||||
host, err := extractHost(string(srv.Server.Address))
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||
grpc "github.com/hashicorp/consul/agent/grpc-internal"
|
||||
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
|
@ -553,6 +554,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
|
|||
NewRequestRecorderFunc: middleware.NewRequestRecorder,
|
||||
GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor,
|
||||
EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c),
|
||||
XDSStreamLimiter: limiter.NewSessionLimiter(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/rpc/middleware"
|
||||
|
@ -23,6 +24,7 @@ type Deps struct {
|
|||
ConnPool *pool.ConnPool
|
||||
GRPCConnPool GRPCClientConner
|
||||
LeaderForwarder LeaderForwarder
|
||||
XDSStreamLimiter *limiter.SessionLimiter
|
||||
// GetNetRPCInterceptorFunc, if not nil, sets the net/rpc rpc.ServerServiceCallInterceptor on
|
||||
// the server side to record metrics around the RPC requests. If nil, no interceptor is added to
|
||||
// the rpc server.
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
"github.com/hashicorp/consul/agent/consul/wanfed"
|
||||
"github.com/hashicorp/consul/agent/consul/xdscapacity"
|
||||
aclgrpc "github.com/hashicorp/consul/agent/grpc-external/services/acl"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/connectca"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/dataplane"
|
||||
|
@ -374,6 +375,10 @@ type Server struct {
|
|||
// peeringServer handles peering RPC requests internal to this cluster, like generating peering tokens.
|
||||
peeringServer *peering.Server
|
||||
|
||||
// xdsCapacityController controls the number of concurrent xDS streams the
|
||||
// server is able to handle.
|
||||
xdsCapacityController *xdscapacity.Controller
|
||||
|
||||
// embedded struct to hold all the enterprise specific data
|
||||
EnterpriseServer
|
||||
}
|
||||
|
@ -749,6 +754,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
|
|||
s.grpcLeaderForwarder = flat.LeaderForwarder
|
||||
go s.trackLeaderChanges()
|
||||
|
||||
s.xdsCapacityController = xdscapacity.NewController(xdscapacity.Config{
|
||||
Logger: s.logger.Named(logging.XDSCapacityController),
|
||||
GetStore: func() xdscapacity.Store { return s.fsm.State() },
|
||||
SessionLimiter: flat.XDSStreamLimiter,
|
||||
})
|
||||
go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
// Initialize Autopilot. This must happen before starting leadership monitoring
|
||||
// as establishing leadership could attempt to use autopilot and cause a panic.
|
||||
s.initAutopilot(config)
|
||||
|
|
|
@ -325,7 +325,7 @@ func (s *Store) NodeUsage() (uint64, NodeUsage, error) {
|
|||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
nodes, err := firstUsageEntry(tx, tableNodes)
|
||||
nodes, err := firstUsageEntry(nil, tx, tableNodes)
|
||||
if err != nil {
|
||||
return 0, NodeUsage{}, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
@ -347,7 +347,7 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) {
|
|||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
peerings, err := firstUsageEntry(tx, tablePeering)
|
||||
peerings, err := firstUsageEntry(nil, tx, tablePeering)
|
||||
if err != nil {
|
||||
return 0, PeeringUsage{}, fmt.Errorf("failed peerings lookup: %s", err)
|
||||
}
|
||||
|
@ -365,23 +365,23 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) {
|
|||
|
||||
// ServiceUsage returns the latest seen Raft index, a compiled set of service
|
||||
// usage data, and any errors.
|
||||
func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
|
||||
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, ServiceUsage, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
serviceInstances, err := firstUsageEntry(tx, tableServices)
|
||||
serviceInstances, err := firstUsageEntry(ws, tx, tableServices)
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
||||
services, err := firstUsageEntry(tx, serviceNamesUsageTable)
|
||||
services, err := firstUsageEntry(ws, tx, serviceNamesUsageTable)
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
||||
serviceKindInstances := make(map[string]int)
|
||||
for _, kind := range allConnectKind {
|
||||
usage, err := firstUsageEntry(tx, connectUsageTableName(kind))
|
||||
usage, err := firstUsageEntry(ws, tx, connectUsageTableName(kind))
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
@ -393,7 +393,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
|
|||
Services: services.Count,
|
||||
ConnectServiceInstances: serviceKindInstances,
|
||||
}
|
||||
results, err := compileEnterpriseServiceUsage(tx, usage)
|
||||
results, err := compileEnterpriseServiceUsage(ws, tx, usage)
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
@ -405,7 +405,7 @@ func (s *Store) KVUsage() (uint64, KVUsage, error) {
|
|||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
kvs, err := firstUsageEntry(tx, "kvs")
|
||||
kvs, err := firstUsageEntry(nil, tx, "kvs")
|
||||
if err != nil {
|
||||
return 0, KVUsage{}, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
|
@ -428,7 +428,7 @@ func (s *Store) ConfigEntryUsage() (uint64, ConfigEntryUsage, error) {
|
|||
configEntries := make(map[string]int)
|
||||
var maxIdx uint64
|
||||
for _, kind := range structs.AllConfigEntryKinds {
|
||||
configEntry, err := firstUsageEntry(tx, configEntryUsageTableName(kind))
|
||||
configEntry, err := firstUsageEntry(nil, tx, configEntryUsageTableName(kind))
|
||||
if configEntry.Index > maxIdx {
|
||||
maxIdx = configEntry.Index
|
||||
}
|
||||
|
@ -448,11 +448,12 @@ func (s *Store) ConfigEntryUsage() (uint64, ConfigEntryUsage, error) {
|
|||
return maxIdx, results, nil
|
||||
}
|
||||
|
||||
func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
|
||||
usage, err := tx.First(tableUsage, indexID, id)
|
||||
func firstUsageEntry(ws memdb.WatchSet, tx ReadTxn, id string) (*UsageEntry, error) {
|
||||
watch, usage, err := tx.FirstWatch(tableUsage, indexID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ws.Add(watch)
|
||||
|
||||
// If no elements have been inserted, the usage entry will not exist. We
|
||||
// return a valid value so that can be certain the return value is not nil
|
||||
|
|
|
@ -29,7 +29,7 @@ func addEnterpriseKVUsage(map[string]int, memdb.Change) {}
|
|||
|
||||
func addEnterpriseConfigEntryUsage(map[string]int, memdb.Change) {}
|
||||
|
||||
func compileEnterpriseServiceUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
|
||||
func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -150,7 +152,7 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// No services have been registered, and thus no usage entry exists
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(0))
|
||||
require.Equal(t, usage.Services, 0)
|
||||
|
@ -174,13 +176,22 @@ func TestStateStore_Usage_ServiceUsage(t *testing.T) {
|
|||
testRegisterConnectNativeService(t, s, 14, "node2", "service-native")
|
||||
testRegisterConnectNativeService(t, s, 15, "node2", "service-native-1")
|
||||
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, usage, err := s.ServiceUsage(ws)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(15))
|
||||
require.Equal(t, 5, usage.Services)
|
||||
require.Equal(t, 8, usage.ServiceInstances)
|
||||
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 3, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
|
||||
testRegisterSidecarProxy(t, s, 16, "node2", "service2")
|
||||
|
||||
select {
|
||||
case <-ws.WatchCh(context.Background()):
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timeout waiting on WatchSet")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
|
||||
|
@ -207,7 +218,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
|
|||
testRegisterSidecarProxy(t, s, 3, "node1", "service2")
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "service-connect")
|
||||
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(4))
|
||||
require.Equal(t, 3, usage.Services)
|
||||
|
@ -217,7 +228,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
|
|||
|
||||
require.NoError(t, s.DeleteNode(4, "node1", nil, ""))
|
||||
|
||||
idx, usage, err = s.ServiceUsage()
|
||||
idx, usage, err = s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(4))
|
||||
require.Equal(t, usage.Services, 0)
|
||||
|
@ -245,7 +256,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
|
|||
testRegisterConnectNativeService(t, s, 7, "node2", "service-native")
|
||||
|
||||
testutil.RunStep(t, "writes", func(t *testing.T) {
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(7), idx)
|
||||
require.Equal(t, 3, usage.Services)
|
||||
|
@ -257,7 +268,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
|
|||
testutil.RunStep(t, "deletes", func(t *testing.T) {
|
||||
require.NoError(t, s.DeleteNode(7, "node1", nil, peerName))
|
||||
require.NoError(t, s.DeleteNode(8, "node2", nil, ""))
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(8), idx)
|
||||
require.Equal(t, 0, usage.Services)
|
||||
|
@ -295,7 +306,7 @@ func TestStateStore_Usage_Restore(t *testing.T) {
|
|||
require.Equal(t, idx, uint64(9))
|
||||
require.Equal(t, nodeUsage.Nodes, 1)
|
||||
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(9))
|
||||
require.Equal(t, usage.Services, 1)
|
||||
|
@ -395,7 +406,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.NoError(t, s.EnsureService(2, "node1", svc))
|
||||
|
||||
// We renamed a service with a single instance, so we maintain 1 service.
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(2))
|
||||
require.Equal(t, usage.Services, 1)
|
||||
|
@ -415,7 +426,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.NoError(t, s.EnsureService(3, "node1", svc))
|
||||
|
||||
// We renamed a service with a single instance, so we maintain 1 service.
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(3))
|
||||
require.Equal(t, usage.Services, 1)
|
||||
|
@ -436,7 +447,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.NoError(t, s.EnsureService(4, "node1", svc))
|
||||
|
||||
// We renamed a service with a single instance, so we maintain 1 service.
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(4))
|
||||
require.Equal(t, usage.Services, 1)
|
||||
|
@ -467,7 +478,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, s.EnsureService(6, "node1", svc3))
|
||||
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(6))
|
||||
require.Equal(t, usage.Services, 2)
|
||||
|
@ -485,7 +496,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, s.EnsureService(7, "node1", update))
|
||||
|
||||
idx, usage, err = s.ServiceUsage()
|
||||
idx, usage, err = s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(7))
|
||||
require.Equal(t, usage.Services, 3)
|
||||
|
@ -511,7 +522,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
|
|||
require.NoError(t, s.EnsureService(2, "node1", svc))
|
||||
|
||||
// We renamed a service with a single instance, so we maintain 1 service.
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(2))
|
||||
require.Equal(t, usage.Services, 1)
|
||||
|
@ -537,7 +548,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, s.EnsureService(4, "node1", svc3))
|
||||
|
||||
idx, usage, err := s.ServiceUsage()
|
||||
idx, usage, err := s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(4))
|
||||
require.Equal(t, usage.Services, 2)
|
||||
|
@ -552,7 +563,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, s.EnsureService(5, "node1", update))
|
||||
|
||||
idx, usage, err = s.ServiceUsage()
|
||||
idx, usage, err = s.ServiceUsage(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(5))
|
||||
require.Equal(t, usage.Services, 3)
|
||||
|
|
|
@ -178,7 +178,7 @@ func (u *UsageMetricsReporter) runOnce() {
|
|||
|
||||
u.emitPeeringUsage(peeringUsage)
|
||||
|
||||
_, serviceUsage, err := state.ServiceUsage()
|
||||
_, serviceUsage, err := state.ServiceUsage(nil)
|
||||
if err != nil {
|
||||
u.logger.Warn("failed to retrieve services from state store", "error", err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
package xdscapacity
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
var StatsGauges = []prometheus.GaugeDefinition{
|
||||
{
|
||||
Name: []string{"xds", "server", "idealStreamsMax"},
|
||||
Help: "The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers.",
|
||||
},
|
||||
}
|
||||
|
||||
// errorMargin is amount to which we allow a server to be over-occupied,
|
||||
// expressed as a percentage (between 0 and 1).
|
||||
//
|
||||
// We allow 10% more than the ideal number of streams per server.
|
||||
const errorMargin = 0.1
|
||||
|
||||
// Controller determines the ideal number of xDS streams for the server to
|
||||
// handle and enforces it using the given SessionLimiter.
|
||||
//
|
||||
// We aim for a roughly even spread of streams between servers in the cluster
|
||||
// and, to that end, limit the number of streams each server can handle to:
|
||||
//
|
||||
// (<number of proxies> / <number of healthy servers>) + <error margin>
|
||||
//
|
||||
// Controller receives changes to the number of healthy servers from the
|
||||
// autopilot delegate. It queries the state store's catalog tables to discover
|
||||
// the number of registered proxy (sidecar and gateway) services.
|
||||
type Controller struct {
|
||||
cfg Config
|
||||
|
||||
serverCh chan uint32
|
||||
doneCh chan struct{}
|
||||
|
||||
prevMaxSessions uint32
|
||||
prevRateLimit rate.Limit
|
||||
}
|
||||
|
||||
// Config contains the dependencies for Controller.
|
||||
type Config struct {
|
||||
Logger hclog.Logger
|
||||
GetStore func() Store
|
||||
SessionLimiter SessionLimiter
|
||||
}
|
||||
|
||||
// SessionLimiter is used to enforce the session limit to achieve the ideal
|
||||
// spread of xDS streams between servers.
|
||||
type SessionLimiter interface {
|
||||
SetMaxSessions(maxSessions uint32)
|
||||
SetDrainRateLimit(rateLimit rate.Limit)
|
||||
}
|
||||
|
||||
// NewController creates a new capacity controller with the given config.
|
||||
//
|
||||
// Call Run to start the control-loop.
|
||||
func NewController(cfg Config) *Controller {
|
||||
return &Controller{
|
||||
cfg: cfg,
|
||||
serverCh: make(chan uint32),
|
||||
doneCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Run the control-loop until the given context is canceled or reaches its
|
||||
// deadline.
|
||||
func (c *Controller) Run(ctx context.Context) {
|
||||
defer close(c.doneCh)
|
||||
|
||||
ws, numProxies, err := c.countProxies(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var numServers uint32
|
||||
for {
|
||||
select {
|
||||
case s := <-c.serverCh:
|
||||
numServers = s
|
||||
c.updateMaxSessions(numServers, numProxies)
|
||||
case <-ws.WatchCh(ctx):
|
||||
ws, numProxies, err = c.countProxies(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c.updateDrainRateLimit(numProxies)
|
||||
c.updateMaxSessions(numServers, numProxies)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetServerCount updates the number of healthy servers that is used when
|
||||
// determining capacity. It is called by the autopilot delegate.
|
||||
func (c *Controller) SetServerCount(count uint32) {
|
||||
select {
|
||||
case c.serverCh <- count:
|
||||
case <-c.doneCh:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) updateDrainRateLimit(numProxies uint32) {
|
||||
rateLimit := calcRateLimit(numProxies)
|
||||
if rateLimit == c.prevRateLimit {
|
||||
return
|
||||
}
|
||||
|
||||
c.cfg.Logger.Debug("updating drain rate limit", "rate_limit", rateLimit)
|
||||
c.cfg.SessionLimiter.SetDrainRateLimit(rateLimit)
|
||||
c.prevRateLimit = rateLimit
|
||||
}
|
||||
|
||||
// We dynamically scale the rate at which excess sessions will be drained
|
||||
// according to the number of proxies in the catalog.
|
||||
//
|
||||
// The numbers here are pretty arbitrary (change them if you find better ones!)
|
||||
// but the logic is:
|
||||
//
|
||||
// 0-512 proxies: drain 1 per second
|
||||
// 513-2815 proxies: linearly scaled by 1/s for every additional 256 proxies
|
||||
// 2816+ proxies: drain 10 per second
|
||||
//
|
||||
func calcRateLimit(numProxies uint32) rate.Limit {
|
||||
perSecond := math.Floor((float64(numProxies) - 256) / 256)
|
||||
|
||||
if perSecond < 1 {
|
||||
return 1
|
||||
}
|
||||
|
||||
if perSecond > 10 {
|
||||
return 10
|
||||
}
|
||||
|
||||
return rate.Limit(perSecond)
|
||||
}
|
||||
|
||||
func (c *Controller) updateMaxSessions(numServers, numProxies uint32) {
|
||||
if numServers == 0 || numProxies == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
maxSessions := uint32(math.Ceil((float64(numProxies) / float64(numServers)) * (1 + errorMargin)))
|
||||
if maxSessions == c.prevMaxSessions {
|
||||
return
|
||||
}
|
||||
|
||||
c.cfg.Logger.Debug(
|
||||
"updating max sessions",
|
||||
"max_sessions", maxSessions,
|
||||
"num_servers", numServers,
|
||||
"num_proxies", numProxies,
|
||||
)
|
||||
metrics.SetGauge([]string{"xds", "server", "idealStreamsMax"}, float32(maxSessions))
|
||||
c.cfg.SessionLimiter.SetMaxSessions(maxSessions)
|
||||
c.prevMaxSessions = maxSessions
|
||||
}
|
||||
|
||||
// countProxies counts the number of registered proxy services, retrying on
|
||||
// error until the given context is cancelled.
|
||||
func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, error) {
|
||||
retryWaiter := &retry.Waiter{
|
||||
MinFailures: 1,
|
||||
MinWait: 1 * time.Second,
|
||||
MaxWait: 1 * time.Minute,
|
||||
}
|
||||
|
||||
for {
|
||||
store := c.cfg.GetStore()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
ws.Add(store.AbandonCh())
|
||||
|
||||
var count uint32
|
||||
_, usage, err := store.ServiceUsage(ws)
|
||||
|
||||
// Query failed? Wait for a while, and then go to the top of the loop to
|
||||
// retry (unless the context is cancelled).
|
||||
if err != nil {
|
||||
if err := retryWaiter.Wait(ctx); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for kind, kindCount := range usage.ConnectServiceInstances {
|
||||
if structs.ServiceKind(kind).IsProxy() {
|
||||
count += uint32(kindCount)
|
||||
}
|
||||
}
|
||||
return ws, count, nil
|
||||
}
|
||||
}
|
||||
|
||||
type Store interface {
|
||||
AbandonCh() <-chan struct{}
|
||||
ServiceUsage(ws memdb.WatchSet) (uint64, state.ServiceUsage, error)
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
package xdscapacity
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestController(t *testing.T) {
|
||||
const index = 123
|
||||
|
||||
store := state.NewStateStore(nil)
|
||||
|
||||
// This loop generates:
|
||||
//
|
||||
// 4 (service kind) * 5 (service) * 5 * (node) = 100 proxy services. And 25 non-proxy services.
|
||||
for _, kind := range []structs.ServiceKind{
|
||||
// These will be included in the count.
|
||||
structs.ServiceKindConnectProxy,
|
||||
structs.ServiceKindIngressGateway,
|
||||
structs.ServiceKindTerminatingGateway,
|
||||
structs.ServiceKindMeshGateway,
|
||||
|
||||
// This one will not.
|
||||
structs.ServiceKindTypical,
|
||||
} {
|
||||
for i := 0; i < 5; i++ {
|
||||
serviceName := fmt.Sprintf("%s-%d", kind, i)
|
||||
|
||||
for j := 0; j < 5; j++ {
|
||||
nodeName := fmt.Sprintf("%s-node-%d", serviceName, j)
|
||||
|
||||
require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{
|
||||
Node: nodeName,
|
||||
Service: &structs.NodeService{
|
||||
ID: serviceName,
|
||||
Service: serviceName,
|
||||
Kind: kind,
|
||||
},
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
limiter := newTestLimiter()
|
||||
|
||||
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
|
||||
cfg := metrics.DefaultConfig("consul")
|
||||
cfg.EnableHostname = false
|
||||
metrics.NewGlobal(cfg, sink)
|
||||
|
||||
t.Cleanup(func() {
|
||||
sink := &metrics.BlackholeSink{}
|
||||
metrics.NewGlobal(cfg, sink)
|
||||
})
|
||||
|
||||
adj := NewController(Config{
|
||||
Logger: testutil.Logger(t),
|
||||
GetStore: func() Store { return store },
|
||||
SessionLimiter: limiter,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
go adj.Run(ctx)
|
||||
|
||||
// Keen readers will notice the numbers here are off by one. This is due to
|
||||
// floating point math (because we multiply by 1.1).
|
||||
testutil.RunStep(t, "load split between 2 servers", func(t *testing.T) {
|
||||
adj.SetServerCount(2)
|
||||
require.Equal(t, 56, limiter.receive(t))
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "all load on 1 server", func(t *testing.T) {
|
||||
adj.SetServerCount(1)
|
||||
require.Equal(t, 111, limiter.receive(t))
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "delete proxy service", func(t *testing.T) {
|
||||
require.NoError(t, store.DeleteService(index+1, "ingress-gateway-0-node-0", "ingress-gateway-0", acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword))
|
||||
require.Equal(t, 109, limiter.receive(t))
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "check we're emitting gauge", func(t *testing.T) {
|
||||
data := sink.Data()
|
||||
require.Len(t, data, 1)
|
||||
|
||||
gauge, ok := data[0].Gauges["consul.xds.server.idealStreamsMax"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, float32(109), gauge.Value)
|
||||
})
|
||||
}
|
||||
|
||||
func newTestLimiter() *testLimiter {
|
||||
return &testLimiter{ch: make(chan uint32, 1)}
|
||||
}
|
||||
|
||||
type testLimiter struct{ ch chan uint32 }
|
||||
|
||||
func (tl *testLimiter) SetMaxSessions(max uint32) { tl.ch <- max }
|
||||
|
||||
func (tl *testLimiter) receive(t *testing.T) int {
|
||||
select {
|
||||
case v := <-tl.ch:
|
||||
return int(v)
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("timeout waiting for SetMaxSessions")
|
||||
}
|
||||
panic("this should never be reached")
|
||||
}
|
||||
|
||||
func (tl *testLimiter) SetDrainRateLimit(rateLimit rate.Limit) {}
|
||||
|
||||
func TestCalcRateLimit(t *testing.T) {
|
||||
for in, out := range map[uint32]rate.Limit{
|
||||
0: rate.Limit(1),
|
||||
1: rate.Limit(1),
|
||||
512: rate.Limit(1),
|
||||
768: rate.Limit(2),
|
||||
1024: rate.Limit(3),
|
||||
2816: rate.Limit(10),
|
||||
1000000000: rate.Limit(10),
|
||||
} {
|
||||
require.Equalf(t, out, calcRateLimit(in), "calcRateLimit(%d)", in)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,245 @@
|
|||
// package limiter provides primatives for limiting the number of concurrent
|
||||
// operations in-flight.
|
||||
package limiter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Unlimited can be used to allow an unlimited number of concurrent sessions.
|
||||
const Unlimited uint32 = 0
|
||||
|
||||
// ErrCapacityReached is returned when there is no capacity for additional sessions.
|
||||
var ErrCapacityReached = errors.New("active session limit reached")
|
||||
|
||||
// SessionLimiter is a session-based concurrency limiter, it provides the basis
|
||||
// of gRPC/xDS load balancing.
|
||||
//
|
||||
// Stream handlers obtain a session with BeginSession before they begin serving
|
||||
// resources - if the server has reached capacity ErrCapacityReached is returned,
|
||||
// otherwise a Session is returned.
|
||||
//
|
||||
// It is the session-holder's responsibility to:
|
||||
//
|
||||
// 1. Call End on the session when finished.
|
||||
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
|
||||
// stream) when it is closed.
|
||||
//
|
||||
// The maximum number of concurrent sessions is controlled with SetMaxSessions.
|
||||
// If there are more than the given maximum sessions already in-flight,
|
||||
// SessionLimiter will drain randomly-selected sessions at a rate controlled
|
||||
// by SetDrainRateLimit.
|
||||
type SessionLimiter struct {
|
||||
drainLimiter *rate.Limiter
|
||||
|
||||
// max and inFlight are read/written using atomic operations.
|
||||
max, inFlight uint32
|
||||
|
||||
// wakeCh is used to trigger the Run loop to start draining excess sessions.
|
||||
wakeCh chan struct{}
|
||||
|
||||
// Everything below here is guarded by mu.
|
||||
mu sync.Mutex
|
||||
maxSessionID uint64
|
||||
sessionIDs []uint64 // sessionIDs must be sorted so we can binary search it.
|
||||
sessions map[uint64]*session
|
||||
}
|
||||
|
||||
// NewSessionLimiter creates a new SessionLimiter.
|
||||
func NewSessionLimiter() *SessionLimiter {
|
||||
return &SessionLimiter{
|
||||
drainLimiter: rate.NewLimiter(rate.Inf, 1),
|
||||
max: Unlimited,
|
||||
wakeCh: make(chan struct{}, 1),
|
||||
sessionIDs: make([]uint64, 0),
|
||||
sessions: make(map[uint64]*session),
|
||||
}
|
||||
}
|
||||
|
||||
// Run the SessionLimiter's drain loop, which terminates excess sessions if the
|
||||
// limit is lowered. It will exit when the given context is canceled or reaches
|
||||
// its deadline.
|
||||
func (l *SessionLimiter) Run(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-l.wakeCh:
|
||||
for {
|
||||
if !l.overCapacity() {
|
||||
break
|
||||
}
|
||||
|
||||
if err := l.drainLimiter.Wait(ctx); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if !l.overCapacity() {
|
||||
break
|
||||
}
|
||||
|
||||
l.terminateSession()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetMaxSessions controls the maximum number of concurrent sessions. If it is
|
||||
// lower, randomly-selected sessions will be drained.
|
||||
func (l *SessionLimiter) SetMaxSessions(max uint32) {
|
||||
atomic.StoreUint32(&l.max, max)
|
||||
|
||||
// Send on wakeCh without blocking if the Run loop is busy. wakeCh has a
|
||||
// buffer of 1, so no triggers will be missed.
|
||||
select {
|
||||
case l.wakeCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// SetDrainRateLimit controls the rate at which excess sessions will be drained.
|
||||
func (l *SessionLimiter) SetDrainRateLimit(limit rate.Limit) {
|
||||
l.drainLimiter.SetLimit(limit)
|
||||
}
|
||||
|
||||
// BeginSession begins a new session, or returns ErrCapacityReached if the
|
||||
// concurrent session limit has been reached.
|
||||
//
|
||||
// It is the session-holder's responsibility to:
|
||||
//
|
||||
// 1. Call End on the session when finished.
|
||||
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
|
||||
// stream) when it is closed.
|
||||
func (l *SessionLimiter) BeginSession() (Session, error) {
|
||||
if !l.hasCapacity() {
|
||||
return nil, ErrCapacityReached
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return l.createSessionLocked(), nil
|
||||
}
|
||||
|
||||
// Note: hasCapacity is *best effort*. As we do not hold l.mu it's possible that:
|
||||
//
|
||||
// - max has changed by the time we compare it to inFlight.
|
||||
// - inFlight < max now, but increases before we create a new session.
|
||||
//
|
||||
// This is acceptable for our uses, especially because excess sessions will
|
||||
// eventually be drained.
|
||||
func (l *SessionLimiter) hasCapacity() bool {
|
||||
max := atomic.LoadUint32(&l.max)
|
||||
if max == Unlimited {
|
||||
return true
|
||||
}
|
||||
|
||||
cur := atomic.LoadUint32(&l.inFlight)
|
||||
return max > cur
|
||||
}
|
||||
|
||||
// Note: overCapacity is *best effort*. As we do not hold l.mu it's possible that:
|
||||
//
|
||||
// - max has changed by the time we compare it to inFlight.
|
||||
// - inFlight > max now, but decreases before we terminate a session.
|
||||
func (l *SessionLimiter) overCapacity() bool {
|
||||
max := atomic.LoadUint32(&l.max)
|
||||
if max == Unlimited {
|
||||
return false
|
||||
}
|
||||
|
||||
cur := atomic.LoadUint32(&l.inFlight)
|
||||
return cur > max
|
||||
}
|
||||
|
||||
func (l *SessionLimiter) terminateSession() {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
idx := rand.Intn(len(l.sessionIDs))
|
||||
id := l.sessionIDs[idx]
|
||||
l.sessions[id].terminate()
|
||||
l.deleteSessionLocked(idx, id)
|
||||
}
|
||||
|
||||
func (l *SessionLimiter) createSessionLocked() *session {
|
||||
session := &session{
|
||||
l: l,
|
||||
id: l.maxSessionID,
|
||||
termCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
l.maxSessionID++
|
||||
l.sessionIDs = append(l.sessionIDs, session.id)
|
||||
l.sessions[session.id] = session
|
||||
|
||||
atomic.AddUint32(&l.inFlight, 1)
|
||||
|
||||
return session
|
||||
}
|
||||
|
||||
func (l *SessionLimiter) deleteSessionLocked(idx int, id uint64) {
|
||||
delete(l.sessions, id)
|
||||
|
||||
// Note: it's important that we preserve the order here (which most allocation
|
||||
// free deletion tricks don't) because we binary search the slice.
|
||||
l.sessionIDs = append(l.sessionIDs[:idx], l.sessionIDs[idx+1:]...)
|
||||
|
||||
atomic.AddUint32(&l.inFlight, ^uint32(0))
|
||||
}
|
||||
|
||||
func (l *SessionLimiter) deleteSessionWithID(id uint64) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
idx := sort.Search(len(l.sessionIDs), func(i int) bool {
|
||||
return l.sessionIDs[i] >= id
|
||||
})
|
||||
|
||||
if idx == len(l.sessionIDs) || l.sessionIDs[idx] != id {
|
||||
// It's possible that we weren't able to find the id because the session has
|
||||
// already been deleted. This could be because the session-holder called End
|
||||
// more than once, or because the session was drained. In either case there's
|
||||
// nothing more to do.
|
||||
return
|
||||
}
|
||||
|
||||
l.deleteSessionLocked(idx, id)
|
||||
}
|
||||
|
||||
// Session allows its holder to perform an operation (e.g. serve a gRPC stream)
|
||||
// concurrenly with other session-holders. Sessions may be terminated abruptly
|
||||
// by the SessionLimiter, so it is the responsibility of the holder to receive
|
||||
// on the Terminated channel and halt the operation when it is closed.
|
||||
type Session interface {
|
||||
// End the session.
|
||||
//
|
||||
// This MUST be called when the session-holder is done (e.g. the gRPC stream
|
||||
// is closed).
|
||||
End()
|
||||
|
||||
// Terminated is a channel that is closed when the session is terminated.
|
||||
//
|
||||
// The session-holder MUST receive on it and exit (e.g. close the gRPC stream)
|
||||
// when it is closed.
|
||||
Terminated() <-chan struct{}
|
||||
}
|
||||
|
||||
type session struct {
|
||||
l *SessionLimiter
|
||||
|
||||
id uint64
|
||||
termCh chan struct{}
|
||||
}
|
||||
|
||||
func (s *session) End() { s.l.deleteSessionWithID(s.id) }
|
||||
|
||||
func (s *session) Terminated() <-chan struct{} { return s.termCh }
|
||||
|
||||
func (s *session) terminate() { close(s.termCh) }
|
|
@ -0,0 +1,81 @@
|
|||
package limiter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
func init() { lib.SeedMathRand() }
|
||||
|
||||
func TestSessionLimiter(t *testing.T) {
|
||||
lim := NewSessionLimiter()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
go lim.Run(ctx)
|
||||
|
||||
// doneCh is used to shut the goroutines down at the end of the test.
|
||||
doneCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(doneCh) })
|
||||
|
||||
// Start 10 sessions, and increment the counter when they are terminated.
|
||||
var (
|
||||
terminations uint32
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
sess, err := lim.BeginSession()
|
||||
require.NoError(t, err)
|
||||
defer sess.End()
|
||||
|
||||
wg.Done()
|
||||
|
||||
select {
|
||||
case <-sess.Terminated():
|
||||
atomic.AddUint32(&terminations, 1)
|
||||
case <-doneCh:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for all the sessions to begin.
|
||||
wg.Wait()
|
||||
|
||||
// Lowering max sessions to 5 should result in 5 sessions being terminated.
|
||||
lim.SetMaxSessions(5)
|
||||
require.Eventually(t, func() bool {
|
||||
return atomic.LoadUint32(&terminations) == 5
|
||||
}, 2*time.Second, 50*time.Millisecond)
|
||||
|
||||
// Attempting to start a new session should fail immediately.
|
||||
_, err := lim.BeginSession()
|
||||
require.Equal(t, ErrCapacityReached, err)
|
||||
|
||||
// Raising MaxSessions should make room for a new session.
|
||||
lim.SetMaxSessions(6)
|
||||
sess, err := lim.BeginSession()
|
||||
require.NoError(t, err)
|
||||
|
||||
// ...but trying to start another new one should fail
|
||||
_, err = lim.BeginSession()
|
||||
require.Equal(t, ErrCapacityReached, err)
|
||||
|
||||
// ...until another session ends.
|
||||
sess.End()
|
||||
_, err = lim.BeginSession()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Calling End twice is a no-op.
|
||||
sess.End()
|
||||
_, err = lim.BeginSession()
|
||||
require.Equal(t, ErrCapacityReached, err)
|
||||
}
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
external "github.com/hashicorp/consul/agent/grpc-external"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||
grpc "github.com/hashicorp/consul/agent/grpc-internal"
|
||||
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
|
@ -1446,6 +1447,7 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps {
|
|||
EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c),
|
||||
NewRequestRecorderFunc: middleware.NewRequestRecorder,
|
||||
GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor,
|
||||
XDSStreamLimiter: limiter.NewSessionLimiter(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
"github.com/hashicorp/consul/agent/consul/xdscapacity"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||
grpc "github.com/hashicorp/consul/agent/grpc-internal"
|
||||
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
|
@ -150,6 +152,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
|
||||
d.EventPublisher = stream.NewEventPublisher(10 * time.Second)
|
||||
|
||||
d.XDSStreamLimiter = limiter.NewSessionLimiter()
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
|
@ -232,7 +236,9 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
|
|||
gauges = append(gauges,
|
||||
consul.AutopilotGauges,
|
||||
consul.LeaderCertExpirationGauges,
|
||||
consul.LeaderPeeringMetrics)
|
||||
consul.LeaderPeeringMetrics,
|
||||
xdscapacity.StatsGauges,
|
||||
)
|
||||
}
|
||||
|
||||
// Flatten definitions
|
||||
|
@ -275,6 +281,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
|
|||
consul.RPCCounters,
|
||||
grpc.StatsCounters,
|
||||
local.StateCounters,
|
||||
xds.StatsCounters,
|
||||
raftCounters,
|
||||
}
|
||||
// Flatten definitions
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
||||
|
@ -29,6 +30,8 @@ import (
|
|||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
|
||||
|
||||
type deltaRecvResponse int
|
||||
|
||||
const (
|
||||
|
@ -86,6 +89,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
return err
|
||||
}
|
||||
|
||||
session, err := s.SessionLimiter.BeginSession()
|
||||
if err != nil {
|
||||
return errOverwhelmed
|
||||
}
|
||||
defer session.End()
|
||||
|
||||
// Loop state
|
||||
var (
|
||||
cfgSnap *proxycfg.ConfigSnapshot
|
||||
|
@ -159,6 +168,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-session.Terminated():
|
||||
generator.Logger.Debug("draining stream to rebalance load")
|
||||
metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1)
|
||||
return errOverwhelmed
|
||||
case <-authTimer:
|
||||
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
|
||||
if err := checkStreamACLs(cfgSnap); err != nil {
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/xds/xdscommon"
|
||||
|
@ -36,7 +37,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled, nil)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
@ -238,7 +239,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
|
|||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
@ -369,7 +370,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
@ -522,7 +523,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
|
|||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
|
||||
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
|
||||
|
@ -663,7 +664,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
|
|||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
@ -799,7 +800,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
@ -1059,7 +1060,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
|
|||
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
|
||||
}
|
||||
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false, nil)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
@ -1137,6 +1138,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
|
|||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
|
||||
100*time.Millisecond, // Make this short.
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
|
@ -1236,6 +1238,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
|
|||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
|
||||
100*time.Millisecond, // Make this short.
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
|
@ -1316,7 +1319,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
|
|||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false)
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false, nil)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("ingress-gateway", nil)
|
||||
|
@ -1368,6 +1371,115 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) {
|
||||
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
|
||||
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, capacityReachedLimiter{})
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
||||
mgr.RegisterProxy(t, sid)
|
||||
|
||||
snap := newTestSnapshot(t, nil, "")
|
||||
|
||||
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
InitialResourceVersions: mustMakeVersionMap(t,
|
||||
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String())
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for handler to finish")
|
||||
}
|
||||
}
|
||||
|
||||
type capacityReachedLimiter struct{}
|
||||
|
||||
func (capacityReachedLimiter) BeginSession() (limiter.Session, error) {
|
||||
return nil, limiter.ErrCapacityReached
|
||||
}
|
||||
|
||||
func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
|
||||
limiter := &testLimiter{}
|
||||
|
||||
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, limiter)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
||||
mgr.RegisterProxy(t, sid)
|
||||
|
||||
testutil.RunStep(t, "successful request/response", func(t *testing.T) {
|
||||
snap := newTestSnapshot(t, nil, "")
|
||||
|
||||
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
InitialResourceVersions: mustMakeVersionMap(t,
|
||||
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: xdscommon.ClusterType,
|
||||
Nonce: hexString(1),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestCluster(t, snap, "tcp:local_app"),
|
||||
makeTestCluster(t, snap, "tcp:db"),
|
||||
),
|
||||
})
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "terminate limiter session", func(t *testing.T) {
|
||||
limiter.TerminateSession()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String())
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for handler to finish")
|
||||
}
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "check drain counter incremeted", func(t *testing.T) {
|
||||
data := scenario.sink.Data()
|
||||
require.Len(t, data, 1)
|
||||
|
||||
item := data[0]
|
||||
require.Len(t, item.Counters, 1)
|
||||
|
||||
val, ok := item.Counters["consul.xds.test.xds.server.streamDrained"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, val.Count)
|
||||
})
|
||||
}
|
||||
|
||||
type testLimiter struct {
|
||||
termCh chan struct{}
|
||||
}
|
||||
|
||||
func (t *testLimiter) BeginSession() (limiter.Session, error) {
|
||||
t.termCh = make(chan struct{})
|
||||
return &testSession{termCh: t.termCh}, nil
|
||||
}
|
||||
|
||||
func (t *testLimiter) TerminateSession() { close(t.termCh) }
|
||||
|
||||
type testSession struct {
|
||||
termCh chan struct{}
|
||||
}
|
||||
|
||||
func (t *testSession) Terminated() <-chan struct{} { return t.termCh }
|
||||
|
||||
func (*testSession) End() {}
|
||||
|
||||
func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) {
|
||||
t.Helper()
|
||||
select {
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
external "github.com/hashicorp/consul/agent/grpc-external"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/xds/xdscommon"
|
||||
|
@ -29,6 +30,13 @@ var StatsGauges = []prometheus.GaugeDefinition{
|
|||
},
|
||||
}
|
||||
|
||||
var StatsCounters = []prometheus.CounterDefinition{
|
||||
{
|
||||
Name: []string{"xds", "server", "streamDrained"},
|
||||
Help: "Counts the number of xDS streams that are drained when rebalancing the load between servers.",
|
||||
},
|
||||
}
|
||||
|
||||
// ADSStream is a shorter way of referring to this thing...
|
||||
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
|
||||
|
||||
|
@ -97,6 +105,12 @@ type ProxyConfigSource interface {
|
|||
Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error)
|
||||
}
|
||||
|
||||
// SessionLimiter is the interface exposed by limiter.SessionLimiter. We depend
|
||||
// on an interface rather than the concrete type so we can mock it in tests.
|
||||
type SessionLimiter interface {
|
||||
BeginSession() (limiter.Session, error)
|
||||
}
|
||||
|
||||
// Server represents a gRPC server that can handle xDS requests from Envoy. All
|
||||
// of it's public members must be set before the gRPC server is started.
|
||||
//
|
||||
|
@ -108,6 +122,7 @@ type Server struct {
|
|||
CfgSrc ProxyConfigSource
|
||||
ResolveToken ACLResolverFunc
|
||||
CfgFetcher ConfigFetcher
|
||||
SessionLimiter SessionLimiter
|
||||
|
||||
// AuthCheckFrequency is how often we should re-check the credentials used
|
||||
// during a long-lived gRPC Stream after it has been initially established.
|
||||
|
@ -159,6 +174,7 @@ func NewServer(
|
|||
cfgMgr ProxyConfigSource,
|
||||
resolveToken ACLResolverFunc,
|
||||
cfgFetcher ConfigFetcher,
|
||||
limiter SessionLimiter,
|
||||
) *Server {
|
||||
return &Server{
|
||||
NodeName: nodeName,
|
||||
|
@ -166,6 +182,7 @@ func NewServer(
|
|||
CfgSrc: cfgMgr,
|
||||
ResolveToken: resolveToken,
|
||||
CfgFetcher: cfgFetcher,
|
||||
SessionLimiter: limiter,
|
||||
AuthCheckFrequency: DefaultAuthCheckFrequency,
|
||||
activeStreams: &activeStreamCounters{},
|
||||
serverlessPluginEnabled: serverlessPluginEnabled,
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||
|
||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||
|
@ -136,6 +137,7 @@ func newTestServerDeltaScenario(
|
|||
token string,
|
||||
authCheckFrequency time.Duration,
|
||||
serverlessPluginEnabled bool,
|
||||
sessionLimiter SessionLimiter,
|
||||
) *testServerScenario {
|
||||
mgr := newTestManager(t)
|
||||
envoy := NewTestEnvoy(t, proxyID, token)
|
||||
|
@ -154,6 +156,10 @@ func newTestServerDeltaScenario(
|
|||
metrics.NewGlobal(cfg, sink)
|
||||
})
|
||||
|
||||
if sessionLimiter == nil {
|
||||
sessionLimiter = limiter.NewSessionLimiter()
|
||||
}
|
||||
|
||||
s := NewServer(
|
||||
"node-123",
|
||||
testutil.Logger(t),
|
||||
|
@ -161,6 +167,7 @@ func newTestServerDeltaScenario(
|
|||
mgr,
|
||||
resolveToken,
|
||||
nil, /*cfgFetcher ConfigFetcher*/
|
||||
sessionLimiter,
|
||||
)
|
||||
if authCheckFrequency > 0 {
|
||||
s.AuthCheckFrequency = authCheckFrequency
|
||||
|
|
|
@ -61,6 +61,7 @@ const (
|
|||
WAN string = "wan"
|
||||
Watch string = "watch"
|
||||
XDS string = "xds"
|
||||
XDSCapacityController string = "xds_capacity_controller"
|
||||
Vault string = "vault"
|
||||
Health string = "health"
|
||||
)
|
||||
|
|
|
@ -542,6 +542,8 @@ These metrics are used to monitor the health of the Consul servers.
|
|||
| `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter |
|
||||
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge |
|
||||
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |
|
||||
| `consul.xds.server.idealStreamsMax` | The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers. | streams | gauge |
|
||||
| `consul.xds.server.streamDrained` | Counts the number of xDS streams that are drained when rebalancing the load between servers. | streams | counter |
|
||||
|
||||
|
||||
## Server Workload
|
||||
|
|
Loading…
Reference in New Issue