Rate limit improvements and fixes (#15917)

- Fixes a panic when Operation.SourceAddr is nil (internal net/rpc calls)
- Adds proper HTTP response codes (429 and 503) for rate limit errors
- Makes the error messages clearer
- Enables automatic retries for rate-limit errors in the net/rpc stack
This commit is contained in:
Dan Upton 2023-01-09 10:20:05 +00:00 committed by GitHub
parent 2a90faa4b1
commit f24f8c681f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 22 deletions

View File

@ -19,14 +19,14 @@ var (
// rate limit was exhausted, but may succeed on a different server. // rate limit was exhausted, but may succeed on a different server.
// //
// Results in a RESOURCE_EXHAUSTED or "429 Too Many Requests" response. // Results in a RESOURCE_EXHAUSTED or "429 Too Many Requests" response.
ErrRetryElsewhere = errors.New("rate limit exceeded, try a different server") ErrRetryElsewhere = errors.New("rate limit exceeded, try again later or against a different server")
// ErrRetryLater indicates that the operation was not allowed because the rate // ErrRetryLater indicates that the operation was not allowed because the rate
// limit was exhausted, and trying a different server won't help (e.g. because // limit was exhausted, and trying a different server won't help (e.g. because
// the operation can only be performed on the leader). // the operation can only be performed on the leader).
// //
// Results in an UNAVAILABLE or "503 Service Unavailable" response. // Results in an UNAVAILABLE or "503 Service Unavailable" response.
ErrRetryLater = errors.New("rate limit exceeded, try again later") ErrRetryLater = errors.New("rate limit exceeded for operation that can only be performed by the leader, try again later")
) )
// Mode determines the action that will be taken when a rate limit has been // Mode determines the action that will be taken when a rate limit has been
@ -209,7 +209,7 @@ func (h *Handler) Allow(op Operation) error {
enforced := l.mode == ModeEnforcing enforced := l.mode == ModeEnforcing
h.logger.Trace("RPC exceeded allowed rate limit", h.logger.Trace("RPC exceeded allowed rate limit",
"rpc", op.Name, "rpc", op.Name,
"source_addr", op.SourceAddr.String(), "source_addr", op.SourceAddr,
"limit_type", l.desc, "limit_type", l.desc,
"limit_enforced", enforced, "limit_enforced", enforced,
) )

View File

@ -26,6 +26,7 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/wanfed" "github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/metadata"
@ -570,9 +571,19 @@ func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config)
return true return true
} }
// If we are chunking and it doesn't seem to have completed, try again. retryableMessages := []error{
if err != nil && strings.Contains(err.Error(), ErrChunkingResubmit.Error()) { // If we are chunking and it doesn't seem to have completed, try again.
return true ErrChunkingResubmit,
// These rate limit errors are returned before the handler is called, so are
// safe to retry.
rate.ErrRetryElsewhere,
rate.ErrRetryLater,
}
for _, m := range retryableMessages {
if err != nil && strings.Contains(err.Error(), m.Error()) {
return true
}
} }
// Reads are safe to retry for stream errors, such as if a server was // Reads are safe to retry for stream errors, such as if a server was

View File

