Move rpc structs into sub-package

This commit is contained in:
Armon Dadgar 2013-12-19 12:03:57 -08:00
parent 5aa94a7989
commit 88396b966a
7 changed files with 79 additions and 79 deletions

View File

@ -1,7 +1,7 @@
package consul
import (
"github.com/hashicorp/consul/rpc"
"github.com/hashicorp/consul/consul/structs"
)
// Catalog endpoint is used to manipulate the service catalog
@ -10,12 +10,12 @@ type Catalog struct {
}
// Register is used register that a node is providing a given service.
func (c *Catalog) Register(args *rpc.RegisterRequest, reply *struct{}) error {
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done {
return err
}
_, err := c.srv.raftApply(rpc.RegisterRequestType, args)
_, err := c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil {
c.srv.logger.Printf("[ERR] Register failed: %v", err)
return err
@ -24,12 +24,12 @@ func (c *Catalog) Register(args *rpc.RegisterRequest, reply *struct{}) error {
}
// Deregister is used to remove a service registration for a given node.
func (c *Catalog) Deregister(args *rpc.DeregisterRequest, reply *struct{}) error {
func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error {
if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done {
return err
}
_, err := c.srv.raftApply(rpc.DeregisterRequestType, args)
_, err := c.srv.raftApply(structs.DeregisterRequestType, args)
if err != nil {
c.srv.logger.Printf("[ERR] Deregister failed: %v", err)
return err
@ -54,7 +54,7 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
}
// ListNodes is used to query the nodes in a DC
func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error {
func (c *Catalog) ListNodes(dc string, reply *structs.Nodes) error {
if done, err := c.srv.forward("Catalog.ListNodes", dc, dc, reply); done {
return err
}
@ -64,9 +64,9 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error {
rawNodes := state.Nodes()
// Format the response
nodes := rpc.Nodes(make([]rpc.Node, len(rawNodes)/2))
nodes := structs.Nodes(make([]structs.Node, len(rawNodes)/2))
for i := 0; i < len(rawNodes); i += 2 {
nodes[i] = rpc.Node{rawNodes[i], rawNodes[i+1]}
nodes[i] = structs.Node{rawNodes[i], rawNodes[i+1]}
}
*reply = nodes
@ -74,7 +74,7 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error {
}
// ListServices is used to query the services in a DC
func (c *Catalog) ListServices(dc string, reply *rpc.Services) error {
func (c *Catalog) ListServices(dc string, reply *structs.Services) error {
if done, err := c.srv.forward("Catalog.ListServices", dc, dc, reply); done {
return err
}
@ -88,14 +88,14 @@ func (c *Catalog) ListServices(dc string, reply *rpc.Services) error {
}
// ServiceNodes returns all the nodes registered as part of a service
func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.ServiceNodes) error {
func (c *Catalog) ServiceNodes(args *structs.ServiceNodesRequest, reply *structs.ServiceNodes) error {
if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done {
return err
}
// Get the nodes
state := c.srv.fsm.State()
var nodes rpc.ServiceNodes
var nodes structs.ServiceNodes
if args.TagFilter {
nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
@ -107,7 +107,7 @@ func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.Service
}
// NodeServices returns all the services registered as part of a node
func (c *Catalog) NodeServices(args *rpc.NodeServicesRequest, reply *rpc.NodeServices) error {
func (c *Catalog) NodeServices(args *structs.NodeServicesRequest, reply *structs.NodeServices) error {
if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done {
return err
}

View File

@ -2,8 +2,8 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/rpc"
nrpc "net/rpc"
"github.com/hashicorp/consul/consul/structs"
"net/rpc"
"os"
"sort"
"testing"
@ -17,7 +17,7 @@ func TestCatalogRegister(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
arg := rpc.RegisterRequest{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
@ -64,14 +64,14 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Use the follower as the client
var client *nrpc.Client
var client *rpc.Client
if !s1.IsLeader() {
client = client1
} else {
client = client2
}
arg := rpc.RegisterRequest{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
@ -106,7 +106,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
// Wait for the leaders
time.Sleep(100 * time.Millisecond)
arg := rpc.RegisterRequest{
arg := structs.RegisterRequest{
Datacenter: "dc2", // SHould forward through s1
Node: "foo",
Address: "127.0.0.1",
@ -127,7 +127,7 @@ func TestCatalogDeregister(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
arg := rpc.DeregisterRequest{
arg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
}
@ -191,7 +191,7 @@ func TestCatalogListNodes(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
var out rpc.Nodes
var out structs.Nodes
err := client.Call("Catalog.ListNodes", "dc1", &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
@ -225,7 +225,7 @@ func TestCatalogListServices(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
var out rpc.Services
var out structs.Services
err := client.Call("Catalog.ListServices", "dc1", &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
@ -260,13 +260,13 @@ func TestCatalogListServiceNodes(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
args := rpc.ServiceNodesRequest{
args := structs.ServiceNodesRequest{
Datacenter: "dc1",
ServiceName: "db",
ServiceTag: "slave",
TagFilter: false,
}
var out rpc.ServiceNodes
var out structs.ServiceNodes
err := client.Call("Catalog.ServiceNodes", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
@ -305,11 +305,11 @@ func TestCatalogNodeServices(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
args := rpc.NodeServicesRequest{
args := structs.NodeServicesRequest{
Datacenter: "dc1",
Node: "foo",
}
var out rpc.NodeServices
var out structs.NodeServices
err := client.Call("Catalog.NodeServices", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)

View File

@ -2,7 +2,7 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/rpc"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"github.com/ugorji/go/codec"
"io"
@ -43,10 +43,10 @@ func (c *consulFSM) State() *StateStore {
}
func (c *consulFSM) Apply(buf []byte) interface{} {
switch rpc.MessageType(buf[0]) {
case rpc.RegisterRequestType:
switch structs.MessageType(buf[0]) {
case structs.RegisterRequestType:
return c.applyRegister(buf[1:])
case rpc.DeregisterRequestType:
case structs.DeregisterRequestType:
return c.applyDeregister(buf[1:])
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
@ -54,8 +54,8 @@ func (c *consulFSM) Apply(buf []byte) interface{} {
}
func (c *consulFSM) applyRegister(buf []byte) interface{} {
var req rpc.RegisterRequest
if err := rpc.Decode(buf, &req); err != nil {
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
@ -70,8 +70,8 @@ func (c *consulFSM) applyRegister(buf []byte) interface{} {
}
func (c *consulFSM) applyDeregister(buf []byte) interface{} {
var req rpc.DeregisterRequest
if err := rpc.Decode(buf, &req); err != nil {
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
@ -122,9 +122,9 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}
// Decode
switch rpc.MessageType(msgType[0]) {
case rpc.RegisterRequestType:
var req rpc.RegisterRequest
switch structs.MessageType(msgType[0]) {
case structs.RegisterRequestType:
var req structs.RegisterRequest
if err := dec.Decode(&req); err != nil {
return err
}
@ -156,15 +156,15 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
encoder := codec.NewEncoder(sink, &handle)
// Register each node
var req rpc.RegisterRequest
var req structs.RegisterRequest
for i := 0; i < len(nodes); i += 2 {
req = rpc.RegisterRequest{
req = structs.RegisterRequest{
Node: nodes[i],
Address: nodes[i+1],
}
// Register the node itself
sink.Write([]byte{byte(rpc.RegisterRequestType)})
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
@ -177,7 +177,7 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
req.ServiceTag = props.Tag
req.ServicePort = props.Port
sink.Write([]byte{byte(rpc.RegisterRequestType)})
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err

View File

@ -2,7 +2,7 @@ package consul
import (
"bytes"
"github.com/hashicorp/consul/rpc"
"github.com/hashicorp/consul/consul/structs"
"testing"
)
@ -30,12 +30,12 @@ func TestFSM_RegisterNode(t *testing.T) {
t.Fatalf("err: %v", err)
}
req := rpc.RegisterRequest{
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -63,7 +63,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
t.Fatalf("err: %v", err)
}
req := rpc.RegisterRequest{
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
@ -71,7 +71,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
ServiceTag: "master",
ServicePort: 8000,
}
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -99,7 +99,7 @@ func TestFSM_DeregisterService(t *testing.T) {
t.Fatalf("err: %v", err)
}
req := rpc.RegisterRequest{
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
@ -107,7 +107,7 @@ func TestFSM_DeregisterService(t *testing.T) {
ServiceTag: "master",
ServicePort: 8000,
}
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -117,12 +117,12 @@ func TestFSM_DeregisterService(t *testing.T) {
t.Fatalf("resp: %v", resp)
}
dereg := rpc.DeregisterRequest{
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
ServiceName: "db",
}
buf, err = rpc.Encode(rpc.DeregisterRequestType, dereg)
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -150,7 +150,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("err: %v", err)
}
req := rpc.RegisterRequest{
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
@ -158,7 +158,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
ServiceTag: "master",
ServicePort: 8000,
}
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -168,11 +168,11 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("resp: %v", resp)
}
dereg := rpc.DeregisterRequest{
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
}
buf, err = rpc.Encode(rpc.DeregisterRequestType, dereg)
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -2,7 +2,7 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/rpc"
"github.com/hashicorp/consul/consul/structs"
"github.com/ugorji/go/codec"
"io"
"math/rand"
@ -105,7 +105,7 @@ func (s *Server) forward(method, dc string, args interface{}, reply interface{})
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
leader := s.raft.Leader()
if leader == nil {
return rpc.ErrNoLeader
return structs.ErrNoLeader
}
return s.connPool.RPC(leader, method, args, reply)
}
@ -117,7 +117,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
servers := s.remoteConsuls[dc]
if len(servers) == 0 {
s.remoteLock.RUnlock()
return rpc.ErrNoDCPath
return structs.ErrNoDCPath
}
// Select a random addr
@ -131,8 +131,8 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t rpc.MessageType, msg interface{}) (interface{}, error) {
buf, err := rpc.Encode(t, msg)
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
buf, err := structs.Encode(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}

View File

@ -4,15 +4,15 @@ import (
"bytes"
"fmt"
"github.com/armon/gomdb"
"github.com/hashicorp/consul/rpc"
"github.com/hashicorp/consul/consul/structs"
"io/ioutil"
"os"
)
const (
dbNodes = "nodes" // Maps node -> addr
dbServices = "services" // Maps node||serv -> rpc.NodeService
dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> rpc.ServiceNode
dbServices = "services" // Maps node||serv -> structs.NodeService
dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> structs.ServiceNode
)
// The StateStore is responsible for maintaining all the Consul
@ -207,11 +207,11 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
// Update the service entry
key := []byte(fmt.Sprintf("%s||%s", name, service))
nService := rpc.NodeService{
nService := structs.NodeService{
Tag: tag,
Port: port,
}
val, err := rpc.Encode(255, &nService)
val, err := structs.Encode(255, &nService)
if err != nil {
tx.Abort()
return err
@ -232,13 +232,13 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
// Update the index entry
key = []byte(fmt.Sprintf("%s||%s||%s", service, tag, name))
node := rpc.ServiceNode{
node := structs.ServiceNode{
Node: name,
Address: string(addr),
ServiceTag: tag,
ServicePort: port,
}
val, err = rpc.Encode(255, &node)
val, err = structs.Encode(255, &node)
if err != nil {
tx.Abort()
return err
@ -252,7 +252,7 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
}
// NodeServices is used to return all the services of a given node
func (s *StateStore) NodeServices(name string) rpc.NodeServices {
func (s *StateStore) NodeServices(name string) structs.NodeServices {
tx, dbis, err := s.startTxn(true, dbServices)
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
@ -262,22 +262,22 @@ func (s *StateStore) NodeServices(name string) rpc.NodeServices {
}
// filterNodeServices is used to filter the services to a specific node
func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) rpc.NodeServices {
func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) structs.NodeServices {
keyPrefix := []byte(fmt.Sprintf("%s||", name))
return parseNodeServices(tx, services, keyPrefix)
}
// parseNodeServices is used to parse the results of a queryNodeServices
func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) rpc.NodeServices {
func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) structs.NodeServices {
// Create the cursor
cursor, err := tx.CursorOpen(dbi)
if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
services := rpc.NodeServices(make(map[string]rpc.NodeService))
services := structs.NodeServices(make(map[string]structs.NodeService))
var service string
var entry rpc.NodeService
var entry structs.NodeService
var key, val []byte
first := true
@ -307,7 +307,7 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) rpc.NodeServices
if val[0] != 255 {
panic(fmt.Errorf("Bad service value: %v", val))
}
if err := rpc.Decode(val[1:], &entry); err != nil {
if err := structs.Decode(val[1:], &entry); err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
@ -430,7 +430,7 @@ func (s *StateStore) Services() map[string][]string {
}
// ServiceNodes returns the nodes associated with a given service
func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes {
func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes {
tx, dbis, err := s.startTxn(false, dbServiceIndex)
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
@ -441,7 +441,7 @@ func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes {
}
// ServiceTagNodes returns the nodes associated with a given service matching a tag
func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes {
func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes {
tx, dbis, err := s.startTxn(false, dbServiceIndex)
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
@ -452,14 +452,14 @@ func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes {
}
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) rpc.ServiceNodes {
func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) structs.ServiceNodes {
cursor, err := tx.CursorOpen(index)
if err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
var nodes rpc.ServiceNodes
var node rpc.ServiceNode
var nodes structs.ServiceNodes
var node structs.ServiceNode
for {
key, val, err := cursor.Get(nil, mdb.NEXT)
if err == mdb.NotFound {
@ -477,7 +477,7 @@ func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) rpc.ServiceNod
if val[0] != 255 {
panic(fmt.Errorf("Bad service value: %v", val))
}
if err := rpc.Decode(val[1:], &node); err != nil {
if err := structs.Decode(val[1:], &node); err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
@ -525,7 +525,7 @@ func (s *StateSnapshot) Nodes() []string {
}
// NodeServices is used to return all the services of a given node
func (s *StateSnapshot) NodeServices(name string) rpc.NodeServices {
func (s *StateSnapshot) NodeServices(name string) structs.NodeServices {
return filterNodeServices(s.tx, s.dbis[1], name)
}

View File

@ -1,4 +1,4 @@
package rpc
package structs
import (
"bytes"