[sync oss] add net/rpc interceptor implementation (#12573)

* sync ent changes from 866dcb0667

Signed-off-by: FFMMM <FFMMM@users.noreply.github.com>

* update oss go.mod

Signed-off-by: FFMMM <FFMMM@users.noreply.github.com>
This commit is contained in:
FFMMM 2022-03-17 16:02:26 -07:00 committed by GitHub
parent 6b3c506b4e
commit 3c08843847
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 188 additions and 35 deletions

View File

@ -86,7 +86,7 @@ func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error
TokenIDs: batch,
}
_, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req)
_, err := srv.leaderRaftApply("ACL.TokenDelete", structs.ACLTokenDeleteRequestType, &req)
return err
}
@ -110,7 +110,7 @@ func (r *aclTokenReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
FromReplication: true,
}
_, err := srv.raftApply(structs.ACLTokenSetRequestType, &req)
_, err := srv.leaderRaftApply("ACL.TokenSet", structs.ACLTokenSetRequestType, &req)
return err
}
@ -186,7 +186,7 @@ func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) erro
PolicyIDs: batch,
}
_, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
_, err := srv.leaderRaftApply("ACL.PolicyDelete", structs.ACLPolicyDeleteRequestType, &req)
return err
}
@ -207,7 +207,7 @@ func (r *aclPolicyReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
Policies: r.updated[start:end],
}
_, err := srv.raftApply(structs.ACLPolicySetRequestType, &req)
_, err := srv.leaderRaftApply("ACL.PolicySet", structs.ACLPolicySetRequestType, &req)
return err
}
@ -307,7 +307,7 @@ func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error
RoleIDs: batch,
}
_, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
_, err := srv.leaderRaftApply("ACL.RoleDelete", structs.ACLRoleDeleteRequestType, &req)
return err
}
@ -329,6 +329,7 @@ func (r *aclRoleReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, s
AllowMissingLinks: true,
}
_, err := srv.raftApply(structs.ACLRoleSetRequestType, &req)
_, err := srv.leaderRaftApply("ACL.RoleSet", structs.ACLRoleSetRequestType, &req)
return err
}

View File

@ -100,7 +100,8 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
"amount", len(req.TokenIDs),
"locality", locality,
)
_, err = s.raftApply(structs.ACLTokenDeleteRequestType, &req)
_, err = s.leaderRaftApply("ACL.TokenDelete", structs.ACLTokenDeleteRequestType, &req)
if err != nil {
return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err)
}

View File

@ -92,6 +92,11 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit))
defer ticker.Stop()
rpcServiceMethod := "ConfigEntry.Apply"
if op == structs.ConfigEntryDelete || op == structs.ConfigEntryDeleteCAS {
rpcServiceMethod = "ConfigEntry.Delete"
}
var merr error
for i, entry := range configs {
// Exported services only apply to the primary datacenter.
@ -104,7 +109,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
Entry: entry,
}
_, err := s.raftApply(structs.ConfigEntryRequestType, &req)
_, err := s.leaderRaftApply(rpcServiceMethod, structs.ConfigEntryRequestType, &req)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("Failed to apply config entry %s: %w", op, err))
}

View File

@ -154,7 +154,7 @@ func (r *FederationStateReplicator) PerformDeletions(ctx context.Context, deleti
State: state,
}
_, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
_, err := r.srv.leaderRaftApply("FederationState.Delete", structs.FederationStateRequestType, &req)
if err != nil {
return false, err
}
@ -195,7 +195,7 @@ func (r *FederationStateReplicator) PerformUpdates(ctx context.Context, updatesR
State: state2,
}
_, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
_, err := r.srv.leaderRaftApply("FederationState.Apply", structs.FederationStateRequestType, &req)
if err != nil {
return false, err
}

View File

@ -796,7 +796,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
config = s.config.AutopilotConfig
req := structs.AutopilotSetConfigRequest{Config: *config}
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
if _, err = s.leaderRaftApply("AutopilotRequest.Apply", structs.AutopilotRequestType, req); err != nil {
logger.Error("failed to initialize config", "error", err)
return nil
}
@ -871,7 +871,7 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
Entry: entry,
}
_, err := s.raftApply(structs.ConfigEntryRequestType, &req)
_, err := s.leaderRaftApply("ConfigEntry.Apply", structs.ConfigEntryRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err)
}

View File

