consul: sharing the RPC layer between Consul/Raft

This commit is contained in:
Armon Dadgar 2013-12-09 13:13:40 -08:00
parent 9dadd99bcf
commit 7f4adceae8
3 changed files with 146 additions and 18 deletions

95
consul/raft_rpc.go Normal file
View File

@ -0,0 +1,95 @@
package consul
import (
"fmt"
"net"
"sync"
"time"
)
// RaftLayer implements the raft.StreamLayer interface,
// so that we can use a single RPC layer for Raft and Consul
type RaftLayer struct {
// Addr is the listener address to return
addr net.Addr
// connCh is used to accept connections
connCh chan net.Conn
// ConnPool is used to make outbound connections
connPool *ConnPool
// Tracks if we are closed
closed bool
closeCh chan struct{}
closeLock sync.Mutex
}
// NewRaftLayer is used to initialize a new RaftLayer which can
// be used as a StreamLayer for Raft
func NewRaftLayer(addr net.Addr, pool *ConnPool) *RaftLayer {
layer := &RaftLayer{
addr: addr,
connCh: make(chan net.Conn),
connPool: pool,
closeCh: make(chan struct{}),
}
return layer
}
// Handoff is used to hand off a connection to the
// RaftLayer. This allows it to be Accept()'ed
func (l *RaftLayer) Handoff(c net.Conn) error {
select {
case l.connCh <- c:
return nil
case <-l.closeCh:
return fmt.Errorf("Raft RPC layer closed")
}
}
// Accept is used to return connection which are
// dialed to be used with the Raft layer
func (l *RaftLayer) Accept() (net.Conn, error) {
select {
case conn := <-l.connCh:
return conn, nil
case <-l.closeCh:
return nil, fmt.Errorf("Raft RPC layer closed")
}
}
// Close is used to stop listening for Raft connections
func (l *RaftLayer) Close() error {
l.closeLock.Lock()
defer l.closeLock.Unlock()
if !l.closed {
l.closed = true
close(l.closeCh)
}
return nil
}
// Addr is used to return the address of the listener
func (l *RaftLayer) Addr() net.Addr {
return l.addr
}
// Dial is used to create a new outgoing connection
func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
// Get a net.Addr
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
// Use the conn pool
conn, err := l.connPool.Acquire(addr)
if err != nil {
return nil, err
}
// Discard the Conn wrapper
return conn.conn, nil
}

View File

@ -5,6 +5,13 @@ import (
"net"
)
type RPCType byte
const (
rpcConsul RPCType = iota
rpcRaft
)
// listen is used to listen for incoming RPC connections
func (s *Server) listen() {
for {
@ -27,8 +34,34 @@ func (s *Server) listen() {
}
}
// handleConn is used to service a single RPC connection
// handleConn is used to determine if this is a Raft or
// Consul type RPC connection and invoke the correct handler
func (s *Server) handleConn(conn net.Conn) {
// Read a single byte
buf := make([]byte, 1)
if _, err := conn.Read(buf); err != nil {
s.logger.Printf("[ERR] Failed to read byte: %v", err)
conn.Close()
return
}
// Switch on the byte
switch RPCType(buf[0]) {
case rpcConsul:
s.handleConsulConn(conn)
case rpcRaft:
s.raftLayer.Handoff(conn)
default:
s.logger.Printf("[ERR] Unrecognized RPC byte: %v", buf[0])
conn.Close()
return
}
}
// handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) {
defer func() {
conn.Close()
s.rpcClientLock.Lock()

View File

@ -44,9 +44,9 @@ type Server struct {
// The raft instance is used among Consul nodes within the
// DC to protect operations that require strong consistency
raft *raft.Raft
raftStore *raft.SQLiteStore
raftTransport *raft.NetworkTransport
raft *raft.Raft
raftLayer *RaftLayer
raftStore *raft.SQLiteStore
// rpcClients is used to track active clients
rpcClients map[net.Conn]struct{}
@ -88,7 +88,7 @@ func NewServer(config *Config) (*Server, error) {
// Create server
s := &Server{
config: config,
connPool: NewPool(3),
connPool: NewPool(5),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
@ -97,6 +97,12 @@ func NewServer(config *Config) (*Server, error) {
shutdownCh: make(chan struct{}),
}
// Initialize the RPC layer
if err := s.setupRPC(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
}
// Start the Serf listeners to prevent a deadlock
go s.lanEventHandler()
go s.wanEventHandler()
@ -162,6 +168,7 @@ func (s *Server) setupRaft() error {
if err != nil {
return err
}
s.raftStore = store
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, 3)
@ -171,11 +178,7 @@ func (s *Server) setupRaft() error {
}
// Create a transport layer
trans, err := raft.NewTCPTransport(s.config.RaftBindAddr, 3, 10*time.Second)
if err != nil {
store.Close()
return err
}
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second)
// Setup the peer store
peers := raft.NewJSONPeers(path, trans)
@ -184,17 +187,13 @@ func (s *Server) setupRaft() error {
s.fsm = &consulFSM{server: s}
// Setup the Raft store
raft, err := raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots,
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots,
peers, trans)
if err != nil {
store.Close()
trans.Close()
return err
}
s.raft = raft
s.raftStore = store
s.raftTransport = trans
return nil
}
@ -205,6 +204,7 @@ func (s *Server) setupRPC() error {
return err
}
s.rpcListener = list
s.raftLayer = NewRaftLayer(s.rpcListener.Addr(), s.connPool)
go s.listen()
return nil
}
@ -234,11 +234,11 @@ func (s *Server) Shutdown() error {
if s.raft != nil {
s.raft.Shutdown()
s.raftLayer.Close()
s.raftStore.Close()
s.raftTransport.Close()
s.raft = nil
s.raftLayer = nil
s.raftStore = nil
s.raftTransport = nil
}
if s.rpcListener != nil {
@ -288,7 +288,7 @@ func (s *Server) Leave() error {
// Request that we are removed
// TODO: Properly forward to leader
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
future := s.raft.RemovePeer(s.rpcListener.Addr())
// Wait for the future
ch := make(chan error, 1)