rpc: add tests for canRetry
Also accept an RPCInfo instead of interface{}. Accepting an interface lead to a bug where the caller was expecting the arg to be the response when in fact it was always passed the request. By accepting RPCInfo it should indicate that this is actually the request value. One caller of canRetry already passed an RPCInfo, the second handles the type assertion before calling canRetry.
This commit is contained in:
parent
c38f4869ad
commit
4905ac6f44
|
@ -10,6 +10,10 @@ import (
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/armon/go-metrics/prometheus"
|
"github.com/armon/go-metrics/prometheus"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
"github.com/hashicorp/consul/agent/router"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -17,9 +21,6 @@ import (
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var ClientCounters = []prometheus.CounterDefinition{
|
var ClientCounters = []prometheus.CounterDefinition{
|
||||||
|
@ -287,7 +288,10 @@ TRY:
|
||||||
)
|
)
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
|
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
|
||||||
manager.NotifyFailedServer(server)
|
manager.NotifyFailedServer(server)
|
||||||
if retry := canRetry(args, rpcErr); !retry {
|
|
||||||
|
// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
|
||||||
|
info, _ := args.(structs.RPCInfo)
|
||||||
|
if retry := canRetry(info, rpcErr); !retry {
|
||||||
return rpcErr
|
return rpcErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,9 +92,7 @@ const (
|
||||||
enqueueLimit = 30 * time.Second
|
enqueueLimit = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var ErrChunkingResubmit = errors.New("please resubmit call for rechunking")
|
||||||
ErrChunkingResubmit = errors.New("please resubmit call for rechunking")
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *Server) rpcLogger() hclog.Logger {
|
func (s *Server) rpcLogger() hclog.Logger {
|
||||||
return s.loggers.Named(logging.RPC)
|
return s.loggers.Named(logging.RPC)
|
||||||
|
@ -527,8 +525,8 @@ func (c *limitedConn) Read(b []byte) (n int, err error) {
|
||||||
return c.lr.Read(b)
|
return c.lr.Read(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// canRetry returns true if the given situation is safe for a retry.
|
// canRetry returns true if the request and error indicate that a retry is safe.
|
||||||
func canRetry(args interface{}, err error) bool {
|
func canRetry(info structs.RPCInfo, err error) bool {
|
||||||
// No leader errors are always safe to retry since no state could have
|
// No leader errors are always safe to retry since no state could have
|
||||||
// been changed.
|
// been changed.
|
||||||
if structs.IsErrNoLeader(err) {
|
if structs.IsErrNoLeader(err) {
|
||||||
|
@ -542,12 +540,7 @@ func canRetry(args interface{}, err error) bool {
|
||||||
|
|
||||||
// Reads are safe to retry for stream errors, such as if a server was
|
// Reads are safe to retry for stream errors, such as if a server was
|
||||||
// being shut down.
|
// being shut down.
|
||||||
info, ok := args.(structs.RPCInfo)
|
return info != nil && info.IsRead() && lib.IsErrEOF(err)
|
||||||
if ok && info.IsRead() && lib.IsErrEOF(err) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader
|
// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader
|
||||||
|
@ -790,11 +783,6 @@ func (s *Server) raftApplyWithEncoder(
|
||||||
// In this case we didn't apply all chunks successfully, possibly due
|
// In this case we didn't apply all chunks successfully, possibly due
|
||||||
// to a term change; resubmit
|
// to a term change; resubmit
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
// This returns the error in the interface because the raft library
|
|
||||||
// returns errors from the FSM via the future, not via err from the
|
|
||||||
// apply function. Downstream client code expects to see any error
|
|
||||||
// from the FSM (as opposed to the apply itself) and decide whether
|
|
||||||
// it can retry in the future's response.
|
|
||||||
return nil, ErrChunkingResubmit
|
return nil, ErrChunkingResubmit
|
||||||
}
|
}
|
||||||
// We expect that this conversion should always work
|
// We expect that this conversion should always work
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
@ -12,6 +14,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
|
@ -20,10 +27,6 @@ import (
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/go-memdb"
|
|
||||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRPC_NoLeader_Fail(t *testing.T) {
|
func TestRPC_NoLeader_Fail(t *testing.T) {
|
||||||
|
@ -952,3 +955,59 @@ func TestRPC_LocalTokenStrippedOnForward(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped")
|
require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCanRetry(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
req structs.RPCInfo
|
||||||
|
err error
|
||||||
|
expected bool
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
require.Equal(t, tc.expected, canRetry(tc.req, tc.err))
|
||||||
|
}
|
||||||
|
|
||||||
|
var testCases = []testCase{
|
||||||
|
{
|
||||||
|
name: "unexpected error",
|
||||||
|
err: fmt.Errorf("some arbitrary error"),
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "checking error",
|
||||||
|
err: fmt.Errorf("some wrapping :%w", ErrChunkingResubmit),
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no leader error",
|
||||||
|
err: fmt.Errorf("some wrapping: %w", structs.ErrNoLeader),
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "EOF on read request",
|
||||||
|
req: isReadRequest{},
|
||||||
|
err: io.EOF,
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "EOF on write request",
|
||||||
|
err: io.EOF,
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type isReadRequest struct {
|
||||||
|
structs.RPCInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r isReadRequest) IsRead() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue