light refactors to support making partitions and serf-based wan federation are mutually exclusive (#11755)
This commit is contained in:
parent
8c8443390d
commit
5ea4b82940
|
@ -0,0 +1,3 @@
|
|||
```release-note:feature
|
||||
partitions: **(Enterprise only)** Ensure partitions and serf-based WAN federation are mutually exclusive.
|
||||
```
|
|
@ -88,7 +88,7 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error {
|
|||
func (s *Server) setupSerfLAN(config *Config) error {
|
||||
var err error
|
||||
// Initialize the LAN Serf for the default network segment.
|
||||
s.serfLAN, err = s.setupSerf(setupSerfOptions{
|
||||
s.serfLAN, _, err = s.setupSerf(setupSerfOptions{
|
||||
Config: config.SerfLANConfig,
|
||||
EventCh: s.eventChLAN,
|
||||
SnapshotPath: serfLANSnapshot,
|
||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
@ -86,14 +87,41 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
|
|||
// ring. We check that the peers are server nodes and abort the merge
|
||||
// otherwise.
|
||||
type wanMergeDelegate struct {
|
||||
localDatacenter string
|
||||
|
||||
federationDisabledLock sync.Mutex
|
||||
federationDisabled bool
|
||||
}
|
||||
|
||||
// SetWANFederationDisabled selectively disables the wan pool from accepting
|
||||
// non-local members. If the toggle changed the current value it returns true.
|
||||
func (md *wanMergeDelegate) SetWANFederationDisabled(disabled bool) bool {
|
||||
md.federationDisabledLock.Lock()
|
||||
prior := md.federationDisabled
|
||||
md.federationDisabled = disabled
|
||||
md.federationDisabledLock.Unlock()
|
||||
|
||||
return prior != disabled
|
||||
}
|
||||
|
||||
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
|
||||
// Deliberately hold this lock during the entire merge so calls to
|
||||
// SetWANFederationDisabled returning immediately imply that the flag takes
|
||||
// effect for all future merges.
|
||||
md.federationDisabledLock.Lock()
|
||||
defer md.federationDisabledLock.Unlock()
|
||||
|
||||
for _, m := range members {
|
||||
ok, _ := metadata.IsConsulServer(*m)
|
||||
ok, srv := metadata.IsConsulServer(*m)
|
||||
if !ok {
|
||||
return fmt.Errorf("Member '%s' is not a server", m.Name)
|
||||
}
|
||||
|
||||
if md.federationDisabled {
|
||||
if srv.Datacenter != md.localDatacenter {
|
||||
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'; WAN federation is disabled", m.Name, srv.Datacenter)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -138,10 +138,16 @@ func TestMerge_WAN(t *testing.T) {
|
|||
type testcase struct {
|
||||
members []*serf.Member
|
||||
expect string
|
||||
setupFn func(t *testing.T, delegate *wanMergeDelegate)
|
||||
}
|
||||
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
delegate := &wanMergeDelegate{}
|
||||
delegate := &wanMergeDelegate{
|
||||
localDatacenter: "dc1",
|
||||
}
|
||||
if tc.setupFn != nil {
|
||||
tc.setupFn(t, delegate)
|
||||
}
|
||||
err := delegate.NotifyMerge(tc.members)
|
||||
if tc.expect == "" {
|
||||
require.NoError(t, err)
|
||||
|
@ -177,7 +183,33 @@ func TestMerge_WAN(t *testing.T) {
|
|||
build: "0.7.5",
|
||||
}),
|
||||
},
|
||||
expect: "",
|
||||
},
|
||||
"federation disabled and local join allowed": {
|
||||
setupFn: func(t *testing.T, delegate *wanMergeDelegate) {
|
||||
delegate.SetWANFederationDisabled(true)
|
||||
},
|
||||
members: []*serf.Member{
|
||||
makeTestNode(t, testMember{
|
||||
dc: "dc1",
|
||||
name: "node1",
|
||||
server: true,
|
||||
build: "0.7.5",
|
||||
}),
|
||||
},
|
||||
},
|
||||
"federation disabled and remote join blocked": {
|
||||
setupFn: func(t *testing.T, delegate *wanMergeDelegate) {
|
||||
delegate.SetWANFederationDisabled(true)
|
||||
},
|
||||
members: []*serf.Member{
|
||||
makeTestNode(t, testMember{
|
||||
dc: "dc2",
|
||||
name: "node1",
|
||||
server: true,
|
||||
build: "0.7.5",
|
||||
}),
|
||||
},
|
||||
expect: `WAN federation is disabled`,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -189,6 +189,12 @@ type Server struct {
|
|||
// serf cluster that spans datacenters
|
||||
eventChWAN chan serf.Event
|
||||
|
||||
// wanMembershipNotifyCh is used to receive notifications that the the
|
||||
// serfWAN wan pool may have changed.
|
||||
//
|
||||
// If this is nil, notification is skipped.
|
||||
wanMembershipNotifyCh chan struct{}
|
||||
|
||||
// fsm is the state machine used with Raft to provide
|
||||
// strong consistency.
|
||||
fsm *fsm.FSM
|
||||
|
@ -266,6 +272,7 @@ type Server struct {
|
|||
// serfWAN is the Serf cluster maintained between DC's
|
||||
// which SHOULD only consist of Consul servers
|
||||
serfWAN *serf.Serf
|
||||
serfWANConfig *serf.Config
|
||||
memberlistTransportWAN wanfed.IngestionAwareTransport
|
||||
gatewayLocator *GatewayLocator
|
||||
|
||||
|
@ -493,7 +500,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
|
||||
// Initialize the WAN Serf if enabled
|
||||
if config.SerfWANConfig != nil {
|
||||
s.serfWAN, err = s.setupSerf(setupSerfOptions{
|
||||
s.serfWAN, s.serfWANConfig, err = s.setupSerf(setupSerfOptions{
|
||||
Config: config.SerfWANConfig,
|
||||
EventCh: s.eventChWAN,
|
||||
SnapshotPath: serfWANSnapshot,
|
||||
|
@ -548,7 +555,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
|
||||
}
|
||||
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
|
||||
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN, s.wanMembershipNotifyCh)
|
||||
|
||||
// Fire up the LAN <-> WAN join flooder.
|
||||
addrFn := func(s *metadata.Server) (string, error) {
|
||||
|
@ -1124,6 +1131,11 @@ func (s *Server) JoinWAN(addrs []string) (int, error) {
|
|||
if s.serfWAN == nil {
|
||||
return 0, ErrWANFederationDisabled
|
||||
}
|
||||
|
||||
if err := s.enterpriseValidateJoinWAN(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return s.serfWAN.Join(addrs, true)
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,10 @@ import (
|
|||
|
||||
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}
|
||||
|
||||
func (s *Server) enterpriseValidateJoinWAN() error {
|
||||
return nil // no-op
|
||||
}
|
||||
|
||||
// JoinLAN is used to have Consul join the inner-DC pool The target address
|
||||
// should be another node inside the DC listening on the Serf LAN address
|
||||
func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) {
|
||||
|
|
|
@ -48,12 +48,18 @@ type setupSerfOptions struct {
|
|||
}
|
||||
|
||||
// setupSerf is used to setup and initialize a Serf
|
||||
func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, error) {
|
||||
func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, *serf.Config, error) {
|
||||
conf, err := s.setupSerfConfig(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return serf.Create(conf)
|
||||
|
||||
cluster, err := serf.Create(conf)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return cluster, conf, nil
|
||||
}
|
||||
|
||||
func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) {
|
||||
|
@ -152,7 +158,9 @@ func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) {
|
|||
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
|
||||
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
|
||||
if opts.WAN {
|
||||
conf.Merge = &wanMergeDelegate{}
|
||||
conf.Merge = &wanMergeDelegate{
|
||||
localDatacenter: s.config.Datacenter,
|
||||
}
|
||||
} else {
|
||||
conf.Merge = &lanMergeDelegate{
|
||||
dc: s.config.Datacenter,
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package router
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
// routerFn selects one of the router operations to map to incoming Serf events.
|
||||
|
@ -50,7 +51,18 @@ func handleMemberEvent(logger hclog.Logger, fn routerFn, areaID types.AreaID, e
|
|||
// HandleSerfEvents is a long-running goroutine that pushes incoming events from
|
||||
// a Serf manager's channel into the given router. This will return when the
|
||||
// shutdown channel is closed.
|
||||
func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event) {
|
||||
//
|
||||
// If membershipNotifyCh is non-nil, it must be a buffered channel of size one
|
||||
// with one consumer. That consumer will be notified when
|
||||
// Join/Leave/Failed/Update occur on this serf pool.
|
||||
func HandleSerfEvents(
|
||||
logger hclog.Logger,
|
||||
router *Router,
|
||||
areaID types.AreaID,
|
||||
shutdownCh <-chan struct{},
|
||||
eventCh <-chan serf.Event,
|
||||
membershipNotifyCh chan<- struct{},
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case <-shutdownCh:
|
||||
|
@ -60,15 +72,19 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID,
|
|||
switch e.EventType() {
|
||||
case serf.EventMemberJoin:
|
||||
handleMemberEvent(logger, router.AddServer, areaID, e)
|
||||
notifyMembershipPossibleChange(membershipNotifyCh)
|
||||
|
||||
case serf.EventMemberLeave, serf.EventMemberReap:
|
||||
handleMemberEvent(logger, router.RemoveServer, areaID, e)
|
||||
notifyMembershipPossibleChange(membershipNotifyCh)
|
||||
|
||||
case serf.EventMemberFailed:
|
||||
handleMemberEvent(logger, router.FailServer, areaID, e)
|
||||
notifyMembershipPossibleChange(membershipNotifyCh)
|
||||
|
||||
case serf.EventMemberUpdate:
|
||||
handleMemberEvent(logger, router.AddServer, areaID, e)
|
||||
notifyMembershipPossibleChange(membershipNotifyCh)
|
||||
|
||||
// All of these event types are ignored.
|
||||
case serf.EventUser:
|
||||
|
@ -80,3 +96,15 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func notifyMembershipPossibleChange(membershipNotifyCh chan<- struct{}) {
|
||||
if membershipNotifyCh == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Notify if not already notified.
|
||||
select {
|
||||
case membershipNotifyCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue