From 33741f91567e2c0759d443cb33c9c6d95d2d5d8b Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 13 Oct 2015 16:43:52 -0700 Subject: [PATCH] consul: initial pass at refactoring RPC using net-rpc-msgpackrpc --- consul/acl_endpoint_test.go | 95 ++++++------ consul/acl_test.go | 40 ++--- consul/catalog_endpoint_test.go | 245 ++++++++++++++++--------------- consul/client_test.go | 5 +- consul/fsm.go | 2 + consul/health_endpoint_test.go | 76 +++++----- consul/internal_endpoint_test.go | 60 ++++---- consul/kvs_endpoint_test.go | 115 ++++++++------- consul/leader_test.go | 18 +-- consul/pool.go | 16 +- consul/rpc.go | 4 +- consul/session_endpoint_test.go | 92 ++++++------ consul/session_ttl_test.go | 7 +- consul/status_endpoint_test.go | 28 ++-- 14 files changed, 405 insertions(+), 398 deletions(-) diff --git a/consul/acl_endpoint_test.go b/consul/acl_endpoint_test.go index 11a305a94..f162c90c0 100644 --- a/consul/acl_endpoint_test.go +++ b/consul/acl_endpoint_test.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestACLEndpoint_Apply(t *testing.T) { @@ -17,10 +18,10 @@ func TestACLEndpoint_Apply(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -32,7 +33,7 @@ func TestACLEndpoint_Apply(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -56,7 +57,7 @@ func TestACLEndpoint_Apply(t *testing.T) { // Do a delete arg.Op = structs.ACLDelete arg.ACL.ID = out - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -77,10 +78,10 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -92,7 +93,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -112,7 +113,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) { // Do an update arg.ACL.ID = out arg.ACL.Rules = `{"key": {"": {"policy": "deny"}}}` - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -134,7 +135,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) { // Do a delete arg.Op = structs.ACLDelete arg.ACL.Rules = "" - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -155,10 +156,10 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -171,7 +172,7 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } if out != "foobarbaz" { @@ -201,10 +202,10 @@ func TestACLEndpoint_Apply_Denied(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -215,7 +216,7 @@ func TestACLEndpoint_Apply_Denied(t *testing.T) { }, } var out string - err := client.Call("ACL.Apply", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) } @@ -228,10 +229,10 @@ func TestACLEndpoint_Apply_DeleteAnon(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -244,7 +245,7 @@ func TestACLEndpoint_Apply_DeleteAnon(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - err := client.Call("ACL.Apply", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out) if err == nil || !strings.Contains(err.Error(), "delete anonymous") { t.Fatalf("err: %v", err) } @@ -257,10 +258,10 @@ func TestACLEndpoint_Apply_RootChange(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -273,7 +274,7 @@ func TestACLEndpoint_Apply_RootChange(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - err := client.Call("ACL.Apply", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out) if err == nil || !strings.Contains(err.Error(), "root ACL") { t.Fatalf("err: %v", err) } @@ -286,10 +287,10 @@ func TestACLEndpoint_Get(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -301,7 +302,7 @@ func TestACLEndpoint_Get(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -310,7 +311,7 @@ func TestACLEndpoint_Get(t *testing.T) { ACL: out, } var acls structs.IndexedACLs - if err := client.Call("ACL.Get", &getR, &acls); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Get", &getR, &acls); err != nil { t.Fatalf("err: %v", err) } @@ -333,10 +334,10 @@ func TestACLEndpoint_GetPolicy(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.ACLRequest{ Datacenter: "dc1", @@ -348,7 +349,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -357,7 +358,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) { ACL: out, } var acls structs.ACLPolicy - if err := client.Call("ACL.GetPolicy", &getR, &acls); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &acls); err != nil { t.Fatalf("err: %v", err) } @@ -371,7 +372,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) { // Do a conditional lookup with etag getR.ETag = acls.ETag var out2 structs.ACLPolicy - if err := client.Call("ACL.GetPolicy", &getR, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -390,10 +391,10 @@ func TestACLEndpoint_List(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") ids := []string{} for i := 0; i < 5; i++ { @@ -407,7 +408,7 @@ func TestACLEndpoint_List(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } ids = append(ids, out) @@ -418,7 +419,7 @@ func TestACLEndpoint_List(t *testing.T) { QueryOptions: structs.QueryOptions{Token: "root"}, } var acls structs.IndexedACLs - if err := client.Call("ACL.List", &getR, &acls); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.List", &getR, &acls); err != nil { t.Fatalf("err: %v", err) } @@ -450,16 +451,16 @@ func TestACLEndpoint_List_Denied(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") getR := structs.DCSpecificRequest{ Datacenter: "dc1", } var acls structs.IndexedACLs - err := client.Call("ACL.List", &getR, &acls) + err := msgpackrpc.CallWithCodec(codec, "ACL.List", &getR, &acls) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) } diff --git a/consul/acl_test.go b/consul/acl_test.go index dc7a1b214..898349369 100644 --- a/consul/acl_test.go +++ b/consul/acl_test.go @@ -18,7 +18,7 @@ func TestACL_Disabled(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") acl, err := s1.resolveToken("does not exist") if err != nil { @@ -62,7 +62,7 @@ func TestACL_Authority_NotFound(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") acl, err := s1.resolveToken("does not exist") if err == nil || err.Error() != aclNotFound { @@ -83,7 +83,7 @@ func TestACL_Authority_Found(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create a new token arg := structs.ACLRequest{ @@ -97,7 +97,7 @@ func TestACL_Authority_Found(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var id string - if err := client.Call("ACL.Apply", &arg, &id); err != nil { + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { t.Fatalf("err: %v", err) } @@ -128,7 +128,7 @@ func TestACL_Authority_Anonymous_Found(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Resolve the token acl, err := s1.resolveToken("") @@ -155,7 +155,7 @@ func TestACL_Authority_Master_Found(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Resolve the token acl, err := s1.resolveToken("foobar") @@ -183,7 +183,7 @@ func TestACL_Authority_Management(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Resolve the token acl, err := s1.resolveToken("foobar") @@ -230,7 +230,7 @@ func TestACL_NonAuthority_NotFound(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // find the non-authoritative server var nonAuth *Server @@ -279,7 +279,7 @@ func TestACL_NonAuthority_Found(t *testing.T) { }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create a new token arg := structs.ACLRequest{ @@ -293,7 +293,7 @@ func TestACL_NonAuthority_Found(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var id string - if err := client.Call("ACL.Apply", &arg, &id); err != nil { + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { t.Fatalf("err: %v", err) } @@ -355,7 +355,7 @@ func TestACL_NonAuthority_Management(t *testing.T) { }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // find the non-authoritative server var nonAuth *Server @@ -412,7 +412,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) { }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create a new token arg := structs.ACLRequest{ @@ -426,7 +426,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var id string - if err := client.Call("ACL.Apply", &arg, &id); err != nil { + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { t.Fatalf("err: %v", err) } @@ -486,7 +486,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) { }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create a new token arg := structs.ACLRequest{ @@ -500,7 +500,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var id string - if err := client.Call("ACL.Apply", &arg, &id); err != nil { + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { t.Fatalf("err: %v", err) } @@ -562,7 +562,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) { }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create a new token arg := structs.ACLRequest{ @@ -576,7 +576,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var id string - if err := client.Call("ACL.Apply", &arg, &id); err != nil { + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { t.Fatalf("err: %v", err) } @@ -637,8 +637,8 @@ func TestACL_MultiDC_Found(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") - testutil.WaitForLeader(t, client.Call, "dc2") + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc2") // Create a new token arg := structs.ACLRequest{ @@ -652,7 +652,7 @@ func TestACL_MultiDC_Found(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var id string - if err := client.Call("ACL.Apply", &arg, &id); err != nil { + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 7fc4c8db5..d85dfc4f4 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -11,14 +11,15 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestCatalogRegister(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -32,13 +33,13 @@ func TestCatalogRegister(t *testing.T) { } var out struct{} - err := client.Call("Catalog.Register", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } testutil.WaitForResult(func() (bool, error) { - err := client.Call("Catalog.Register", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) return err == nil, err }, func(err error) { t.Fatalf("err: %v", err) @@ -53,10 +54,10 @@ func TestCatalogRegister_ACLDeny(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create the ACL arg := structs.ACLRequest{ @@ -70,7 +71,7 @@ func TestCatalogRegister_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -88,13 +89,13 @@ func TestCatalogRegister_ACLDeny(t *testing.T) { } var outR struct{} - err := client.Call("Catalog.Register", &argR, &outR) + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) } argR.Service.Service = "foo" - err = client.Call("Catalog.Register", &argR, &outR) + err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR) if err != nil { t.Fatalf("err: %v", err) } @@ -104,14 +105,14 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client1 := rpcClient(t, s1) - defer client1.Close() + codec1 := rpcClient(t, s1) + defer codec1.Close() dir2, s2 := testServer(t) defer os.RemoveAll(dir2) defer s2.Shutdown() - client2 := rpcClient(t, s2) - defer client2.Close() + codec2 := rpcClient(t, s2) + defer codec2.Close() // Try to join addr := fmt.Sprintf("127.0.0.1:%d", @@ -120,15 +121,15 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client1.Call, "dc1") - testutil.WaitForLeader(t, client2.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s2.RPC, "dc1") // Use the follower as the client - var client *rpc.Client + var codec rpc.ClientCodec if !s1.IsLeader() { - client = client1 + codec = codec1 } else { - client = client2 + codec = codec2 } arg := structs.RegisterRequest{ @@ -142,7 +143,7 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -151,8 +152,8 @@ func TestCatalogRegister_ForwardDC(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() dir2, s2 := testServerDC(t, "dc2") defer os.RemoveAll(dir2) @@ -165,7 +166,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc2") + testutil.WaitForLeader(t, s1.RPC, "dc2") arg := structs.RegisterRequest{ Datacenter: "dc2", // Should forward through s1 @@ -178,7 +179,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -187,8 +188,8 @@ func TestCatalogDeregister(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() arg := structs.DeregisterRequest{ Datacenter: "dc1", @@ -196,14 +197,14 @@ func TestCatalogDeregister(t *testing.T) { } var out struct{} - err := client.Call("Catalog.Deregister", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.Deregister", &arg, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") - if err := client.Call("Catalog.Deregister", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Deregister", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -212,8 +213,8 @@ func TestCatalogListDatacenters(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() dir2, s2 := testServerDC(t, "dc2") defer os.RemoveAll(dir2) @@ -226,10 +227,10 @@ func TestCatalogListDatacenters(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") var out []string - if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListDatacenters", struct{}{}, &out); err != nil { t.Fatalf("err: %v", err) } @@ -251,25 +252,25 @@ func TestCatalogListNodes(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.DCSpecificRequest{ Datacenter: "dc1", } var out structs.IndexedNodes - err := client.Call("Catalog.ListNodes", &args, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) testutil.WaitForResult(func() (bool, error) { - client.Call("Catalog.ListNodes", &args, &out) + msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) return len(out.Nodes) == 2, nil }, func(err error) { t.Fatalf("err: %v", err) @@ -291,14 +292,14 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client1 := rpcClient(t, s1) - defer client1.Close() + codec1 := rpcClient(t, s1) + defer codec1.Close() dir2, s2 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir2) defer s2.Shutdown() - client2 := rpcClient(t, s2) - defer client2.Close() + codec2 := rpcClient(t, s2) + defer codec2.Close() // Try to join addr := fmt.Sprintf("127.0.0.1:%d", @@ -307,18 +308,18 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client1.Call, "dc1") - testutil.WaitForLeader(t, client2.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s2.RPC, "dc1") // Use the follower as the client - var client *rpc.Client + var codec rpc.ClientCodec if !s1.IsLeader() { - client = client1 + codec = codec1 // Inject fake data on the follower! s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) } else { - client = client2 + codec = codec2 // Inject fake data on the follower! s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) @@ -329,7 +330,7 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) { QueryOptions: structs.QueryOptions{AllowStale: true}, } var out structs.IndexedNodes - if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -355,14 +356,14 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client1 := rpcClient(t, s1) - defer client1.Close() + codec1 := rpcClient(t, s1) + defer codec1.Close() dir2, s2 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir2) defer s2.Shutdown() - client2 := rpcClient(t, s2) - defer client2.Close() + codec2 := rpcClient(t, s2) + defer codec2.Close() // Try to join addr := fmt.Sprintf("127.0.0.1:%d", @@ -371,16 +372,16 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client1.Call, "dc1") - testutil.WaitForLeader(t, client2.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s2.RPC, "dc1") // Use the leader as the client, kill the follower - var client *rpc.Client + var codec rpc.ClientCodec if s1.IsLeader() { - client = client1 + codec = codec1 s2.Shutdown() } else { - client = client2 + codec = codec2 s1.Shutdown() } @@ -389,7 +390,7 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) { QueryOptions: structs.QueryOptions{RequireConsistent: true}, } var out structs.IndexedNodes - if err := client.Call("Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") { t.Fatalf("err: %v", err) } @@ -405,14 +406,14 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client1 := rpcClient(t, s1) - defer client1.Close() + codec1 := rpcClient(t, s1) + defer codec1.Close() dir2, s2 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir2) defer s2.Shutdown() - client2 := rpcClient(t, s2) - defer client2.Close() + codec2 := rpcClient(t, s2) + defer codec2.Close() // Try to join addr := fmt.Sprintf("127.0.0.1:%d", @@ -421,15 +422,15 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client1.Call, "dc1") - testutil.WaitForLeader(t, client2.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s2.RPC, "dc1") // Use the leader as the client, kill the follower - var client *rpc.Client + var codec rpc.ClientCodec if s1.IsLeader() { - client = client1 + codec = codec1 } else { - client = client2 + codec = codec2 } args := structs.DCSpecificRequest{ @@ -437,7 +438,7 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) { QueryOptions: structs.QueryOptions{RequireConsistent: true}, } var out structs.IndexedNodes - if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -453,8 +454,8 @@ func BenchmarkCatalogListNodes(t *testing.B) { dir1, s1 := testServer(nil) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(nil, s1) - defer client.Close() + codec := rpcClient(nil, s1) + defer codec.Close() // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) @@ -464,7 +465,7 @@ func BenchmarkCatalogListNodes(t *testing.B) { } for i := 0; i < t.N; i++ { var out structs.IndexedNodes - if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -474,25 +475,25 @@ func TestCatalogListServices(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.DCSpecificRequest{ Datacenter: "dc1", } var out structs.IndexedServices - err := client.Call("Catalog.ListServices", &args, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false}) - if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -520,18 +521,18 @@ func TestCatalogListServices_Blocking(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.DCSpecificRequest{ Datacenter: "dc1", } var out structs.IndexedServices - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Run the query - if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -549,7 +550,7 @@ func TestCatalogListServices_Blocking(t *testing.T) { // Re-run the query out = structs.IndexedServices{} - if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -573,18 +574,18 @@ func TestCatalogListServices_Timeout(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.DCSpecificRequest{ Datacenter: "dc1", } var out structs.IndexedServices - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Run the query - if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -595,7 +596,7 @@ func TestCatalogListServices_Timeout(t *testing.T) { // Re-run the query start := time.Now() out = structs.IndexedServices{} - if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -614,8 +615,8 @@ func TestCatalogListServices_Stale(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.DCSpecificRequest{ Datacenter: "dc1", @@ -628,7 +629,7 @@ func TestCatalogListServices_Stale(t *testing.T) { s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false}) // Run the query, do not wait for leader! - if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -647,8 +648,8 @@ func TestCatalogListServiceNodes(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.ServiceSpecificRequest{ Datacenter: "dc1", @@ -657,18 +658,18 @@ func TestCatalogListServiceNodes(t *testing.T) { TagFilter: false, } var out structs.IndexedServiceNodes - err := client.Call("Catalog.ServiceNodes", &args, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false}) - if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -680,7 +681,7 @@ func TestCatalogListServiceNodes(t *testing.T) { args.TagFilter = true out = structs.IndexedServiceNodes{} - if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } if len(out.ServiceNodes) != 0 { @@ -692,27 +693,27 @@ func TestCatalogNodeServices(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: "foo", } var out structs.IndexedNodeServices - err := client.Call("Catalog.NodeServices", &args, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false}) s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80, false}) - if err := client.Call("Catalog.NodeServices", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -736,8 +737,8 @@ func TestCatalogRegister_FailedCase1(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -751,14 +752,14 @@ func TestCatalogRegister_FailedCase1(t *testing.T) { } var out struct{} - err := client.Call("Catalog.Register", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -768,7 +769,7 @@ func TestCatalogRegister_FailedCase1(t *testing.T) { ServiceName: "web", } var out2 structs.IndexedServiceNodes - if err := client.Call("Catalog.ServiceNodes", query, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", query, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -778,15 +779,15 @@ func TestCatalogRegister_FailedCase1(t *testing.T) { } } -func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *rpc.Client) { +func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, codec rpc.ClientCodec) { dir, srv = testServerWithConfig(t, func(c *Config) { c.ACLDatacenter = "dc1" c.ACLMasterToken = "root" c.ACLDefaultPolicy = "deny" }) - client = rpcClient(t, srv) - testutil.WaitForLeader(t, client.Call, "dc1") + codec = rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC, "dc1") // Create a new token arg := structs.ACLRequest{ @@ -799,7 +800,7 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client * }, WriteRequest: structs.WriteRequest{Token: "root"}, } - if err := client.Call("ACL.Apply", &arg, &token); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &token); err != nil { t.Fatalf("err: %v", err) } @@ -820,7 +821,7 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client * }, WriteRequest: structs.WriteRequest{Token: "root"}, } - if err := client.Call("Catalog.Register", ®Arg, nil); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil); err != nil { t.Fatalf("err: %s", err) } @@ -840,24 +841,24 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client * }, WriteRequest: structs.WriteRequest{Token: "root"}, } - if err := client.Call("Catalog.Register", ®Arg, nil); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil); err != nil { t.Fatalf("err: %s", err) } return } func TestCatalog_ListServices_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.DCSpecificRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedServices{} - if err := client.Call("Catalog.ListServices", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } if _, ok := reply.Services["foo"]; !ok { @@ -869,10 +870,10 @@ func TestCatalog_ListServices_FilterACL(t *testing.T) { } func TestCatalog_ServiceNodes_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.ServiceSpecificRequest{ Datacenter: "dc1", @@ -880,7 +881,7 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedServiceNodes{} - if err := client.Call("Catalog.ServiceNodes", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } found := false @@ -901,7 +902,7 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply = structs.IndexedServiceNodes{} - if err := client.Call("Catalog.ServiceNodes", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } for _, sn := range reply.ServiceNodes { @@ -912,10 +913,10 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) { } func TestCatalog_NodeServices_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -923,7 +924,7 @@ func TestCatalog_NodeServices_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedNodeServices{} - if err := client.Call("Catalog.NodeServices", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } found := false diff --git a/consul/client_test.go b/consul/client_test.go index dad717397..02a8db0bc 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/serf" ) @@ -320,13 +321,13 @@ func TestClientServer_UserEvent(t *testing.T) { }) // Fire the user event - client := rpcClient(t, s1) + codec := rpcClient(t, s1) event := structs.EventFireRequest{ Name: "foo", Datacenter: "dc1", Payload: []byte("baz"), } - if err := client.Call("Internal.EventFire", &event, nil); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil); err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/fsm.go b/consul/fsm.go index 7324a72bc..391f3b8b5 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -14,6 +14,8 @@ import ( "github.com/hashicorp/raft" ) +var msgpackHandle = &codec.MsgpackHandle{} + // consulFSM implements a finite state machine that is used // along with Raft to provide strong consistency. We implement // this outside the Server to avoid exposing this outside the package. diff --git a/consul/health_endpoint_test.go b/consul/health_endpoint_test.go index 49e7412d1..110b1d8b0 100644 --- a/consul/health_endpoint_test.go +++ b/consul/health_endpoint_test.go @@ -1,20 +1,22 @@ package consul import ( - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/testutil" "os" "testing" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestHealth_ChecksInState(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -26,7 +28,7 @@ func TestHealth_ChecksInState(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -35,7 +37,7 @@ func TestHealth_ChecksInState(t *testing.T) { Datacenter: "dc1", State: structs.HealthPassing, } - if err := client.Call("Health.ChecksInState", &inState, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -57,10 +59,10 @@ func TestHealth_NodeChecks(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -72,7 +74,7 @@ func TestHealth_NodeChecks(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -81,7 +83,7 @@ func TestHealth_NodeChecks(t *testing.T) { Datacenter: "dc1", Node: "foo", } - if err := client.Call("Health.NodeChecks", &node, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &node, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -98,10 +100,10 @@ func TestHealth_ServiceChecks(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -118,7 +120,7 @@ func TestHealth_ServiceChecks(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -127,7 +129,7 @@ func TestHealth_ServiceChecks(t *testing.T) { Datacenter: "dc1", ServiceName: "db", } - if err := client.Call("Health.ServiceChecks", &node, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &node, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -144,10 +146,10 @@ func TestHealth_ServiceNodes(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -165,7 +167,7 @@ func TestHealth_ServiceNodes(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -184,7 +186,7 @@ func TestHealth_ServiceNodes(t *testing.T) { ServiceID: "db", }, } - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -195,7 +197,7 @@ func TestHealth_ServiceNodes(t *testing.T) { ServiceTag: "master", TagFilter: false, } - if err := client.Call("Health.ServiceNodes", &req, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -224,10 +226,10 @@ func TestHealth_ServiceNodes(t *testing.T) { } func TestHealth_NodeChecks_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -235,7 +237,7 @@ func TestHealth_NodeChecks_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedHealthChecks{} - if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } found := false @@ -253,10 +255,10 @@ func TestHealth_NodeChecks_FilterACL(t *testing.T) { } func TestHealth_ServiceChecks_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.ServiceSpecificRequest{ Datacenter: "dc1", @@ -264,7 +266,7 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedHealthChecks{} - if err := client.Call("Health.ServiceChecks", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } found := false @@ -280,7 +282,7 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) { opt.ServiceName = "bar" reply = structs.IndexedHealthChecks{} - if err := client.Call("Health.ServiceChecks", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } if len(reply.HealthChecks) != 0 { @@ -289,10 +291,10 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) { } func TestHealth_ServiceNodes_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.ServiceSpecificRequest{ Datacenter: "dc1", @@ -300,7 +302,7 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedCheckServiceNodes{} - if err := client.Call("Health.ServiceNodes", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } if len(reply.Nodes) != 1 { @@ -309,7 +311,7 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) { opt.ServiceName = "bar" reply = structs.IndexedCheckServiceNodes{} - if err := client.Call("Health.ServiceNodes", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } if len(reply.Nodes) != 0 { @@ -318,10 +320,10 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) { } func TestHealth_ChecksInState_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.ChecksInStateRequest{ Datacenter: "dc1", @@ -329,7 +331,7 @@ func TestHealth_ChecksInState_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedHealthChecks{} - if err := client.Call("Health.ChecksInState", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/internal_endpoint_test.go b/consul/internal_endpoint_test.go index 5e29a9bc2..77138a5fe 100644 --- a/consul/internal_endpoint_test.go +++ b/consul/internal_endpoint_test.go @@ -3,20 +3,22 @@ package consul import ( "encoding/base64" "fmt" - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/testutil" "os" "testing" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestInternal_NodeInfo(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -34,7 +36,7 @@ func TestInternal_NodeInfo(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -43,7 +45,7 @@ func TestInternal_NodeInfo(t *testing.T) { Datacenter: "dc1", Node: "foo", } - if err := client.Call("Internal.NodeInfo", &req, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -66,10 +68,10 @@ func TestInternal_NodeDump(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.RegisterRequest{ Datacenter: "dc1", @@ -87,7 +89,7 @@ func TestInternal_NodeDump(t *testing.T) { }, } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -106,7 +108,7 @@ func TestInternal_NodeDump(t *testing.T) { ServiceID: "db", }, } - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -114,7 +116,7 @@ func TestInternal_NodeDump(t *testing.T) { req := structs.DCSpecificRequest{ Datacenter: "dc1", } - if err := client.Call("Internal.NodeDump", &req, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -165,17 +167,17 @@ func TestInternal_KeyringOperation(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") var out structs.KeyringResponses req := structs.KeyringRequest{ Operation: structs.KeyringList, Datacenter: "dc1", } - if err := client.Call("Internal.KeyringOperation", &req, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req, &out); err != nil { t.Fatalf("err: %v", err) } @@ -218,7 +220,7 @@ func TestInternal_KeyringOperation(t *testing.T) { req2 := structs.KeyringRequest{ Operation: structs.KeyringList, } - if err := client.Call("Internal.KeyringOperation", &req2, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req2, &out2); err != nil { t.Fatalf("err: %v", err) } @@ -240,10 +242,10 @@ func TestInternal_KeyringOperation(t *testing.T) { } func TestInternal_NodeInfo_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -251,7 +253,7 @@ func TestInternal_NodeInfo_FilterACL(t *testing.T) { QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedNodeDump{} - if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } for _, info := range reply.Dump { @@ -284,17 +286,17 @@ func TestInternal_NodeInfo_FilterACL(t *testing.T) { } func TestInternal_NodeDump_FilterACL(t *testing.T) { - dir, token, srv, client := testACLFilterServer(t) + dir, token, srv, codec := testACLFilterServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() - defer client.Close() + defer codec.Close() opt := structs.DCSpecificRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: token}, } reply := structs.IndexedNodeDump{} - if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil { t.Fatalf("err: %s", err) } for _, info := range reply.Dump { @@ -336,10 +338,10 @@ func TestInternal_EventFire_Token(t *testing.T) { defer os.RemoveAll(dir) defer srv.Shutdown() - client := rpcClient(t, srv) - defer client.Close() + codec := rpcClient(t, srv) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, srv.RPC, "dc1") // No token is rejected event := structs.EventFireRequest{ @@ -347,14 +349,14 @@ func TestInternal_EventFire_Token(t *testing.T) { Datacenter: "dc1", Payload: []byte("nope"), } - err := client.Call("Internal.EventFire", &event, nil) + err := msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil) if err == nil || err.Error() != permissionDenied { t.Fatalf("bad: %s", err) } // Root token is allowed to fire event.Token = "root" - err = client.Call("Internal.EventFire", &event, nil) + err = msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index fef4508e7..cfaee046e 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -8,16 +8,17 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestKVS_Apply(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.KVSRequest{ Datacenter: "dc1", @@ -29,7 +30,7 @@ func TestKVS_Apply(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -47,7 +48,7 @@ func TestKVS_Apply(t *testing.T) { arg.Op = structs.KVSCAS arg.DirEnt.ModifyIndex = d.ModifyIndex arg.DirEnt.Flags = 43 - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -74,10 +75,10 @@ func TestKVS_Apply_ACLDeny(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create the ACL arg := structs.ACLRequest{ @@ -91,7 +92,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -108,7 +109,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: id}, } var outR bool - err := client.Call("KVS.Apply", &argR, &outR) + err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &argR, &outR) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) } @@ -122,7 +123,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) { }, WriteRequest: structs.WriteRequest{Token: id}, } - err = client.Call("KVS.Apply", &argR, &outR) + err = msgpackrpc.CallWithCodec(codec, "KVS.Apply", &argR, &outR) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) } @@ -132,10 +133,10 @@ func TestKVS_Get(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.KVSRequest{ Datacenter: "dc1", @@ -147,7 +148,7 @@ func TestKVS_Get(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -156,7 +157,7 @@ func TestKVS_Get(t *testing.T) { Key: "test", } var dirent structs.IndexedDirEntries - if err := client.Call("KVS.Get", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -183,10 +184,10 @@ func TestKVS_Get_ACLDeny(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.KVSRequest{ Datacenter: "dc1", @@ -199,7 +200,7 @@ func TestKVS_Get_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -208,7 +209,7 @@ func TestKVS_Get_ACLDeny(t *testing.T) { Key: "zip", } var dirent structs.IndexedDirEntries - if err := client.Call("KVS.Get", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -224,10 +225,10 @@ func TestKVSEndpoint_List(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") keys := []string{ "/test/key1", @@ -245,7 +246,7 @@ func TestKVSEndpoint_List(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -255,7 +256,7 @@ func TestKVSEndpoint_List(t *testing.T) { Key: "/test", } var dirent structs.IndexedDirEntries - if err := client.Call("KVS.List", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -283,10 +284,10 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") keys := []string{ "/test/key1", @@ -304,7 +305,7 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -314,7 +315,7 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) { Key: "/test", } var dirent structs.IndexedDirEntries - if err := client.Call("KVS.List", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -326,8 +327,8 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) { start := time.Now() go func() { time.Sleep(100 * time.Millisecond) - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() arg := structs.KVSRequest{ Datacenter: "dc1", Op: structs.KVSDelete, @@ -336,14 +337,14 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } }() // Re-run the query dirent = structs.IndexedDirEntries{} - if err := client.Call("KVS.List", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -382,10 +383,10 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") keys := []string{ "abe", @@ -406,7 +407,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -422,7 +423,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -433,7 +434,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) { QueryOptions: structs.QueryOptions{Token: id}, } var dirent structs.IndexedDirEntries - if err := client.Call("KVS.List", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -462,10 +463,10 @@ func TestKVSEndpoint_ListKeys(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") keys := []string{ "/test/key1", @@ -483,7 +484,7 @@ func TestKVSEndpoint_ListKeys(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -494,7 +495,7 @@ func TestKVSEndpoint_ListKeys(t *testing.T) { Seperator: "/", } var dirent structs.IndexedKeyList - if err := client.Call("KVS.ListKeys", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -523,10 +524,10 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") keys := []string{ "abe", @@ -547,7 +548,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -563,7 +564,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) { WriteRequest: structs.WriteRequest{Token: "root"}, } var out string - if err := client.Call("ACL.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -575,7 +576,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) { QueryOptions: structs.QueryOptions{Token: id}, } var dirent structs.IndexedKeyList - if err := client.Call("KVS.ListKeys", &getR, &dirent); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getR, &dirent); err != nil { t.Fatalf("err: %v", err) } @@ -597,10 +598,10 @@ func TestKVS_Apply_LockDelay(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create and invalidate a session with a lock state := s1.fsm.State() @@ -643,7 +644,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } if out != false { @@ -654,7 +655,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) { time.Sleep(50 * time.Millisecond) // Should acquire - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } if out != true { diff --git a/consul/leader_test.go b/consul/leader_test.go index 20015cb13..4155caaf6 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/serf" ) @@ -28,8 +29,7 @@ func TestLeader_RegisterMember(t *testing.T) { t.Fatalf("err: %v", err) } - client := rpcClient(t, s1) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Client should be registered state := s1.fsm.State() @@ -77,8 +77,7 @@ func TestLeader_FailedMember(t *testing.T) { defer os.RemoveAll(dir2) defer c1.Shutdown() - client := rpcClient(t, s1) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Try to join addr := fmt.Sprintf("127.0.0.1:%d", @@ -212,8 +211,7 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) { defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Register a non-existing member dead := structs.RegisterRequest{ @@ -520,9 +518,9 @@ func TestLeader_ReapTombstones(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() + codec := rpcClient(t, s1) - client := rpcClient(t, s1) - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Create a KV entry arg := structs.KVSRequest{ @@ -534,13 +532,13 @@ func TestLeader_ReapTombstones(t *testing.T) { }, } var out bool - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } // Delete the KV entry (tombstoned) arg.Op = structs.KVSDelete - if err := client.Call("KVS.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/pool.go b/consul/pool.go index b7711aad6..7254ded15 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -11,14 +11,11 @@ import ( "time" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" "github.com/inconshreveable/muxado" ) -// msgpackHandle is a shared handle for encoding/decoding of RPC messages -var msgpackHandle = &codec.MsgpackHandle{} - // muxSession is used to provide an interface for either muxado or yamux type muxSession interface { Open() (net.Conn, error) @@ -40,12 +37,12 @@ func (w *muxadoWrapper) Close() error { // streamClient is used to wrap a stream with an RPC client type StreamClient struct { stream net.Conn - client *rpc.Client + codec rpc.ClientCodec } func (sc *StreamClient) Close() { sc.stream.Close() - sc.client.Close() + sc.codec.Close() } // Conn is a pooled connection to a Consul server @@ -88,13 +85,12 @@ func (c *Conn) getClient() (*StreamClient, error) { } // Create the RPC client - cc := codec.GoRpc.ClientCodec(stream, msgpackHandle) - client := rpc.NewClientWithCodec(cc) + codec := msgpackrpc.NewClientCodec(stream) // Return a new stream client sc := &StreamClient{ stream: stream, - client: client, + codec: codec, } return sc, nil } @@ -390,7 +386,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg } // Make the RPC call - err = sc.client.Call(method, args, reply) + err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply) if err != nil { sc.Close() p.releaseConn(conn) diff --git a/consul/rpc.go b/consul/rpc.go index 37559d263..292f71949 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -11,7 +11,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" "github.com/inconshreveable/muxado" ) @@ -158,7 +158,7 @@ func (s *Server) handleMultiplexV2(conn net.Conn) { // handleConsulConn is used to service a single Consul RPC connection func (s *Server) handleConsulConn(conn net.Conn) { defer conn.Close() - rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle) + rpcCodec := msgpackrpc.NewServerCodec(conn) for { select { case <-s.shutdownCh: diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go index 267d6f194..7355ad7dc 100644 --- a/consul/session_endpoint_test.go +++ b/consul/session_endpoint_test.go @@ -2,21 +2,23 @@ package consul import ( "os" + "strings" "testing" "time" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestSessionEndpoint_Apply(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) @@ -30,7 +32,7 @@ func TestSessionEndpoint_Apply(t *testing.T) { }, } var out string - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -54,7 +56,7 @@ func TestSessionEndpoint_Apply(t *testing.T) { // Do a delete arg.Op = structs.SessionDestroy arg.Session.ID = out - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -72,10 +74,10 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) @@ -90,7 +92,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) { }, } var out string - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } id := out @@ -117,7 +119,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) { // Do a delete arg.Op = structs.SessionDestroy arg.Session.ID = out - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -135,10 +137,10 @@ func TestSessionEndpoint_Get(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) arg := structs.SessionRequest{ @@ -149,7 +151,7 @@ func TestSessionEndpoint_Get(t *testing.T) { }, } var out string - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -158,7 +160,7 @@ func TestSessionEndpoint_Get(t *testing.T) { Session: out, } var sessions structs.IndexedSessions - if err := client.Call("Session.Get", &getR, &sessions); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil { t.Fatalf("err: %v", err) } @@ -178,10 +180,10 @@ func TestSessionEndpoint_List(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) ids := []string{} @@ -194,7 +196,7 @@ func TestSessionEndpoint_List(t *testing.T) { }, } var out string - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } ids = append(ids, out) @@ -204,7 +206,7 @@ func TestSessionEndpoint_List(t *testing.T) { Datacenter: "dc1", } var sessions structs.IndexedSessions - if err := client.Call("Session.List", &getR, &sessions); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessions); err != nil { t.Fatalf("err: %v", err) } @@ -229,10 +231,10 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) arg := structs.SessionRequest{ @@ -244,7 +246,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) { }, } var out string - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -256,7 +258,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) { // Destroy the session arg.Op = structs.SessionDestroy arg.Session.ID = out - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -270,10 +272,10 @@ func TestSessionEndpoint_Renew(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") TTL := "10s" // the minimum allowed ttl ttl := 10 * time.Second @@ -289,7 +291,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { }, } var out string - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } ids = append(ids, out) @@ -305,7 +307,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { } var sessions structs.IndexedSessions - if err := client.Call("Session.List", &getR, &sessions); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessions); err != nil { t.Fatalf("err: %v", err) } @@ -339,7 +341,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { Session: ids[i], } var session structs.IndexedSessions - if err := client.Call("Session.Renew", &renewR, &session); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session); err != nil { t.Fatalf("err: %v", err) } @@ -366,7 +368,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { time.Sleep((ttl * structs.SessionTTLMultiplier) * 2.0 / 3.0) var sessionsL1 structs.IndexedSessions - if err := client.Call("Session.List", &getR, &sessionsL1); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessionsL1); err != nil { t.Fatalf("err: %v", err) } @@ -400,7 +402,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { time.Sleep(ttl * structs.SessionTTLMultiplier) var sessionsL2 structs.IndexedSessions - if err := client.Call("Session.List", &getR, &sessionsL2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessionsL2); err != nil { t.Fatalf("err: %v", err) } @@ -430,10 +432,10 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) s1.fsm.State().EnsureNode(1, structs.Node{"bar", "127.0.0.1"}) @@ -450,7 +452,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) { arg.Session.Node = "foo" } var out string - if err := client.Call("Session.Apply", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } if i < 5 { @@ -463,7 +465,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) { Node: "foo", } var sessions structs.IndexedSessions - if err := client.Call("Session.NodeSessions", &getR, &sessions); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &getR, &sessions); err != nil { t.Fatalf("err: %v", err) } @@ -488,10 +490,10 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.SessionRequest{ Datacenter: "dc1", @@ -506,18 +508,18 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) { arg.Session.TTL = "10z" var out string - err := client.Call("Session.Apply", &arg, &out) + err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out) if err == nil { t.Fatal("expected error") } - if err.Error() != "Session TTL '10z' invalid: time: unknown unit z in duration 10z" { + if !strings.Contains(err.Error(), "Session TTL '10z' invalid: time: unknown unit z in duration 10z") { t.Fatalf("incorrect error message: %s", err.Error()) } // less than SessionTTLMin arg.Session.TTL = "5s" - err = client.Call("Session.Apply", &arg, &out) + err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out) if err == nil { t.Fatal("expected error") } @@ -528,7 +530,7 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) { // more than SessionTTLMax arg.Session.TTL = "4000s" - err = client.Call("Session.Apply", &arg, &out) + err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out) if err == nil { t.Fatal("expected error") } diff --git a/consul/session_ttl_test.go b/consul/session_ttl_test.go index 870e789c5..e732b5d01 100644 --- a/consul/session_ttl_test.go +++ b/consul/session_ttl_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) func TestInitializeSessionTimers(t *testing.T) { @@ -310,8 +311,8 @@ func TestServer_SessionTTL_Failover(t *testing.T) { t.Fatalf("Should have a leader") } - client := rpcClient(t, leader) - defer client.Close() + codec := rpcClient(t, leader) + defer codec.Close() // Register a node node := structs.RegisterRequest{ @@ -334,7 +335,7 @@ func TestServer_SessionTTL_Failover(t *testing.T) { }, } var id1 string - if err := client.Call("Session.Apply", &arg, &id1); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id1); err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/status_endpoint_test.go b/consul/status_endpoint_test.go index 419c174bc..4c2f695ee 100644 --- a/consul/status_endpoint_test.go +++ b/consul/status_endpoint_test.go @@ -1,47 +1,47 @@ package consul import ( - "github.com/hashicorp/consul/testutil" - "github.com/hashicorp/go-msgpack/codec" "net" "net/rpc" "os" "testing" "time" + + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" ) -func rpcClient(t *testing.T, s *Server) *rpc.Client { +func rpcClient(t *testing.T, s *Server) rpc.ClientCodec { addr := s.config.RPCAddr conn, err := net.DialTimeout("tcp", addr.String(), time.Second) if err != nil { t.Fatalf("err: %v", err) } + // Write the Consul RPC byte to set the mode conn.Write([]byte{byte(rpcConsul)}) - - cc := codec.GoRpc.ClientCodec(conn, msgpackHandle) - return rpc.NewClientWithCodec(cc) + return msgpackrpc.NewClientCodec(conn) } func TestStatusLeader(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() arg := struct{}{} var leader string - if err := client.Call("Status.Leader", arg, &leader); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Status.Leader", arg, &leader); err != nil { t.Fatalf("err: %v", err) } if leader != "" { t.Fatalf("unexpected leader: %v", leader) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") - if err := client.Call("Status.Leader", arg, &leader); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Status.Leader", arg, &leader); err != nil { t.Fatalf("err: %v", err) } if leader == "" { @@ -53,12 +53,12 @@ func TestStatusPeers(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() arg := struct{}{} var peers []string - if err := client.Call("Status.Peers", arg, &peers); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Status.Peers", arg, &peers); err != nil { t.Fatalf("err: %v", err) } if len(peers) != 1 {