@ -118,7 +118,7 @@ func (s *Server) updateOurFederationState(curr *structs.FederationState) error {
if s.config.Datacenter == s.config.PrimaryDatacenter {
// We are the primary, so we can't do an RPC as we don't have a replication token.
_, err := s.raftApply(structs.FederationStateRequestType, args)
_, err := s.leaderRaftApply("FederationState.Apply", structs.FederationStateRequestType, args)
if err != nil {
return err
}
@ -223,7 +223,8 @@ func (s *Server) pruneStaleFederationStates() error {
Datacenter: dc,
},
}
_, err := s.raftApply(structs.FederationStateRequestType, &req)
_, err := s.leaderRaftApply("FederationState.Delete", structs.FederationStateRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to delete federation state %s: %v", dc, err)
}

View File

@ -170,7 +170,8 @@ func (s *Server) legacyIntentionsMigrationCleanupPhase(quiet bool) error {
req := structs.IntentionRequest{
Op: structs.IntentionOpDeleteAll,
}
if _, err := s.raftApply(structs.IntentionRequestType, req); err != nil {
if _, err := s.leaderRaftApply("Intentions.DeleteAll", structs.IntentionRequestType, req); err != nil {
return err
}
@ -410,7 +411,9 @@ func (s *Server) replicateLegacyIntentionsOnce(ctx context.Context, lastFetchInd
for _, ops := range txnOpSets {
txnReq := structs.TxnRequest{Ops: ops}
resp, err := s.raftApply(structs.TxnRequestType, &txnReq)
// TODO(rpc-metrics-improv) -- verify labels
resp, err := s.leaderRaftApply("Txn.Apply", structs.TxnRequestType, &txnReq)
if err != nil {
return 0, false, err
}

View File

@ -30,6 +30,7 @@ import (
"github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
@ -842,6 +843,17 @@ func (s *Server) keyringRPCs(method string, args interface{}, dcs []string) (*st
type raftEncoder func(structs.MessageType, interface{}) ([]byte, error)
// leaderRaftApply is used by the leader to persist data to Raft for internal cluster management activities.
// This method MUST not be called from RPC endpoints, since it would result in duplicated RPC metrics.
func (s *Server) leaderRaftApply(method string, t structs.MessageType, msg interface{}) (interface{}, error) {
start := time.Now()
resp, err := s.raftApplyMsgpack(t, msg)
s.rpcRecorder.Record(method, middleware.RPCTypeInternal, start, &msg, err != nil)
return resp, err
}
// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
// raftApplyWithEncoder.
// Deprecated: use raftApplyMsgpack

View File

@ -16,6 +16,8 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/go-version"
"go.etcd.io/bbolt"
@ -255,6 +257,9 @@ type Server struct {
// Endpoint that is available at the time of writing.
insecureRPCServer *rpc.Server
// rpcRecorder is a middleware component that can emit RPC request metrics.
rpcRecorder *middleware.RequestRecorder
// tlsConfigurator holds the agent configuration relevant to TLS and
// configures everything related to it.
tlsConfigurator *tlsutil.Configurator
@ -363,6 +368,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer)
loggers := newLoggerStore(serverLogger)
recorder := middleware.NewRequestRecorder(serverLogger)
// Create server.
s := &Server{
config: config,
@ -376,8 +382,10 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, reconcileChSize),
router: flat.Router,
rpcServer: rpc.NewServer(),
insecureRPCServer: rpc.NewServer(),
rpcRecorder: recorder,
// TODO(rpc-metrics-improv): consider pulling out the interceptor from config in order to isolate testing
rpcServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))),
insecureRPCServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))),
tlsConfigurator: flat.TLSConfigurator,
reassertLeaderCh: make(chan chan error),
sessionTimers: NewSessionTimers(),

View File

@ -115,7 +115,8 @@ func (s *Server) invalidateSession(id string, entMeta *structs.EnterpriseMeta) {
// Retry with exponential backoff to invalidate the session
for attempt := uint(0); attempt < maxInvalidateAttempts; attempt++ {
_, err := s.raftApply(structs.SessionRequestType, args)
// TODO(rpc-metrics-improv): Double check request name here
_, err := s.leaderRaftApply("Session.Check", structs.SessionRequestType, args)
if err == nil {
s.logger.Debug("Session TTL expired", "session", id)
return

View File

@ -22,7 +22,9 @@ func (s *Server) setSystemMetadataKey(key, val string) error {
Entry: &structs.SystemMetadataEntry{Key: key, Value: val},
}
_, err := s.raftApply(structs.SystemMetadataRequestType, args)
// TODO(rpc-metrics-improv): Double check request name here
_, err := s.leaderRaftApply("SystemMetadata.Upsert", structs.SystemMetadataRequestType, args)
return err
}
@ -32,6 +34,8 @@ func (s *Server) deleteSystemMetadataKey(key string) error {
Entry: &structs.SystemMetadataEntry{Key: key},
}
_, err := s.raftApply(structs.SystemMetadataRequestType, args)
// TODO(rpc-metrics-improv): Double check request name here
_, err := s.leaderRaftApply("SystemMetadata.Delete", structs.SystemMetadataRequestType, args)
return err
}

View File

@ -33,6 +33,16 @@ func recordPromMetrics(t *testing.T, a *TestAgent, respRec *httptest.ResponseRec
}
func assertMetricExists(t *testing.T, respRec *httptest.ResponseRecorder, metric string) {
if respRec.Body.String() == "" {
t.Fatalf("Response body is empty.")
}
if !strings.Contains(respRec.Body.String(), metric) {
t.Fatalf("Could not find the metric \"%s\" in the /v1/agent/metrics response", metric)
}
}
func assertMetricExistsWithValue(t *testing.T, respRec *httptest.ResponseRecorder, metric string, value string) {
if respRec.Body.String() == "" {
t.Fatalf("Response body is empty.")
@ -56,6 +66,36 @@ func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, met
}
}
// TestAgent_NewRPCMetrics test for the new RPC metrics presence. These are the labeled metrics coming from
// agent.rpc.middleware.interceptors package.
func TestAgent_NewRPCMetrics(t *testing.T) {
skipIfShortTesting(t)
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
t.Run("Check new rpc metrics are being emitted", func(t *testing.T) {
metricsPrefix := "new_rpc_metrics"
hcl := fmt.Sprintf(`
telemetry = {
prometheus_retention_time = "5s"
disable_hostname = true
metrics_prefix = "%s"
}
`, metricsPrefix)
a := StartTestAgent(t, TestAgent{HCL: hcl})
defer a.Shutdown()
var out struct{}
err := a.RPC("Status.Ping", struct{}{}, &out)
require.NoError(t, err)
respRec := httptest.NewRecorder()
recordPromMetrics(t, a, respRec)
assertMetricExists(t, respRec, metricsPrefix+"_rpc_server_request")
})
}
// TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus adds testing around
// the published autopilot metrics on https://www.consul.io/docs/agent/telemetry#autopilot
func TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus(t *testing.T) {

View File

@ -0,0 +1,77 @@
package middleware
import (
"reflect"
"strconv"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/go-hclog"
)
// RPCTypeInternal identifies the "RPC" request as coming from some internal
// operation that runs on the cluster leader. Technically this is not an RPC
// request, but these raft.Apply operations have the same impact on blocking
// queries, and streaming subscriptions, so need to be tracked by the same metric
// and logs.
// Really what we are measuring here is a "cluster operation". The term we have
// used for this historically is "RPC", so we continue to use that here.
const RPCTypeInternal = "internal"
var metricRPCRequest = []string{"rpc", "server", "request"}
var requestLogName = "rpc.server.request"
type RequestRecorder struct {
Logger hclog.Logger
}
func NewRequestRecorder(logger hclog.Logger) *RequestRecorder {
return &RequestRecorder{Logger: logger}
}
func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) {
elapsed := time.Since(start)
reqType := requestType(request)
labels := []metrics.Label{
{Name: "method", Value: requestName},
{Name: "errored", Value: strconv.FormatBool(respErrored)},
{Name: "request_type", Value: reqType},
{Name: "rpc_type", Value: rpcType},
}
// TODO(rpc-metrics-improv): consider using Telemetry API call here
// It'd be neat if we could actually pass the elapsed observed above
metrics.MeasureSinceWithLabels(metricRPCRequest, start, labels)
r.Logger.Debug(requestLogName,
"method", requestName,
"errored", respErrored,
"request_type", reqType,
"rpc_type", rpcType,
"elapsed", elapsed)
}
func requestType(req interface{}) string {
if r, ok := req.(interface{ IsRead() bool }); ok && r.IsRead() {
return "read"
}
return "write"
}
func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor {
return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) {
reqStart := time.Now()
err := handler()
responseErr := false
if err != nil {
responseErr = true
}
recorder.Record(reqServiceMethod, "net/rpc", reqStart, argv.Interface(), responseErr)
}
}

2
go.mod
View File

@ -31,7 +31,7 @@ require (
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22
github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/hashicorp/consul-net-rpc v0.0.0-20220207223504-4cffceffcd29
github.com/hashicorp/consul-net-rpc v0.0.0-20220307172752-3602954411b4
github.com/hashicorp/consul/api v1.11.0
github.com/hashicorp/consul/sdk v0.8.0
github.com/hashicorp/go-bexpr v0.1.2

4
go.sum
View File

@ -222,8 +222,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmo
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/consul-net-rpc v0.0.0-20220207223504-4cffceffcd29 h1:0BbXmAgzy5vx2rjixiO1FLJBdYJfEvSixcjWOli2w+Q=
github.com/hashicorp/consul-net-rpc v0.0.0-20220207223504-4cffceffcd29/go.mod h1:vWEAHAeAqfOwB3pSgHMQpIu8VH1jL+Ltg54Tw0wt/NI=
github.com/hashicorp/consul-net-rpc v0.0.0-20220307172752-3602954411b4 h1:Com/5n/omNSBusX11zdyIYtidiqewLIanchbm//McZA=
github.com/hashicorp/consul-net-rpc v0.0.0-20220307172752-3602954411b4/go.mod h1:vWEAHAeAqfOwB3pSgHMQpIu8VH1jL+Ltg54Tw0wt/NI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-bexpr v0.1.2 h1:ijMXI4qERbzxbCnkxmfUtwMyjrrk3y+Vt0MxojNCbBs=