peering: emit exported services count metric (#13811)
Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
parent
a253d7e49b
commit
7bd55578cc
|
@ -8,6 +8,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
@ -27,8 +29,72 @@ import (
|
|||
"github.com/hashicorp/consul/proto/pbpeerstream"
|
||||
)
|
||||
|
||||
var leaderExportedServicesCountKey = []string{"consul", "peering", "exported_services"}
|
||||
var LeaderPeeringMetrics = []prometheus.GaugeDefinition{
|
||||
{
|
||||
Name: leaderExportedServicesCountKey,
|
||||
Help: "A gauge that tracks how many services are exported for the peering. " +
|
||||
"The labels are \"peering\" and, for enterprise, \"partition\". " +
|
||||
"We emit this metric every 9 seconds",
|
||||
},
|
||||
}
|
||||
|
||||
func (s *Server) startPeeringStreamSync(ctx context.Context) {
|
||||
s.leaderRoutineManager.Start(ctx, peeringStreamsRoutineName, s.runPeeringSync)
|
||||
s.leaderRoutineManager.Start(ctx, peeringStreamsMetricsRoutineName, s.runPeeringMetrics)
|
||||
}
|
||||
|
||||
func (s *Server) runPeeringMetrics(ctx context.Context) error {
|
||||
ticker := time.NewTicker(s.config.MetricsReportingInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
logger := s.logger.Named(logging.PeeringMetrics)
|
||||
defaultMetrics := metrics.Default
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("stopping peering metrics")
|
||||
|
||||
// "Zero-out" the metric on exit so that when prometheus scrapes this
|
||||
// metric from a non-leader, it does not get a stale value.
|
||||
metrics.SetGauge(leaderExportedServicesCountKey, float32(0))
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := s.emitPeeringMetricsOnce(logger, defaultMetrics()); err != nil {
|
||||
s.logger.Error("error emitting peering stream metrics", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metrics.Metrics) error {
|
||||
_, peers, err := s.fsm.State().PeeringList(nil, *structs.NodeEnterpriseMetaInPartition(structs.WildcardSpecifier))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, peer := range peers {
|
||||
status, found := s.peerStreamServer.StreamStatus(peer.ID)
|
||||
if !found {
|
||||
logger.Trace("did not find status for", "peer_name", peer.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
esc := status.GetExportedServicesCount()
|
||||
part := peer.Partition
|
||||
labels := []metrics.Label{
|
||||
{Name: "peer_name", Value: peer.Name},
|
||||
{Name: "peer_id", Value: peer.ID},
|
||||
}
|
||||
if part != "" {
|
||||
labels = append(labels, metrics.Label{Name: "partition", Value: part})
|
||||
}
|
||||
|
||||
metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) runPeeringSync(ctx context.Context) error {
|
||||
|
@ -51,6 +117,7 @@ func (s *Server) runPeeringSync(ctx context.Context) error {
|
|||
func (s *Server) stopPeeringStreamSync() {
|
||||
// will be a no-op when not started
|
||||
s.leaderRoutineManager.Stop(peeringStreamsRoutineName)
|
||||
s.leaderRoutineManager.Stop(peeringStreamsMetricsRoutineName)
|
||||
}
|
||||
|
||||
// syncPeeringsAndBlock is a long-running goroutine that is responsible for watching
|
||||
|
|
|
@ -4,10 +4,12 @@ import (
|
|||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
@ -615,7 +617,7 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
|
|||
return lastIdx
|
||||
}
|
||||
|
||||
// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe
|
||||
// TODO(peering): once we move away from keeping state in stream tracker only on leaders, move this test to consul/server_test maybe
|
||||
func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
@ -904,3 +906,133 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(peering): once we move away from keeping state in stream tracker only on leaders, move this test to consul/server_test maybe
|
||||
func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
var (
|
||||
s2PeerID1 = generateUUID()
|
||||
s2PeerID2 = generateUUID()
|
||||
testContextTimeout = 60 * time.Second
|
||||
lastIdx = uint64(0)
|
||||
)
|
||||
|
||||
// TODO(peering): Configure with TLS
|
||||
_, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s1.dc1"
|
||||
c.Datacenter = "dc1"
|
||||
c.TLSConfig.Domain = "consul"
|
||||
})
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a peering by generating a token
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
peeringClient := pbpeering.NewPeeringServiceClient(conn)
|
||||
|
||||
req := pbpeering.GenerateTokenRequest{
|
||||
PeerName: "my-peer-s2",
|
||||
}
|
||||
resp, err := peeringClient.GenerateToken(ctx, &req)
|
||||
require.NoError(t, err)
|
||||
|
||||
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token structs.PeeringToken
|
||||
require.NoError(t, json.Unmarshal(tokenJSON, &token))
|
||||
|
||||
// Bring up s2 and store s1's token so that it attempts to dial.
|
||||
_, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s2.dc2"
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc2"
|
||||
})
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
// Simulate exporting services in the tracker
|
||||
{
|
||||
// Simulate a peering initiation event by writing a peering with data from a peering token.
|
||||
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
|
||||
p := &pbpeering.Peering{
|
||||
ID: s2PeerID1,
|
||||
Name: "my-peer-s1",
|
||||
PeerID: token.PeerID,
|
||||
PeerCAPems: token.CA,
|
||||
PeerServerName: token.ServerName,
|
||||
PeerServerAddresses: token.ServerAddresses,
|
||||
}
|
||||
require.True(t, p.ShouldDial())
|
||||
lastIdx++
|
||||
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p))
|
||||
|
||||
p2 := &pbpeering.Peering{
|
||||
ID: s2PeerID2,
|
||||
Name: "my-peer-s3",
|
||||
PeerID: token.PeerID, // doesn't much matter what these values are
|
||||
PeerCAPems: token.CA,
|
||||
PeerServerName: token.ServerName,
|
||||
PeerServerAddresses: token.ServerAddresses,
|
||||
}
|
||||
require.True(t, p2.ShouldDial())
|
||||
lastIdx++
|
||||
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p2))
|
||||
|
||||
// connect the stream
|
||||
mst1, err := s2.peeringServer.Tracker.Connected(s2PeerID1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// mimic tracking exported services
|
||||
mst1.TrackExportedService(structs.ServiceName{Name: "a-service"})
|
||||
mst1.TrackExportedService(structs.ServiceName{Name: "b-service"})
|
||||
mst1.TrackExportedService(structs.ServiceName{Name: "c-service"})
|
||||
|
||||
// connect the stream
|
||||
mst2, err := s2.peeringServer.Tracker.Connected(s2PeerID2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// mimic tracking exported services
|
||||
mst2.TrackExportedService(structs.ServiceName{Name: "d-service"})
|
||||
mst2.TrackExportedService(structs.ServiceName{Name: "e-service"})
|
||||
}
|
||||
|
||||
// set up a metrics sink
|
||||
sink := metrics.NewInmemSink(testContextTimeout, testContextTimeout)
|
||||
cfg := metrics.DefaultConfig("us-west")
|
||||
cfg.EnableHostname = false
|
||||
met, err := metrics.New(cfg, sink)
|
||||
require.NoError(t, err)
|
||||
|
||||
errM := s2.emitPeeringMetricsOnce(s2.logger, met)
|
||||
require.NoError(t, errM)
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
intervals := sink.Data()
|
||||
require.Len(r, intervals, 1)
|
||||
intv := intervals[0]
|
||||
|
||||
// the keys for a Gauge value look like: {serviceName}.{prefix}.{key_name};{label=value};...
|
||||
keyMetric1 := fmt.Sprintf("us-west.consul.peering.exported_services;peer_name=my-peer-s1;peer_id=%s", s2PeerID1)
|
||||
metric1, ok := intv.Gauges[keyMetric1]
|
||||
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric1))
|
||||
|
||||
require.Equal(r, float32(3), metric1.Value) // for a, b, c services
|
||||
|
||||
keyMetric2 := fmt.Sprintf("us-west.consul.peering.exported_services;peer_name=my-peer-s3;peer_id=%s", s2PeerID2)
|
||||
metric2, ok := intv.Gauges[keyMetric2]
|
||||
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric2))
|
||||
|
||||
require.Equal(r, float32(2), metric2.Value) // for d, e services
|
||||
})
|
||||
}
|
||||
|
|
|
@ -127,6 +127,7 @@ const (
|
|||
virtualIPCheckRoutineName = "virtual IP version check"
|
||||
peeringStreamsRoutineName = "streaming peering resources"
|
||||
peeringDeletionRoutineName = "peering deferred deletion"
|
||||
peeringStreamsMetricsRoutineName = "metrics for streaming peering resources"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
@ -539,8 +539,8 @@ func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) {
|
|||
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
|
||||
}
|
||||
|
||||
func (s *Server) StreamStatus(peer string) (resp Status, found bool) {
|
||||
return s.Tracker.StreamStatus(peer)
|
||||
func (s *Server) StreamStatus(peerID string) (resp Status, found bool) {
|
||||
return s.Tracker.StreamStatus(peerID)
|
||||
}
|
||||
|
||||
// ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.
|
||||
|
|
|
@ -231,7 +231,8 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
|
|||
if isServer {
|
||||
gauges = append(gauges,
|
||||
consul.AutopilotGauges,
|
||||
consul.LeaderCertExpirationGauges)
|
||||
consul.LeaderCertExpirationGauges,
|
||||
consul.LeaderPeeringMetrics)
|
||||
}
|
||||
|
||||
// Flatten definitions
|
||||
|
|
|
@ -51,6 +51,7 @@ const (
|
|||
Snapshot string = "snapshot"
|
||||
Partition string = "partition"
|
||||
Peering string = "peering"
|
||||
PeeringMetrics string = "peering_metrics"
|
||||
TerminatingGateway string = "terminating_gateway"
|
||||
TLSUtil string = "tlsutil"
|
||||
Transaction string = "txn"
|
||||
|
|
Loading…
Reference in New Issue