diff --git a/vendor/github.com/hashicorp/raft-boltdb/Makefile b/vendor/github.com/hashicorp/raft-boltdb/Makefile new file mode 100644 index 000000000..bc5c6cc01 --- /dev/null +++ b/vendor/github.com/hashicorp/raft-boltdb/Makefile @@ -0,0 +1,11 @@ +DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) + +.PHONY: test deps + +test: + go test -timeout=30s ./... + +deps: + go get -d -v ./... + echo $(DEPS) | xargs -n1 go get -d + diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index 147cde295..73f057c98 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -492,6 +492,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna } r.processConfigurationLogEntry(&entry) } + r.logger.Printf("[INFO] raft: Initial configuration (index=%d): %+v", r.configurations.latestIndex, r.configurations.latest.Servers) diff --git a/vendor/github.com/hashicorp/raft/configuration.go b/vendor/github.com/hashicorp/raft/configuration.go index 74508c5e5..8afc38bd9 100644 --- a/vendor/github.com/hashicorp/raft/configuration.go +++ b/vendor/github.com/hashicorp/raft/configuration.go @@ -283,7 +283,7 @@ func encodePeers(configuration Configuration, trans Transport) []byte { var encPeers [][]byte for _, server := range configuration.Servers { if server.Suffrage == Voter { - encPeers = append(encPeers, trans.EncodePeer(server.Address)) + encPeers = append(encPeers, trans.EncodePeer(server.ID, server.Address)) } } diff --git a/vendor/github.com/hashicorp/raft/inmem_transport.go b/vendor/github.com/hashicorp/raft/inmem_transport.go index 3693cd5ad..ce37f63aa 100644 --- a/vendor/github.com/hashicorp/raft/inmem_transport.go +++ b/vendor/github.com/hashicorp/raft/inmem_transport.go @@ -75,7 +75,7 @@ func (i *InmemTransport) LocalAddr() ServerAddress { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { +func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { i.RLock() peer, ok := i.peers[target] i.RUnlock() @@ -90,7 +90,7 @@ func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipe } // AppendEntries implements the Transport interface. -func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { +func (i *InmemTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -103,7 +103,7 @@ func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntries } // RequestVote implements the Transport interface. -func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { +func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -116,7 +116,7 @@ func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequ } // InstallSnapshot implements the Transport interface. -func (i *InmemTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout) if err != nil { return err @@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re } // EncodePeer implements the Transport interface. -func (i *InmemTransport) EncodePeer(p ServerAddress) []byte { +func (i *InmemTransport) EncodePeer(id ServerID, p ServerAddress) []byte { return []byte(p) } diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index 7c55ac537..9555a0eae 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -68,6 +68,8 @@ type NetworkTransport struct { maxPool int + serverAddressProvider ServerAddressProvider + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -78,6 +80,28 @@ type NetworkTransport struct { TimeoutScale int } +// NetworkTransportConfig encapsulates configuration for the network transport layer. +type NetworkTransportConfig struct { + // ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC + ServerAddressProvider ServerAddressProvider + + Logger *log.Logger + + // Dialer + Stream StreamLayer + + // MaxPool controls how many connections we will pool + MaxPool int + + // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply + // the timeout by (SnapshotSize / TimeoutScale). + Timeout time.Duration +} + +type ServerAddressProvider interface { + ServerAddr(id ServerID) (ServerAddress, error) +} + // StreamLayer is used with the NetworkTransport to provide // the low level stream abstraction. type StreamLayer interface { @@ -112,6 +136,28 @@ type netPipeline struct { shutdownLock sync.Mutex } +// NewNetworkTransportWithConfig creates a new network transport with the given config struct +func NewNetworkTransportWithConfig( + config *NetworkTransportConfig, +) *NetworkTransport { + if config.Logger == nil { + config.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + trans := &NetworkTransport{ + connPool: make(map[ServerAddress][]*netConn), + consumeCh: make(chan RPC), + logger: config.Logger, + maxPool: config.MaxPool, + shutdownCh: make(chan struct{}), + stream: config.Stream, + timeout: config.Timeout, + TimeoutScale: DefaultTimeoutScale, + serverAddressProvider: config.ServerAddressProvider, + } + go trans.listen() + return trans +} + // NewNetworkTransport creates a new network transport with the given dialer // and listener. The maxPool controls how many connections we will pool. The // timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply @@ -125,10 +171,12 @@ func NewNetworkTransport( if logOutput == nil { logOutput = os.Stderr } - return NewNetworkTransportWithLogger(stream, maxPool, timeout, log.New(logOutput, "", log.LstdFlags)) + logger := log.New(logOutput, "", log.LstdFlags) + config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} + return NewNetworkTransportWithConfig(config) } -// NewNetworkTransportWithLogger creates a new network transport with the given dialer +// NewNetworkTransportWithLogger creates a new network transport with the given logger, dialer // and listener. The maxPool controls how many connections we will pool. The // timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply // the timeout by (SnapshotSize / TimeoutScale). @@ -138,21 +186,8 @@ func NewNetworkTransportWithLogger( timeout time.Duration, logger *log.Logger, ) *NetworkTransport { - if logger == nil { - logger = log.New(os.Stderr, "", log.LstdFlags) - } - trans := &NetworkTransport{ - connPool: make(map[ServerAddress][]*netConn), - consumeCh: make(chan RPC), - logger: logger, - maxPool: maxPool, - shutdownCh: make(chan struct{}), - stream: stream, - timeout: timeout, - TimeoutScale: DefaultTimeoutScale, - } - go trans.listen() - return trans + config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} + return NewNetworkTransportWithConfig(config) } // SetHeartbeatHandler is used to setup a heartbeat handler @@ -214,6 +249,24 @@ func (n *NetworkTransport) getPooledConn(target ServerAddress) *netConn { return conn } +// getConnFromAddressProvider returns a connection from the server address provider if available, or defaults to a connection using the target server address +func (n *NetworkTransport) getConnFromAddressProvider(id ServerID, target ServerAddress) (*netConn, error) { + address := n.getProviderAddressOrFallback(id, target) + return n.getConn(address) +} + +func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target ServerAddress) ServerAddress { + if n.serverAddressProvider != nil { + serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) + if err != nil { + n.logger.Printf("[WARN] Unable to get address for server id %v, using fallback address %v: %v", id, target, err) + } else { + return serverAddressOverride + } + } + return target +} + // getConn is used to get a connection from the pool. func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) { // Check for a pooled conn @@ -260,9 +313,9 @@ func (n *NetworkTransport) returnConn(conn *netConn) { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { +func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { // Get a connection - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return nil, err } @@ -272,19 +325,19 @@ func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPi } // AppendEntries implements the Transport interface. -func (n *NetworkTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { - return n.genericRPC(target, rpcAppendEntries, args, resp) +func (n *NetworkTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { + return n.genericRPC(id, target, rpcAppendEntries, args, resp) } // RequestVote implements the Transport interface. -func (n *NetworkTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { - return n.genericRPC(target, rpcRequestVote, args, resp) +func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { + return n.genericRPC(id, target, rpcRequestVote, args, resp) } // genericRPC handles a simple request/response RPC. -func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { +func (n *NetworkTransport) genericRPC(id ServerID, target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { // Get a conn - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return err } @@ -308,9 +361,9 @@ func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args } // InstallSnapshot implements the Transport interface. -func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { // Get a conn, always close for InstallSnapshot - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return err } @@ -346,8 +399,9 @@ func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSn } // EncodePeer implements the Transport interface. -func (n *NetworkTransport) EncodePeer(p ServerAddress) []byte { - return []byte(p) +func (n *NetworkTransport) EncodePeer(id ServerID, p ServerAddress) []byte { + address := n.getProviderAddressOrFallback(id, p) + return []byte(address) } // DecodePeer implements the Transport interface. diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index 0f89ccaa4..50ae6e916 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -1379,7 +1379,7 @@ func (r *Raft) electSelf() <-chan *voteResult { req := &RequestVoteRequest{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), - Candidate: r.trans.EncodePeer(r.localAddr), + Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, } @@ -1389,7 +1389,7 @@ func (r *Raft) electSelf() <-chan *voteResult { r.goFunc(func() { defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now()) resp := &voteResult{voterID: peer.ID} - err := r.trans.RequestVote(peer.Address, req, &resp.RequestVoteResponse) + err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) if err != nil { r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err) resp.Term = req.Term diff --git a/vendor/github.com/hashicorp/raft/replication.go b/vendor/github.com/hashicorp/raft/replication.go index 683927343..e631b5a09 100644 --- a/vendor/github.com/hashicorp/raft/replication.go +++ b/vendor/github.com/hashicorp/raft/replication.go @@ -157,7 +157,7 @@ PIPELINE: goto RPC } -// replicateTo is a hepler to replicate(), used to replicate the logs up to a +// replicateTo is a helper to replicate(), used to replicate the logs up to a // given last index. // If the follower log is behind, we take care to bring them up to date. func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) { @@ -183,7 +183,7 @@ START: // Make the RPC call start = time.Now() - if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err) s.failures++ return @@ -278,7 +278,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { RPCHeader: r.getRPCHeader(), SnapshotVersion: meta.Version, Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localAddr), + Leader: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, Peers: meta.Peers, @@ -290,7 +290,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Make the call start := time.Now() var resp InstallSnapshotResponse - if err := r.trans.InstallSnapshot(s.peer.Address, &req, &resp, snapshot); err != nil { + if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil { r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err) s.failures++ return false, err @@ -332,7 +332,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { req := AppendEntriesRequest{ RPCHeader: r.getRPCHeader(), Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localAddr), + Leader: r.trans.EncodePeer(r.localID, r.localAddr), } var resp AppendEntriesResponse for { @@ -345,7 +345,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { } start := time.Now() - if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err) failures++ select { @@ -367,7 +367,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { // back to the standard replication which can handle more complex situations. func (r *Raft) pipelineReplicate(s *followerReplication) error { // Create a new pipeline - pipeline, err := r.trans.AppendEntriesPipeline(s.peer.Address) + pipeline, err := r.trans.AppendEntriesPipeline(s.peer.ID, s.peer.Address) if err != nil { return err } @@ -476,7 +476,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.RPCHeader = r.getRPCHeader() req.Term = s.currentTerm - req.Leader = r.trans.EncodePeer(r.localAddr) + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { return err diff --git a/vendor/github.com/hashicorp/raft/tcp_transport.go b/vendor/github.com/hashicorp/raft/tcp_transport.go index 9281508a0..29b2740f6 100644 --- a/vendor/github.com/hashicorp/raft/tcp_transport.go +++ b/vendor/github.com/hashicorp/raft/tcp_transport.go @@ -28,7 +28,7 @@ func NewTCPTransport( timeout time.Duration, logOutput io.Writer, ) (*NetworkTransport, error) { - return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransport(stream, maxPool, timeout, logOutput) }) } @@ -42,15 +42,26 @@ func NewTCPTransportWithLogger( timeout time.Duration, logger *log.Logger, ) (*NetworkTransport, error) { - return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger) }) } +// NewTCPTransportWithLogger returns a NetworkTransport that is built on top of +// a TCP streaming transport layer, using a default logger and the address provider +func NewTCPTransportWithConfig( + bindAddr string, + advertise net.Addr, + config *NetworkTransportConfig, +) (*NetworkTransport, error) { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { + config.Stream = stream + return NewNetworkTransportWithConfig(config) + }) +} + func newTCPTransport(bindAddr string, advertise net.Addr, - maxPool int, - timeout time.Duration, transportCreator func(stream StreamLayer) *NetworkTransport) (*NetworkTransport, error) { // Try to bind list, err := net.Listen("tcp", bindAddr) diff --git a/vendor/github.com/hashicorp/raft/transport.go b/vendor/github.com/hashicorp/raft/transport.go index 633f97a8c..85459b221 100644 --- a/vendor/github.com/hashicorp/raft/transport.go +++ b/vendor/github.com/hashicorp/raft/transport.go @@ -35,20 +35,20 @@ type Transport interface { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. - AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) + AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) // AppendEntries sends the appropriate RPC to the target node. - AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error + AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error // RequestVote sends the appropriate RPC to the target node. - RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error + RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error // InstallSnapshot is used to push a snapshot down to a follower. The data is read from // the ReadCloser and streamed to the client. - InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error + InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error // EncodePeer is used to serialize a peer's address. - EncodePeer(ServerAddress) []byte + EncodePeer(id ServerID, addr ServerAddress) []byte // DecodePeer is used to deserialize a peer's address. DecodePeer([]byte) ServerAddress diff --git a/vendor/vendor.json b/vendor/vendor.json index b8d7f744c..36a923218 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -57,7 +57,7 @@ {"checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","path":"github.com/hashicorp/logutils","revision":"0dc08b1671f34c4250ce212759ebd880f743d883","revisionTime":"2015-06-09T07:04:31Z"}, {"checksumSHA1":"ml0MTqOsKTrsqv/mZhy78Vz4SfA=","path":"github.com/hashicorp/memberlist","revision":"d6c1fb0b99c33d0a8e22acea9da9709b369b5d39","revisionTime":"2017-08-15T22:46:17Z"}, {"checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","path":"github.com/hashicorp/net-rpc-msgpackrpc","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"}, - {"checksumSHA1":"RVDP6/BNLtrGbyoiGU2GjTun9Kk=","path":"github.com/hashicorp/raft","revision":"2356637a1c1ffe894b753680363ad970480215aa","revisionTime":"2017-08-24T21:39:20Z","version":"library-v2-stage-one","versionExact":"library-v2-stage-one"}, + {"checksumSHA1":"f2QYddVWZ2eWxdlCEhearTH4XOs=","path":"github.com/hashicorp/raft","revision":"c837e57a6077e74a4a3749959fb6cfefc26d7705","revisionTime":"2017-08-30T14:31:53Z","version":"library-v2-stage-one","versionExact":"library-v2-stage-one"}, {"checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","path":"github.com/hashicorp/raft-boltdb","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"checksumSHA1":"/oss17GO4hXGM7QnUdI3VzcAHzA=","comment":"v0.7.0-66-g6c4672d","path":"github.com/hashicorp/serf/coordinate","revision":"c2e4be24cdc9031eb0ad869c5d160775efdf7d7a","revisionTime":"2017-05-25T23:15:04Z"}, {"checksumSHA1":"3WPnGSL9ZK6EmkAE6tEW5SCxrd8=","comment":"v0.7.0-66-g6c4672d","path":"github.com/hashicorp/serf/serf","revision":"b84a66cc5575994cb672940d244a2404141688c0","revisionTime":"2017-08-17T21:22:02Z"},