reuse codec.MsgpackHandle

This commit is contained in:
Andrew M Bursavich 2014-06-07 00:59:27 -07:00
parent 7fc5b7a920
commit d209517d50
8 changed files with 19 additions and 25 deletions

View File

@ -60,6 +60,8 @@ const (
monitorExists = "Monitor already exists" monitorExists = "Monitor already exists"
) )
var mh = codec.MsgpackHandle{RawToString: true, WriteExt: true}
// Request header is sent before each request // Request header is sent before each request
type requestHeader struct { type requestHeader struct {
Command string Command string
@ -249,10 +251,8 @@ func (i *AgentRPC) listen() {
reader: bufio.NewReader(conn), reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn), writer: bufio.NewWriter(conn),
} }
client.dec = codec.NewDecoder(client.reader, client.dec = codec.NewDecoder(client.reader, &mh)
&codec.MsgpackHandle{RawToString: true, WriteExt: true}) client.enc = codec.NewEncoder(client.writer, &mh)
client.enc = codec.NewEncoder(client.writer,
&codec.MsgpackHandle{RawToString: true, WriteExt: true})
if err != nil { if err != nil {
i.logger.Printf("[ERR] agent.rpc: Failed to create decoder: %v", err) i.logger.Printf("[ERR] agent.rpc: Failed to create decoder: %v", err)
conn.Close() conn.Close()

View File

@ -94,10 +94,8 @@ func NewRPCClient(addr string) (*RPCClient, error) {
dispatch: make(map[uint64]seqHandler), dispatch: make(map[uint64]seqHandler),
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
client.dec = codec.NewDecoder(client.reader, client.dec = codec.NewDecoder(client.reader, &mh)
&codec.MsgpackHandle{RawToString: true, WriteExt: true}) client.enc = codec.NewEncoder(client.writer, &mh)
client.enc = codec.NewEncoder(client.writer,
&codec.MsgpackHandle{RawToString: true, WriteExt: true})
go client.listen() go client.listen()
// Do the initial handshake // Do the initial handshake

View File

@ -221,8 +221,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
c.state = state c.state = state
// Create a decoder // Create a decoder
var handle codec.MsgpackHandle dec := codec.NewDecoder(old, &mh)
dec := codec.NewDecoder(old, &handle)
// Read in the header // Read in the header
var header snapshotHeader var header snapshotHeader
@ -278,8 +277,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Register the nodes // Register the nodes
handle := codec.MsgpackHandle{} encoder := codec.NewEncoder(sink, &mh)
encoder := codec.NewEncoder(sink, &handle)
// Write the header // Write the header
header := snapshotHeader{ header := snapshotHeader{

View File

@ -19,8 +19,7 @@ type MockData struct {
func MockEncoder(obj interface{}) []byte { func MockEncoder(obj interface{}) []byte {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
handle := codec.MsgpackHandle{} encoder := codec.NewEncoder(buf, &mh)
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(obj) err := encoder.Encode(obj)
if err != nil { if err != nil {
panic(err) panic(err)
@ -30,8 +29,7 @@ func MockEncoder(obj interface{}) []byte {
func MockDecoder(buf []byte) interface{} { func MockDecoder(buf []byte) interface{} {
out := new(MockData) out := new(MockData)
var handle codec.MsgpackHandle err := codec.NewDecoder(bytes.NewReader(buf), &mh).Decode(out)
err := codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -15,6 +15,8 @@ import (
"time" "time"
) )
var mh = codec.MsgpackHandle{}
// muxSession is used to provide an interface for either muxado or yamux // muxSession is used to provide an interface for either muxado or yamux
type muxSession interface { type muxSession interface {
Open() (net.Conn, error) Open() (net.Conn, error)
@ -79,7 +81,7 @@ func (c *Conn) getClient() (*StreamClient, error) {
} }
// Create the RPC client // Create the RPC client
cc := codec.GoRpc.ClientCodec(stream, &codec.MsgpackHandle{}) cc := codec.GoRpc.ClientCodec(stream, &mh)
client := rpc.NewClientWithCodec(cc) client := rpc.NewClientWithCodec(cc)
// Return a new stream client // Return a new stream client

View File

@ -148,7 +148,7 @@ func (s *Server) handleMultiplexV2(conn net.Conn) {
// handleConsulConn is used to service a single Consul RPC connection // handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) { func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close() defer conn.Close()
rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{}) rpcCodec := codec.GoRpc.ServerCodec(conn, &mh)
for !s.shutdown { for !s.shutdown {
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
if err != io.EOF && !strings.Contains(err.Error(), "closed") { if err != io.EOF && !strings.Contains(err.Error(), "closed") {

View File

@ -19,7 +19,7 @@ func rpcClient(t *testing.T, s *Server) *rpc.Client {
// Write the Consul RPC byte to set the mode // Write the Consul RPC byte to set the mode
conn.Write([]byte{byte(rpcConsul)}) conn.Write([]byte{byte(rpcConsul)})
cc := codec.GoRpc.ClientCodec(conn, &codec.MsgpackHandle{}) cc := codec.GoRpc.ClientCodec(conn, &mh)
return rpc.NewClientWithCodec(cc) return rpc.NewClientWithCodec(cc)
} }

View File

@ -395,19 +395,17 @@ type IndexedSessions struct {
QueryMeta QueryMeta
} }
var mh = codec.MsgpackHandle{}
// Decode is used to decode a MsgPack encoded object // Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error { func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle return codec.NewDecoder(bytes.NewReader(buf), &mh).Decode(out)
return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
} }
// Encode is used to encode a MsgPack object with type prefix // Encode is used to encode a MsgPack object with type prefix
func Encode(t MessageType, msg interface{}) ([]byte, error) { func Encode(t MessageType, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(t)) buf.WriteByte(uint8(t))
err := codec.NewEncoder(buf, &mh).Encode(msg)
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(msg)
return buf.Bytes(), err return buf.Bytes(), err
} }