From 7f4adceae86ce542ffaea6ab7c3aee3f691f54b1 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 9 Dec 2013 13:13:40 -0800 Subject: [PATCH] consul: sharing the RPC layer between Consul/Raft --- consul/raft_rpc.go | 95 ++++++++++++++++++++++++++++++++++++++++++++++ consul/rpc.go | 35 ++++++++++++++++- consul/server.go | 34 ++++++++--------- 3 files changed, 146 insertions(+), 18 deletions(-) create mode 100644 consul/raft_rpc.go diff --git a/consul/raft_rpc.go b/consul/raft_rpc.go new file mode 100644 index 000000000..597a3f5eb --- /dev/null +++ b/consul/raft_rpc.go @@ -0,0 +1,95 @@ +package consul + +import ( + "fmt" + "net" + "sync" + "time" +) + +// RaftLayer implements the raft.StreamLayer interface, +// so that we can use a single RPC layer for Raft and Consul +type RaftLayer struct { + // Addr is the listener address to return + addr net.Addr + + // connCh is used to accept connections + connCh chan net.Conn + + // ConnPool is used to make outbound connections + connPool *ConnPool + + // Tracks if we are closed + closed bool + closeCh chan struct{} + closeLock sync.Mutex +} + +// NewRaftLayer is used to initialize a new RaftLayer which can +// be used as a StreamLayer for Raft +func NewRaftLayer(addr net.Addr, pool *ConnPool) *RaftLayer { + layer := &RaftLayer{ + addr: addr, + connCh: make(chan net.Conn), + connPool: pool, + closeCh: make(chan struct{}), + } + return layer +} + +// Handoff is used to hand off a connection to the +// RaftLayer. This allows it to be Accept()'ed +func (l *RaftLayer) Handoff(c net.Conn) error { + select { + case l.connCh <- c: + return nil + case <-l.closeCh: + return fmt.Errorf("Raft RPC layer closed") + } +} + +// Accept is used to return connection which are +// dialed to be used with the Raft layer +func (l *RaftLayer) Accept() (net.Conn, error) { + select { + case conn := <-l.connCh: + return conn, nil + case <-l.closeCh: + return nil, fmt.Errorf("Raft RPC layer closed") + } +} + +// Close is used to stop listening for Raft connections +func (l *RaftLayer) Close() error { + l.closeLock.Lock() + defer l.closeLock.Unlock() + + if !l.closed { + l.closed = true + close(l.closeCh) + } + return nil +} + +// Addr is used to return the address of the listener +func (l *RaftLayer) Addr() net.Addr { + return l.addr +} + +// Dial is used to create a new outgoing connection +func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) { + // Get a net.Addr + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, err + } + + // Use the conn pool + conn, err := l.connPool.Acquire(addr) + if err != nil { + return nil, err + } + + // Discard the Conn wrapper + return conn.conn, nil +} diff --git a/consul/rpc.go b/consul/rpc.go index 09697d1ac..fff6cf2b5 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -5,6 +5,13 @@ import ( "net" ) +type RPCType byte + +const ( + rpcConsul RPCType = iota + rpcRaft +) + // listen is used to listen for incoming RPC connections func (s *Server) listen() { for { @@ -27,8 +34,34 @@ func (s *Server) listen() { } } -// handleConn is used to service a single RPC connection +// handleConn is used to determine if this is a Raft or +// Consul type RPC connection and invoke the correct handler func (s *Server) handleConn(conn net.Conn) { + // Read a single byte + buf := make([]byte, 1) + if _, err := conn.Read(buf); err != nil { + s.logger.Printf("[ERR] Failed to read byte: %v", err) + conn.Close() + return + } + + // Switch on the byte + switch RPCType(buf[0]) { + case rpcConsul: + s.handleConsulConn(conn) + + case rpcRaft: + s.raftLayer.Handoff(conn) + + default: + s.logger.Printf("[ERR] Unrecognized RPC byte: %v", buf[0]) + conn.Close() + return + } +} + +// handleConsulConn is used to service a single Consul RPC connection +func (s *Server) handleConsulConn(conn net.Conn) { defer func() { conn.Close() s.rpcClientLock.Lock() diff --git a/consul/server.go b/consul/server.go index 36f982e55..adf361db5 100644 --- a/consul/server.go +++ b/consul/server.go @@ -44,9 +44,9 @@ type Server struct { // The raft instance is used among Consul nodes within the // DC to protect operations that require strong consistency - raft *raft.Raft - raftStore *raft.SQLiteStore - raftTransport *raft.NetworkTransport + raft *raft.Raft + raftLayer *RaftLayer + raftStore *raft.SQLiteStore // rpcClients is used to track active clients rpcClients map[net.Conn]struct{} @@ -88,7 +88,7 @@ func NewServer(config *Config) (*Server, error) { // Create server s := &Server{ config: config, - connPool: NewPool(3), + connPool: NewPool(5), eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), logger: logger, @@ -97,6 +97,12 @@ func NewServer(config *Config) (*Server, error) { shutdownCh: make(chan struct{}), } + // Initialize the RPC layer + if err := s.setupRPC(); err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start RPC layer: %v", err) + } + // Start the Serf listeners to prevent a deadlock go s.lanEventHandler() go s.wanEventHandler() @@ -162,6 +168,7 @@ func (s *Server) setupRaft() error { if err != nil { return err } + s.raftStore = store // Create the snapshot store snapshots, err := raft.NewFileSnapshotStore(path, 3) @@ -171,11 +178,7 @@ func (s *Server) setupRaft() error { } // Create a transport layer - trans, err := raft.NewTCPTransport(s.config.RaftBindAddr, 3, 10*time.Second) - if err != nil { - store.Close() - return err - } + trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second) // Setup the peer store peers := raft.NewJSONPeers(path, trans) @@ -184,17 +187,13 @@ func (s *Server) setupRaft() error { s.fsm = &consulFSM{server: s} // Setup the Raft store - raft, err := raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots, + s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots, peers, trans) if err != nil { store.Close() trans.Close() return err } - - s.raft = raft - s.raftStore = store - s.raftTransport = trans return nil } @@ -205,6 +204,7 @@ func (s *Server) setupRPC() error { return err } s.rpcListener = list + s.raftLayer = NewRaftLayer(s.rpcListener.Addr(), s.connPool) go s.listen() return nil } @@ -234,11 +234,11 @@ func (s *Server) Shutdown() error { if s.raft != nil { s.raft.Shutdown() + s.raftLayer.Close() s.raftStore.Close() - s.raftTransport.Close() s.raft = nil + s.raftLayer = nil s.raftStore = nil - s.raftTransport = nil } if s.rpcListener != nil { @@ -288,7 +288,7 @@ func (s *Server) Leave() error { // Request that we are removed // TODO: Properly forward to leader - future := s.raft.RemovePeer(s.raftTransport.LocalAddr()) + future := s.raft.RemovePeer(s.rpcListener.Addr()) // Wait for the future ch := make(chan error, 1)