From f24f8c681f44c0c2f61993cd043494148d41ccde Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Mon, 9 Jan 2023 10:20:05 +0000 Subject: [PATCH] Rate limit improvements and fixes (#15917) - Fixes a panic when Operation.SourceAddr is nil (internal net/rpc calls) - Adds proper HTTP response codes (429 and 503) for rate limit errors - Makes the error messages clearer - Enables automatic retries for rate-limit errors in the net/rpc stack --- agent/consul/rate/handler.go | 6 ++-- agent/consul/rpc.go | 17 +++++++-- agent/consul/rpc_test.go | 11 ++++++ agent/http.go | 68 +++++++++++++++++++++++++++--------- 4 files changed, 80 insertions(+), 22 deletions(-) diff --git a/agent/consul/rate/handler.go b/agent/consul/rate/handler.go index 25a184e26..c28a4562c 100644 --- a/agent/consul/rate/handler.go +++ b/agent/consul/rate/handler.go @@ -19,14 +19,14 @@ var ( // rate limit was exhausted, but may succeed on a different server. // // Results in a RESOURCE_EXHAUSTED or "429 Too Many Requests" response. - ErrRetryElsewhere = errors.New("rate limit exceeded, try a different server") + ErrRetryElsewhere = errors.New("rate limit exceeded, try again later or against a different server") // ErrRetryLater indicates that the operation was not allowed because the rate // limit was exhausted, and trying a different server won't help (e.g. because // the operation can only be performed on the leader). // // Results in an UNAVAILABLE or "503 Service Unavailable" response. - ErrRetryLater = errors.New("rate limit exceeded, try again later") + ErrRetryLater = errors.New("rate limit exceeded for operation that can only be performed by the leader, try again later") ) // Mode determines the action that will be taken when a rate limit has been @@ -209,7 +209,7 @@ func (h *Handler) Allow(op Operation) error { enforced := l.mode == ModeEnforcing h.logger.Trace("RPC exceeded allowed rate limit", "rpc", op.Name, - "source_addr", op.SourceAddr.String(), + "source_addr", op.SourceAddr, "limit_type", l.desc, "limit_enforced", enforced, ) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 7e95e8ae9..695b9f818 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -26,6 +26,7 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/wanfed" "github.com/hashicorp/consul/agent/metadata" @@ -570,9 +571,19 @@ func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) return true } - // If we are chunking and it doesn't seem to have completed, try again. - if err != nil && strings.Contains(err.Error(), ErrChunkingResubmit.Error()) { - return true + retryableMessages := []error{ + // If we are chunking and it doesn't seem to have completed, try again. + ErrChunkingResubmit, + + // These rate limit errors are returned before the handler is called, so are + // safe to retry. + rate.ErrRetryElsewhere, + rate.ErrRetryLater, + } + for _, m := range retryableMessages { + if err != nil && strings.Contains(err.Error(), m.Error()) { + return true + } } // Reads are safe to retry for stream errors, such as if a server was diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 512d9a4c3..b08dd995b 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" agent_grpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/pool" @@ -1310,6 +1311,16 @@ func TestCanRetry(t *testing.T) { err: fmt.Errorf("some wrapping: %w", structs.ErrNoLeader), expected: true, }, + { + name: "ErrRetryElsewhere", + err: fmt.Errorf("some wrapping: %w", rate.ErrRetryElsewhere), + expected: true, + }, + { + name: "ErrRetryLater", + err: fmt.Errorf("some wrapping: %w", rate.ErrRetryLater), + expected: true, + }, { name: "EOF on read request", req: isReadRequest{}, diff --git a/agent/http.go b/agent/http.go index 401e94ecd..2c793ec58 100644 --- a/agent/http.go +++ b/agent/http.go @@ -29,6 +29,7 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/uiserver" "github.com/hashicorp/consul/api" @@ -393,16 +394,49 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc return false } + isTooManyRequests := func(err error) bool { + if err == nil { + return false + } + + // Client-side RPC limits. + if structs.IsErrRPCRateExceeded(err) { + return true + } + + // Connect CA rate limiter. + if err.Error() == consul.ErrRateLimited.Error() { + return true + } + + // gRPC server rate limit interceptor. + if status.Code(err) == codes.ResourceExhausted { + return true + } + + // net/rpc server rate limit interceptor. + return strings.Contains(err.Error(), rate.ErrRetryElsewhere.Error()) + } + + isServiceUnavailable := func(err error) bool { + if err == nil { + return false + } + + // gRPC server rate limit interceptor. + if status.Code(err) == codes.Unavailable { + return true + } + + // net/rpc server rate limit interceptor. + return strings.Contains(err.Error(), rate.ErrRetryLater.Error()) + } + isMethodNotAllowed := func(err error) bool { _, ok := err.(MethodNotAllowedError) return ok } - isTooManyRequests := func(err error) bool { - // Sadness net/rpc can't do nice typed errors so this is all we got - return err.Error() == consul.ErrRateLimited.Error() - } - addAllowHeader := func(methods []string) { resp.Header().Add("Allow", strings.Join(methods, ",")) } @@ -427,12 +461,19 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc "error", err) } + // If the error came from gRPC, unpack it to get the real message. + msg := err.Error() + if s, ok := status.FromError(err); ok { + msg = s.Message() + } + switch { case isForbidden(err): resp.WriteHeader(http.StatusForbidden) - fmt.Fprint(resp, err.Error()) - case structs.IsErrRPCRateExceeded(err): + case isTooManyRequests(err): resp.WriteHeader(http.StatusTooManyRequests) + case isServiceUnavailable(err): + resp.WriteHeader(http.StatusServiceUnavailable) case isMethodNotAllowed(err): // RFC2616 states that for 405 Method Not Allowed the response // MUST include an Allow header containing the list of valid @@ -440,26 +481,21 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc // https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html addAllowHeader(err.(MethodNotAllowedError).Allow) resp.WriteHeader(http.StatusMethodNotAllowed) // 405 - fmt.Fprint(resp, err.Error()) case isHTTPError(err): err := err.(HTTPError) code := http.StatusInternalServerError if err.StatusCode != 0 { code = err.StatusCode } - reason := "An unexpected error occurred" - if err.Error() != "" { - reason = err.Error() + if msg == "" { + msg = "An unexpected error occurred" } resp.WriteHeader(code) - fmt.Fprint(resp, reason) - case isTooManyRequests(err): - resp.WriteHeader(http.StatusTooManyRequests) - fmt.Fprint(resp, err.Error()) default: resp.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(resp, err.Error()) } + + fmt.Fprint(resp, msg) } start := time.Now()