peering: expose IsLeader, hung up on dialer if follower (#13164)
Signed-off-by: acpana <8968914+acpana@users.noreply.github.com> Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>
This commit is contained in:
parent
83c8e1891e
commit
451dc50f4f
|
@ -15,6 +15,7 @@ import (
|
|||
)
|
||||
|
||||
type peeringBackend struct {
|
||||
// TODO(peering): accept a smaller interface; maybe just funcs from the server that we actually need: DC, IsLeader, etc
|
||||
srv *Server
|
||||
connPool GRPCClientConner
|
||||
apply *peeringApply
|
||||
|
@ -31,6 +32,7 @@ func NewPeeringBackend(srv *Server, connPool GRPCClientConner) peering.Backend {
|
|||
}
|
||||
}
|
||||
|
||||
// Forward should not be used to initiate forwarding over bidirectional streams
|
||||
func (b *peeringBackend) Forward(info structs.RPCInfo, f func(*grpc.ClientConn) error) (handled bool, err error) {
|
||||
// Only forward the request if the dc in the request matches the server's datacenter.
|
||||
if info.RequestDatacenter() != "" && info.RequestDatacenter() != b.srv.config.Datacenter {
|
||||
|
@ -103,6 +105,10 @@ func (b *peeringBackend) EnterpriseCheckPartitions(partition string) error {
|
|||
return b.enterpriseCheckPartitions(partition)
|
||||
}
|
||||
|
||||
func (b *peeringBackend) IsLeader() bool {
|
||||
return b.srv.IsLeader()
|
||||
}
|
||||
|
||||
type peeringApply struct {
|
||||
srv *Server
|
||||
}
|
||||
|
|
|
@ -91,6 +91,9 @@ type Backend interface {
|
|||
|
||||
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
||||
|
||||
// IsLeader indicates whether the consul server is in a leader state or not.
|
||||
IsLeader() bool
|
||||
|
||||
Store() Store
|
||||
Apply() Apply
|
||||
}
|
||||
|
@ -423,6 +426,14 @@ type BidirectionalStream interface {
|
|||
|
||||
// StreamResources handles incoming streaming connections.
|
||||
func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResourcesServer) error {
|
||||
if !s.Backend.IsLeader() {
|
||||
// we are not the leader so we will hang up on the dialer
|
||||
|
||||
// TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking)
|
||||
s.logger.Error("cannot establish a peering stream on a follower node")
|
||||
return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
|
||||
}
|
||||
|
||||
// Initial message on a new stream must be a new subscription request.
|
||||
first, err := stream.Recv()
|
||||
if err != nil {
|
||||
|
@ -586,6 +597,14 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if !s.Backend.IsLeader() {
|
||||
// we are not the leader anymore so we will hang up on the dialer
|
||||
|
||||
// TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking)
|
||||
logger.Error("node is not a leader anymore; cannot continue streaming")
|
||||
return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming")
|
||||
}
|
||||
|
||||
if req := msg.GetRequest(); req != nil {
|
||||
switch {
|
||||
case req.Nonce == "":
|
||||
|
|
|
@ -30,6 +30,107 @@ import (
|
|||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
func TestStreamResources_Server_Follower(t *testing.T) {
|
||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||
store := newStateStore(t, publisher)
|
||||
|
||||
srv := NewService(testutil.Logger(t), &testStreamBackend{
|
||||
store: store,
|
||||
pub: publisher,
|
||||
leader: func() bool {
|
||||
return false
|
||||
},
|
||||
})
|
||||
|
||||
client := NewMockClient(context.Background())
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
client.ErrCh = errCh
|
||||
|
||||
go func() {
|
||||
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
|
||||
// This matches gRPC's behavior when an error is returned by a server.
|
||||
err := srv.StreamResources(client.ReplicationStream)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
msg, err := client.Recv()
|
||||
require.Nil(t, msg)
|
||||
require.Error(t, err)
|
||||
require.EqualError(t, err, "rpc error: code = FailedPrecondition desc = cannot establish a peering stream on a follower node")
|
||||
}
|
||||
|
||||
// TestStreamResources_Server_LeaderBecomesFollower simulates a srv that is a leader when the
|
||||
// subscription request is sent but loses leadership status for subsequent messages.
|
||||
func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||
store := newStateStore(t, publisher)
|
||||
|
||||
first := true
|
||||
leaderFunc := func() bool {
|
||||
if first {
|
||||
first = false
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
srv := NewService(testutil.Logger(t), &testStreamBackend{
|
||||
store: store,
|
||||
pub: publisher,
|
||||
leader: leaderFunc,
|
||||
})
|
||||
|
||||
client := NewMockClient(context.Background())
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
client.ErrCh = errCh
|
||||
|
||||
go func() {
|
||||
err := srv.StreamResources(client.ReplicationStream)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
p := writeInitiatedPeering(t, store, 1, "my-peer")
|
||||
peerID := p.ID
|
||||
|
||||
// Receive a subscription from a peer
|
||||
sub := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Request_{
|
||||
Request: &pbpeering.ReplicationMessage_Request{
|
||||
PeerID: peerID,
|
||||
ResourceURL: pbpeering.TypeURLService,
|
||||
},
|
||||
},
|
||||
}
|
||||
err := client.Send(sub)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg, err := client.Recv()
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msg)
|
||||
|
||||
input2 := &pbpeering.ReplicationMessage{
|
||||
Payload: &pbpeering.ReplicationMessage_Request_{
|
||||
Request: &pbpeering.ReplicationMessage_Request{
|
||||
ResourceURL: pbpeering.TypeURLService,
|
||||
Nonce: "1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err2 := client.Send(input2)
|
||||
require.NoError(t, err2)
|
||||
|
||||
msg2, err2 := client.Recv()
|
||||
require.Nil(t, msg2)
|
||||
require.Error(t, err2)
|
||||
require.EqualError(t, err2, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming")
|
||||
}
|
||||
|
||||
func TestStreamResources_Server_FirstRequest(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
|
@ -694,8 +795,16 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
|||
}
|
||||
|
||||
type testStreamBackend struct {
|
||||
pub state.EventPublisher
|
||||
store *state.Store
|
||||
pub state.EventPublisher
|
||||
store *state.Store
|
||||
leader func() bool
|
||||
}
|
||||
|
||||
func (b *testStreamBackend) IsLeader() bool {
|
||||
if b.leader != nil {
|
||||
return b.leader()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (b *testStreamBackend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) {
|
||||
|
|
Loading…
Reference in New Issue