@ -30,6 +30,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
agent_grpc "github.com/hashicorp/consul/agent/grpc-internal" agent_grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
@ -1310,6 +1311,16 @@ func TestCanRetry(t *testing.T) {
err: fmt.Errorf("some wrapping: %w", structs.ErrNoLeader), err: fmt.Errorf("some wrapping: %w", structs.ErrNoLeader),
expected: true, expected: true,
}, },
{
name: "ErrRetryElsewhere",
err: fmt.Errorf("some wrapping: %w", rate.ErrRetryElsewhere),
expected: true,
},
{
name: "ErrRetryLater",
err: fmt.Errorf("some wrapping: %w", rate.ErrRetryLater),
expected: true,
},
{ {
name: "EOF on read request", name: "EOF on read request",
req: isReadRequest{}, req: isReadRequest{},

View File

@ -29,6 +29,7 @@ import (
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/uiserver" "github.com/hashicorp/consul/agent/uiserver"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
@ -393,16 +394,49 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc
return false return false
} }
isTooManyRequests := func(err error) bool {
if err == nil {
return false
}
// Client-side RPC limits.
if structs.IsErrRPCRateExceeded(err) {
return true
}
// Connect CA rate limiter.
if err.Error() == consul.ErrRateLimited.Error() {
return true
}
// gRPC server rate limit interceptor.
if status.Code(err) == codes.ResourceExhausted {
return true
}
// net/rpc server rate limit interceptor.
return strings.Contains(err.Error(), rate.ErrRetryElsewhere.Error())
}
isServiceUnavailable := func(err error) bool {
if err == nil {
return false
}
// gRPC server rate limit interceptor.
if status.Code(err) == codes.Unavailable {
return true
}
// net/rpc server rate limit interceptor.
return strings.Contains(err.Error(), rate.ErrRetryLater.Error())
}
isMethodNotAllowed := func(err error) bool { isMethodNotAllowed := func(err error) bool {
_, ok := err.(MethodNotAllowedError) _, ok := err.(MethodNotAllowedError)
return ok return ok
} }
isTooManyRequests := func(err error) bool {
// Sadness net/rpc can't do nice typed errors so this is all we got
return err.Error() == consul.ErrRateLimited.Error()
}
addAllowHeader := func(methods []string) { addAllowHeader := func(methods []string) {
resp.Header().Add("Allow", strings.Join(methods, ",")) resp.Header().Add("Allow", strings.Join(methods, ","))
} }
@ -427,12 +461,19 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc
"error", err) "error", err)
} }
// If the error came from gRPC, unpack it to get the real message.
msg := err.Error()
if s, ok := status.FromError(err); ok {
msg = s.Message()
}
switch { switch {
case isForbidden(err): case isForbidden(err):
resp.WriteHeader(http.StatusForbidden) resp.WriteHeader(http.StatusForbidden)
fmt.Fprint(resp, err.Error()) case isTooManyRequests(err):
case structs.IsErrRPCRateExceeded(err):
resp.WriteHeader(http.StatusTooManyRequests) resp.WriteHeader(http.StatusTooManyRequests)
case isServiceUnavailable(err):
resp.WriteHeader(http.StatusServiceUnavailable)
case isMethodNotAllowed(err): case isMethodNotAllowed(err):
// RFC2616 states that for 405 Method Not Allowed the response // RFC2616 states that for 405 Method Not Allowed the response
// MUST include an Allow header containing the list of valid // MUST include an Allow header containing the list of valid
@ -440,26 +481,21 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html // https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
addAllowHeader(err.(MethodNotAllowedError).Allow) addAllowHeader(err.(MethodNotAllowedError).Allow)
resp.WriteHeader(http.StatusMethodNotAllowed) // 405 resp.WriteHeader(http.StatusMethodNotAllowed) // 405
fmt.Fprint(resp, err.Error())
case isHTTPError(err): case isHTTPError(err):
err := err.(HTTPError) err := err.(HTTPError)
code := http.StatusInternalServerError code := http.StatusInternalServerError
if err.StatusCode != 0 { if err.StatusCode != 0 {
code = err.StatusCode code = err.StatusCode
} }
reason := "An unexpected error occurred" if msg == "" {
if err.Error() != "" { msg = "An unexpected error occurred"
reason = err.Error()
} }
resp.WriteHeader(code) resp.WriteHeader(code)
fmt.Fprint(resp, reason)
case isTooManyRequests(err):
resp.WriteHeader(http.StatusTooManyRequests)
fmt.Fprint(resp, err.Error())
default: default:
resp.WriteHeader(http.StatusInternalServerError) resp.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(resp, err.Error())
} }
fmt.Fprint(resp, msg)
} }
start := time.Now() start := time.Now()