diff --git a/consul/rpc.go b/consul/rpc.go new file mode 100644 index 000000000..09697d1ac --- /dev/null +++ b/consul/rpc.go @@ -0,0 +1,46 @@ +package consul + +import ( + "github.com/ugorji/go/codec" + "net" +) + +// listen is used to listen for incoming RPC connections +func (s *Server) listen() { + for { + // Accept a connection + conn, err := s.rpcListener.Accept() + if err != nil { + if s.shutdown { + return + } + s.logger.Printf("[ERR] Failed to accept RPC conn: %v", err) + continue + } + + // Track this client + s.rpcClientLock.Lock() + s.rpcClients[conn] = struct{}{} + s.rpcClientLock.Unlock() + + go s.handleConn(conn) + } +} + +// handleConn is used to service a single RPC connection +func (s *Server) handleConn(conn net.Conn) { + defer func() { + conn.Close() + s.rpcClientLock.Lock() + delete(s.rpcClients, conn) + s.rpcClientLock.Unlock() + }() + + rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{}) + for !s.shutdown { + if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { + s.logger.Printf("[ERR] RPC error: %v (%v)", err, conn) + return + } + } +} diff --git a/consul/serf.go b/consul/serf.go new file mode 100644 index 000000000..6ef6a14ad --- /dev/null +++ b/consul/serf.go @@ -0,0 +1,25 @@ +package connsul + +// lanEventHandler is used to handle events from the lan Serf cluster +func (s *Server) lanEventHandler() { + for { + select { + case e := <-s.eventChLAN: + s.logger.Printf("[INFO] LAN Event: %v", e) + case <-s.shutdownCh: + return + } + } +} + +// wanEventHandler is used to handle events from the wan Serf cluster +func (s *Server) wanEventHandler() { + for { + select { + case e := <-s.eventChWAN: + s.logger.Printf("[INFO] WAN Event: %v", e) + case <-s.shutdownCh: + return + } + } +} diff --git a/consul/server.go b/consul/server.go index a0943d7e7..69e8262ce 100644 --- a/consul/server.go +++ b/consul/server.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" - "github.com/ugorji/go/codec" "log" "net" "net/rpc" @@ -252,67 +251,3 @@ func (s *Server) Shutdown() error { return nil } - -// lanEventHandler is used to handle events from the lan Serf cluster -func (s *Server) lanEventHandler() { - for { - select { - case e := <-s.eventChLAN: - s.logger.Printf("[INFO] LAN Event: %v", e) - case <-s.shutdownCh: - return - } - } -} - -// wanEventHandler is used to handle events from the wan Serf cluster -func (s *Server) wanEventHandler() { - for { - select { - case e := <-s.eventChWAN: - s.logger.Printf("[INFO] WAN Event: %v", e) - case <-s.shutdownCh: - return - } - } -} - -// listen is used to listen for incoming RPC connections -func (s *Server) listen() { - for { - // Accept a connection - conn, err := s.rpcListener.Accept() - if err != nil { - if s.shutdown { - return - } - s.logger.Printf("[ERR] Failed to accept RPC conn: %v", err) - continue - } - - // Track this client - s.rpcClientLock.Lock() - s.rpcClients[conn] = struct{}{} - s.rpcClientLock.Unlock() - - go s.handleConn(conn) - } -} - -// handleConn is used to service a single RPC connection -func (s *Server) handleConn(conn net.Conn) { - defer func() { - conn.Close() - s.rpcClientLock.Lock() - delete(s.rpcClients, conn) - s.rpcClientLock.Unlock() - }() - - rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{}) - for !s.shutdown { - if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { - s.logger.Printf("[ERR] RPC error: %v (%v)", err, conn) - return - } - } -}