add more labels to RequestRecorder (#12727)

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>
Signed-off-by: FFMMM <FFMMM@users.noreply.github.com>
This commit is contained in:
FFMMM 2022-04-12 10:50:25 -07:00 committed by GitHub
parent a2a9c963d6
commit cf7e6484aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 320 additions and 105 deletions

4
.changelog/12727.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:improvement
telemetry: Add new `leader` label to `consul.rpc.server.call` and optional `target_datacenter`, `locality`,
`allow_stale`, and `blocking` optional labels.
```

View File

@ -25,7 +25,7 @@ type Deps struct {
// the rpc server. // the rpc server.
GetNetRPCInterceptorFunc func(recorder *middleware.RequestRecorder) rpc.ServerServiceCallInterceptor GetNetRPCInterceptorFunc func(recorder *middleware.RequestRecorder) rpc.ServerServiceCallInterceptor
// NewRequestRecorderFunc provides a middleware.RequestRecorder for the server to use; it cannot be nil // NewRequestRecorderFunc provides a middleware.RequestRecorder for the server to use; it cannot be nil
NewRequestRecorderFunc func(logger hclog.Logger) *middleware.RequestRecorder NewRequestRecorderFunc func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder
EnterpriseDeps EnterpriseDeps
} }

View File

@ -386,24 +386,6 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer) serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer)
loggers := newLoggerStore(serverLogger) loggers := newLoggerStore(serverLogger)
var recorder *middleware.RequestRecorder
if flat.NewRequestRecorderFunc == nil {
return nil, fmt.Errorf("cannot initialize server without an RPC request recorder provider")
}
recorder = flat.NewRequestRecorderFunc(serverLogger)
if recorder == nil {
return nil, fmt.Errorf("cannot initialize server without a non nil RPC request recorder")
}
var rpcServer, insecureRPCServer *rpc.Server
if flat.GetNetRPCInterceptorFunc == nil {
rpcServer = rpc.NewServer()
insecureRPCServer = rpc.NewServer()
} else {
rpcServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
insecureRPCServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
}
eventPublisher := stream.NewEventPublisher(10 * time.Second) eventPublisher := stream.NewEventPublisher(10 * time.Second)
fsmDeps := fsm.Deps{ fsmDeps := fsm.Deps{
@ -427,9 +409,6 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
leaveCh: make(chan struct{}), leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, reconcileChSize), reconcileCh: make(chan serf.Member, reconcileChSize),
router: flat.Router, router: flat.Router,
rpcRecorder: recorder,
rpcServer: rpcServer,
insecureRPCServer: insecureRPCServer,
tlsConfigurator: flat.TLSConfigurator, tlsConfigurator: flat.TLSConfigurator,
publicGRPCServer: publicGRPCServer, publicGRPCServer: publicGRPCServer,
reassertLeaderCh: make(chan chan error), reassertLeaderCh: make(chan chan error),
@ -443,6 +422,26 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
publisher: eventPublisher, publisher: eventPublisher,
} }
var recorder *middleware.RequestRecorder
if flat.NewRequestRecorderFunc != nil {
recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter)
} else {
return nil, fmt.Errorf("cannot initialize server without an RPC request recorder provider")
}
if recorder == nil {
return nil, fmt.Errorf("cannot initialize server with a nil RPC request recorder")
}
if flat.GetNetRPCInterceptorFunc == nil {
s.rpcServer = rpc.NewServer()
s.insecureRPCServer = rpc.NewServer()
} else {
s.rpcServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
s.insecureRPCServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
}
s.rpcRecorder = recorder
go s.publisher.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) go s.publisher.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
if s.config.ConnectMeshGatewayWANFederationEnabled { if s.config.ConnectMeshGatewayWANFederationEnabled {

View File

@ -1172,7 +1172,8 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
// note that there will be "internal" net/rpc calls made // note that there will be "internal" net/rpc calls made
// that will still show up; those don't go thru the net/rpc interceptor; // that will still show up; those don't go thru the net/rpc interceptor;
// see consul.agent.rpc.middleware.RPCTypeInternal for context // see consul.agent.rpc.middleware.RPCTypeInternal for context
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
// for the purposes of this test, we don't need isLeader or localDC
return &middleware.RequestRecorder{ return &middleware.RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc, RecorderFunc: simpleRecorderFunc,
@ -1205,7 +1206,8 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
// note that there will be "internal" net/rpc calls made // note that there will be "internal" net/rpc calls made
// that will still show up; those don't go thru the net/rpc interceptor; // that will still show up; those don't go thru the net/rpc interceptor;
// see consul.agent.rpc.middleware.RPCTypeInternal for context // see consul.agent.rpc.middleware.RPCTypeInternal for context
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
// for the purposes of this test, we don't need isLeader or localDC
return &middleware.RequestRecorder{ return &middleware.RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc, RecorderFunc: simpleRecorderFunc,
@ -1265,14 +1267,14 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
t.Run("test nil RequestRecorder", func(t *testing.T) { t.Run("test nil RequestRecorder", func(t *testing.T) {
_, conf := testServerConfig(t) _, conf := testServerConfig(t)
deps := newDefaultDeps(t, conf) deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
return nil return nil
} }
s2, err := NewServer(conf, deps, grpc.NewServer()) s2, err := NewServer(conf, deps, grpc.NewServer())
require.Error(t, err, "need err when RequestRecorder is nil") require.Error(t, err, "need err when RequestRecorder is nil")
require.Equal(t, err.Error(), "cannot initialize server without a non nil RPC request recorder") require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")
t.Cleanup(func() { t.Cleanup(func() {
if s2 != nil { if s2 != nil {
@ -1308,7 +1310,8 @@ func TestServer_RPC_MetricsIntercept(t *testing.T) {
simpleRecorderFunc := func(key []string, val float32, labels []metrics.Label) { simpleRecorderFunc := func(key []string, val float32, labels []metrics.Label) {
storage[keyMakingFunc(key, labels)] = val storage[keyMakingFunc(key, labels)] = val
} }
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
// for the purposes of this test, we don't need isLeader or localDC
return &middleware.RequestRecorder{ return &middleware.RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc, RecorderFunc: simpleRecorderFunc,
@ -1344,11 +1347,13 @@ func TestServer_RPC_MetricsIntercept(t *testing.T) {
{Name: "errored", Value: "false"}, {Name: "errored", Value: "false"},
{Name: "request_type", Value: "read"}, {Name: "request_type", Value: "read"},
{Name: "rpc_type", Value: "test"}, {Name: "rpc_type", Value: "test"},
{Name: "server_role", Value: "unreported"},
} }
key := keyMakingFunc(middleware.OneTwelveRPCSummary[0].Name, expectedLabels) key := keyMakingFunc(middleware.OneTwelveRPCSummary[0].Name, expectedLabels)
if _, ok := storage[key]; !ok { if _, ok := storage[key]; !ok {
// the compound key will look like: "rpc+server+call+Status.Ping+false+read+test+unreported"
t.Fatalf("Did not find key %s in the metrics log, ", key) t.Fatalf("Did not find key %s in the metrics log, ", key)
} }
}) })

View File

@ -199,7 +199,7 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) {
recordPromMetrics(t, a, respRec) recordPromMetrics(t, a, respRec)
// make sure the labels exist for this metric // make sure the labels exist for this metric
assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type"}) assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type", "leader"})
// make sure we see 3 Status.Ping metrics corresponding to the calls we made above // make sure we see 3 Status.Ping metrics corresponding to the calls we made above
assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3) assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3)
}) })

View File

@ -35,31 +35,77 @@ var OneTwelveRPCSummary = []prometheus.SummaryDefinition{
type RequestRecorder struct { type RequestRecorder struct {
Logger hclog.Logger Logger hclog.Logger
RecorderFunc func(key []string, val float32, labels []metrics.Label) RecorderFunc func(key []string, val float32, labels []metrics.Label)
serverIsLeader func() bool
localDC string
} }
func NewRequestRecorder(logger hclog.Logger) *RequestRecorder { func NewRequestRecorder(logger hclog.Logger, isLeader func() bool, localDC string) *RequestRecorder {
return &RequestRecorder{Logger: logger, RecorderFunc: metrics.AddSampleWithLabels} return &RequestRecorder{
Logger: logger,
RecorderFunc: metrics.AddSampleWithLabels,
serverIsLeader: isLeader,
localDC: localDC,
}
} }
func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) { func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) {
elapsed := time.Since(start).Milliseconds() elapsed := time.Since(start).Milliseconds()
reqType := requestType(request) reqType := requestType(request)
isLeader := r.getServerLeadership()
labels := []metrics.Label{ labels := []metrics.Label{
{Name: "method", Value: requestName}, {Name: "method", Value: requestName},
{Name: "errored", Value: strconv.FormatBool(respErrored)}, {Name: "errored", Value: strconv.FormatBool(respErrored)},
{Name: "request_type", Value: reqType}, {Name: "request_type", Value: reqType},
{Name: "rpc_type", Value: rpcType}, {Name: "rpc_type", Value: rpcType},
{Name: "leader", Value: isLeader},
} }
labels = r.addOptionalLabels(request, labels)
// math.MaxInt64 < math.MaxFloat32 is true so we should be good! // math.MaxInt64 < math.MaxFloat32 is true so we should be good!
r.RecorderFunc(metricRPCRequest, float32(elapsed), labels) r.RecorderFunc(metricRPCRequest, float32(elapsed), labels)
r.Logger.Trace(requestLogName,
"method", requestName, labelsArr := flattenLabels(labels)
"errored", respErrored, r.Logger.Trace(requestLogName, labelsArr...)
"request_type", reqType,
"rpc_type", rpcType, }
"elapsed", elapsed)
func flattenLabels(labels []metrics.Label) []interface{} {
var labelArr []interface{}
for _, label := range labels {
labelArr = append(labelArr, label.Name, label.Value)
}
return labelArr
}
func (r *RequestRecorder) addOptionalLabels(request interface{}, labels []metrics.Label) []metrics.Label {
if rq, ok := request.(readQuery); ok {
labels = append(labels,
metrics.Label{
Name: "allow_stale",
Value: strconv.FormatBool(rq.AllowStaleRead()),
},
metrics.Label{
Name: "blocking",
Value: strconv.FormatBool(rq.GetMinQueryIndex() > 0),
})
}
if td, ok := request.(targetDC); ok {
requestDC := td.RequestDatacenter()
labels = append(labels, metrics.Label{Name: "target_datacenter", Value: requestDC})
if r.localDC == requestDC {
labels = append(labels, metrics.Label{Name: "locality", Value: "local"})
} else {
labels = append(labels, metrics.Label{Name: "locality", Value: "forwarded"})
}
}
return labels
} }
func requestType(req interface{}) string { func requestType(req interface{}) string {
@ -77,6 +123,30 @@ func requestType(req interface{}) string {
return "unreported" return "unreported"
} }
func (r *RequestRecorder) getServerLeadership() string {
if r.serverIsLeader != nil {
if r.serverIsLeader() {
return "true"
} else {
return "false"
}
}
// This logical branch should not happen. If it happens
// it means that we have not plumbed down a way to verify
// whether the server handling the request was a leader or not
return "unreported"
}
type readQuery interface {
GetMinQueryIndex() uint64
AllowStaleRead() bool
}
type targetDC interface {
RequestDatacenter() string
}
func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor { func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor {
return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) { return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) {
reqStart := time.Now() reqStart := time.Now()

View File

@ -48,6 +48,8 @@ var simpleRecorderFunc = func(key []string, val float32, labels []metrics.Label)
type readRequest struct{} type readRequest struct{}
type writeRequest struct{} type writeRequest struct{}
type readReqWithTD struct{}
type writeReqWithTD struct{}
func (rr readRequest) IsRead() bool { func (rr readRequest) IsRead() bool {
return true return true
@ -57,75 +59,210 @@ func (wr writeRequest) IsRead() bool {
return false return false
} }
// TestRequestRecorder_SimpleOK tests that the RequestRecorder can record a simple request. func (r readReqWithTD) IsRead() bool {
func TestRequestRecorder_SimpleOK(t *testing.T) { return true
t.Parallel()
r := RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc,
} }
start := time.Now() func (r readReqWithTD) RequestDatacenter() string {
r.Record("A.B", RPCTypeInternal, start, struct{}{}, false) return "dc3"
}
expectedLabels := []metrics.Label{ func (r readReqWithTD) GetMinQueryIndex() uint64 {
return 1
}
func (r readReqWithTD) AllowStaleRead() bool {
return false
}
func (w writeReqWithTD) IsRead() bool {
return false
}
func (w writeReqWithTD) RequestDatacenter() string {
return "dc2"
}
type testCase struct {
name string
// description is meant for human friendliness
description string
// requestName is encouraged to be unique across tests to
// avoid lock contention
requestName string
requestI interface{}
rpcType string
errored bool
isLeader func() bool
dc string
// the first element in expectedLabels should be the method name
expectedLabels []metrics.Label
}
var testCases = []testCase{
{
name: "simple ok",
description: "This is a simple happy path test case. We check for pass through and normal request processing",
requestName: "A.B",
requestI: struct{}{},
rpcType: RPCTypeInternal,
errored: false,
dc: "dc1",
expectedLabels: []metrics.Label{
{Name: "method", Value: "A.B"}, {Name: "method", Value: "A.B"},
{Name: "errored", Value: "false"}, {Name: "errored", Value: "false"},
{Name: "request_type", Value: "unreported"}, {Name: "request_type", Value: "unreported"},
{Name: "rpc_type", Value: RPCTypeInternal}, {Name: "rpc_type", Value: RPCTypeInternal},
} {Name: "leader", Value: "unreported"},
},
o := store.get(append(metricRPCRequest, expectedLabels[0].Value)) },
require.Equal(t, o.key, metricRPCRequest) {
require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds())) name: "simple ok errored",
require.Equal(t, o.labels, expectedLabels) description: "Checks that the errored value is populated right.",
} requestName: "A.C",
requestI: struct{}{},
// TestRequestRecorder_ReadRequest tests that RequestRecorder can record a read request AND a responseErrored arg. rpcType: "test",
func TestRequestRecorder_ReadRequest(t *testing.T) { errored: true,
t.Parallel() dc: "dc1",
expectedLabels: []metrics.Label{
r := RequestRecorder{ {Name: "method", Value: "A.C"},
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc,
}
start := time.Now()
r.Record("B.A", RPCTypeNetRPC, start, readRequest{}, true)
expectedLabels := []metrics.Label{
{Name: "method", Value: "B.A"},
{Name: "errored", Value: "true"}, {Name: "errored", Value: "true"},
{Name: "request_type", Value: "read"}, {Name: "request_type", Value: "unreported"},
{Name: "rpc_type", Value: RPCTypeNetRPC}, {Name: "rpc_type", Value: "test"},
} {Name: "leader", Value: "unreported"},
},
o := store.get(append(metricRPCRequest, expectedLabels[0].Value)) },
require.Equal(t, o.labels, expectedLabels) {
} name: "read request, rpc type internal",
description: "Checks for read request interface parsing",
// TestRequestRecorder_WriteRequest tests that RequestRecorder can record a write request. requestName: "B.C",
func TestRequestRecorder_WriteRequest(t *testing.T) { requestI: readRequest{},
t.Parallel() rpcType: RPCTypeInternal,
errored: false,
r := RequestRecorder{ dc: "dc1",
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), expectedLabels: []metrics.Label{
RecorderFunc: simpleRecorderFunc,
}
start := time.Now()
r.Record("B.C", RPCTypeNetRPC, start, writeRequest{}, true)
expectedLabels := []metrics.Label{
{Name: "method", Value: "B.C"}, {Name: "method", Value: "B.C"},
{Name: "errored", Value: "true"}, {Name: "errored", Value: "false"},
{Name: "request_type", Value: "read"},
{Name: "rpc_type", Value: RPCTypeInternal},
{Name: "leader", Value: "unreported"},
},
},
{
name: "write request, rpc type net/rpc",
description: "Checks for write request interface, different RPC type",
requestName: "D.E",
requestI: writeRequest{},
rpcType: RPCTypeNetRPC,
errored: false,
dc: "dc1",
expectedLabels: []metrics.Label{
{Name: "method", Value: "D.E"},
{Name: "errored", Value: "false"},
{Name: "request_type", Value: "write"}, {Name: "request_type", Value: "write"},
{Name: "rpc_type", Value: RPCTypeNetRPC}, {Name: "rpc_type", Value: RPCTypeNetRPC},
{Name: "leader", Value: "unreported"},
},
},
{
name: "read request with blocking stale and target dc",
description: "Checks for locality, blocking status and target dc",
requestName: "E.F",
requestI: readReqWithTD{},
rpcType: RPCTypeNetRPC,
errored: false,
dc: "dc1",
expectedLabels: []metrics.Label{
{Name: "method", Value: "E.F"},
{Name: "errored", Value: "false"},
{Name: "request_type", Value: "read"},
{Name: "rpc_type", Value: RPCTypeNetRPC},
{Name: "leader", Value: "unreported"},
{Name: "allow_stale", Value: "false"},
{Name: "blocking", Value: "true"},
{Name: "target_datacenter", Value: "dc3"},
{Name: "locality", Value: "forwarded"},
},
},
{
name: "write request with TD, locality local",
description: "Checks for write request with local forwarding and target dc",
requestName: "F.G",
requestI: writeReqWithTD{},
rpcType: RPCTypeNetRPC,
errored: false,
dc: "dc2",
expectedLabels: []metrics.Label{
{Name: "method", Value: "F.G"},
{Name: "errored", Value: "false"},
{Name: "request_type", Value: "write"},
{Name: "rpc_type", Value: RPCTypeNetRPC},
{Name: "leader", Value: "unreported"},
{Name: "target_datacenter", Value: "dc2"},
{Name: "locality", Value: "local"},
},
},
{
name: "is leader",
description: "checks for is leader",
requestName: "G.H",
requestI: struct{}{},
rpcType: "test",
errored: false,
isLeader: func() bool {
return true
},
expectedLabels: []metrics.Label{
{Name: "method", Value: "G.H"},
{Name: "errored", Value: "false"},
{Name: "request_type", Value: "unreported"},
{Name: "rpc_type", Value: "test"},
{Name: "leader", Value: "true"},
},
},
{
name: "is not leader",
description: "checks for is not leader",
requestName: "H.I",
requestI: struct{}{},
rpcType: "test",
errored: false,
isLeader: func() bool {
return false
},
expectedLabels: []metrics.Label{
{Name: "method", Value: "H.I"},
{Name: "errored", Value: "false"},
{Name: "request_type", Value: "unreported"},
{Name: "rpc_type", Value: "test"},
{Name: "leader", Value: "false"},
},
},
} }
o := store.get(append(metricRPCRequest, expectedLabels[0].Value)) // TestRequestRecorder goes over all the parsing and reporting that RequestRecorder
require.Equal(t, o.labels, expectedLabels) // is expected to perform.
func TestRequestRecorder(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
r := RequestRecorder{
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
RecorderFunc: simpleRecorderFunc,
serverIsLeader: tc.isLeader,
localDC: tc.dc,
}
start := time.Now()
r.Record(tc.requestName, tc.rpcType, start, tc.requestI, tc.errored)
key := append(metricRPCRequest, tc.expectedLabels[0].Value)
o := store.get(key)
require.Equal(t, o.key, metricRPCRequest)
require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds()))
require.Equal(t, o.labels, tc.expectedLabels)
})
}
} }