Merge pull request #580 from hashicorp/f-merge
Adding merge delegate to prevent mixing clusters
This commit is contained in:
commit
0f4f55c91d
|
@ -3,8 +3,6 @@ package consul
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
@ -13,6 +11,9 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -138,6 +139,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
|
||||||
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
|
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
|
||||||
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
|
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
|
||||||
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
|
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
|
||||||
|
conf.Merge = &lanMergeDelegate{logger: c.logger, dc: c.config.Datacenter}
|
||||||
if err := ensurePath(conf.SnapshotPath, false); err != nil {
|
if err := ensurePath(conf.SnapshotPath, false); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,56 @@ func TestClient_JoinLAN(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClient_JoinLAN_Invalid(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
|
||||||
|
dir2, c1 := testClientDC(t, "other")
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer c1.Shutdown()
|
||||||
|
|
||||||
|
// Try to join
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := c1.JoinLAN([]string{addr}); err == nil {
|
||||||
|
t.Fatalf("should error")
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
if len(s1.LANMembers()) != 1 {
|
||||||
|
t.Fatalf("should not join")
|
||||||
|
}
|
||||||
|
if len(c1.LANMembers()) != 1 {
|
||||||
|
t.Fatalf("should not join")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_JoinWAN_Invalid(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
|
||||||
|
dir2, c1 := testClientDC(t, "dc2")
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer c1.Shutdown()
|
||||||
|
|
||||||
|
// Try to join
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := c1.JoinLAN([]string{addr}); err == nil {
|
||||||
|
t.Fatalf("should error")
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
if len(s1.WANMembers()) != 1 {
|
||||||
|
t.Fatalf("should not join")
|
||||||
|
}
|
||||||
|
if len(c1.LANMembers()) != 1 {
|
||||||
|
t.Fatalf("should not join")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestClient_RPC(t *testing.T) {
|
func TestClient_RPC(t *testing.T) {
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
)
|
||||||
|
|
||||||
|
// lanMergeDelegate is used to handle a cluster merge on the LAN gossip
|
||||||
|
// ring. We check that the peers are in the same datacenter and abort the
|
||||||
|
// merge if there is a mis-match.
|
||||||
|
type lanMergeDelegate struct {
|
||||||
|
logger *log.Logger
|
||||||
|
dc string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) {
|
||||||
|
for _, m := range members {
|
||||||
|
ok, dc := isConsulNode(*m)
|
||||||
|
if ok {
|
||||||
|
if dc != md.dc {
|
||||||
|
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'",
|
||||||
|
m.Name, dc)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, parts := isConsulServer(*m)
|
||||||
|
if ok && parts.Datacenter != md.dc {
|
||||||
|
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'",
|
||||||
|
m.Name, parts.Datacenter)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// wanMergeDelegate is used to handle a cluster merge on the WAN gossip
|
||||||
|
// ring. We check that the peers are server nodes and abort the merge
|
||||||
|
// otherwise.
|
||||||
|
type wanMergeDelegate struct {
|
||||||
|
logger *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) {
|
||||||
|
for _, m := range members {
|
||||||
|
ok, _ := isConsulServer(*m)
|
||||||
|
if !ok {
|
||||||
|
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' is not a server",
|
||||||
|
m.Name)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
|
@ -302,6 +302,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
|
||||||
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
|
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
|
||||||
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
|
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
|
||||||
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
|
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
|
||||||
|
if wan {
|
||||||
|
conf.Merge = &wanMergeDelegate{logger: s.logger}
|
||||||
|
} else {
|
||||||
|
conf.Merge = &lanMergeDelegate{logger: s.logger, dc: s.config.Datacenter}
|
||||||
|
}
|
||||||
|
|
||||||
// Until Consul supports this fully, we disable automatic resolution.
|
// Until Consul supports this fully, we disable automatic resolution.
|
||||||
// When enabled, the Serf gossip may just turn off if we are the minority
|
// When enabled, the Serf gossip may just turn off if we are the minority
|
||||||
|
|
Loading…
Reference in New Issue