diff --git a/consul/client.go b/consul/client.go index d10db8550..c41cc78a2 100644 --- a/consul/client.go +++ b/consul/client.go @@ -3,8 +3,6 @@ package consul import ( "crypto/tls" "fmt" - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/serf/serf" "log" "math/rand" "os" @@ -13,6 +11,9 @@ import ( "strings" "sync" "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/serf/serf" ) const ( @@ -138,6 +139,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( conf.SnapshotPath = filepath.Join(c.config.DataDir, path) conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] conf.RejoinAfterLeave = c.config.RejoinAfterLeave + conf.Merge = &lanMergeDelegate{logger: c.logger, dc: c.config.Datacenter} if err := ensurePath(conf.SnapshotPath, false); err != nil { return nil, err } diff --git a/consul/client_test.go b/consul/client_test.go index 33425cdf7..c799696ab 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -99,6 +99,56 @@ func TestClient_JoinLAN(t *testing.T) { }) } +func TestClient_JoinLAN_Invalid(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClientDC(t, "other") + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err == nil { + t.Fatalf("should error") + } + + time.Sleep(50 * time.Millisecond) + if len(s1.LANMembers()) != 1 { + t.Fatalf("should not join") + } + if len(c1.LANMembers()) != 1 { + t.Fatalf("should not join") + } +} + +func TestClient_JoinWAN_Invalid(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClientDC(t, "dc2") + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err == nil { + t.Fatalf("should error") + } + + time.Sleep(50 * time.Millisecond) + if len(s1.WANMembers()) != 1 { + t.Fatalf("should not join") + } + if len(c1.LANMembers()) != 1 { + t.Fatalf("should not join") + } +} + func TestClient_RPC(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/merge.go b/consul/merge.go new file mode 100644 index 000000000..0efcb0cd3 --- /dev/null +++ b/consul/merge.go @@ -0,0 +1,56 @@ +package consul + +import ( + "log" + + "github.com/hashicorp/serf/serf" +) + +// lanMergeDelegate is used to handle a cluster merge on the LAN gossip +// ring. We check that the peers are in the same datacenter and abort the +// merge if there is a mis-match. +type lanMergeDelegate struct { + logger *log.Logger + dc string +} + +func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) { + for _, m := range members { + ok, dc := isConsulNode(*m) + if ok { + if dc != md.dc { + md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'", + m.Name, dc) + return true + } + continue + } + + ok, parts := isConsulServer(*m) + if ok && parts.Datacenter != md.dc { + md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'", + m.Name, parts.Datacenter) + return true + } + } + return false +} + +// wanMergeDelegate is used to handle a cluster merge on the WAN gossip +// ring. We check that the peers are server nodes and abort the merge +// otherwise. +type wanMergeDelegate struct { + logger *log.Logger +} + +func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) { + for _, m := range members { + ok, _ := isConsulServer(*m) + if !ok { + md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' is not a server", + m.Name) + return true + } + } + return false +} diff --git a/consul/server.go b/consul/server.go index 88096a175..6a267025e 100644 --- a/consul/server.go +++ b/consul/server.go @@ -302,6 +302,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.SnapshotPath = filepath.Join(s.config.DataDir, path) conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] conf.RejoinAfterLeave = s.config.RejoinAfterLeave + if wan { + conf.Merge = &wanMergeDelegate{logger: s.logger} + } else { + conf.Merge = &lanMergeDelegate{logger: s.logger, dc: s.config.Datacenter} + } // Until Consul supports this fully, we disable automatic resolution. // When enabled, the Serf gossip may just turn off if we are the minority