Updates Serf library to get relay fixes.
https://github.com/hashicorp/serf/pull/447
This commit is contained in:
parent
07b92c52e8
commit
2df8b492c4
|
@ -2,7 +2,6 @@ package serf
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -131,12 +130,12 @@ func (q *Query) Respond(buf []byte) error {
|
|||
|
||||
// Check if we've already responded
|
||||
if q.deadline.IsZero() {
|
||||
return fmt.Errorf("Response already sent")
|
||||
return fmt.Errorf("response already sent")
|
||||
}
|
||||
|
||||
// Ensure we aren't past our response deadline
|
||||
if time.Now().After(q.deadline) {
|
||||
return fmt.Errorf("Response is past the deadline")
|
||||
return fmt.Errorf("response is past the deadline")
|
||||
}
|
||||
|
||||
// Create response
|
||||
|
@ -148,83 +147,28 @@ func (q *Query) Respond(buf []byte) error {
|
|||
}
|
||||
|
||||
// Send a direct response
|
||||
{
|
||||
raw, err := encodeMessage(messageQueryResponseType, &resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to format response: %v", err)
|
||||
}
|
||||
raw, err := encodeMessage(messageQueryResponseType, &resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to format response: %v", err)
|
||||
}
|
||||
|
||||
// Check the size limit
|
||||
if len(raw) > q.serf.config.QueryResponseSizeLimit {
|
||||
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
||||
}
|
||||
// Check the size limit
|
||||
if len(raw) > q.serf.config.QueryResponseSizeLimit {
|
||||
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
||||
}
|
||||
|
||||
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
||||
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
|
||||
return err
|
||||
}
|
||||
// Send the response directly to the originator
|
||||
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
||||
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Relay the response through up to relayFactor other nodes
|
||||
members := q.serf.Members()
|
||||
if len(members) > 2 {
|
||||
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
||||
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to format relayed response: %v", err)
|
||||
}
|
||||
|
||||
// Check the size limit
|
||||
if len(raw) > q.serf.config.QueryResponseSizeLimit {
|
||||
return fmt.Errorf("relayed response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
||||
}
|
||||
|
||||
relayMembers := kRandomMembers(int(q.relayFactor), members, func(m Member) bool {
|
||||
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == q.serf.LocalMember().Name
|
||||
})
|
||||
|
||||
for _, m := range relayMembers {
|
||||
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
|
||||
if err := q.serf.memberlist.SendTo(&relayAddr, raw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := q.serf.relayResponse(q.relayFactor, addr, &resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clear the deadline, responses sent
|
||||
q.deadline = time.Time{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// kRandomMembers selects up to k members from a given list, optionally
|
||||
// filtering by the given filterFunc
|
||||
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
|
||||
n := len(members)
|
||||
kMembers := make([]Member, 0, k)
|
||||
OUTER:
|
||||
// Probe up to 3*n times, with large n this is not necessary
|
||||
// since k << n, but with small n we want search to be
|
||||
// exhaustive
|
||||
for i := 0; i < 3*n && len(kMembers) < k; i++ {
|
||||
// Get random member
|
||||
idx := rand.Intn(n)
|
||||
member := members[idx]
|
||||
|
||||
// Give the filter a shot at it.
|
||||
if filterFunc != nil && filterFunc(member) {
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
// Check if we have this member already
|
||||
for j := 0; j < len(kMembers); j++ {
|
||||
if member.Name == kMembers[j].Name {
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
|
||||
// Append the member
|
||||
kMembers = append(kMembers, member)
|
||||
}
|
||||
|
||||
return kMembers
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package serf
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -218,3 +221,74 @@ func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// relayResponse will relay a copy of the given response to up to relayFactor
|
||||
// other members.
|
||||
func (s *Serf) relayResponse(relayFactor uint8, addr net.UDPAddr, resp *messageQueryResponse) error {
|
||||
if relayFactor == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Needs to be worth it; we need to have at least relayFactor *other*
|
||||
// nodes. If you have a tiny cluster then the relayFactor shouldn't
|
||||
// be needed.
|
||||
members := s.Members()
|
||||
if len(members) < int(relayFactor)+1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prep the relay message, which is a wrapped version of the original.
|
||||
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to format relayed response: %v", err)
|
||||
}
|
||||
if len(raw) > s.config.QueryResponseSizeLimit {
|
||||
return fmt.Errorf("relayed response exceeds limit of %d bytes", s.config.QueryResponseSizeLimit)
|
||||
}
|
||||
|
||||
// Relay to a random set of peers.
|
||||
localName := s.LocalMember().Name
|
||||
relayMembers := kRandomMembers(int(relayFactor), members, func(m Member) bool {
|
||||
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == localName
|
||||
})
|
||||
for _, m := range relayMembers {
|
||||
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
|
||||
if err := s.memberlist.SendTo(&relayAddr, raw); err != nil {
|
||||
return fmt.Errorf("failed to send relay response: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// kRandomMembers selects up to k members from a given list, optionally
|
||||
// filtering by the given filterFunc
|
||||
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
|
||||
n := len(members)
|
||||
kMembers := make([]Member, 0, k)
|
||||
OUTER:
|
||||
// Probe up to 3*n times, with large n this is not necessary
|
||||
// since k << n, but with small n we want search to be
|
||||
// exhaustive
|
||||
for i := 0; i < 3*n && len(kMembers) < k; i++ {
|
||||
// Get random member
|
||||
idx := rand.Intn(n)
|
||||
member := members[idx]
|
||||
|
||||
// Give the filter a shot at it.
|
||||
if filterFunc != nil && filterFunc(member) {
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
// Check if we have this member already
|
||||
for j := 0; j < len(kMembers); j++ {
|
||||
if member.Name == kMembers[j].Name {
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
|
||||
// Append the member
|
||||
kMembers = append(kMembers, member)
|
||||
}
|
||||
|
||||
return kMembers
|
||||
}
|
||||
|
|
|
@ -1238,6 +1238,9 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
|
|||
if err := s.memberlist.SendTo(&addr, raw); err != nil {
|
||||
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
|
||||
}
|
||||
if err := s.relayResponse(query.RelayFactor, addr, &ack); err != nil {
|
||||
s.logger.Printf("[ERR] serf: failed to relay ack: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1286,6 +1289,7 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
|
|||
if resp.Ack() {
|
||||
// Exit early if this is a duplicate ack
|
||||
if _, ok := query.acks[resp.From]; ok {
|
||||
metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1299,6 +1303,7 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
|
|||
} else {
|
||||
// Exit early if this is a duplicate response
|
||||
if _, ok := query.responses[resp.From]; ok {
|
||||
metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -625,11 +625,11 @@
|
|||
"revisionTime": "2016-08-09T01:42:04Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "EhESUBqb9Kot4rzZu2l/oAJoYCU=",
|
||||
"checksumSHA1": "E63/tz2qjNJ6+hyGi9AoYb0sH9s=",
|
||||
"comment": "v0.7.0-66-g6c4672d",
|
||||
"path": "github.com/hashicorp/serf/serf",
|
||||
"revision": "34e94dbd8faa991710b442c22ad6ad37c8b44c3b",
|
||||
"revisionTime": "2017-02-02T01:56:25Z"
|
||||
"revision": "f85661e5323286a0406cabeb0ad515962c1780b7",
|
||||
"revisionTime": "2017-02-06T16:55:42Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",
|
||||
|
|
Loading…
Reference in New Issue