Net 2229/rpc reduce max retries 2 (#16165)

* feat: calculate retry wait time with exponential back-off

* test: add test for getWaitTime method

* feat: enforce random jitter between min value from previous iteration and current

* extract randomStagger to simplify tests and use Milliseconds to avoid float math.

* rename variables

* add test and rename comment

---------

Co-authored-by: Poonam Jadhav <poonam.jadhav@hashicorp.com>
This commit is contained in:
Dhia Ayachi 2023-02-06 14:07:41 -05:00 committed by GitHub
parent 528da14a21
commit 0f3e935228
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 3 deletions

View File

@ -272,8 +272,10 @@ func (c *Client) RPC(ctx context.Context, method string, args interface{}, reply
// starting the timer here we won't potentially double up the delay. // starting the timer here we won't potentially double up the delay.
// TODO (slackpad) Plumb a deadline here with a context. // TODO (slackpad) Plumb a deadline here with a context.
firstCheck := time.Now() firstCheck := time.Now()
retryCount := 0
previousJitter := time.Duration(0)
TRY: TRY:
retryCount++
manager, server := c.router.FindLANRoute() manager, server := c.router.FindLANRoute()
if server == nil { if server == nil {
return structs.ErrNoServers return structs.ErrNoServers
@ -323,7 +325,9 @@ TRY:
) )
// We can wait a bit and retry! // We can wait a bit and retry!
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) jitter := lib.RandomStaggerWithRange(previousJitter, getWaitTime(c.config.RPCHoldTimeout, retryCount))
previousJitter = jitter
select { select {
case <-time.After(jitter): case <-time.After(jitter):
goto TRY goto TRY

View File

@ -7,6 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math"
"net" "net"
"strings" "strings"
"sync/atomic" "sync/atomic"
@ -556,6 +557,21 @@ func (c *limitedConn) Read(b []byte) (n int, err error) {
return c.lr.Read(b) return c.lr.Read(b)
} }
func getWaitTime(rpcHoldTimeout time.Duration, retryCount int) time.Duration {
const backoffMultiplier = 2.0
rpcHoldTimeoutInMilli := int(rpcHoldTimeout.Milliseconds())
initialBackoffInMilli := rpcHoldTimeoutInMilli / structs.JitterFraction
if initialBackoffInMilli < 1 {
initialBackoffInMilli = 1
}
waitTimeInMilli := initialBackoffInMilli * int(math.Pow(backoffMultiplier, float64(retryCount-1)))
return time.Duration(waitTimeInMilli) * time.Millisecond
}
// canRetry returns true if the request and error indicate that a retry is safe. // canRetry returns true if the request and error indicate that a retry is safe.
func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config, retryableMessages []error) bool { func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config, retryableMessages []error) bool {
if info != nil { if info != nil {
@ -714,7 +730,10 @@ func (s *Server) canServeReadRequest(info structs.RPCInfo) bool {
// See the comment for forwardRPC for more details. // See the comment for forwardRPC for more details.
func (s *Server) forwardRequestToLeader(info structs.RPCInfo, forwardToLeader func(leader *metadata.Server) error) (handled bool, err error) { func (s *Server) forwardRequestToLeader(info structs.RPCInfo, forwardToLeader func(leader *metadata.Server) error) (handled bool, err error) {
firstCheck := time.Now() firstCheck := time.Now()
retryCount := 0
previousJitter := time.Duration(0)
CHECK_LEADER: CHECK_LEADER:
retryCount++
// Fail fast if we are in the process of leaving // Fail fast if we are in the process of leaving
select { select {
case <-s.leaveCh: case <-s.leaveCh:
@ -747,7 +766,9 @@ CHECK_LEADER:
if retry := canRetry(info, rpcErr, firstCheck, s.config, retryableMessages); retry { if retry := canRetry(info, rpcErr, firstCheck, s.config, retryableMessages); retry {
// Gate the request until there is a leader // Gate the request until there is a leader
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) jitter := lib.RandomStaggerWithRange(previousJitter, getWaitTime(s.config.RPCHoldTimeout, retryCount))
previousJitter = jitter
select { select {
case <-time.After(jitter): case <-time.After(jitter):
goto CHECK_LEADER goto CHECK_LEADER

View File

@ -1620,6 +1620,60 @@ func TestRPC_AuthorizeRaftRPC(t *testing.T) {
} }
} }
func TestGetWaitTime(t *testing.T) {
type testCase struct {
name string
RPCHoldTimeout time.Duration
expected time.Duration
retryCount int
}
config := DefaultConfig()
run := func(t *testing.T, tc testCase) {
config.RPCHoldTimeout = tc.RPCHoldTimeout
require.Equal(t, tc.expected, getWaitTime(config.RPCHoldTimeout, tc.retryCount))
}
var testCases = []testCase{
{
name: "init backoff small",
RPCHoldTimeout: 7 * time.Millisecond,
retryCount: 1,
expected: 1 * time.Millisecond,
},
{
name: "first attempt",
RPCHoldTimeout: 7 * time.Second,
retryCount: 1,
expected: 437 * time.Millisecond,
},
{
name: "second attempt",
RPCHoldTimeout: 7 * time.Second,
retryCount: 2,
expected: 874 * time.Millisecond,
},
{
name: "third attempt",
RPCHoldTimeout: 7 * time.Second,
retryCount: 3,
expected: 1748 * time.Millisecond,
},
{
name: "fourth attempt",
RPCHoldTimeout: 7 * time.Second,
retryCount: 4,
expected: 3496 * time.Millisecond,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func doRaftRPC(conn net.Conn, leader string) (raft.AppendEntriesResponse, error) { func doRaftRPC(conn net.Conn, leader string) (raft.AppendEntriesResponse, error) {
var resp raft.AppendEntriesResponse var resp raft.AppendEntriesResponse

View File

@ -45,6 +45,11 @@ func RandomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv)) return time.Duration(uint64(rand.Int63()) % uint64(intv))
} }
// RandomStaggerWithRange returns an interval between min and the max duration
func RandomStaggerWithRange(min time.Duration, max time.Duration) time.Duration {
return RandomStagger(max-min) + min
}
// RateScaledInterval is used to choose an interval to perform an action in // RateScaledInterval is used to choose an interval to perform an action in
// order to target an aggregate number of actions per second across the whole // order to target an aggregate number of actions per second across the whole
// cluster. // cluster.

View File

@ -1,6 +1,8 @@
package lib package lib
import ( import (
"github.com/stretchr/testify/require"
"math"
"testing" "testing"
"time" "time"
) )
@ -172,3 +174,25 @@ func TestRateScaledInterval(t *testing.T) {
t.Fatalf("Bad: %v", v) t.Fatalf("Bad: %v", v)
} }
} }
func TestRandomStaggerWithRange(t *testing.T) {
type args struct {
min time.Duration
max time.Duration
}
tests := []struct {
name string
args args
}{
{"min-max 0", args{time.Duration(0), time.Duration(0)}},
{"min-max big", args{time.Duration(math.MaxInt64), time.Duration(math.MaxInt64)}},
{"normal case", args{time.Duration(3), time.Duration(7)}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := RandomStaggerWithRange(tt.args.min, tt.args.max)
require.GreaterOrEqual(t, got, tt.args.min)
require.LessOrEqual(t, got, tt.args.max)
})
}
}