3e7ea1931c
This will help make it clearer what happens when we add some registration plumbing for the different operations and snapshots.
220 lines
4.7 KiB
Go
220 lines
4.7 KiB
Go
package consul
|
|
|
|
import (
|
|
"bytes"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/testrpc"
|
|
"github.com/hashicorp/consul/testutil/retry"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
|
)
|
|
|
|
func TestRPC_NoLeader_Fail(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.RPCHoldTimeout = 1 * time.Millisecond
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
arg := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
}
|
|
var out struct{}
|
|
|
|
// Make sure we eventually fail with a no leader error, which we should
|
|
// see given the short timeout.
|
|
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
|
|
if err == nil || err.Error() != structs.ErrNoLeader.Error() {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Now make sure it goes through.
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
|
|
if err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestRPC_NoLeader_Retry(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.RPCHoldTimeout = 10 * time.Second
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
arg := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
}
|
|
var out struct{}
|
|
|
|
// This isn't sure-fire but tries to check that we don't have a
|
|
// leader going into the RPC, so we exercise the retry logic.
|
|
if ok, _ := s1.getLeader(); ok {
|
|
t.Fatalf("should not have a leader yet")
|
|
}
|
|
|
|
// The timeout is long enough to ride out any reasonable leader
|
|
// election.
|
|
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
|
|
if err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
}
|
|
|
|
type MockSink struct {
|
|
*bytes.Buffer
|
|
cancel bool
|
|
}
|
|
|
|
func (m *MockSink) ID() string {
|
|
return "Mock"
|
|
}
|
|
|
|
func (m *MockSink) Cancel() error {
|
|
m.cancel = true
|
|
return nil
|
|
}
|
|
|
|
func (m *MockSink) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func TestRPC_blockingQuery(t *testing.T) {
|
|
t.Parallel()
|
|
dir, s := testServer(t)
|
|
defer os.RemoveAll(dir)
|
|
defer s.Shutdown()
|
|
|
|
// Perform a non-blocking query.
|
|
{
|
|
var opts structs.QueryOptions
|
|
var meta structs.QueryMeta
|
|
var calls int
|
|
fn := func(ws memdb.WatchSet, state *state.Store) error {
|
|
calls++
|
|
return nil
|
|
}
|
|
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if calls != 1 {
|
|
t.Fatalf("bad: %d", calls)
|
|
}
|
|
}
|
|
|
|
// Perform a blocking query that gets woken up and loops around once.
|
|
{
|
|
opts := structs.QueryOptions{
|
|
MinQueryIndex: 3,
|
|
}
|
|
var meta structs.QueryMeta
|
|
var calls int
|
|
fn := func(ws memdb.WatchSet, state *state.Store) error {
|
|
if calls == 0 {
|
|
meta.Index = 3
|
|
|
|
fakeCh := make(chan struct{})
|
|
close(fakeCh)
|
|
ws.Add(fakeCh)
|
|
} else {
|
|
meta.Index = 4
|
|
}
|
|
calls++
|
|
return nil
|
|
}
|
|
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if calls != 2 {
|
|
t.Fatalf("bad: %d", calls)
|
|
}
|
|
}
|
|
|
|
// Perform a query that blocks and gets interrupted when the state store
|
|
// is abandoned.
|
|
{
|
|
opts := structs.QueryOptions{
|
|
MinQueryIndex: 3,
|
|
}
|
|
var meta structs.QueryMeta
|
|
var calls int
|
|
fn := func(ws memdb.WatchSet, state *state.Store) error {
|
|
if calls == 0 {
|
|
meta.Index = 3
|
|
|
|
snap, err := s.fsm.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
defer snap.Release()
|
|
|
|
buf := bytes.NewBuffer(nil)
|
|
sink := &MockSink{buf, false}
|
|
if err := snap.Persist(sink); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if err := s.fsm.Restore(sink); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
calls++
|
|
return nil
|
|
}
|
|
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if calls != 1 {
|
|
t.Fatalf("bad: %d", calls)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRPC_ReadyForConsistentReads(t *testing.T) {
|
|
t.Parallel()
|
|
dir, s := testServerWithConfig(t, func(c *Config) {
|
|
c.RPCHoldTimeout = 2 * time.Millisecond
|
|
})
|
|
defer os.RemoveAll(dir)
|
|
defer s.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, s.RPC, "dc1")
|
|
|
|
if !s.isReadyForConsistentReads() {
|
|
t.Fatal("Server should be ready for consistent reads")
|
|
}
|
|
|
|
s.resetConsistentReadReady()
|
|
err := s.consistentRead()
|
|
if err.Error() != "Not ready to serve consistent reads" {
|
|
t.Fatal("Server should NOT be ready for consistent reads")
|
|
}
|
|
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
s.setConsistentReadReady()
|
|
}()
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
if err := s.consistentRead(); err != nil {
|
|
r.Fatalf("Expected server to be ready for consistent reads, got error %v", err)
|
|
}
|
|
})
|
|
}
|