From 88396b966abb66e44f675a5b3501af2e13dc7112 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 19 Dec 2013 12:03:57 -0800 Subject: [PATCH] Move rpc structs into sub-package --- consul/catalog_endpoint.go | 24 +++++++++--------- consul/catalog_endpoint_test.go | 26 +++++++++---------- consul/fsm.go | 30 +++++++++++----------- consul/fsm_test.go | 26 +++++++++---------- consul/rpc.go | 10 ++++---- consul/state_store.go | 40 +++++++++++++++--------------- {rpc => consul/structs}/structs.go | 2 +- 7 files changed, 79 insertions(+), 79 deletions(-) rename {rpc => consul/structs}/structs.go (99%) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 727e2d10f..f538b5b34 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -1,7 +1,7 @@ package consul import ( - "github.com/hashicorp/consul/rpc" + "github.com/hashicorp/consul/consul/structs" ) // Catalog endpoint is used to manipulate the service catalog @@ -10,12 +10,12 @@ type Catalog struct { } // Register is used register that a node is providing a given service. -func (c *Catalog) Register(args *rpc.RegisterRequest, reply *struct{}) error { +func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done { return err } - _, err := c.srv.raftApply(rpc.RegisterRequestType, args) + _, err := c.srv.raftApply(structs.RegisterRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] Register failed: %v", err) return err @@ -24,12 +24,12 @@ func (c *Catalog) Register(args *rpc.RegisterRequest, reply *struct{}) error { } // Deregister is used to remove a service registration for a given node. -func (c *Catalog) Deregister(args *rpc.DeregisterRequest, reply *struct{}) error { +func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done { return err } - _, err := c.srv.raftApply(rpc.DeregisterRequestType, args) + _, err := c.srv.raftApply(structs.DeregisterRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] Deregister failed: %v", err) return err @@ -54,7 +54,7 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { } // ListNodes is used to query the nodes in a DC -func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error { +func (c *Catalog) ListNodes(dc string, reply *structs.Nodes) error { if done, err := c.srv.forward("Catalog.ListNodes", dc, dc, reply); done { return err } @@ -64,9 +64,9 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error { rawNodes := state.Nodes() // Format the response - nodes := rpc.Nodes(make([]rpc.Node, len(rawNodes)/2)) + nodes := structs.Nodes(make([]structs.Node, len(rawNodes)/2)) for i := 0; i < len(rawNodes); i += 2 { - nodes[i] = rpc.Node{rawNodes[i], rawNodes[i+1]} + nodes[i] = structs.Node{rawNodes[i], rawNodes[i+1]} } *reply = nodes @@ -74,7 +74,7 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error { } // ListServices is used to query the services in a DC -func (c *Catalog) ListServices(dc string, reply *rpc.Services) error { +func (c *Catalog) ListServices(dc string, reply *structs.Services) error { if done, err := c.srv.forward("Catalog.ListServices", dc, dc, reply); done { return err } @@ -88,14 +88,14 @@ func (c *Catalog) ListServices(dc string, reply *rpc.Services) error { } // ServiceNodes returns all the nodes registered as part of a service -func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.ServiceNodes) error { +func (c *Catalog) ServiceNodes(args *structs.ServiceNodesRequest, reply *structs.ServiceNodes) error { if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done { return err } // Get the nodes state := c.srv.fsm.State() - var nodes rpc.ServiceNodes + var nodes structs.ServiceNodes if args.TagFilter { nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { @@ -107,7 +107,7 @@ func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.Service } // NodeServices returns all the services registered as part of a node -func (c *Catalog) NodeServices(args *rpc.NodeServicesRequest, reply *rpc.NodeServices) error { +func (c *Catalog) NodeServices(args *structs.NodeServicesRequest, reply *structs.NodeServices) error { if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done { return err } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index c6160aa68..116716781 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -2,8 +2,8 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/rpc" - nrpc "net/rpc" + "github.com/hashicorp/consul/consul/structs" + "net/rpc" "os" "sort" "testing" @@ -17,7 +17,7 @@ func TestCatalogRegister(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - arg := rpc.RegisterRequest{ + arg := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", @@ -64,14 +64,14 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) { time.Sleep(100 * time.Millisecond) // Use the follower as the client - var client *nrpc.Client + var client *rpc.Client if !s1.IsLeader() { client = client1 } else { client = client2 } - arg := rpc.RegisterRequest{ + arg := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", @@ -106,7 +106,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) { // Wait for the leaders time.Sleep(100 * time.Millisecond) - arg := rpc.RegisterRequest{ + arg := structs.RegisterRequest{ Datacenter: "dc2", // SHould forward through s1 Node: "foo", Address: "127.0.0.1", @@ -127,7 +127,7 @@ func TestCatalogDeregister(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - arg := rpc.DeregisterRequest{ + arg := structs.DeregisterRequest{ Datacenter: "dc1", Node: "foo", } @@ -191,7 +191,7 @@ func TestCatalogListNodes(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - var out rpc.Nodes + var out structs.Nodes err := client.Call("Catalog.ListNodes", "dc1", &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) @@ -225,7 +225,7 @@ func TestCatalogListServices(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - var out rpc.Services + var out structs.Services err := client.Call("Catalog.ListServices", "dc1", &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) @@ -260,13 +260,13 @@ func TestCatalogListServiceNodes(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - args := rpc.ServiceNodesRequest{ + args := structs.ServiceNodesRequest{ Datacenter: "dc1", ServiceName: "db", ServiceTag: "slave", TagFilter: false, } - var out rpc.ServiceNodes + var out structs.ServiceNodes err := client.Call("Catalog.ServiceNodes", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) @@ -305,11 +305,11 @@ func TestCatalogNodeServices(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - args := rpc.NodeServicesRequest{ + args := structs.NodeServicesRequest{ Datacenter: "dc1", Node: "foo", } - var out rpc.NodeServices + var out structs.NodeServices err := client.Call("Catalog.NodeServices", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) diff --git a/consul/fsm.go b/consul/fsm.go index bc98726f3..5d56af527 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -2,7 +2,7 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/rpc" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" "github.com/ugorji/go/codec" "io" @@ -43,10 +43,10 @@ func (c *consulFSM) State() *StateStore { } func (c *consulFSM) Apply(buf []byte) interface{} { - switch rpc.MessageType(buf[0]) { - case rpc.RegisterRequestType: + switch structs.MessageType(buf[0]) { + case structs.RegisterRequestType: return c.applyRegister(buf[1:]) - case rpc.DeregisterRequestType: + case structs.DeregisterRequestType: return c.applyDeregister(buf[1:]) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) @@ -54,8 +54,8 @@ func (c *consulFSM) Apply(buf []byte) interface{} { } func (c *consulFSM) applyRegister(buf []byte) interface{} { - var req rpc.RegisterRequest - if err := rpc.Decode(buf, &req); err != nil { + var req structs.RegisterRequest + if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } @@ -70,8 +70,8 @@ func (c *consulFSM) applyRegister(buf []byte) interface{} { } func (c *consulFSM) applyDeregister(buf []byte) interface{} { - var req rpc.DeregisterRequest - if err := rpc.Decode(buf, &req); err != nil { + var req structs.DeregisterRequest + if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } @@ -122,9 +122,9 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } // Decode - switch rpc.MessageType(msgType[0]) { - case rpc.RegisterRequestType: - var req rpc.RegisterRequest + switch structs.MessageType(msgType[0]) { + case structs.RegisterRequestType: + var req structs.RegisterRequest if err := dec.Decode(&req); err != nil { return err } @@ -156,15 +156,15 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { encoder := codec.NewEncoder(sink, &handle) // Register each node - var req rpc.RegisterRequest + var req structs.RegisterRequest for i := 0; i < len(nodes); i += 2 { - req = rpc.RegisterRequest{ + req = structs.RegisterRequest{ Node: nodes[i], Address: nodes[i+1], } // Register the node itself - sink.Write([]byte{byte(rpc.RegisterRequestType)}) + sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { sink.Cancel() return err @@ -177,7 +177,7 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { req.ServiceTag = props.Tag req.ServicePort = props.Port - sink.Write([]byte{byte(rpc.RegisterRequestType)}) + sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { sink.Cancel() return err diff --git a/consul/fsm_test.go b/consul/fsm_test.go index bce056bc8..1765dbaf9 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -2,7 +2,7 @@ package consul import ( "bytes" - "github.com/hashicorp/consul/rpc" + "github.com/hashicorp/consul/consul/structs" "testing" ) @@ -30,12 +30,12 @@ func TestFSM_RegisterNode(t *testing.T) { t.Fatalf("err: %v", err) } - req := rpc.RegisterRequest{ + req := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", } - buf, err := rpc.Encode(rpc.RegisterRequestType, req) + buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -63,7 +63,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { t.Fatalf("err: %v", err) } - req := rpc.RegisterRequest{ + req := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", @@ -71,7 +71,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { ServiceTag: "master", ServicePort: 8000, } - buf, err := rpc.Encode(rpc.RegisterRequestType, req) + buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -99,7 +99,7 @@ func TestFSM_DeregisterService(t *testing.T) { t.Fatalf("err: %v", err) } - req := rpc.RegisterRequest{ + req := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", @@ -107,7 +107,7 @@ func TestFSM_DeregisterService(t *testing.T) { ServiceTag: "master", ServicePort: 8000, } - buf, err := rpc.Encode(rpc.RegisterRequestType, req) + buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -117,12 +117,12 @@ func TestFSM_DeregisterService(t *testing.T) { t.Fatalf("resp: %v", resp) } - dereg := rpc.DeregisterRequest{ + dereg := structs.DeregisterRequest{ Datacenter: "dc1", Node: "foo", ServiceName: "db", } - buf, err = rpc.Encode(rpc.DeregisterRequestType, dereg) + buf, err = structs.Encode(structs.DeregisterRequestType, dereg) if err != nil { t.Fatalf("err: %v", err) } @@ -150,7 +150,7 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("err: %v", err) } - req := rpc.RegisterRequest{ + req := structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", @@ -158,7 +158,7 @@ func TestFSM_DeregisterNode(t *testing.T) { ServiceTag: "master", ServicePort: 8000, } - buf, err := rpc.Encode(rpc.RegisterRequestType, req) + buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -168,11 +168,11 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("resp: %v", resp) } - dereg := rpc.DeregisterRequest{ + dereg := structs.DeregisterRequest{ Datacenter: "dc1", Node: "foo", } - buf, err = rpc.Encode(rpc.DeregisterRequestType, dereg) + buf, err = structs.Encode(structs.DeregisterRequestType, dereg) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/rpc.go b/consul/rpc.go index 2ff6e5959..acfbd5db0 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -2,7 +2,7 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/rpc" + "github.com/hashicorp/consul/consul/structs" "github.com/ugorji/go/codec" "io" "math/rand" @@ -105,7 +105,7 @@ func (s *Server) forward(method, dc string, args interface{}, reply interface{}) func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error { leader := s.raft.Leader() if leader == nil { - return rpc.ErrNoLeader + return structs.ErrNoLeader } return s.connPool.RPC(leader, method, args, reply) } @@ -117,7 +117,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ servers := s.remoteConsuls[dc] if len(servers) == 0 { s.remoteLock.RUnlock() - return rpc.ErrNoDCPath + return structs.ErrNoDCPath } // Select a random addr @@ -131,8 +131,8 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ // raftApply is used to encode a message, run it through raft, and return // the FSM response along with any errors -func (s *Server) raftApply(t rpc.MessageType, msg interface{}) (interface{}, error) { - buf, err := rpc.Encode(t, msg) +func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) { + buf, err := structs.Encode(t, msg) if err != nil { return nil, fmt.Errorf("Failed to encode request: %v", err) } diff --git a/consul/state_store.go b/consul/state_store.go index f1225958b..7003845d1 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -4,15 +4,15 @@ import ( "bytes" "fmt" "github.com/armon/gomdb" - "github.com/hashicorp/consul/rpc" + "github.com/hashicorp/consul/consul/structs" "io/ioutil" "os" ) const ( dbNodes = "nodes" // Maps node -> addr - dbServices = "services" // Maps node||serv -> rpc.NodeService - dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> rpc.ServiceNode + dbServices = "services" // Maps node||serv -> structs.NodeService + dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> structs.ServiceNode ) // The StateStore is responsible for maintaining all the Consul @@ -207,11 +207,11 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error { // Update the service entry key := []byte(fmt.Sprintf("%s||%s", name, service)) - nService := rpc.NodeService{ + nService := structs.NodeService{ Tag: tag, Port: port, } - val, err := rpc.Encode(255, &nService) + val, err := structs.Encode(255, &nService) if err != nil { tx.Abort() return err @@ -232,13 +232,13 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error { // Update the index entry key = []byte(fmt.Sprintf("%s||%s||%s", service, tag, name)) - node := rpc.ServiceNode{ + node := structs.ServiceNode{ Node: name, Address: string(addr), ServiceTag: tag, ServicePort: port, } - val, err = rpc.Encode(255, &node) + val, err = structs.Encode(255, &node) if err != nil { tx.Abort() return err @@ -252,7 +252,7 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error { } // NodeServices is used to return all the services of a given node -func (s *StateStore) NodeServices(name string) rpc.NodeServices { +func (s *StateStore) NodeServices(name string) structs.NodeServices { tx, dbis, err := s.startTxn(true, dbServices) if err != nil { panic(fmt.Errorf("Failed to get node servicess: %v", err)) @@ -262,22 +262,22 @@ func (s *StateStore) NodeServices(name string) rpc.NodeServices { } // filterNodeServices is used to filter the services to a specific node -func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) rpc.NodeServices { +func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) structs.NodeServices { keyPrefix := []byte(fmt.Sprintf("%s||", name)) return parseNodeServices(tx, services, keyPrefix) } // parseNodeServices is used to parse the results of a queryNodeServices -func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) rpc.NodeServices { +func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) structs.NodeServices { // Create the cursor cursor, err := tx.CursorOpen(dbi) if err != nil { panic(fmt.Errorf("Failed to get nodes: %v", err)) } - services := rpc.NodeServices(make(map[string]rpc.NodeService)) + services := structs.NodeServices(make(map[string]structs.NodeService)) var service string - var entry rpc.NodeService + var entry structs.NodeService var key, val []byte first := true @@ -307,7 +307,7 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) rpc.NodeServices if val[0] != 255 { panic(fmt.Errorf("Bad service value: %v", val)) } - if err := rpc.Decode(val[1:], &entry); err != nil { + if err := structs.Decode(val[1:], &entry); err != nil { panic(fmt.Errorf("Failed to get node services: %v", err)) } @@ -430,7 +430,7 @@ func (s *StateStore) Services() map[string][]string { } // ServiceNodes returns the nodes associated with a given service -func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes { +func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes { tx, dbis, err := s.startTxn(false, dbServiceIndex) if err != nil { panic(fmt.Errorf("Failed to get node servicess: %v", err)) @@ -441,7 +441,7 @@ func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes { } // ServiceTagNodes returns the nodes associated with a given service matching a tag -func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes { +func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes { tx, dbis, err := s.startTxn(false, dbServiceIndex) if err != nil { panic(fmt.Errorf("Failed to get node servicess: %v", err)) @@ -452,14 +452,14 @@ func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes { } // parseServiceNodes parses results ServiceNodes and ServiceTagNodes -func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) rpc.ServiceNodes { +func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) structs.ServiceNodes { cursor, err := tx.CursorOpen(index) if err != nil { panic(fmt.Errorf("Failed to get node services: %v", err)) } - var nodes rpc.ServiceNodes - var node rpc.ServiceNode + var nodes structs.ServiceNodes + var node structs.ServiceNode for { key, val, err := cursor.Get(nil, mdb.NEXT) if err == mdb.NotFound { @@ -477,7 +477,7 @@ func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) rpc.ServiceNod if val[0] != 255 { panic(fmt.Errorf("Bad service value: %v", val)) } - if err := rpc.Decode(val[1:], &node); err != nil { + if err := structs.Decode(val[1:], &node); err != nil { panic(fmt.Errorf("Failed to get node services: %v", err)) } @@ -525,7 +525,7 @@ func (s *StateSnapshot) Nodes() []string { } // NodeServices is used to return all the services of a given node -func (s *StateSnapshot) NodeServices(name string) rpc.NodeServices { +func (s *StateSnapshot) NodeServices(name string) structs.NodeServices { return filterNodeServices(s.tx, s.dbis[1], name) } diff --git a/rpc/structs.go b/consul/structs/structs.go similarity index 99% rename from rpc/structs.go rename to consul/structs/structs.go index 3bf27ed17..7839be231 100644 --- a/rpc/structs.go +++ b/consul/structs/structs.go @@ -1,4 +1,4 @@ -package rpc +package structs import ( "bytes"