From d209517d5064aa6e77f79cdb785620eae52712b5 Mon Sep 17 00:00:00 2001 From: Andrew M Bursavich Date: Sat, 7 Jun 2014 00:59:27 -0700 Subject: [PATCH] reuse codec.MsgpackHandle --- command/agent/rpc.go | 8 ++++---- command/agent/rpc_client.go | 6 ++---- consul/fsm.go | 6 ++---- consul/mdb_table_test.go | 6 ++---- consul/pool.go | 4 +++- consul/rpc.go | 2 +- consul/status_endpoint_test.go | 2 +- consul/structs/structs.go | 10 ++++------ 8 files changed, 19 insertions(+), 25 deletions(-) diff --git a/command/agent/rpc.go b/command/agent/rpc.go index e56196f51..e4d17ff38 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -60,6 +60,8 @@ const ( monitorExists = "Monitor already exists" ) +var mh = codec.MsgpackHandle{RawToString: true, WriteExt: true} + // Request header is sent before each request type requestHeader struct { Command string @@ -249,10 +251,8 @@ func (i *AgentRPC) listen() { reader: bufio.NewReader(conn), writer: bufio.NewWriter(conn), } - client.dec = codec.NewDecoder(client.reader, - &codec.MsgpackHandle{RawToString: true, WriteExt: true}) - client.enc = codec.NewEncoder(client.writer, - &codec.MsgpackHandle{RawToString: true, WriteExt: true}) + client.dec = codec.NewDecoder(client.reader, &mh) + client.enc = codec.NewEncoder(client.writer, &mh) if err != nil { i.logger.Printf("[ERR] agent.rpc: Failed to create decoder: %v", err) conn.Close() diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 8f22a8fb3..2b6d98ca8 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -94,10 +94,8 @@ func NewRPCClient(addr string) (*RPCClient, error) { dispatch: make(map[uint64]seqHandler), shutdownCh: make(chan struct{}), } - client.dec = codec.NewDecoder(client.reader, - &codec.MsgpackHandle{RawToString: true, WriteExt: true}) - client.enc = codec.NewEncoder(client.writer, - &codec.MsgpackHandle{RawToString: true, WriteExt: true}) + client.dec = codec.NewDecoder(client.reader, &mh) + client.enc = codec.NewEncoder(client.writer, &mh) go client.listen() // Do the initial handshake diff --git a/consul/fsm.go b/consul/fsm.go index 41a911a56..e26dffa56 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -221,8 +221,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { c.state = state // Create a decoder - var handle codec.MsgpackHandle - dec := codec.NewDecoder(old, &handle) + dec := codec.NewDecoder(old, &mh) // Read in the header var header snapshotHeader @@ -278,8 +277,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { // Register the nodes - handle := codec.MsgpackHandle{} - encoder := codec.NewEncoder(sink, &handle) + encoder := codec.NewEncoder(sink, &mh) // Write the header header := snapshotHeader{ diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index 2800c2a48..11bb10e79 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -19,8 +19,7 @@ type MockData struct { func MockEncoder(obj interface{}) []byte { buf := bytes.NewBuffer(nil) - handle := codec.MsgpackHandle{} - encoder := codec.NewEncoder(buf, &handle) + encoder := codec.NewEncoder(buf, &mh) err := encoder.Encode(obj) if err != nil { panic(err) @@ -30,8 +29,7 @@ func MockEncoder(obj interface{}) []byte { func MockDecoder(buf []byte) interface{} { out := new(MockData) - var handle codec.MsgpackHandle - err := codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out) + err := codec.NewDecoder(bytes.NewReader(buf), &mh).Decode(out) if err != nil { panic(err) } diff --git a/consul/pool.go b/consul/pool.go index 2323b36b3..d6cfb9035 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -15,6 +15,8 @@ import ( "time" ) +var mh = codec.MsgpackHandle{} + // muxSession is used to provide an interface for either muxado or yamux type muxSession interface { Open() (net.Conn, error) @@ -79,7 +81,7 @@ func (c *Conn) getClient() (*StreamClient, error) { } // Create the RPC client - cc := codec.GoRpc.ClientCodec(stream, &codec.MsgpackHandle{}) + cc := codec.GoRpc.ClientCodec(stream, &mh) client := rpc.NewClientWithCodec(cc) // Return a new stream client diff --git a/consul/rpc.go b/consul/rpc.go index 20ffb3c6d..788f09f83 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -148,7 +148,7 @@ func (s *Server) handleMultiplexV2(conn net.Conn) { // handleConsulConn is used to service a single Consul RPC connection func (s *Server) handleConsulConn(conn net.Conn) { defer conn.Close() - rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{}) + rpcCodec := codec.GoRpc.ServerCodec(conn, &mh) for !s.shutdown { if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { if err != io.EOF && !strings.Contains(err.Error(), "closed") { diff --git a/consul/status_endpoint_test.go b/consul/status_endpoint_test.go index 75ff51947..b5759befe 100644 --- a/consul/status_endpoint_test.go +++ b/consul/status_endpoint_test.go @@ -19,7 +19,7 @@ func rpcClient(t *testing.T, s *Server) *rpc.Client { // Write the Consul RPC byte to set the mode conn.Write([]byte{byte(rpcConsul)}) - cc := codec.GoRpc.ClientCodec(conn, &codec.MsgpackHandle{}) + cc := codec.GoRpc.ClientCodec(conn, &mh) return rpc.NewClientWithCodec(cc) } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 6eda48d59..9a305eae5 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -395,19 +395,17 @@ type IndexedSessions struct { QueryMeta } +var mh = codec.MsgpackHandle{} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { - var handle codec.MsgpackHandle - return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out) + return codec.NewDecoder(bytes.NewReader(buf), &mh).Decode(out) } // Encode is used to encode a MsgPack object with type prefix func Encode(t MessageType, msg interface{}) ([]byte, error) { buf := bytes.NewBuffer(nil) buf.WriteByte(uint8(t)) - - handle := codec.MsgpackHandle{} - encoder := codec.NewEncoder(buf, &handle) - err := encoder.Encode(msg) + err := codec.NewEncoder(buf, &mh).Encode(msg) return buf.Bytes(), err }