updated serf along with raft

This commit is contained in:
Chris Baker 2019-01-04 16:56:26 +00:00
parent e2e83feb5d
commit 4109306fff
9 changed files with 748 additions and 33 deletions

423
vendor/github.com/hashicorp/serf/CHANGELOG.md generated vendored Normal file
View file

@ -0,0 +1,423 @@
## 0.8.2 (UNRELEASED)
FEATURES:
IMPROVEMENTS:
* agent: Fixed a missing case where gossip would stop flowing to dead nodes for a short while. [GH-451]
* agent: Uses the go-sockaddr library to look for private IP addresses, which prefers non-loopback private addresses over loopback ones when trying to automatically determine the advertise address. [GH-451]
* agent: Properly seeds Go's random number generator using the seed library. [GH-451]
* agent: Serf is now built with Go 1.10.x. [GH-520]
* agent: Improved address comparison during conflict resolution. [GH-433]
* agent: Updated memberlist to latest to pull several cleanups and fixes. [GH-491]
* agent: Improved handling of leave intent messages to make sure they propagate and are processed correctly. [GH-510]
* agent: Added CLI option to disable compression for debugging messages. [GH-529]
* library: Moved close of shutdown channel until after network resorces are released. [GH-453]
* library: Fixed several race conditions with QueryResponse [GH-460]
* library: Made snapshot writing asyncronous and will less aggressive compaction on large clusters to avoid blocking message handler on disk IO [GH-524][GH-525]
BUG FIXES:
* agent: Added defenses against invalid network coordinates with NaN and Inf values. [GH-468]
* agent: Fixed an issue on Windows where "wsarecv" errors were logged when clients accessed the RPC interface. [GH-479]
* agent: Fixed an issue where calling the serf Stats function could result in a deadlock. [[Consul Issue 4011](https://github.com/hashicorp/consul/issues/4011)]
## 0.8.1 (February 6, 2017)
IMPROVEMENTS:
* agent: Added support for relaying query responses through N other nodes for redundancy. [GH-439]
* agent: Added the ability to tune the broadcast timeout, which might be necessary in very large clusters that experience very large, simultaneous changes to the cluster. [GH-412]
* agent: Added a checksum to UDP gossip messages to guard against packet corruption. [GH-432]
* agent: Added a short window where gossip will still flow to dead nodes so that they can more quickly refute. [GH-440]
* build: Serf now builds with Go 1.7.5. [GH-443]
## 0.8 (September 14, 2016)
FEATURES:
* **Lifeguard Updates:** Implemented a new set of feedback controls for the gossip layer that help prevent degraded nodes that can't meet the soft real-time requirements from erroneously causing flapping in other, healthy nodes. This feature tunes itself automatically and requires no configuration. [GH-394]
IMRPOVEMENTS:
* Modified management of intents to be per-node to avoid intent queue overflow errors in large clusters. [GH-402]
* Joins based on a DNS lookup will use TCP and attempt to join with the full list of returned addresses. [GH-387]
* Serf's Go dependencies are now vendored using govendor. [GH-383]
* Updated all of Serf's dependencies. [GH-387] [GH-401]
* Moved dist build into a Docker container. [GH-409]
BUG FIXES:
* Updated memberlist to pull in a fix for leaking goroutines when performing TCP fallback pings. This affected users with frequent UDP connectivity problems. [GH-381]
## 0.7 (December 21, 2015)
FEATURES:
* Added new network tomography subsystem that computes network coordinates for
nodes in the cluster which can be used to estimate network round trip times
between any two nodes; exposes new `GetCoordinate` API as as well as a
a new `serf rtt` command to query RTT interactively
IMPROVEMENTS:
* Added support for configuring query request size and query response size [GH-346]
* Syslog messages are now filtered by the configured log-level
* New `statsd_addr` for sending metrics via UDP to statsd
* Added support for sending telemetry to statsite
* `serf info` command now displays event handlers [GH-312]
* Added a `MemberLeave` message to the `EventCh` for a force-leave so higher-
level applications can handle the leave event
* Lots of documentation updates
BUG FIXES:
* Fixed updating cached protocol version of a node when an update event
fires [GH-335]
* Fixed a bug where an empty remote state message would cause a crash in
`MergeRemoteState`
## 0.6.4 (Febuary 12, 2015)
IMPROVEMENTS:
* Added merge delegate to Serf library to support application
specific logic in cluster merging.
* `SERF_RPC_AUTH` environment variable can be used in place of CLI flags.
* Display if encryption is enabled in Serf stats
* Improved `join` behavior when using DNS resolution
BUG FIXES:
* Fixed snapshot file compaction on Windows
* Fixed device binding on Windows
* Fixed bug with empty keyring
* Fixed parsing of ports in some cases
* Fixing stability issues under high churn
MISC:
* Increased user event size limit to 512 bytes (previously 256)
## 0.6.3 (July 10, 2014)
IMPROVEMENTS:
* Added `statsite_addr` configuration to stream to statsite
BUG FIXES:
* Fixed issue with mDNS flooding when using IPv4 and IPv6
* Fixed issue with reloading event handlers
MISC:
* Improved failure detection reliability under load
* Reduced fsync() use in snapshot file
* Improved snapshot file performance
* Additional logging to help debug flapping
## 0.6.2 (June 16, 2014)
IMPROVEMENTS:
* Added `syslog_facility` configuration to set facility
BUG FIXES:
* Fixed memory leak in in-memory stats system
* Fixed issue that would cause syslog to deadlock
MISC:
* Fixed missing prefixes on some log messages
* Docs fixes
## 0.6.1 (May 29, 2014)
BUG FIXES:
* On Windows, a "failed to decode request header" error will no
longer be shown on every RPC request.
* Avoiding holding a lock which can cause monitor/stream commands to
fail when an event handler is blocking
* Fixing conflict response decoding errors
IMPROVEMENTS:
* Improved agent CLI usage documentation
* Warn if an event handler is slow, potentially blocking other events
## 0.6.0 (May 8, 2014)
FEATURES:
* Support for key rotation when using encryption. This adds a new
`serf keys` command, and a `-keyring-file` configuration. Thanks
to @ryanuber.
* New `-tags-file` can be specified to persist changes to tags made
via the RPC interface. Thanks to @ryanuber.
* New `serf info` command to provide operator debugging information,
and to get info about the local node.
* Adding `-retry-join` flag to agent which enables retrying the join
until success or `-retry-max` attempts have been made.
IMPROVEMENTS:
* New `-rejoin` flag can be used along with a snapshot file to
automatically rejoin a cluster.
* Agent uses circular buffer to invoke handlers, guards against unbounded
output lengths.
* Adding support for logging to syslog
* The SERF_RPC_ADDR environment variable can be used instead of the
`-rpc-addr` flags. Thanks to @lalyos [GH-209].
* `serf query` can now output the results in a JSON format.
* Unknown configuration directives generate an error [GH-186].
Thanks to @vincentbernat.
BUG FIXES:
* Fixing environmental variables with invalid characters. [GH-200].
Thanks to @arschles.
* Fixing issue with tag changes with hard restart before
failure detection.
* Fixing issue with reconnect when using dynamic ports.
MISC:
* Improved logging of various error messages
* Improved debian packaging. Thanks to @vincentbernat.
## 0.5.0 (March 12, 2014)
FEATURES:
* New `query` command provides a request/response mechanism to do realtime
queries across the cluster. [GH-139]
* Automatic conflict resolution. Serf will detect name conflicts, and use an
internal query to determine which node is in the minority and perform a shutdown.
[GH-167] [GH-119]
* New `reachability` command can be used to help diagnose network and configuration
issues.
* Added `member-reap` event to get notified of when Serf removes a failed or left
node from the cluster. The reap interval is controlled by `reconnect_timeout` and
`tombstone_timeout` respectively. [GH-172]
IMPROVEMENTS:
* New Recipes section on the site to share Serf tips. Thanks to @ryanuber. [GH-177]
* `members` command has new `-name` filter flag. Thanks to @ryanuber [GH-164]
* New RPC command "members-filtered" to move filtering logic to the agent.
Thanks to @ryanuber. [GH-149]
* `reconnect_interval` and `reconnect_timeout` can be provided to configure
agent behavior for attempting to reconnect to failed nodes. [GH-155]
* `tombstone_interval` can be provided to configure the reap time for nodes
that have gracefully left. [GH_172]
* Agent can be provided `rpc_auth` config to require that RPC is authenticated.
All commands can take a `-rpc-auth` flag now. [GH-148]
BUG FIXES:
* Fixed config folder in Upstart script. Thanks to @llchen223. [GH-174]
* Event handlers are correctly invoked when BusyBox is the shell. [GH-156]
* Event handlers were not being invoked with the correct SERF_TAG_* values
if tags were changed using the `tags` command. [GH-169]
MISC:
* Support for protocol version 1 (Serf 0.2) has been removed. Serf 0.5 cannot
join a cluster that has members running version 0.2.
## 0.4.5 (February 25, 2014)
FEATURES:
* New `tags` command is available to dynamically update tags without
reloading the agent. Thanks to @ryanuber. [GH-126]
IMPROVEMENTS:
* Upstart receipe logs output thanks to @breerly [GH-128]
* `members` can filter on any tag thanks to @hmrm [GH-124]
* Added vagrant demo to make a simple cluster
* `members` now columnizes the output thanks to @ryanuber [GH-138]
* Agent passes its own environment variables through thanks to @mcroydon [GH-142]
* `-iface` flag can be used to bind to interfaces [GH-145]
BUG FIXES:
* -config-dir would cause protocol to be set to 0 if there are no
configuration files in the directory [GH-129]
* Event handlers can filter on 'member-update'
* User event handler appends new line, this was being omitted
## 0.4.1 (February 3, 2014)
IMPROVEMENTS:
* mDNS service uses the advertise address instead of bind address
## 0.4.0 (January 31, 2014)
FEATURES:
* Static `role` has been replaced with dynamic tags. Each agent can have
multiple key/value tags associated using `-tag`. Tags can be updated using
a SIGHUP and are advertised to the cluster, causing the `member-update` event
to be triggered. [GH-111] [GH-98]
* Serf can automatically discover peers uing mDNS when provided the `-discover`
flag. In network environments supporting multicast, no explicit join is needed
to find peers. [GH-53]
* Serf collects telemetry information and simple runtime profiling. Stats can
be dumped to stderr by sending a `USR1` signal to Serf. Windows users must use
the `BREAK` signal instead. [GH-103]
* `advertise` flag can be used to set an advertise address different
from the bind address. Used for NAT traversal. Thanks to @benagricola [GH-93]
* `members` command now takes `-format` flag to specify either text or JSON
output. Fixed by @ryanuber [GH-97]
IMPROVEMENTS:
* User payload always appends a newline when invoking a shell script
* Severity of "Potential blocking operation" reduced to debug to prevent
spurious messages on slow or busy machines.
BUG FIXES:
* If an agent is restarted with the same bind address but new name, it
will not respond to the old name, causing the old name to enter the
`failed` state, instead of having duplicate entries in the `alive` state.
* `leave_on_interrupt` set to false when not specified, if
any config file is provided. This flag is deprecated for
`skip_leave_on_interrupt` instead. [GH-94]
MISC:
* `-role` configuration has been deprecated in favor of `-tag role=foo`.
The flag is still supported but will generate warnings.
* Support for protocol version 0 (Serf 0.1) has been removed. Serf 0.4 cannot
join a cluster that has members running version 0.1.
## 0.3.0 (December 5, 2013)
FEATURES:
* Dynamic port support, cluster wide consistent config not necessary
* Snapshots to automaticaly rejoin cluster after failure and prevent replays [GH-84] [GH-71]
* Adding `profile` config to agent, to support WAN, LAN, and Local modes
* MsgPack over TCP RPC protocol which can be used to control Serf, send events, and
receive events with low latency.
* New `leave` CLI command and RPC endpoint to control graceful leaves
* Signal handling is controlable, graceful leave behavior on SIGINT/SIGTERM
can be specified
* SIGHUP can be used to reload configuration
IMPROVEMENTS:
* Event handler provides lamport time of user events via SERF_USER_LTIME [GH-68]
* Memberlist encryption overhead has been reduced
* Filter output of `members` using regular expressions on role and status
* `replay_on_join` parameter to control replay with `start_join`
* `monitor` works even if the client is behind a NAT
* Serf generates warning if binding to public IP without encryption
BUG FIXES:
* Prevent unbounded transmit queues [GH-78]
* IPv6 addresses can be bound to [GH-72]
* Serf join won't hang on a slow/dead node [GH-70]
* Serf Leave won't block Shutdown [GH-1]
## 0.2.1 (November 6, 2013)
BUG FIXES:
* Member role and address not updated on re-join [GH-58]
## 0.2.0 (November 1, 2013)
FEATURES:
* Protocol versioning features so that upgrades can be done safely.
See the website on upgrading Serf for more info.
* Can now configure Serf with files or directories of files by specifying
the `-config-file` and/or `-config-dir` flags to the agent.
* New command `serf force-leave` can be used to force a "failed" node
to the "left" state.
* Serf now supports message encryption and verification so that it can
be used on untrusted networks [GH-25]
* The `-join` flag on `serf agent` can be used to join a cluster when
starting an agent. [GH-42]
IMPROVEMENTS:
* Random staggering of periodic routines to avoid cluster-wide
synchronization
* Push/Pull timer automatically slows down as cluster grows to avoid
congestion
* Messages are compressed to reduce bandwidth utilization
* `serf members` now provides node roles in output
* Joining a cluster will no longer replay all the old events by default,
but it can using the `-replay` flag.
* User events are coalesced by default, meaning duplicate events (by name)
within a short period of time are merged. [GH-8]
BUG FIXES:
* Event handlers work on Windows now by executing commands through
`cmd /C` [GH-37]
* Nodes that previously left and rejoin won't get stuck in 'leaving' state.
[GH-18]
* Fixing alignment issues on i386 for atomic operations [GH-20]
* "trace" log level works [GH-31]
## 0.1.1 (October 23, 2013)
BUG FIXES:
* Default node name is outputted when "serf agent" is called with no args.
* Remove node from reap list after join so a fast re-join doesn't lose the
member.
## 0.1.0 (October 23, 2013)
* Initial release

64
vendor/github.com/hashicorp/serf/GNUmakefile generated vendored Normal file
View file

@ -0,0 +1,64 @@
GOTOOLS = github.com/mitchellh/gox github.com/kardianos/govendor
VERSION = $(shell awk -F\" '/^const Version/ { print $$2; exit }' cmd/serf/version.go)
GITSHA:=$(shell git rev-parse HEAD)
GITBRANCH:=$(shell git symbolic-ref --short HEAD 2>/dev/null)
default:: test
# bin generates the releasable binaries
bin:: tools
@sh -c "'$(CURDIR)/scripts/build.sh'"
# cov generates the coverage output
cov:: tools
gocov test ./... | gocov-html > /tmp/coverage.html
open /tmp/coverage.html
# dev creates binaries for testing locally - these are put into ./bin and
# $GOPATH
dev::
@SERF_DEV=1 sh -c "'$(CURDIR)/scripts/build.sh'"
# dist creates the binaries for distibution
dist::
@sh -c "'$(CURDIR)/scripts/dist.sh' $(VERSION)"
get-tools::
go get -u -v $(GOTOOLS)
# subnet sets up the require subnet for testing on darwin (osx) - you must run
# this before running other tests if you are on osx.
subnet::
@sh -c "'$(CURDIR)/scripts/setup_test_subnet.sh'"
# test runs the test suite
test:: subnet tools
@go list ./... | grep -v -E '^github.com/hashicorp/serf/(vendor|cmd/serf/vendor)' | xargs -n1 go test $(TESTARGS)
# testrace runs the race checker
testrace:: subnet
go test -race `govendor list -no-status +local` $(TESTARGS)
tools::
@which gox 2>/dev/null ; if [ $$? -eq 1 ]; then \
$(MAKE) get-tools; \
fi
# updatedeps installs all the dependencies needed to test, build, and run
updatedeps:: tools
govendor list -no-status +vendor | xargs -n1 go get -u
govendor update +vendor
vet:: tools
@echo "--> Running go tool vet $(VETARGS) ."
@govendor list -no-status +local \
| cut -d '/' -f 4- \
| xargs -n1 \
go tool vet $(VETARGS) ;\
if [ $$? -ne 0 ]; then \
echo ""; \
echo "Vet found suspicious constructs. Please check the reported constructs"; \
echo "and fix them if necessary before submitting the code for reviewal."; \
fi
.PHONY: default bin cov dev dist get-tools subnet test testrace tools updatedeps vet

121
vendor/github.com/hashicorp/serf/README.md generated vendored Normal file
View file

@ -0,0 +1,121 @@
# Serf [![Build Status](https://travis-ci.org/hashicorp/serf.png)](https://travis-ci.org/hashicorp/serf) [![Join the chat at https://gitter.im/hashicorp-serf/Lobby](https://badges.gitter.im/hashicorp-serf/Lobby.svg)](https://gitter.im/hashicorp-serf/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
* Website: https://www.serf.io
* Chat: [Gitter](https://gitter.im/hashicorp-serf/Lobby)
* Mailing list: [Google Groups](https://groups.google.com/group/serfdom/)
Serf is a decentralized solution for service discovery and orchestration
that is lightweight, highly available, and fault tolerant.
Serf runs on Linux, Mac OS X, and Windows. An efficient and lightweight gossip
protocol is used to communicate with other nodes. Serf can detect node failures
and notify the rest of the cluster. An event system is built on top of
Serf, letting you use Serf's gossip protocol to propagate events such
as deploys, configuration changes, etc. Serf is completely masterless
with no single point of failure.
Here are some example use cases of Serf, though there are many others:
* Discovering web servers and automatically adding them to a load balancer
* Organizing many memcached or redis nodes into a cluster, perhaps with
something like [twemproxy](https://github.com/twitter/twemproxy) or
maybe just configuring an application with the address of all the
nodes
* Triggering web deploys using the event system built on top of Serf
* Propagating changes to configuration to relevant nodes.
* Updating DNS records to reflect cluster changes as they occur.
* Much, much more.
## Quick Start
First, [download a pre-built Serf binary](https://www.serf.io/downloads.html)
for your operating system, [compile Serf yourself](#developing-serf), or install
using `go get -u github.com/hashicorp/serf/cmd/serf`.
Next, let's start a couple Serf agents. Agents run until they're told to quit
and handle the communication of maintenance tasks of Serf. In a real Serf
setup, each node in your system will run one or more Serf agents (it can
run multiple agents if you're running multiple cluster types. e.g. web
servers vs. memcached servers).
Start each Serf agent in a separate terminal session so that we can see
the output of each. Start the first agent:
```
$ serf agent -node=foo -bind=127.0.0.1:5000 -rpc-addr=127.0.0.1:7373
...
```
Start the second agent in another terminal session (while the first is still
running):
```
$ serf agent -node=bar -bind=127.0.0.1:5001 -rpc-addr=127.0.0.1:7374
...
```
At this point two Serf agents are running independently but are still
unaware of each other. Let's now tell the first agent to join an existing
cluster (the second agent). When starting a Serf agent, you must join an
existing cluster by specifying at least one existing member. After this,
Serf gossips and the remainder of the cluster becomes aware of the join.
Run the following commands in a third terminal session.
```
$ serf join 127.0.0.1:5001
...
```
If you're watching your terminals, you should see both Serf agents
become aware of the join. You can prove it by running `serf members`
to see the members of the Serf cluster:
```
$ serf members
foo 127.0.0.1:5000 alive
bar 127.0.0.1:5001 alive
...
```
At this point, you can ctrl-C or force kill either Serf agent, and they'll
update their membership lists appropriately. If you ctrl-C a Serf agent,
it will gracefully leave by notifying the cluster of its intent to leave.
If you force kill an agent, it will eventually (usually within seconds)
be detected by another member of the cluster which will notify the
cluster of the node failure.
## Documentation
Full, comprehensive documentation is viewable on the Serf website:
https://www.serf.io/docs
## Developing Serf
If you wish to work on Serf itself, you'll first need [Go](https://golang.org)
installed (version 1.8+ is _required_). Make sure you have Go properly
[installed](https://golang.org/doc/install),
including setting up your [GOPATH](https://golang.org/doc/code.html#GOPATH).
Next, clone this repository into `$GOPATH/src/github.com/hashicorp/serf` and
then just type `make`. In a few moments, you'll have a working `serf` executable:
```
$ make
...
$ bin/serf
...
```
*NOTE: `make` will also place a copy of the executable under `$GOPATH/bin/`*
Serf is first and foremost a library with a command-line interface, `serf`. The
Serf library is independent of the command line agent, `serf`. The `serf`
binary is located under `cmd/serf` and can be installed stand alone by issuing
the command `go get -u github.com/hashicorp/serf/cmd/serf`. Applications using
the Serf library should only need to include `github.com/hashicorp/serf`.
Tests can be run by typing `make test`.
If you make any changes to the code, run `make format` in order to automatically
format the code according to Go [standards](https://golang.org/doc/effective_go.html#formatting).

View file

@ -16,6 +16,9 @@ func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}
// implements memberlist.UniqueBroadcast
func (b *broadcast) UniqueBroadcast() {}
func (b *broadcast) Message() []byte {
return b.msg
}

View file

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)
// delegate is the memberlist.Delegate implementation that Serf uses.
@ -13,6 +14,8 @@ type delegate struct {
serf *Serf
}
var _ memberlist.Delegate = &delegate{}
func (d *delegate) NodeMeta(limit int) []byte {
roleBytes := d.serf.encodeTags(d.serf.config.Tags)
if len(roleBytes) > limit {

View file

@ -189,4 +189,4 @@ func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse,
defer k.l.RUnlock()
return k.handleKeyRequest("", listKeysQuery, opts)
}
}

View file

@ -1331,7 +1331,7 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
// handleNodeConflict is invoked when a join detects a conflict over a name.
// This means two different nodes (IP/Port) are claiming the same name. Memberlist
// will reject the "new" node mapping, but we can still be notified
// will reject the "new" node mapping, but we can still be notified.
func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) {
// Log a basic warning if the node is not us...
if existing.Name != s.config.NodeName {

View file

@ -25,10 +25,34 @@ nodes to re-join, as well as restore our clock values to avoid replaying
old events.
*/
const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
const tmpExt = ".compact"
const snapshotErrorRecoveryInterval = 30 * time.Second
const (
// flushInterval is how often we force a flush of the snapshot file
flushInterval = 500 * time.Millisecond
// clockUpdateInterval is how often we fetch the current lamport time of the cluster and write to the snapshot file
clockUpdateInterval = 500 * time.Millisecond
// tmpExt is the extention we use for the temporary file during compaction
tmpExt = ".compact"
// snapshotErrorRecoveryInterval is how often we attempt to recover from
// errors writing to the snapshot file.
snapshotErrorRecoveryInterval = 30 * time.Second
// eventChSize is the size of the event buffers between Serf and the
// consuming application. If this is exhausted we will block Serf and Memberlist.
eventChSize = 2048
// shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown
shutdownFlushTimeout = 250 * time.Millisecond
// snapshotBytesPerNode is an estimated bytes per node to snapshot
snapshotBytesPerNode = 128
// snapshotCompactionThreshold is the threshold we apply to
// the snapshot size estimate (nodes * bytes per node) before compacting.
snapshotCompactionThreshold = 2
)
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
@ -38,6 +62,7 @@ type Snapshotter struct {
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
streamCh chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
@ -45,7 +70,7 @@ type Snapshotter struct {
leaveCh chan struct{}
leaving bool
logger *log.Logger
maxSize int64
minCompactSize int64
path string
offset int64
outCh chan<- Event
@ -72,13 +97,14 @@ func (p PreviousNode) String() string {
// Setting rejoinAfterLeave makes leave not clear the state, and can be used
// if you intend to rejoin the same cluster after a leave.
func NewSnapshotter(path string,
maxSize int,
minCompactSize int,
rejoinAfterLeave bool,
logger *log.Logger,
clock *LamportClock,
outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024)
inCh := make(chan Event, eventChSize)
streamCh := make(chan Event, eventChSize)
// Try to open the file
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
@ -101,12 +127,13 @@ func NewSnapshotter(path string,
fh: fh,
buffered: bufio.NewWriter(fh),
inCh: inCh,
streamCh: streamCh,
lastClock: 0,
lastEventClock: 0,
lastQueryClock: 0,
leaveCh: make(chan struct{}),
logger: logger,
maxSize: int64(maxSize),
minCompactSize: int64(minCompactSize),
path: path,
offset: offset,
outCh: outCh,
@ -122,6 +149,7 @@ func NewSnapshotter(path string,
}
// Start handling new commands
go snap.teeStream()
go snap.stream()
return inCh, snap, nil
}
@ -171,11 +199,69 @@ func (s *Snapshotter) Leave() {
}
}
// teeStream is a long running routine that is used to copy events
// to the output channel and the internal event handler.
func (s *Snapshotter) teeStream() {
flushEvent := func(e Event) {
// Forward to the internal stream, do not block
select {
case s.streamCh <- e:
default:
}
// Forward the event immediately, do not block
if s.outCh != nil {
select {
case s.outCh <- e:
default:
}
}
}
OUTER:
for {
select {
case e := <-s.inCh:
flushEvent(e)
case <-s.shutdownCh:
break OUTER
}
}
// Drain any remaining events before exiting
for {
select {
case e := <-s.inCh:
flushEvent(e)
default:
return
}
}
}
// stream is a long running routine that is used to handle events
func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop()
// flushEvent is used to handle writing out an event
flushEvent := func(e Event) {
// Stop recording events after a leave is issued
if s.leaving {
return
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
}
for {
select {
case <-s.leaveCh:
@ -193,31 +279,32 @@ func (s *Snapshotter) stream() {
s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err)
}
case e := <-s.inCh:
// Forward the event immediately
if s.outCh != nil {
s.outCh <- e
}
// Stop recording events after a leave is issued
if s.leaving {
continue
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
case e := <-s.streamCh:
flushEvent(e)
case <-clockTicker.C:
s.updateClock()
case <-s.shutdownCh:
// Setup a timeout
flushTimeout := time.After(shutdownFlushTimeout)
// Snapshot the clock
s.updateClock()
// Clear out the buffers
FLUSH:
for {
select {
case e := <-s.streamCh:
flushEvent(e)
case <-flushTimeout:
break FLUSH
default:
break FLUSH
}
}
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
}
@ -321,12 +408,25 @@ func (s *Snapshotter) appendLine(l string) error {
// Check if a compaction is necessary
s.offset += int64(n)
if s.offset > s.maxSize {
if s.offset > s.snapshotMaxSize() {
return s.compact()
}
return nil
}
// snapshotMaxSize computes the maximum size and is used to force periodic compaction.
func (s *Snapshotter) snapshotMaxSize() int64 {
nodes := int64(len(s.aliveNodes))
estSize := nodes * snapshotBytesPerNode
threshold := estSize * snapshotCompactionThreshold
// Apply a minimum threshold to avoid frequent compaction
if threshold < s.minCompactSize {
threshold = s.minCompactSize
}
return threshold
}
// Compact is used to compact the snapshot once it is too large
func (s *Snapshotter) compact() error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())

5
vendor/vendor.json vendored
View file

@ -221,8 +221,9 @@
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"ujL3Sc5iqc28/En2ndmc2R7oUQM=","revision":"9c733b2b7f53115c5ef261a90ce912a1bb49e970","revisionTime":"2019-01-04T13:37:20Z"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"80ab48778deee28e4ea2dc4ef1ebb2c5f4063996","revisionTime":"2018-05-07T23:19:28Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"QrT+nzyXsD/MmhTjjhcPdnALZ1I=","revision":"80ab48778deee28e4ea2dc4ef1ebb2c5f4063996","revisionTime":"2018-05-07T23:19:28Z"},
{"path":"github.com/hashicorp/serf","checksumSHA1":"9omt7lEuhBNSdgT32ThEzXn/aTU=","revision":"c7f3bc96b40972e67dfbe007c1fa825cf59ac8c2","revisionTime":"2019-01-04T15:39:47Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"c7f3bc96b40972e67dfbe007c1fa825cf59ac8c2","revisionTime":"2019-01-04T15:39:47Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"siLn7zwVHQk070rpd99BTktGfTs=","revision":"c7f3bc96b40972e67dfbe007c1fa825cf59ac8c2","revisionTime":"2019-01-04T15:39:47Z"},
{"path":"github.com/hashicorp/vault/api","checksumSHA1":"DP7dd8OErZVF0q+XfPo0RGkDcLk=","revision":"6e8d91a59c34bd9f323397c30be9651422295c65","revisionTime":"2018-09-19T17:09:49Z"},
{"path":"github.com/hashicorp/vault/helper/compressutil","checksumSHA1":"bSdPFOHaTwEvM4PIvn0PZfn75jM=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},
{"path":"github.com/hashicorp/vault/helper/consts","checksumSHA1":"QNGGvSYtwk6VCkj4laZPjM2301E=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},