Update serf library to pick up coordinate persistence fix

This commit is contained in:
Preetha Appan 2017-10-21 21:19:43 -05:00
parent 669c109d27
commit ad8b9171d6
3 changed files with 3 additions and 62 deletions

View File

@ -314,7 +314,6 @@ func Create(conf *Config) (*Serf, error) {
conf.RejoinAfterLeave, conf.RejoinAfterLeave,
serf.logger, serf.logger,
&serf.clock, &serf.clock,
serf.coordClient,
conf.EventCh, conf.EventCh,
serf.shutdownCh) serf.shutdownCh)
if err != nil { if err != nil {

View File

@ -2,7 +2,6 @@ package serf
import ( import (
"bufio" "bufio"
"encoding/json"
"fmt" "fmt"
"log" "log"
"math/rand" "math/rand"
@ -13,7 +12,6 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/serf/coordinate"
) )
/* /*
@ -29,7 +27,6 @@ old events.
const flushInterval = 500 * time.Millisecond const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond const clockUpdateInterval = 500 * time.Millisecond
const coordinateUpdateInterval = 60 * time.Second
const tmpExt = ".compact" const tmpExt = ".compact"
const snapshotErrorRecoveryInterval = 30 * time.Second const snapshotErrorRecoveryInterval = 30 * time.Second
@ -38,7 +35,6 @@ const snapshotErrorRecoveryInterval = 30 * time.Second
type Snapshotter struct { type Snapshotter struct {
aliveNodes map[string]string aliveNodes map[string]string
clock *LamportClock clock *LamportClock
coordClient *coordinate.Client
fh *os.File fh *os.File
buffered *bufio.Writer buffered *bufio.Writer
inCh <-chan Event inCh <-chan Event
@ -80,7 +76,6 @@ func NewSnapshotter(path string,
rejoinAfterLeave bool, rejoinAfterLeave bool,
logger *log.Logger, logger *log.Logger,
clock *LamportClock, clock *LamportClock,
coordClient *coordinate.Client,
outCh chan<- Event, outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) { shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024) inCh := make(chan Event, 1024)
@ -103,7 +98,6 @@ func NewSnapshotter(path string,
snap := &Snapshotter{ snap := &Snapshotter{
aliveNodes: make(map[string]string), aliveNodes: make(map[string]string),
clock: clock, clock: clock,
coordClient: coordClient,
fh: fh, fh: fh,
buffered: bufio.NewWriter(fh), buffered: bufio.NewWriter(fh),
inCh: inCh, inCh: inCh,
@ -182,9 +176,6 @@ func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval) clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop() defer clockTicker.Stop()
coordinateTicker := time.NewTicker(coordinateUpdateInterval)
defer coordinateTicker.Stop()
for { for {
select { select {
case <-s.leaveCh: case <-s.leaveCh:
@ -226,9 +217,6 @@ func (s *Snapshotter) stream() {
case <-clockTicker.C: case <-clockTicker.C:
s.updateClock() s.updateClock()
case <-coordinateTicker.C:
s.updateCoordinate()
case <-s.shutdownCh: case <-s.shutdownCh:
if err := s.buffered.Flush(); err != nil { if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err) s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
@ -275,20 +263,6 @@ func (s *Snapshotter) updateClock() {
} }
} }
// updateCoordinate is called periodically to write out the current local
// coordinate. It's safe to call this if coordinates aren't enabled (nil
// client) and it will be a no-op.
func (s *Snapshotter) updateCoordinate() {
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode coordinate: %v", err)
} else {
s.tryAppend(fmt.Sprintf("coordinate: %s\n", encoded))
}
}
}
// processUserEvent is used to handle a single user event // processUserEvent is used to handle a single user event
func (s *Snapshotter) processUserEvent(e UserEvent) { func (s *Snapshotter) processUserEvent(e UserEvent) {
// Ignore old clocks // Ignore old clocks
@ -404,23 +378,6 @@ func (s *Snapshotter) compact() error {
} }
offset += int64(n) offset += int64(n)
// Write out the coordinate.
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
fh.Close()
return err
}
line = fmt.Sprintf("coordinate: %s\n", encoded)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
}
// Flush the new snapshot // Flush the new snapshot
err = buf.Flush() err = buf.Flush()
@ -542,22 +499,7 @@ func (s *Snapshotter) replay() error {
s.lastQueryClock = LamportTime(timeInt) s.lastQueryClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "coordinate: ") { } else if strings.HasPrefix(line, "coordinate: ") {
if s.coordClient == nil { continue // Ignores any coordinate persistence from old snapshots, serf should re-converge
s.logger.Printf("[WARN] serf: Ignoring snapshot coordinates since they are disabled")
continue
}
coordStr := strings.TrimPrefix(line, "coordinate: ")
var coord coordinate.Coordinate
err := json.Unmarshal([]byte(coordStr), &coord)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
continue
}
if err := s.coordClient.SetCoordinate(&coord); err != nil {
s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err)
continue
}
} else if line == "leave" { } else if line == "leave" {
// Ignore a leave if we plan on re-joining // Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave { if s.rejoinAfterLeave {

4
vendor/vendor.json vendored
View File

@ -59,8 +59,8 @@
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"}, {"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"mS15CkImPzXYsgNwl3Mt9Gh3Vb0=","comment":"v0.7.0-66-g6c4672d","revision":"d7edef7830f6ef57fde9d8774489d3db8d91ae98","revisionTime":"2017-10-20T15:31:31Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"mS15CkImPzXYsgNwl3Mt9Gh3Vb0=","comment":"v0.7.0-66-g6c4672d","revision":"c20a0b1b1ea9eb8168bcdec0116688fa9254e449","revisionTime":"2017-10-22T02:00:50Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"LPqwFCD4mIRfbsQez0vgZMfpdW8=","comment":"v0.7.0-66-g6c4672d","revision":"d7edef7830f6ef57fde9d8774489d3db8d91ae98","revisionTime":"2017-10-20T15:31:31Z"}, {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"iYhCWgAAUcQjU0JocsKgak5C8tY=","comment":"v0.7.0-66-g6c4672d","revision":"c20a0b1b1ea9eb8168bcdec0116688fa9254e449","revisionTime":"2017-10-22T02:00:50Z"},
{"path":"github.com/hashicorp/yamux","checksumSHA1":"ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=","revision":"d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd","revisionTime":"2016-07-20T23:31:40Z"}, {"path":"github.com/hashicorp/yamux","checksumSHA1":"ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=","revision":"d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd","revisionTime":"2016-07-20T23:31:40Z"},
{"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"}, {"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"},
{"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"}, {"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"},