consul: initial pass at refactoring RPC using net-rpc-msgpackrpc
This commit is contained in:
parent
5e49ba044e
commit
33741f9156
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestACLEndpoint_Apply(t *testing.T) {
|
||||
|
@ -17,10 +18,10 @@ func TestACLEndpoint_Apply(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -32,7 +33,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -56,7 +57,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
|
|||
// Do a delete
|
||||
arg.Op = structs.ACLDelete
|
||||
arg.ACL.ID = out
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -77,10 +78,10 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -92,7 +93,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -112,7 +113,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
|
|||
// Do an update
|
||||
arg.ACL.ID = out
|
||||
arg.ACL.Rules = `{"key": {"": {"policy": "deny"}}}`
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -134,7 +135,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
|
|||
// Do a delete
|
||||
arg.Op = structs.ACLDelete
|
||||
arg.ACL.Rules = ""
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -155,10 +156,10 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -171,7 +172,7 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out != "foobarbaz" {
|
||||
|
@ -201,10 +202,10 @@ func TestACLEndpoint_Apply_Denied(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -215,7 +216,7 @@ func TestACLEndpoint_Apply_Denied(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out string
|
||||
err := client.Call("ACL.Apply", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -228,10 +229,10 @@ func TestACLEndpoint_Apply_DeleteAnon(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -244,7 +245,7 @@ func TestACLEndpoint_Apply_DeleteAnon(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
err := client.Call("ACL.Apply", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), "delete anonymous") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -257,10 +258,10 @@ func TestACLEndpoint_Apply_RootChange(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -273,7 +274,7 @@ func TestACLEndpoint_Apply_RootChange(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
err := client.Call("ACL.Apply", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), "root ACL") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -286,10 +287,10 @@ func TestACLEndpoint_Get(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -301,7 +302,7 @@ func TestACLEndpoint_Get(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -310,7 +311,7 @@ func TestACLEndpoint_Get(t *testing.T) {
|
|||
ACL: out,
|
||||
}
|
||||
var acls structs.IndexedACLs
|
||||
if err := client.Call("ACL.Get", &getR, &acls); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Get", &getR, &acls); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -333,10 +334,10 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -348,7 +349,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -357,7 +358,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
|
|||
ACL: out,
|
||||
}
|
||||
var acls structs.ACLPolicy
|
||||
if err := client.Call("ACL.GetPolicy", &getR, &acls); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &acls); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -371,7 +372,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
|
|||
// Do a conditional lookup with etag
|
||||
getR.ETag = acls.ETag
|
||||
var out2 structs.ACLPolicy
|
||||
if err := client.Call("ACL.GetPolicy", &getR, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -390,10 +391,10 @@ func TestACLEndpoint_List(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
ids := []string{}
|
||||
for i := 0; i < 5; i++ {
|
||||
|
@ -407,7 +408,7 @@ func TestACLEndpoint_List(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
ids = append(ids, out)
|
||||
|
@ -418,7 +419,7 @@ func TestACLEndpoint_List(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: "root"},
|
||||
}
|
||||
var acls structs.IndexedACLs
|
||||
if err := client.Call("ACL.List", &getR, &acls); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.List", &getR, &acls); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -450,16 +451,16 @@ func TestACLEndpoint_List_Denied(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
getR := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var acls structs.IndexedACLs
|
||||
err := client.Call("ACL.List", &getR, &acls)
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.List", &getR, &acls)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ func TestACL_Disabled(t *testing.T) {
|
|||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
acl, err := s1.resolveToken("does not exist")
|
||||
if err != nil {
|
||||
|
@ -62,7 +62,7 @@ func TestACL_Authority_NotFound(t *testing.T) {
|
|||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
acl, err := s1.resolveToken("does not exist")
|
||||
if err == nil || err.Error() != aclNotFound {
|
||||
|
@ -83,7 +83,7 @@ func TestACL_Authority_Found(t *testing.T) {
|
|||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a new token
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -97,7 +97,7 @@ func TestACL_Authority_Found(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,7 @@ func TestACL_Authority_Anonymous_Found(t *testing.T) {
|
|||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Resolve the token
|
||||
acl, err := s1.resolveToken("")
|
||||
|
@ -155,7 +155,7 @@ func TestACL_Authority_Master_Found(t *testing.T) {
|
|||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Resolve the token
|
||||
acl, err := s1.resolveToken("foobar")
|
||||
|
@ -183,7 +183,7 @@ func TestACL_Authority_Management(t *testing.T) {
|
|||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Resolve the token
|
||||
acl, err := s1.resolveToken("foobar")
|
||||
|
@ -230,7 +230,7 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {
|
|||
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// find the non-authoritative server
|
||||
var nonAuth *Server
|
||||
|
@ -279,7 +279,7 @@ func TestACL_NonAuthority_Found(t *testing.T) {
|
|||
}, func(err error) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a new token
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -293,7 +293,7 @@ func TestACL_NonAuthority_Found(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -355,7 +355,7 @@ func TestACL_NonAuthority_Management(t *testing.T) {
|
|||
}, func(err error) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// find the non-authoritative server
|
||||
var nonAuth *Server
|
||||
|
@ -412,7 +412,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
|
|||
}, func(err error) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a new token
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -426,7 +426,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -486,7 +486,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
|
|||
}, func(err error) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a new token
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -500,7 +500,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -562,7 +562,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
|
|||
}, func(err error) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a new token
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -576,7 +576,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -637,8 +637,8 @@ func TestACL_MultiDC_Found(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, client.Call, "dc2")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Create a new token
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -652,7 +652,7 @@ func TestACL_MultiDC_Found(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -11,14 +11,15 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestCatalogRegister(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -32,13 +33,13 @@ func TestCatalogRegister(t *testing.T) {
|
|||
}
|
||||
var out struct{}
|
||||
|
||||
err := client.Call("Catalog.Register", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
err := client.Call("Catalog.Register", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
|
||||
return err == nil, err
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -53,10 +54,10 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create the ACL
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -70,7 +71,7 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -88,13 +89,13 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
|
|||
}
|
||||
var outR struct{}
|
||||
|
||||
err := client.Call("Catalog.Register", &argR, &outR)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
argR.Service.Service = "foo"
|
||||
err = client.Call("Catalog.Register", &argR, &outR)
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -104,14 +105,14 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client1 := rpcClient(t, s1)
|
||||
defer client1.Close()
|
||||
codec1 := rpcClient(t, s1)
|
||||
defer codec1.Close()
|
||||
|
||||
dir2, s2 := testServer(t)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
client2 := rpcClient(t, s2)
|
||||
defer client2.Close()
|
||||
codec2 := rpcClient(t, s2)
|
||||
defer codec2.Close()
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
|
@ -120,15 +121,15 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client1.Call, "dc1")
|
||||
testutil.WaitForLeader(t, client2.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
// Use the follower as the client
|
||||
var client *rpc.Client
|
||||
var codec rpc.ClientCodec
|
||||
if !s1.IsLeader() {
|
||||
client = client1
|
||||
codec = codec1
|
||||
} else {
|
||||
client = client2
|
||||
codec = codec2
|
||||
}
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
|
@ -142,7 +143,7 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -151,8 +152,8 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
dir2, s2 := testServerDC(t, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
|
@ -165,7 +166,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc2")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc2", // Should forward through s1
|
||||
|
@ -178,7 +179,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -187,8 +188,8 @@ func TestCatalogDeregister(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -196,14 +197,14 @@ func TestCatalogDeregister(t *testing.T) {
|
|||
}
|
||||
var out struct{}
|
||||
|
||||
err := client.Call("Catalog.Deregister", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.Deregister", &arg, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
if err := client.Call("Catalog.Deregister", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Deregister", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -212,8 +213,8 @@ func TestCatalogListDatacenters(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
dir2, s2 := testServerDC(t, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
|
@ -226,10 +227,10 @@ func TestCatalogListDatacenters(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
var out []string
|
||||
if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListDatacenters", struct{}{}, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -251,25 +252,25 @@ func TestCatalogListNodes(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var out structs.IndexedNodes
|
||||
err := client.Call("Catalog.ListNodes", &args, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
client.Call("Catalog.ListNodes", &args, &out)
|
||||
msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
|
||||
return len(out.Nodes) == 2, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -291,14 +292,14 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client1 := rpcClient(t, s1)
|
||||
defer client1.Close()
|
||||
codec1 := rpcClient(t, s1)
|
||||
defer codec1.Close()
|
||||
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
client2 := rpcClient(t, s2)
|
||||
defer client2.Close()
|
||||
codec2 := rpcClient(t, s2)
|
||||
defer codec2.Close()
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
|
@ -307,18 +308,18 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client1.Call, "dc1")
|
||||
testutil.WaitForLeader(t, client2.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
// Use the follower as the client
|
||||
var client *rpc.Client
|
||||
var codec rpc.ClientCodec
|
||||
if !s1.IsLeader() {
|
||||
client = client1
|
||||
codec = codec1
|
||||
|
||||
// Inject fake data on the follower!
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
} else {
|
||||
client = client2
|
||||
codec = codec2
|
||||
|
||||
// Inject fake data on the follower!
|
||||
s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
|
@ -329,7 +330,7 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{AllowStale: true},
|
||||
}
|
||||
var out structs.IndexedNodes
|
||||
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -355,14 +356,14 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client1 := rpcClient(t, s1)
|
||||
defer client1.Close()
|
||||
codec1 := rpcClient(t, s1)
|
||||
defer codec1.Close()
|
||||
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
client2 := rpcClient(t, s2)
|
||||
defer client2.Close()
|
||||
codec2 := rpcClient(t, s2)
|
||||
defer codec2.Close()
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
|
@ -371,16 +372,16 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client1.Call, "dc1")
|
||||
testutil.WaitForLeader(t, client2.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
// Use the leader as the client, kill the follower
|
||||
var client *rpc.Client
|
||||
var codec rpc.ClientCodec
|
||||
if s1.IsLeader() {
|
||||
client = client1
|
||||
codec = codec1
|
||||
s2.Shutdown()
|
||||
} else {
|
||||
client = client2
|
||||
codec = codec2
|
||||
s1.Shutdown()
|
||||
}
|
||||
|
||||
|
@ -389,7 +390,7 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{RequireConsistent: true},
|
||||
}
|
||||
var out structs.IndexedNodes
|
||||
if err := client.Call("Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -405,14 +406,14 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client1 := rpcClient(t, s1)
|
||||
defer client1.Close()
|
||||
codec1 := rpcClient(t, s1)
|
||||
defer codec1.Close()
|
||||
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
client2 := rpcClient(t, s2)
|
||||
defer client2.Close()
|
||||
codec2 := rpcClient(t, s2)
|
||||
defer codec2.Close()
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
|
@ -421,15 +422,15 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client1.Call, "dc1")
|
||||
testutil.WaitForLeader(t, client2.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
// Use the leader as the client, kill the follower
|
||||
var client *rpc.Client
|
||||
var codec rpc.ClientCodec
|
||||
if s1.IsLeader() {
|
||||
client = client1
|
||||
codec = codec1
|
||||
} else {
|
||||
client = client2
|
||||
codec = codec2
|
||||
}
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
|
@ -437,7 +438,7 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{RequireConsistent: true},
|
||||
}
|
||||
var out structs.IndexedNodes
|
||||
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -453,8 +454,8 @@ func BenchmarkCatalogListNodes(t *testing.B) {
|
|||
dir1, s1 := testServer(nil)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(nil, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(nil, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
|
@ -464,7 +465,7 @@ func BenchmarkCatalogListNodes(t *testing.B) {
|
|||
}
|
||||
for i := 0; i < t.N; i++ {
|
||||
var out structs.IndexedNodes
|
||||
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -474,25 +475,25 @@ func TestCatalogListServices(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var out structs.IndexedServices
|
||||
err := client.Call("Catalog.ListServices", &args, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
|
||||
|
||||
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -520,18 +521,18 @@ func TestCatalogListServices_Blocking(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var out structs.IndexedServices
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Run the query
|
||||
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -549,7 +550,7 @@ func TestCatalogListServices_Blocking(t *testing.T) {
|
|||
|
||||
// Re-run the query
|
||||
out = structs.IndexedServices{}
|
||||
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -573,18 +574,18 @@ func TestCatalogListServices_Timeout(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var out structs.IndexedServices
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Run the query
|
||||
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -595,7 +596,7 @@ func TestCatalogListServices_Timeout(t *testing.T) {
|
|||
// Re-run the query
|
||||
start := time.Now()
|
||||
out = structs.IndexedServices{}
|
||||
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -614,8 +615,8 @@ func TestCatalogListServices_Stale(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -628,7 +629,7 @@ func TestCatalogListServices_Stale(t *testing.T) {
|
|||
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
|
||||
|
||||
// Run the query, do not wait for leader!
|
||||
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -647,8 +648,8 @@ func TestCatalogListServiceNodes(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -657,18 +658,18 @@ func TestCatalogListServiceNodes(t *testing.T) {
|
|||
TagFilter: false,
|
||||
}
|
||||
var out structs.IndexedServiceNodes
|
||||
err := client.Call("Catalog.ServiceNodes", &args, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
|
||||
|
||||
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -680,7 +681,7 @@ func TestCatalogListServiceNodes(t *testing.T) {
|
|||
args.TagFilter = true
|
||||
out = structs.IndexedServiceNodes{}
|
||||
|
||||
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(out.ServiceNodes) != 0 {
|
||||
|
@ -692,27 +693,27 @@ func TestCatalogNodeServices(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
args := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
}
|
||||
var out structs.IndexedNodeServices
|
||||
err := client.Call("Catalog.NodeServices", &args, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
|
||||
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80, false})
|
||||
|
||||
if err := client.Call("Catalog.NodeServices", &args, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -736,8 +737,8 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -751,14 +752,14 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
|
|||
}
|
||||
var out struct{}
|
||||
|
||||
err := client.Call("Catalog.Register", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -768,7 +769,7 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
|
|||
ServiceName: "web",
|
||||
}
|
||||
var out2 structs.IndexedServiceNodes
|
||||
if err := client.Call("Catalog.ServiceNodes", query, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", query, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -778,15 +779,15 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *rpc.Client) {
|
||||
func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, codec rpc.ClientCodec) {
|
||||
dir, srv = testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
|
||||
client = rpcClient(t, srv)
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
codec = rpcClient(t, srv)
|
||||
testutil.WaitForLeader(t, srv.RPC, "dc1")
|
||||
|
||||
// Create a new token
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -799,7 +800,7 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *
|
|||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := client.Call("ACL.Apply", &arg, &token); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -820,7 +821,7 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *
|
|||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := client.Call("Catalog.Register", ®Arg, nil); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
|
@ -840,24 +841,24 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *
|
|||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := client.Call("Catalog.Register", ®Arg, nil); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func TestCatalog_ListServices_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedServices{}
|
||||
if err := client.Call("Catalog.ListServices", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if _, ok := reply.Services["foo"]; !ok {
|
||||
|
@ -869,10 +870,10 @@ func TestCatalog_ListServices_FilterACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -880,7 +881,7 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedServiceNodes{}
|
||||
if err := client.Call("Catalog.ServiceNodes", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
found := false
|
||||
|
@ -901,7 +902,7 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply = structs.IndexedServiceNodes{}
|
||||
if err := client.Call("Catalog.ServiceNodes", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
for _, sn := range reply.ServiceNodes {
|
||||
|
@ -912,10 +913,10 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCatalog_NodeServices_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -923,7 +924,7 @@ func TestCatalog_NodeServices_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedNodeServices{}
|
||||
if err := client.Call("Catalog.NodeServices", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
found := false
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
|
@ -320,13 +321,13 @@ func TestClientServer_UserEvent(t *testing.T) {
|
|||
})
|
||||
|
||||
// Fire the user event
|
||||
client := rpcClient(t, s1)
|
||||
codec := rpcClient(t, s1)
|
||||
event := structs.EventFireRequest{
|
||||
Name: "foo",
|
||||
Datacenter: "dc1",
|
||||
Payload: []byte("baz"),
|
||||
}
|
||||
if err := client.Call("Internal.EventFire", &event, nil); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,8 @@ import (
|
|||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
var msgpackHandle = &codec.MsgpackHandle{}
|
||||
|
||||
// consulFSM implements a finite state machine that is used
|
||||
// along with Raft to provide strong consistency. We implement
|
||||
// this outside the Server to avoid exposing this outside the package.
|
||||
|
|
|
@ -1,20 +1,22 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestHealth_ChecksInState(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -26,7 +28,7 @@ func TestHealth_ChecksInState(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -35,7 +37,7 @@ func TestHealth_ChecksInState(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
State: structs.HealthPassing,
|
||||
}
|
||||
if err := client.Call("Health.ChecksInState", &inState, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -57,10 +59,10 @@ func TestHealth_NodeChecks(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -72,7 +74,7 @@ func TestHealth_NodeChecks(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -81,7 +83,7 @@ func TestHealth_NodeChecks(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
}
|
||||
if err := client.Call("Health.NodeChecks", &node, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &node, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -98,10 +100,10 @@ func TestHealth_ServiceChecks(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -118,7 +120,7 @@ func TestHealth_ServiceChecks(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -127,7 +129,7 @@ func TestHealth_ServiceChecks(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
ServiceName: "db",
|
||||
}
|
||||
if err := client.Call("Health.ServiceChecks", &node, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &node, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -144,10 +146,10 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -165,7 +167,7 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -184,7 +186,7 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
|||
ServiceID: "db",
|
||||
},
|
||||
}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -195,7 +197,7 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
|||
ServiceTag: "master",
|
||||
TagFilter: false,
|
||||
}
|
||||
if err := client.Call("Health.ServiceNodes", &req, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -224,10 +226,10 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHealth_NodeChecks_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -235,7 +237,7 @@ func TestHealth_NodeChecks_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedHealthChecks{}
|
||||
if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
found := false
|
||||
|
@ -253,10 +255,10 @@ func TestHealth_NodeChecks_FilterACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -264,7 +266,7 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedHealthChecks{}
|
||||
if err := client.Call("Health.ServiceChecks", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
found := false
|
||||
|
@ -280,7 +282,7 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
|
|||
|
||||
opt.ServiceName = "bar"
|
||||
reply = structs.IndexedHealthChecks{}
|
||||
if err := client.Call("Health.ServiceChecks", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(reply.HealthChecks) != 0 {
|
||||
|
@ -289,10 +291,10 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -300,7 +302,7 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedCheckServiceNodes{}
|
||||
if err := client.Call("Health.ServiceNodes", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(reply.Nodes) != 1 {
|
||||
|
@ -309,7 +311,7 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
|
|||
|
||||
opt.ServiceName = "bar"
|
||||
reply = structs.IndexedCheckServiceNodes{}
|
||||
if err := client.Call("Health.ServiceNodes", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(reply.Nodes) != 0 {
|
||||
|
@ -318,10 +320,10 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHealth_ChecksInState_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.ChecksInStateRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -329,7 +331,7 @@ func TestHealth_ChecksInState_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedHealthChecks{}
|
||||
if err := client.Call("Health.ChecksInState", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -3,20 +3,22 @@ package consul
|
|||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestInternal_NodeInfo(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -34,7 +36,7 @@ func TestInternal_NodeInfo(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -43,7 +45,7 @@ func TestInternal_NodeInfo(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
}
|
||||
if err := client.Call("Internal.NodeInfo", &req, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -66,10 +68,10 @@ func TestInternal_NodeDump(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -87,7 +89,7 @@ func TestInternal_NodeDump(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -106,7 +108,7 @@ func TestInternal_NodeDump(t *testing.T) {
|
|||
ServiceID: "db",
|
||||
},
|
||||
}
|
||||
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -114,7 +116,7 @@ func TestInternal_NodeDump(t *testing.T) {
|
|||
req := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
if err := client.Call("Internal.NodeDump", &req, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -165,17 +167,17 @@ func TestInternal_KeyringOperation(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
var out structs.KeyringResponses
|
||||
req := structs.KeyringRequest{
|
||||
Operation: structs.KeyringList,
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
if err := client.Call("Internal.KeyringOperation", &req, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -218,7 +220,7 @@ func TestInternal_KeyringOperation(t *testing.T) {
|
|||
req2 := structs.KeyringRequest{
|
||||
Operation: structs.KeyringList,
|
||||
}
|
||||
if err := client.Call("Internal.KeyringOperation", &req2, &out2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req2, &out2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -240,10 +242,10 @@ func TestInternal_KeyringOperation(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestInternal_NodeInfo_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -251,7 +253,7 @@ func TestInternal_NodeInfo_FilterACL(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedNodeDump{}
|
||||
if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
for _, info := range reply.Dump {
|
||||
|
@ -284,17 +286,17 @@ func TestInternal_NodeInfo_FilterACL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestInternal_NodeDump_FilterACL(t *testing.T) {
|
||||
dir, token, srv, client := testACLFilterServer(t)
|
||||
dir, token, srv, codec := testACLFilterServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer client.Close()
|
||||
defer codec.Close()
|
||||
|
||||
opt := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
reply := structs.IndexedNodeDump{}
|
||||
if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
for _, info := range reply.Dump {
|
||||
|
@ -336,10 +338,10 @@ func TestInternal_EventFire_Token(t *testing.T) {
|
|||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
|
||||
client := rpcClient(t, srv)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, srv)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, srv.RPC, "dc1")
|
||||
|
||||
// No token is rejected
|
||||
event := structs.EventFireRequest{
|
||||
|
@ -347,14 +349,14 @@ func TestInternal_EventFire_Token(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
Payload: []byte("nope"),
|
||||
}
|
||||
err := client.Call("Internal.EventFire", &event, nil)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil)
|
||||
if err == nil || err.Error() != permissionDenied {
|
||||
t.Fatalf("bad: %s", err)
|
||||
}
|
||||
|
||||
// Root token is allowed to fire
|
||||
event.Token = "root"
|
||||
err = client.Call("Internal.EventFire", &event, nil)
|
||||
err = msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -8,16 +8,17 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestKVS_Apply(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -29,7 +30,7 @@ func TestKVS_Apply(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -47,7 +48,7 @@ func TestKVS_Apply(t *testing.T) {
|
|||
arg.Op = structs.KVSCAS
|
||||
arg.DirEnt.ModifyIndex = d.ModifyIndex
|
||||
arg.DirEnt.Flags = 43
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -74,10 +75,10 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create the ACL
|
||||
arg := structs.ACLRequest{
|
||||
|
@ -91,7 +92,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -108,7 +109,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: id},
|
||||
}
|
||||
var outR bool
|
||||
err := client.Call("KVS.Apply", &argR, &outR)
|
||||
err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &argR, &outR)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -122,7 +123,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
|
|||
},
|
||||
WriteRequest: structs.WriteRequest{Token: id},
|
||||
}
|
||||
err = client.Call("KVS.Apply", &argR, &outR)
|
||||
err = msgpackrpc.CallWithCodec(codec, "KVS.Apply", &argR, &outR)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -132,10 +133,10 @@ func TestKVS_Get(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -147,7 +148,7 @@ func TestKVS_Get(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -156,7 +157,7 @@ func TestKVS_Get(t *testing.T) {
|
|||
Key: "test",
|
||||
}
|
||||
var dirent structs.IndexedDirEntries
|
||||
if err := client.Call("KVS.Get", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -183,10 +184,10 @@ func TestKVS_Get_ACLDeny(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -199,7 +200,7 @@ func TestKVS_Get_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -208,7 +209,7 @@ func TestKVS_Get_ACLDeny(t *testing.T) {
|
|||
Key: "zip",
|
||||
}
|
||||
var dirent structs.IndexedDirEntries
|
||||
if err := client.Call("KVS.Get", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -224,10 +225,10 @@ func TestKVSEndpoint_List(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
keys := []string{
|
||||
"/test/key1",
|
||||
|
@ -245,7 +246,7 @@ func TestKVSEndpoint_List(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -255,7 +256,7 @@ func TestKVSEndpoint_List(t *testing.T) {
|
|||
Key: "/test",
|
||||
}
|
||||
var dirent structs.IndexedDirEntries
|
||||
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -283,10 +284,10 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
keys := []string{
|
||||
"/test/key1",
|
||||
|
@ -304,7 +305,7 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -314,7 +315,7 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
|
|||
Key: "/test",
|
||||
}
|
||||
var dirent structs.IndexedDirEntries
|
||||
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -326,8 +327,8 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
|
|||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
arg := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSDelete,
|
||||
|
@ -336,14 +337,14 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Re-run the query
|
||||
dirent = structs.IndexedDirEntries{}
|
||||
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -382,10 +383,10 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
keys := []string{
|
||||
"abe",
|
||||
|
@ -406,7 +407,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -422,7 +423,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -433,7 +434,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: id},
|
||||
}
|
||||
var dirent structs.IndexedDirEntries
|
||||
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -462,10 +463,10 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
keys := []string{
|
||||
"/test/key1",
|
||||
|
@ -483,7 +484,7 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -494,7 +495,7 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
|
|||
Seperator: "/",
|
||||
}
|
||||
var dirent structs.IndexedKeyList
|
||||
if err := client.Call("KVS.ListKeys", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -523,10 +524,10 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
keys := []string{
|
||||
"abe",
|
||||
|
@ -547,7 +548,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -563,7 +564,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -575,7 +576,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
|
|||
QueryOptions: structs.QueryOptions{Token: id},
|
||||
}
|
||||
var dirent structs.IndexedKeyList
|
||||
if err := client.Call("KVS.ListKeys", &getR, &dirent); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -597,10 +598,10 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create and invalidate a session with a lock
|
||||
state := s1.fsm.State()
|
||||
|
@ -643,7 +644,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out != false {
|
||||
|
@ -654,7 +655,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
|
|||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Should acquire
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out != true {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
|
@ -28,8 +29,7 @@ func TestLeader_RegisterMember(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
client := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Client should be registered
|
||||
state := s1.fsm.State()
|
||||
|
@ -77,8 +77,7 @@ func TestLeader_FailedMember(t *testing.T) {
|
|||
defer os.RemoveAll(dir2)
|
||||
defer c1.Shutdown()
|
||||
|
||||
client := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
|
@ -212,8 +211,7 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
|
|||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
client := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Register a non-existing member
|
||||
dead := structs.RegisterRequest{
|
||||
|
@ -520,9 +518,9 @@ func TestLeader_ReapTombstones(t *testing.T) {
|
|||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
client := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a KV entry
|
||||
arg := structs.KVSRequest{
|
||||
|
@ -534,13 +532,13 @@ func TestLeader_ReapTombstones(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Delete the KV entry (tombstoned)
|
||||
arg.Op = structs.KVSDelete
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -11,14 +11,11 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/yamux"
|
||||
"github.com/inconshreveable/muxado"
|
||||
)
|
||||
|
||||
// msgpackHandle is a shared handle for encoding/decoding of RPC messages
|
||||
var msgpackHandle = &codec.MsgpackHandle{}
|
||||
|
||||
// muxSession is used to provide an interface for either muxado or yamux
|
||||
type muxSession interface {
|
||||
Open() (net.Conn, error)
|
||||
|
@ -40,12 +37,12 @@ func (w *muxadoWrapper) Close() error {
|
|||
// streamClient is used to wrap a stream with an RPC client
|
||||
type StreamClient struct {
|
||||
stream net.Conn
|
||||
client *rpc.Client
|
||||
codec rpc.ClientCodec
|
||||
}
|
||||
|
||||
func (sc *StreamClient) Close() {
|
||||
sc.stream.Close()
|
||||
sc.client.Close()
|
||||
sc.codec.Close()
|
||||
}
|
||||
|
||||
// Conn is a pooled connection to a Consul server
|
||||
|
@ -88,13 +85,12 @@ func (c *Conn) getClient() (*StreamClient, error) {
|
|||
}
|
||||
|
||||
// Create the RPC client
|
||||
cc := codec.GoRpc.ClientCodec(stream, msgpackHandle)
|
||||
client := rpc.NewClientWithCodec(cc)
|
||||
codec := msgpackrpc.NewClientCodec(stream)
|
||||
|
||||
// Return a new stream client
|
||||
sc := &StreamClient{
|
||||
stream: stream,
|
||||
client: client,
|
||||
codec: codec,
|
||||
}
|
||||
return sc, nil
|
||||
}
|
||||
|
@ -390,7 +386,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg
|
|||
}
|
||||
|
||||
// Make the RPC call
|
||||
err = sc.client.Call(method, args, reply)
|
||||
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
|
||||
if err != nil {
|
||||
sc.Close()
|
||||
p.releaseConn(conn)
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/yamux"
|
||||
"github.com/inconshreveable/muxado"
|
||||
)
|
||||
|
@ -158,7 +158,7 @@ func (s *Server) handleMultiplexV2(conn net.Conn) {
|
|||
// handleConsulConn is used to service a single Consul RPC connection
|
||||
func (s *Server) handleConsulConn(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle)
|
||||
rpcCodec := msgpackrpc.NewServerCodec(conn)
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
|
|
|
@ -2,21 +2,23 @@ package consul
|
|||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestSessionEndpoint_Apply(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
|
@ -30,7 +32,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -54,7 +56,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
|
|||
// Do a delete
|
||||
arg.Op = structs.SessionDestroy
|
||||
arg.Session.ID = out
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -72,10 +74,10 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
|
@ -90,7 +92,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
@ -117,7 +119,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
|
|||
// Do a delete
|
||||
arg.Op = structs.SessionDestroy
|
||||
arg.Session.ID = out
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -135,10 +137,10 @@ func TestSessionEndpoint_Get(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
arg := structs.SessionRequest{
|
||||
|
@ -149,7 +151,7 @@ func TestSessionEndpoint_Get(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -158,7 +160,7 @@ func TestSessionEndpoint_Get(t *testing.T) {
|
|||
Session: out,
|
||||
}
|
||||
var sessions structs.IndexedSessions
|
||||
if err := client.Call("Session.Get", &getR, &sessions); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -178,10 +180,10 @@ func TestSessionEndpoint_List(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
ids := []string{}
|
||||
|
@ -194,7 +196,7 @@ func TestSessionEndpoint_List(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
ids = append(ids, out)
|
||||
|
@ -204,7 +206,7 @@ func TestSessionEndpoint_List(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
}
|
||||
var sessions structs.IndexedSessions
|
||||
if err := client.Call("Session.List", &getR, &sessions); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessions); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -229,10 +231,10 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
arg := structs.SessionRequest{
|
||||
|
@ -244,7 +246,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -256,7 +258,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
|
|||
// Destroy the session
|
||||
arg.Op = structs.SessionDestroy
|
||||
arg.Session.ID = out
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -270,10 +272,10 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
TTL := "10s" // the minimum allowed ttl
|
||||
ttl := 10 * time.Second
|
||||
|
||||
|
@ -289,7 +291,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
ids = append(ids, out)
|
||||
|
@ -305,7 +307,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
|||
}
|
||||
|
||||
var sessions structs.IndexedSessions
|
||||
if err := client.Call("Session.List", &getR, &sessions); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessions); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -339,7 +341,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
|||
Session: ids[i],
|
||||
}
|
||||
var session structs.IndexedSessions
|
||||
if err := client.Call("Session.Renew", &renewR, &session); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -366,7 +368,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
|||
time.Sleep((ttl * structs.SessionTTLMultiplier) * 2.0 / 3.0)
|
||||
|
||||
var sessionsL1 structs.IndexedSessions
|
||||
if err := client.Call("Session.List", &getR, &sessionsL1); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessionsL1); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -400,7 +402,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
|
|||
time.Sleep(ttl * structs.SessionTTLMultiplier)
|
||||
|
||||
var sessionsL2 structs.IndexedSessions
|
||||
if err := client.Call("Session.List", &getR, &sessionsL2); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessionsL2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -430,10 +432,10 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"bar", "127.0.0.1"})
|
||||
|
@ -450,7 +452,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
|
|||
arg.Session.Node = "foo"
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if i < 5 {
|
||||
|
@ -463,7 +465,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
|
|||
Node: "foo",
|
||||
}
|
||||
var sessions structs.IndexedSessions
|
||||
if err := client.Call("Session.NodeSessions", &getR, &sessions); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &getR, &sessions); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -488,10 +490,10 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.SessionRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -506,18 +508,18 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
|
|||
arg.Session.TTL = "10z"
|
||||
|
||||
var out string
|
||||
err := client.Call("Session.Apply", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
if err.Error() != "Session TTL '10z' invalid: time: unknown unit z in duration 10z" {
|
||||
if !strings.Contains(err.Error(), "Session TTL '10z' invalid: time: unknown unit z in duration 10z") {
|
||||
t.Fatalf("incorrect error message: %s", err.Error())
|
||||
}
|
||||
|
||||
// less than SessionTTLMin
|
||||
arg.Session.TTL = "5s"
|
||||
|
||||
err = client.Call("Session.Apply", &arg, &out)
|
||||
err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
|
@ -528,7 +530,7 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
|
|||
// more than SessionTTLMax
|
||||
arg.Session.TTL = "4000s"
|
||||
|
||||
err = client.Call("Session.Apply", &arg, &out)
|
||||
err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestInitializeSessionTimers(t *testing.T) {
|
||||
|
@ -310,8 +311,8 @@ func TestServer_SessionTTL_Failover(t *testing.T) {
|
|||
t.Fatalf("Should have a leader")
|
||||
}
|
||||
|
||||
client := rpcClient(t, leader)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, leader)
|
||||
defer codec.Close()
|
||||
|
||||
// Register a node
|
||||
node := structs.RegisterRequest{
|
||||
|
@ -334,7 +335,7 @@ func TestServer_SessionTTL_Failover(t *testing.T) {
|
|||
},
|
||||
}
|
||||
var id1 string
|
||||
if err := client.Call("Session.Apply", &arg, &id1); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id1); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,47 +1,47 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func rpcClient(t *testing.T, s *Server) *rpc.Client {
|
||||
func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
|
||||
addr := s.config.RPCAddr
|
||||
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Write the Consul RPC byte to set the mode
|
||||
conn.Write([]byte{byte(rpcConsul)})
|
||||
|
||||
cc := codec.GoRpc.ClientCodec(conn, msgpackHandle)
|
||||
return rpc.NewClientWithCodec(cc)
|
||||
return msgpackrpc.NewClientCodec(conn)
|
||||
}
|
||||
|
||||
func TestStatusLeader(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := struct{}{}
|
||||
var leader string
|
||||
if err := client.Call("Status.Leader", arg, &leader); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Status.Leader", arg, &leader); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if leader != "" {
|
||||
t.Fatalf("unexpected leader: %v", leader)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
if err := client.Call("Status.Leader", arg, &leader); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Status.Leader", arg, &leader); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if leader == "" {
|
||||
|
@ -53,12 +53,12 @@ func TestStatusPeers(t *testing.T) {
|
|||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
arg := struct{}{}
|
||||
var peers []string
|
||||
if err := client.Call("Status.Peers", arg, &peers); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Status.Peers", arg, &peers); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(peers) != 1 {
|
||||
|
|
Loading…
Reference in New Issue