package consul import ( "bytes" "os" "testing" "time" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestRPC_NoLeader_Fail(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.RPCHoldTimeout = 1 * time.Millisecond }) defer os.RemoveAll(dir1) defer s1.Shutdown() codec := rpcClient(t, s1) defer codec.Close() arg := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", } var out struct{} // Make sure we eventually fail with a no leader error, which we should // see given the short timeout. err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) if err == nil || err.Error() != structs.ErrNoLeader.Error() { t.Fatalf("bad: %v", err) } // Now make sure it goes through. testrpc.WaitForLeader(t, s1.RPC, "dc1") err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) if err != nil { t.Fatalf("bad: %v", err) } } func TestRPC_NoLeader_Retry(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.RPCHoldTimeout = 10 * time.Second }) defer os.RemoveAll(dir1) defer s1.Shutdown() codec := rpcClient(t, s1) defer codec.Close() arg := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", } var out struct{} // This isn't sure-fire but tries to check that we don't have a // leader going into the RPC, so we exercise the retry logic. if ok, _ := s1.getLeader(); ok { t.Fatalf("should not have a leader yet") } // The timeout is long enough to ride out any reasonable leader // election. err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) if err != nil { t.Fatalf("bad: %v", err) } } func TestRPC_blockingQuery(t *testing.T) { dir, s := testServer(t) defer os.RemoveAll(dir) defer s.Shutdown() // Perform a non-blocking query. { var opts structs.QueryOptions var meta structs.QueryMeta var calls int fn := func(ws memdb.WatchSet, state *state.Store) error { calls++ return nil } if err := s.blockingQuery(&opts, &meta, fn); err != nil { t.Fatalf("err: %v", err) } if calls != 1 { t.Fatalf("bad: %d", calls) } } // Perform a blocking query that gets woken up and loops around once. { opts := structs.QueryOptions{ MinQueryIndex: 3, } var meta structs.QueryMeta var calls int fn := func(ws memdb.WatchSet, state *state.Store) error { if calls == 0 { meta.Index = 3 fakeCh := make(chan struct{}) close(fakeCh) ws.Add(fakeCh) } else { meta.Index = 4 } calls++ return nil } if err := s.blockingQuery(&opts, &meta, fn); err != nil { t.Fatalf("err: %v", err) } if calls != 2 { t.Fatalf("bad: %d", calls) } } // Perform a query that blocks and gets interrupted when the state store // is abandoned. { opts := structs.QueryOptions{ MinQueryIndex: 3, } var meta structs.QueryMeta var calls int fn := func(ws memdb.WatchSet, state *state.Store) error { if calls == 0 { meta.Index = 3 snap, err := s.fsm.Snapshot() if err != nil { t.Fatalf("err: %v", err) } defer snap.Release() buf := bytes.NewBuffer(nil) sink := &MockSink{buf, false} if err := snap.Persist(sink); err != nil { t.Fatalf("err: %v", err) } if err := s.fsm.Restore(sink); err != nil { t.Fatalf("err: %v", err) } } calls++ return nil } if err := s.blockingQuery(&opts, &meta, fn); err != nil { t.Fatalf("err: %v", err) } if calls != 1 { t.Fatalf("bad: %d", calls) } } } func TestReadyForConsistentReads(t *testing.T) { dir, s := testServerWithConfig(t, func(c *Config) { c.RPCHoldTimeout = 2 * time.Millisecond }) defer os.RemoveAll(dir) defer s.Shutdown() testrpc.WaitForLeader(t, s.RPC, "dc1") if !s.isReadyForConsistentReads() { t.Fatal("Server should be ready for consistent reads") } s.resetConsistentReadReady() setConsistentFunc := func() { time.Sleep(3 * time.Millisecond) s.setConsistentReadReady() } go setConsistentFunc() //set some time to wait for the goroutine above to finish waitUntil := time.Now().Add(time.Millisecond * 5) err := s.consistentRead() if err.Error() != "Not ready to serve consistent reads" { t.Fatal("Server should NOT be ready for consistent reads") } for time.Now().Before(waitUntil) && err != nil { err = s.consistentRead() } if err != nil { t.Fatalf("Expected server to be ready for consistent reads, got error %v", err) } }