Missing hashicorp/serf/serf dep
This commit is contained in:
parent
68891839d3
commit
96e9857f12
|
@ -2,6 +2,7 @@ package serf
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -15,6 +16,7 @@ var ProtocolVersionMap map[uint8]uint8
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
ProtocolVersionMap = map[uint8]uint8{
|
ProtocolVersionMap = map[uint8]uint8{
|
||||||
|
5: 2,
|
||||||
4: 2,
|
4: 2,
|
||||||
3: 2,
|
3: 2,
|
||||||
2: 2,
|
2: 2,
|
||||||
|
@ -182,6 +184,12 @@ type Config struct {
|
||||||
// logs will go to stderr.
|
// logs will go to stderr.
|
||||||
LogOutput io.Writer
|
LogOutput io.Writer
|
||||||
|
|
||||||
|
// Logger is a custom logger which you provide. If Logger is set, it will use
|
||||||
|
// this for the internal logger. If Logger is not set, it will fall back to the
|
||||||
|
// behavior for using LogOutput. You cannot specify both LogOutput and Logger
|
||||||
|
// at the same time.
|
||||||
|
Logger *log.Logger
|
||||||
|
|
||||||
// SnapshotPath if provided is used to snapshot live nodes as well
|
// SnapshotPath if provided is used to snapshot live nodes as well
|
||||||
// as lamport clock values. When Serf is started with a snapshot,
|
// as lamport clock values. When Serf is started with a snapshot,
|
||||||
// it will attempt to join all the previously known nodes until one
|
// it will attempt to join all the previously known nodes until one
|
||||||
|
@ -240,7 +248,7 @@ func DefaultConfig() *Config {
|
||||||
EventBuffer: 512,
|
EventBuffer: 512,
|
||||||
QueryBuffer: 512,
|
QueryBuffer: 512,
|
||||||
LogOutput: os.Stderr,
|
LogOutput: os.Stderr,
|
||||||
ProtocolVersion: ProtocolVersionMax,
|
ProtocolVersion: 4,
|
||||||
ReapInterval: 15 * time.Second,
|
ReapInterval: 15 * time.Second,
|
||||||
RecentIntentTimeout: 5 * time.Minute,
|
RecentIntentTimeout: 5 * time.Minute,
|
||||||
ReconnectInterval: 30 * time.Second,
|
ReconnectInterval: 30 * time.Second,
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package serf
|
package serf
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
// delegate is the memberlist.Delegate implementation that Serf uses.
|
// delegate is the memberlist.Delegate implementation that Serf uses.
|
||||||
|
@ -83,6 +85,25 @@ func (d *delegate) NotifyMsg(buf []byte) {
|
||||||
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
|
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
|
||||||
d.serf.handleQueryResponse(&resp)
|
d.serf.handleQueryResponse(&resp)
|
||||||
|
|
||||||
|
case messageRelayType:
|
||||||
|
var header relayHeader
|
||||||
|
var handle codec.MsgpackHandle
|
||||||
|
reader := bytes.NewReader(buf[1:])
|
||||||
|
decoder := codec.NewDecoder(reader, &handle)
|
||||||
|
if err := decoder.Decode(&header); err != nil {
|
||||||
|
d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// The remaining contents are the message itself, so forward that
|
||||||
|
raw := make([]byte, reader.Len())
|
||||||
|
reader.Read(raw)
|
||||||
|
d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String())
|
||||||
|
if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil {
|
||||||
|
d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
|
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
|
||||||
}
|
}
|
||||||
|
@ -230,7 +251,8 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
||||||
// If we are doing a join, and eventJoinIgnore is set
|
// If we are doing a join, and eventJoinIgnore is set
|
||||||
// then we set the eventMinTime to the EventLTime. This
|
// then we set the eventMinTime to the EventLTime. This
|
||||||
// prevents any of the incoming events from being processed
|
// prevents any of the incoming events from being processed
|
||||||
if isJoin && d.serf.eventJoinIgnore {
|
eventJoinIgnore := d.serf.eventJoinIgnore.Load().(bool)
|
||||||
|
if isJoin && eventJoinIgnore {
|
||||||
d.serf.eventLock.Lock()
|
d.serf.eventLock.Lock()
|
||||||
if pp.EventLTime > d.serf.eventMinTime {
|
if pp.EventLTime > d.serf.eventMinTime {
|
||||||
d.serf.eventMinTime = pp.EventLTime
|
d.serf.eventMinTime = pp.EventLTime
|
||||||
|
|
|
@ -95,18 +95,19 @@ func (u UserEvent) String() string {
|
||||||
return fmt.Sprintf("user-event: %s", u.Name)
|
return fmt.Sprintf("user-event: %s", u.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query is the struct used EventQuery type events
|
// Query is the struct used by EventQuery type events
|
||||||
type Query struct {
|
type Query struct {
|
||||||
LTime LamportTime
|
LTime LamportTime
|
||||||
Name string
|
Name string
|
||||||
Payload []byte
|
Payload []byte
|
||||||
|
|
||||||
serf *Serf
|
serf *Serf
|
||||||
id uint32 // ID is not exported, since it may change
|
id uint32 // ID is not exported, since it may change
|
||||||
addr []byte // Address to respond to
|
addr []byte // Address to respond to
|
||||||
port uint16 // Port to respond to
|
port uint16 // Port to respond to
|
||||||
deadline time.Time // Must respond by this deadline
|
deadline time.Time // Must respond by this deadline
|
||||||
respLock sync.Mutex
|
relayFactor uint8 // Number of duplicate responses to relay back to sender
|
||||||
|
respLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Query) EventType() EventType {
|
func (q *Query) EventType() EventType {
|
||||||
|
@ -129,12 +130,12 @@ func (q *Query) Respond(buf []byte) error {
|
||||||
|
|
||||||
// Check if we've already responded
|
// Check if we've already responded
|
||||||
if q.deadline.IsZero() {
|
if q.deadline.IsZero() {
|
||||||
return fmt.Errorf("Response already sent")
|
return fmt.Errorf("response already sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure we aren't past our response deadline
|
// Ensure we aren't past our response deadline
|
||||||
if time.Now().After(q.deadline) {
|
if time.Now().After(q.deadline) {
|
||||||
return fmt.Errorf("Response is past the deadline")
|
return fmt.Errorf("response is past the deadline")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create response
|
// Create response
|
||||||
|
@ -145,10 +146,10 @@ func (q *Query) Respond(buf []byte) error {
|
||||||
Payload: buf,
|
Payload: buf,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Format the response
|
// Send a direct response
|
||||||
raw, err := encodeMessage(messageQueryResponseType, &resp)
|
raw, err := encodeMessage(messageQueryResponseType, &resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to format response: %v", err)
|
return fmt.Errorf("failed to format response: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the size limit
|
// Check the size limit
|
||||||
|
@ -156,13 +157,18 @@ func (q *Query) Respond(buf []byte) error {
|
||||||
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the response
|
// Send the response directly to the originator
|
||||||
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
||||||
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
|
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clera the deadline, response sent
|
// Relay the response through up to relayFactor other nodes
|
||||||
|
if err := q.serf.relayResponse(q.relayFactor, addr, &resp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the deadline, responses sent
|
||||||
q.deadline = time.Time{}
|
q.deadline = time.Time{}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -192,10 +192,12 @@ func (s *serfQueries) handleInstallKey(q *Query) {
|
||||||
goto SEND
|
goto SEND
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.serf.writeKeyringFile(); err != nil {
|
if s.serf.config.KeyringFile != "" {
|
||||||
response.Message = err.Error()
|
if err := s.serf.writeKeyringFile(); err != nil {
|
||||||
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
|
response.Message = err.Error()
|
||||||
goto SEND
|
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
|
||||||
|
goto SEND
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
response.Result = true
|
response.Result = true
|
||||||
|
|
|
@ -33,6 +33,13 @@ type KeyResponse struct {
|
||||||
Keys map[string]int
|
Keys map[string]int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KeyRequestOptions is used to contain optional parameters for a keyring operation
|
||||||
|
type KeyRequestOptions struct {
|
||||||
|
// RelayFactor is the number of duplicate query responses to send by relaying through
|
||||||
|
// other nodes, for redundancy
|
||||||
|
RelayFactor uint8
|
||||||
|
}
|
||||||
|
|
||||||
// streamKeyResp takes care of reading responses from a channel and composing
|
// streamKeyResp takes care of reading responses from a channel and composing
|
||||||
// them into a KeyResponse. It will update a KeyResponse *in place* and
|
// them into a KeyResponse. It will update a KeyResponse *in place* and
|
||||||
// therefore has nothing to return.
|
// therefore has nothing to return.
|
||||||
|
@ -83,7 +90,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
|
||||||
// handleKeyRequest performs query broadcasting to all members for any type of
|
// handleKeyRequest performs query broadcasting to all members for any type of
|
||||||
// key operation and manages gathering responses and packing them up into a
|
// key operation and manages gathering responses and packing them up into a
|
||||||
// KeyResponse for uniform response handling.
|
// KeyResponse for uniform response handling.
|
||||||
func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
|
func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||||
resp := &KeyResponse{
|
resp := &KeyResponse{
|
||||||
Messages: make(map[string]string),
|
Messages: make(map[string]string),
|
||||||
Keys: make(map[string]int),
|
Keys: make(map[string]int),
|
||||||
|
@ -103,6 +110,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qParam := k.serf.DefaultQueryParams()
|
qParam := k.serf.DefaultQueryParams()
|
||||||
|
if opts != nil {
|
||||||
|
qParam.RelayFactor = opts.RelayFactor
|
||||||
|
}
|
||||||
queryResp, err := k.serf.Query(qName, req, qParam)
|
queryResp, err := k.serf.Query(qName, req, qParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
|
@ -127,30 +137,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
|
||||||
// responses from each of them, returning a list of messages from each node
|
// responses from each of them, returning a list of messages from each node
|
||||||
// and any applicable error conditions.
|
// and any applicable error conditions.
|
||||||
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
|
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
|
||||||
|
return k.InstallKeyWithOptions(key, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||||
k.l.Lock()
|
k.l.Lock()
|
||||||
defer k.l.Unlock()
|
defer k.l.Unlock()
|
||||||
|
|
||||||
return k.handleKeyRequest(key, installKeyQuery)
|
return k.handleKeyRequest(key, installKeyQuery, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UseKey handles broadcasting a primary key change to all members in the
|
// UseKey handles broadcasting a primary key change to all members in the
|
||||||
// cluster, and gathering any response messages. If successful, there should
|
// cluster, and gathering any response messages. If successful, there should
|
||||||
// be an empty KeyResponse returned.
|
// be an empty KeyResponse returned.
|
||||||
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
|
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
|
||||||
|
return k.UseKeyWithOptions(key, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||||
k.l.Lock()
|
k.l.Lock()
|
||||||
defer k.l.Unlock()
|
defer k.l.Unlock()
|
||||||
|
|
||||||
return k.handleKeyRequest(key, useKeyQuery)
|
return k.handleKeyRequest(key, useKeyQuery, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveKey handles broadcasting a key to the cluster for removal. Each member
|
// RemoveKey handles broadcasting a key to the cluster for removal. Each member
|
||||||
// will receive this event, and if they have the key in their keyring, remove
|
// will receive this event, and if they have the key in their keyring, remove
|
||||||
// it. If any errors are encountered, RemoveKey will collect and relay them.
|
// it. If any errors are encountered, RemoveKey will collect and relay them.
|
||||||
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
|
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
|
||||||
|
return k.RemoveKeyWithOptions(key, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||||
k.l.Lock()
|
k.l.Lock()
|
||||||
defer k.l.Unlock()
|
defer k.l.Unlock()
|
||||||
|
|
||||||
return k.handleKeyRequest(key, removeKeyQuery)
|
return k.handleKeyRequest(key, removeKeyQuery, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListKeys is used to collect installed keys from members in a Serf cluster
|
// ListKeys is used to collect installed keys from members in a Serf cluster
|
||||||
|
@ -159,8 +181,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
|
||||||
// Since having multiple keys installed can cause performance penalties in some
|
// Since having multiple keys installed can cause performance penalties in some
|
||||||
// cases, it's important to verify this information and remove unneeded keys.
|
// cases, it's important to verify this information and remove unneeded keys.
|
||||||
func (k *KeyManager) ListKeys() (*KeyResponse, error) {
|
func (k *KeyManager) ListKeys() (*KeyResponse, error) {
|
||||||
|
return k.ListKeysWithOptions(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||||
k.l.RLock()
|
k.l.RLock()
|
||||||
defer k.l.RUnlock()
|
defer k.l.RUnlock()
|
||||||
|
|
||||||
return k.handleKeyRequest("", listKeysQuery)
|
return k.handleKeyRequest("", listKeysQuery, opts)
|
||||||
}
|
}
|
|
@ -2,8 +2,10 @@ package serf
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
// messageType are the types of gossip messages Serf will send along
|
// messageType are the types of gossip messages Serf will send along
|
||||||
|
@ -20,6 +22,7 @@ const (
|
||||||
messageConflictResponseType
|
messageConflictResponseType
|
||||||
messageKeyRequestType
|
messageKeyRequestType
|
||||||
messageKeyResponseType
|
messageKeyResponseType
|
||||||
|
messageRelayType
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -75,15 +78,16 @@ type messageUserEvent struct {
|
||||||
|
|
||||||
// messageQuery is used for query events
|
// messageQuery is used for query events
|
||||||
type messageQuery struct {
|
type messageQuery struct {
|
||||||
LTime LamportTime // Event lamport time
|
LTime LamportTime // Event lamport time
|
||||||
ID uint32 // Query ID, randomly generated
|
ID uint32 // Query ID, randomly generated
|
||||||
Addr []byte // Source address, used for a direct reply
|
Addr []byte // Source address, used for a direct reply
|
||||||
Port uint16 // Source port, used for a direct reply
|
Port uint16 // Source port, used for a direct reply
|
||||||
Filters [][]byte // Potential query filters
|
Filters [][]byte // Potential query filters
|
||||||
Flags uint32 // Used to provide various flags
|
Flags uint32 // Used to provide various flags
|
||||||
Timeout time.Duration // Maximum time between delivery and response
|
RelayFactor uint8 // Used to set the number of duplicate relayed responses
|
||||||
Name string // Query name
|
Timeout time.Duration // Maximum time between delivery and response
|
||||||
Payload []byte // Query payload
|
Name string // Query name
|
||||||
|
Payload []byte // Query payload
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ack checks if the ack flag is set
|
// Ack checks if the ack flag is set
|
||||||
|
@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
|
||||||
return buf.Bytes(), err
|
return buf.Bytes(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// relayHeader is used to store the end destination of a relayed message
|
||||||
|
type relayHeader struct {
|
||||||
|
DestAddr net.UDPAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
// encodeRelayMessage wraps a message in the messageRelayType, adding the length and
|
||||||
|
// address of the end recipient to the front of the message
|
||||||
|
func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) {
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
handle := codec.MsgpackHandle{}
|
||||||
|
encoder := codec.NewEncoder(buf, &handle)
|
||||||
|
|
||||||
|
buf.WriteByte(uint8(messageRelayType))
|
||||||
|
if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.WriteByte(uint8(t))
|
||||||
|
err := encoder.Encode(msg)
|
||||||
|
return buf.Bytes(), err
|
||||||
|
}
|
||||||
|
|
||||||
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
|
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
|
||||||
buf := bytes.NewBuffer(nil)
|
buf := bytes.NewBuffer(nil)
|
||||||
buf.WriteByte(uint8(f))
|
buf.WriteByte(uint8(f))
|
||||||
|
|
|
@ -2,7 +2,6 @@ package serf
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -37,7 +36,7 @@ func (p *pingDelegate) AckPayload() []byte {
|
||||||
// The rest of the message is the serialized coordinate.
|
// The rest of the message is the serialized coordinate.
|
||||||
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
|
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
|
||||||
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
|
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
|
||||||
log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
|
p.serf.logger.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
|
||||||
}
|
}
|
||||||
return buf.Bytes()
|
return buf.Bytes()
|
||||||
}
|
}
|
||||||
|
@ -52,7 +51,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
|
||||||
// Verify ping version in the header.
|
// Verify ping version in the header.
|
||||||
version := payload[0]
|
version := payload[0]
|
||||||
if version != PingVersion {
|
if version != PingVersion {
|
||||||
log.Printf("[ERR] serf: Unsupported ping version: %v", version)
|
p.serf.logger.Printf("[ERR] serf: Unsupported ping version: %v", version)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,29 +60,30 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
|
||||||
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
|
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
|
||||||
var coord coordinate.Coordinate
|
var coord coordinate.Coordinate
|
||||||
if err := dec.Decode(&coord); err != nil {
|
if err := dec.Decode(&coord); err != nil {
|
||||||
log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
|
p.serf.logger.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply the update. Since this is a coordinate coming from some place
|
// Apply the update.
|
||||||
// else we harden this and look for dimensionality problems proactively.
|
|
||||||
before := p.serf.coordClient.GetCoordinate()
|
before := p.serf.coordClient.GetCoordinate()
|
||||||
if before.IsCompatibleWith(&coord) {
|
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
|
||||||
after := p.serf.coordClient.Update(other.Name, &coord, rtt)
|
if err != nil {
|
||||||
|
p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n",
|
||||||
// Publish some metrics to give us an idea of how much we are
|
other.Name, err)
|
||||||
// adjusting each time we update.
|
return
|
||||||
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
|
|
||||||
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
|
|
||||||
|
|
||||||
// Cache the coordinate for the other node, and add our own
|
|
||||||
// to the cache as well since it just got updated. This lets
|
|
||||||
// users call GetCachedCoordinate with our node name, which is
|
|
||||||
// more friendly.
|
|
||||||
p.serf.coordCacheLock.Lock()
|
|
||||||
p.serf.coordCache[other.Name] = &coord
|
|
||||||
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
|
|
||||||
p.serf.coordCacheLock.Unlock()
|
|
||||||
} else {
|
|
||||||
log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publish some metrics to give us an idea of how much we are
|
||||||
|
// adjusting each time we update.
|
||||||
|
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
|
||||||
|
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
|
||||||
|
|
||||||
|
// Cache the coordinate for the other node, and add our own
|
||||||
|
// to the cache as well since it just got updated. This lets
|
||||||
|
// users call GetCachedCoordinate with our node name, which is
|
||||||
|
// more friendly.
|
||||||
|
p.serf.coordCacheLock.Lock()
|
||||||
|
p.serf.coordCache[other.Name] = &coord
|
||||||
|
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
|
||||||
|
p.serf.coordCacheLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
package serf
|
package serf
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"net"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -24,6 +28,10 @@ type QueryParam struct {
|
||||||
// send an ack.
|
// send an ack.
|
||||||
RequestAck bool
|
RequestAck bool
|
||||||
|
|
||||||
|
// RelayFactor controls the number of duplicate responses to relay
|
||||||
|
// back to the sender through other nodes for redundancy.
|
||||||
|
RelayFactor uint8
|
||||||
|
|
||||||
// The timeout limits how long the query is left open. If not provided,
|
// The timeout limits how long the query is left open. If not provided,
|
||||||
// then a default timeout is used based on the configuration of Serf
|
// then a default timeout is used based on the configuration of Serf
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
@ -93,6 +101,10 @@ type QueryResponse struct {
|
||||||
// respCh is used to send a response from a node
|
// respCh is used to send a response from a node
|
||||||
respCh chan NodeResponse
|
respCh chan NodeResponse
|
||||||
|
|
||||||
|
// acks/responses are used to track the nodes that have sent an ack/response
|
||||||
|
acks map[string]struct{}
|
||||||
|
responses map[string]struct{}
|
||||||
|
|
||||||
closed bool
|
closed bool
|
||||||
closeLock sync.Mutex
|
closeLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
@ -100,13 +112,15 @@ type QueryResponse struct {
|
||||||
// newQueryResponse is used to construct a new query response
|
// newQueryResponse is used to construct a new query response
|
||||||
func newQueryResponse(n int, q *messageQuery) *QueryResponse {
|
func newQueryResponse(n int, q *messageQuery) *QueryResponse {
|
||||||
resp := &QueryResponse{
|
resp := &QueryResponse{
|
||||||
deadline: time.Now().Add(q.Timeout),
|
deadline: time.Now().Add(q.Timeout),
|
||||||
id: q.ID,
|
id: q.ID,
|
||||||
lTime: q.LTime,
|
lTime: q.LTime,
|
||||||
respCh: make(chan NodeResponse, n),
|
respCh: make(chan NodeResponse, n),
|
||||||
|
responses: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
if q.Ack() {
|
if q.Ack() {
|
||||||
resp.ackCh = make(chan string, n)
|
resp.ackCh = make(chan string, n)
|
||||||
|
resp.acks = make(map[string]struct{})
|
||||||
}
|
}
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
@ -135,6 +149,8 @@ func (r *QueryResponse) Deadline() time.Time {
|
||||||
|
|
||||||
// Finished returns if the query is finished running
|
// Finished returns if the query is finished running
|
||||||
func (r *QueryResponse) Finished() bool {
|
func (r *QueryResponse) Finished() bool {
|
||||||
|
r.closeLock.Lock()
|
||||||
|
defer r.closeLock.Unlock()
|
||||||
return r.closed || time.Now().After(r.deadline)
|
return r.closed || time.Now().After(r.deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,6 +167,22 @@ func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
|
||||||
return r.respCh
|
return r.respCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sendResponse sends a response on the response channel ensuring the channel is not closed.
|
||||||
|
func (r *QueryResponse) sendResponse(nr NodeResponse) error {
|
||||||
|
r.closeLock.Lock()
|
||||||
|
defer r.closeLock.Unlock()
|
||||||
|
if r.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case r.respCh <- nr:
|
||||||
|
r.responses[nr.From] = struct{}{}
|
||||||
|
default:
|
||||||
|
return errors.New("serf: Failed to deliver query response, dropping")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// NodeResponse is used to represent a single response from a node
|
// NodeResponse is used to represent a single response from a node
|
||||||
type NodeResponse struct {
|
type NodeResponse struct {
|
||||||
From string
|
From string
|
||||||
|
@ -208,3 +240,74 @@ func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// relayResponse will relay a copy of the given response to up to relayFactor
|
||||||
|
// other members.
|
||||||
|
func (s *Serf) relayResponse(relayFactor uint8, addr net.UDPAddr, resp *messageQueryResponse) error {
|
||||||
|
if relayFactor == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Needs to be worth it; we need to have at least relayFactor *other*
|
||||||
|
// nodes. If you have a tiny cluster then the relayFactor shouldn't
|
||||||
|
// be needed.
|
||||||
|
members := s.Members()
|
||||||
|
if len(members) < int(relayFactor)+1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prep the relay message, which is a wrapped version of the original.
|
||||||
|
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to format relayed response: %v", err)
|
||||||
|
}
|
||||||
|
if len(raw) > s.config.QueryResponseSizeLimit {
|
||||||
|
return fmt.Errorf("relayed response exceeds limit of %d bytes", s.config.QueryResponseSizeLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Relay to a random set of peers.
|
||||||
|
localName := s.LocalMember().Name
|
||||||
|
relayMembers := kRandomMembers(int(relayFactor), members, func(m Member) bool {
|
||||||
|
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == localName
|
||||||
|
})
|
||||||
|
for _, m := range relayMembers {
|
||||||
|
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
|
||||||
|
if err := s.memberlist.SendTo(&relayAddr, raw); err != nil {
|
||||||
|
return fmt.Errorf("failed to send relay response: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// kRandomMembers selects up to k members from a given list, optionally
|
||||||
|
// filtering by the given filterFunc
|
||||||
|
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
|
||||||
|
n := len(members)
|
||||||
|
kMembers := make([]Member, 0, k)
|
||||||
|
OUTER:
|
||||||
|
// Probe up to 3*n times, with large n this is not necessary
|
||||||
|
// since k << n, but with small n we want search to be
|
||||||
|
// exhaustive
|
||||||
|
for i := 0; i < 3*n && len(kMembers) < k; i++ {
|
||||||
|
// Get random member
|
||||||
|
idx := rand.Intn(n)
|
||||||
|
member := members[idx]
|
||||||
|
|
||||||
|
// Give the filter a shot at it.
|
||||||
|
if filterFunc != nil && filterFunc(member) {
|
||||||
|
continue OUTER
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have this member already
|
||||||
|
for j := 0; j < len(kMembers); j++ {
|
||||||
|
if member.Name == kMembers[j].Name {
|
||||||
|
continue OUTER
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append the member
|
||||||
|
kMembers = append(kMembers, member)
|
||||||
|
}
|
||||||
|
|
||||||
|
return kMembers
|
||||||
|
}
|
||||||
|
|
|
@ -10,8 +10,10 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -25,7 +27,7 @@ import (
|
||||||
// version to memberlist below.
|
// version to memberlist below.
|
||||||
const (
|
const (
|
||||||
ProtocolVersionMin uint8 = 2
|
ProtocolVersionMin uint8 = 2
|
||||||
ProtocolVersionMax = 4
|
ProtocolVersionMax = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -73,7 +75,7 @@ type Serf struct {
|
||||||
|
|
||||||
eventBroadcasts *memberlist.TransmitLimitedQueue
|
eventBroadcasts *memberlist.TransmitLimitedQueue
|
||||||
eventBuffer []*userEvents
|
eventBuffer []*userEvents
|
||||||
eventJoinIgnore bool
|
eventJoinIgnore atomic.Value
|
||||||
eventMinTime LamportTime
|
eventMinTime LamportTime
|
||||||
eventLock sync.RWMutex
|
eventLock sync.RWMutex
|
||||||
|
|
||||||
|
@ -240,14 +242,24 @@ func Create(conf *Config) (*Serf, error) {
|
||||||
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := conf.Logger
|
||||||
|
if logger == nil {
|
||||||
|
logOutput := conf.LogOutput
|
||||||
|
if logOutput == nil {
|
||||||
|
logOutput = os.Stderr
|
||||||
|
}
|
||||||
|
logger = log.New(logOutput, "", log.LstdFlags)
|
||||||
|
}
|
||||||
|
|
||||||
serf := &Serf{
|
serf := &Serf{
|
||||||
config: conf,
|
config: conf,
|
||||||
logger: log.New(conf.LogOutput, "", log.LstdFlags),
|
logger: logger,
|
||||||
members: make(map[string]*memberState),
|
members: make(map[string]*memberState),
|
||||||
queryResponse: make(map[LamportTime]*QueryResponse),
|
queryResponse: make(map[LamportTime]*QueryResponse),
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
state: SerfAlive,
|
state: SerfAlive,
|
||||||
}
|
}
|
||||||
|
serf.eventJoinIgnore.Store(false)
|
||||||
|
|
||||||
// Check that the meta data length is okay
|
// Check that the meta data length is okay
|
||||||
if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
|
if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
|
||||||
|
@ -328,21 +340,15 @@ func Create(conf *Config) (*Serf, error) {
|
||||||
// Setup the various broadcast queues, which we use to send our own
|
// Setup the various broadcast queues, which we use to send our own
|
||||||
// custom broadcasts along the gossip channel.
|
// custom broadcasts along the gossip channel.
|
||||||
serf.broadcasts = &memberlist.TransmitLimitedQueue{
|
serf.broadcasts = &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: serf.NumNodes,
|
||||||
return len(serf.members)
|
|
||||||
},
|
|
||||||
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
||||||
}
|
}
|
||||||
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
|
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: serf.NumNodes,
|
||||||
return len(serf.members)
|
|
||||||
},
|
|
||||||
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
||||||
}
|
}
|
||||||
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
|
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: serf.NumNodes,
|
||||||
return len(serf.members)
|
|
||||||
},
|
|
||||||
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,15 +505,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes
|
||||||
|
|
||||||
// Create a message
|
// Create a message
|
||||||
q := messageQuery{
|
q := messageQuery{
|
||||||
LTime: s.queryClock.Time(),
|
LTime: s.queryClock.Time(),
|
||||||
ID: uint32(rand.Int31()),
|
ID: uint32(rand.Int31()),
|
||||||
Addr: local.Addr,
|
Addr: local.Addr,
|
||||||
Port: local.Port,
|
Port: local.Port,
|
||||||
Filters: filters,
|
Filters: filters,
|
||||||
Flags: flags,
|
Flags: flags,
|
||||||
Timeout: params.Timeout,
|
RelayFactor: params.RelayFactor,
|
||||||
Name: name,
|
Timeout: params.Timeout,
|
||||||
Payload: payload,
|
Name: name,
|
||||||
|
Payload: payload,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode the query
|
// Encode the query
|
||||||
|
@ -588,9 +595,9 @@ func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
|
||||||
// Ignore any events from a potential join. This is safe since we hold
|
// Ignore any events from a potential join. This is safe since we hold
|
||||||
// the joinLock and nobody else can be doing a Join
|
// the joinLock and nobody else can be doing a Join
|
||||||
if ignoreOld {
|
if ignoreOld {
|
||||||
s.eventJoinIgnore = true
|
s.eventJoinIgnore.Store(true)
|
||||||
defer func() {
|
defer func() {
|
||||||
s.eventJoinIgnore = false
|
s.eventJoinIgnore.Store(false)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -791,13 +798,15 @@ func (s *Serf) Shutdown() error {
|
||||||
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
|
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait to close the shutdown channel until after we've shut down the
|
||||||
|
// memberlist and its associated network resources, since the shutdown
|
||||||
|
// channel signals that we are cleaned up outside of Serf.
|
||||||
s.state = SerfShutdown
|
s.state = SerfShutdown
|
||||||
close(s.shutdownCh)
|
|
||||||
|
|
||||||
err := s.memberlist.Shutdown()
|
err := s.memberlist.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
close(s.shutdownCh)
|
||||||
|
|
||||||
// Wait for the snapshoter to finish if we have one
|
// Wait for the snapshoter to finish if we have one
|
||||||
if s.snapshotter != nil {
|
if s.snapshotter != nil {
|
||||||
|
@ -1237,19 +1246,23 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
|
||||||
if err := s.memberlist.SendTo(&addr, raw); err != nil {
|
if err := s.memberlist.SendTo(&addr, raw); err != nil {
|
||||||
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
|
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
|
||||||
}
|
}
|
||||||
|
if err := s.relayResponse(query.RelayFactor, addr, &ack); err != nil {
|
||||||
|
s.logger.Printf("[ERR] serf: failed to relay ack: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.config.EventCh != nil {
|
if s.config.EventCh != nil {
|
||||||
s.config.EventCh <- &Query{
|
s.config.EventCh <- &Query{
|
||||||
LTime: query.LTime,
|
LTime: query.LTime,
|
||||||
Name: query.Name,
|
Name: query.Name,
|
||||||
Payload: query.Payload,
|
Payload: query.Payload,
|
||||||
serf: s,
|
serf: s,
|
||||||
id: query.ID,
|
id: query.ID,
|
||||||
addr: query.Addr,
|
addr: query.Addr,
|
||||||
port: query.Port,
|
port: query.Port,
|
||||||
deadline: time.Now().Add(query.Timeout),
|
deadline: time.Now().Add(query.Timeout),
|
||||||
|
relayFactor: query.RelayFactor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rebroadcast
|
return rebroadcast
|
||||||
|
@ -1282,18 +1295,30 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
|
||||||
|
|
||||||
// Process each type of response
|
// Process each type of response
|
||||||
if resp.Ack() {
|
if resp.Ack() {
|
||||||
|
// Exit early if this is a duplicate ack
|
||||||
|
if _, ok := query.acks[resp.From]; ok {
|
||||||
|
metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
|
metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
|
||||||
select {
|
select {
|
||||||
case query.ackCh <- resp.From:
|
case query.ackCh <- resp.From:
|
||||||
|
query.acks[resp.From] = struct{}{}
|
||||||
default:
|
default:
|
||||||
s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping")
|
s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// Exit early if this is a duplicate response
|
||||||
|
if _, ok := query.responses[resp.From]; ok {
|
||||||
|
metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
|
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
|
||||||
select {
|
err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
|
||||||
case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
|
if err != nil {
|
||||||
default:
|
s.logger.Printf("[WARN] %v", err)
|
||||||
s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1353,7 +1378,7 @@ func (s *Serf) resolveNodeConflict() {
|
||||||
|
|
||||||
// Update the counters
|
// Update the counters
|
||||||
responses++
|
responses++
|
||||||
if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
|
if member.Addr.Equal(local.Addr) && member.Port == local.Port {
|
||||||
matching++
|
matching++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1642,6 +1667,9 @@ func (s *Serf) Stats() map[string]string {
|
||||||
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
|
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
|
||||||
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
|
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
|
||||||
}
|
}
|
||||||
|
if !s.config.DisableCoordinates {
|
||||||
|
stats["coordinate_resets"] = toString(uint64(s.coordClient.Stats().Resets))
|
||||||
|
}
|
||||||
return stats
|
return stats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ const flushInterval = 500 * time.Millisecond
|
||||||
const clockUpdateInterval = 500 * time.Millisecond
|
const clockUpdateInterval = 500 * time.Millisecond
|
||||||
const coordinateUpdateInterval = 60 * time.Second
|
const coordinateUpdateInterval = 60 * time.Second
|
||||||
const tmpExt = ".compact"
|
const tmpExt = ".compact"
|
||||||
|
const snapshotErrorRecoveryInterval = 30 * time.Second
|
||||||
|
|
||||||
// Snapshotter is responsible for ingesting events and persisting
|
// Snapshotter is responsible for ingesting events and persisting
|
||||||
// them to disk, and providing a recovery mechanism at start time.
|
// them to disk, and providing a recovery mechanism at start time.
|
||||||
|
@ -55,6 +56,7 @@ type Snapshotter struct {
|
||||||
rejoinAfterLeave bool
|
rejoinAfterLeave bool
|
||||||
shutdownCh <-chan struct{}
|
shutdownCh <-chan struct{}
|
||||||
waitCh chan struct{}
|
waitCh chan struct{}
|
||||||
|
lastAttemptedCompaction time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreviousNode is used to represent the previously known alive nodes
|
// PreviousNode is used to represent the previously known alive nodes
|
||||||
|
@ -84,7 +86,7 @@ func NewSnapshotter(path string,
|
||||||
inCh := make(chan Event, 1024)
|
inCh := make(chan Event, 1024)
|
||||||
|
|
||||||
// Try to open the file
|
// Try to open the file
|
||||||
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
|
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
|
return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -311,6 +313,17 @@ func (s *Snapshotter) processQuery(q *Query) {
|
||||||
func (s *Snapshotter) tryAppend(l string) {
|
func (s *Snapshotter) tryAppend(l string) {
|
||||||
if err := s.appendLine(l); err != nil {
|
if err := s.appendLine(l); err != nil {
|
||||||
s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
|
s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
|
||||||
|
now := time.Now()
|
||||||
|
if now.Sub(s.lastAttemptedCompaction) > snapshotErrorRecoveryInterval {
|
||||||
|
s.lastAttemptedCompaction = now
|
||||||
|
s.logger.Printf("[INFO] serf: Attempting compaction to recover from error...")
|
||||||
|
err = s.compact()
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Printf("[ERR] serf: Compaction failed, will reattempt after %v: %v", snapshotErrorRecoveryInterval, err)
|
||||||
|
} else {
|
||||||
|
s.logger.Printf("[INFO] serf: Finished compaction, successfully recovered from error state")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,7 +545,10 @@ func (s *Snapshotter) replay() error {
|
||||||
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
|
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.coordClient.SetCoordinate(&coord)
|
if err := s.coordClient.SetCoordinate(&coord); err != nil {
|
||||||
|
s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
} else if line == "leave" {
|
} else if line == "leave" {
|
||||||
// Ignore a leave if we plan on re-joining
|
// Ignore a leave if we plan on re-joining
|
||||||
if s.rejoinAfterLeave {
|
if s.rejoinAfterLeave {
|
||||||
|
|
|
@ -880,6 +880,13 @@
|
||||||
"revision": "bbeddf0b3ab3072a60525afbd6b6f47d33839eee",
|
"revision": "bbeddf0b3ab3072a60525afbd6b6f47d33839eee",
|
||||||
"revisionTime": "2017-07-14T18:26:01Z"
|
"revisionTime": "2017-07-14T18:26:01Z"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"checksumSHA1": "pvLOzocYsZtxuJ9pREHRTxYnoa4=",
|
||||||
|
"origin": "github.com/hashicorp/nomad/vendor/github.com/hashicorp/serf/serf",
|
||||||
|
"path": "github.com/hashicorp/serf/serf",
|
||||||
|
"revision": "bbeddf0b3ab3072a60525afbd6b6f47d33839eee",
|
||||||
|
"revisionTime": "2017-07-14T18:26:01Z"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "eGzvBRMFD6ZB3A6uO750np7Om/E=",
|
"checksumSHA1": "eGzvBRMFD6ZB3A6uO750np7Om/E=",
|
||||||
"path": "github.com/hashicorp/vault",
|
"path": "github.com/hashicorp/vault",
|
||||||
|
|
Loading…
Reference in New Issue