From 0f2e189012dfcab62a5f50f4817c8bda22524bd0 Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Mon, 27 Apr 2020 09:53:40 +0200 Subject: [PATCH 1/3] agent: rename local/global to src/dst --- agent/consul/flood.go | 10 ++++----- agent/router/serf_flooder.go | 40 +++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/agent/consul/flood.go b/agent/consul/flood.go index 1287d0d7a..d8de89b32 100644 --- a/agent/consul/flood.go +++ b/agent/consul/flood.go @@ -24,7 +24,7 @@ func (s *Server) FloodNotify() { // Flood is a long-running goroutine that floods servers from the LAN to the // given global Serf instance, such as the WAN. This will exit once either of // the Serf instances are shut down. -func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, global *serf.Serf) { +func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, dstSerf *serf.Serf) { s.floodLock.Lock() floodCh := make(chan struct{}) s.floodCh = append(s.floodCh, floodCh) @@ -50,17 +50,15 @@ func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, glo case <-s.serfLAN.ShutdownCh(): return - case <-global.ShutdownCh(): + case <-dstSerf.ShutdownCh(): return case <-ticker.C: - goto FLOOD + router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, dstSerf) case <-floodCh: - goto FLOOD + router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, dstSerf) } - FLOOD: - router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, global) } } diff --git a/agent/router/serf_flooder.go b/agent/router/serf_flooder.go index 35a4141e4..9a6f242f3 100644 --- a/agent/router/serf_flooder.go +++ b/agent/router/serf_flooder.go @@ -18,21 +18,21 @@ type FloodAddrFn func(*metadata.Server) (string, bool) // will return false if it doesn't have one. type FloodPortFn func(*metadata.Server) (int, bool) -// FloodJoins attempts to make sure all Consul servers in the local Serf -// instance are joined in the global Serf instance. It assumes names in the -// local area are of the form and those in the global area are of the +// FloodJoins attempts to make sure all Consul servers in the src Serf +// instance are joined in the dst Serf instance. It assumes names in the +// src area are of the form and those in the dst area are of the // form . as is done for WAN and general network areas in Consul // Enterprise. func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, - localDatacenter string, localSerf *serf.Serf, globalSerf *serf.Serf) { + localDatacenter string, srcSerf *serf.Serf, dstSerf *serf.Serf) { - // Names in the global Serf have the datacenter suffixed. + // Names in the dst Serf have the datacenter suffixed. suffix := fmt.Sprintf(".%s", localDatacenter) - // Index the global side so we can do one pass through the local side + // Index the dst side so we can do one pass through the src side // with cheap lookups. index := make(map[string]*metadata.Server) - for _, m := range globalSerf.Members() { + for _, m := range dstSerf.Members() { ok, server := metadata.IsConsulServer(m) if !ok { continue @@ -42,12 +42,12 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, continue } - localName := strings.TrimSuffix(server.Name, suffix) - index[localName] = server + srcName := strings.TrimSuffix(server.Name, suffix) + index[srcName] = server } - // Now run through the local side and look for joins. - for _, m := range localSerf.Members() { + // Now run through the src side and look for joins. + for _, m := range srcSerf.Members() { if m.Status != serf.StatusAlive { continue } @@ -61,7 +61,11 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, continue } - // We can't use the port number from the local Serf, so we just + // TODO make RPC + + // TODO refactor into one function: + + // We can't use the port number from the src Serf, so we just // get the host part. addr, _, err := net.SplitHostPort(server.Addr.String()) if err != nil { @@ -83,7 +87,7 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port)) } else { // If we have an IPv6 address, we should add brackets, - // single globalSerf.Join expects that. + // single dstSerf.Join expects that. if ip := net.ParseIP(addr); ip != nil { if ip.To4() == nil { addr = fmt.Sprintf("[%s]", addr) @@ -93,19 +97,21 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, } } - globalServerName := fmt.Sprintf("%s.%s", server.Name, server.Datacenter) + // end refactor + + dstServerName := fmt.Sprintf("%s.%s", server.Name, server.Datacenter) // Do the join! - n, err := globalSerf.Join([]string{globalServerName + "/" + addr}, true) + n, err := dstSerf.Join([]string{dstServerName + "/" + addr}, true) if err != nil { logger.Debug("Failed to flood-join server at address", - "server", globalServerName, + "server", dstServerName, "address", addr, "error", err, ) } else if n > 0 { logger.Debug("Successfully performed flood-join for server at address", - "server", globalServerName, + "server", dstServerName, "address", addr, ) } From 854aac510f1a339555d4ae4944b2fa9944d1dbad Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Mon, 27 Apr 2020 10:21:05 +0200 Subject: [PATCH 2/3] agent: refactor to use a single addrFn --- agent/consul/flood.go | 6 +-- agent/consul/server.go | 14 ++++--- agent/router/serf_flooder.go | 73 +++++++++++++++++++----------------- 3 files changed, 50 insertions(+), 43 deletions(-) diff --git a/agent/consul/flood.go b/agent/consul/flood.go index d8de89b32..ab9c54003 100644 --- a/agent/consul/flood.go +++ b/agent/consul/flood.go @@ -24,7 +24,7 @@ func (s *Server) FloodNotify() { // Flood is a long-running goroutine that floods servers from the LAN to the // given global Serf instance, such as the WAN. This will exit once either of // the Serf instances are shut down. -func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, dstSerf *serf.Serf) { +func (s *Server) Flood(addrFn router.FloodAddrFn, dstSerf *serf.Serf) { s.floodLock.Lock() floodCh := make(chan struct{}) s.floodCh = append(s.floodCh, floodCh) @@ -54,10 +54,10 @@ func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, dst return case <-ticker.C: - router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, dstSerf) + router.FloodJoins(s.logger, addrFn, s.config.Datacenter, s.serfLAN, dstSerf) case <-floodCh: - router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, dstSerf) + router.FloodJoins(s.logger, addrFn, s.config.Datacenter, s.serfLAN, dstSerf) } } diff --git a/agent/consul/server.go b/agent/consul/server.go index 3b6e9886a..44026fe87 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -558,13 +558,17 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) // Fire up the LAN <-> WAN join flooder. - portFn := func(s *metadata.Server) (int, bool) { - if s.WanJoinPort > 0 { - return s.WanJoinPort, true + addrFn := func(s *metadata.Server) (string, error) { + if s.WanJoinPort == 0 { + return "", fmt.Errorf("no wan join port for server: %s", s.Addr.String()) } - return 0, false + addr, _, err := net.SplitHostPort(s.Addr.String()) + if err != nil { + return "", err + } + return fmt.Sprintf("%s:%d", addr, s.WanJoinPort), nil } - go s.Flood(nil, portFn, s.serfWAN) + go s.Flood(addrFn, s.serfWAN) } // Start enterprise specific functionality diff --git a/agent/router/serf_flooder.go b/agent/router/serf_flooder.go index 9a6f242f3..b7aa8e676 100644 --- a/agent/router/serf_flooder.go +++ b/agent/router/serf_flooder.go @@ -2,7 +2,6 @@ package router import ( "fmt" - "net" "strings" "github.com/hashicorp/consul/agent/metadata" @@ -10,20 +9,16 @@ import ( "github.com/hashicorp/serf/serf" ) -// FloodAddrFn gets the address to use for a given server when flood-joining. This -// will return false if it doesn't have one. -type FloodAddrFn func(*metadata.Server) (string, bool) - -// FloodPortFn gets the port to use for a given server when flood-joining. This -// will return false if it doesn't have one. -type FloodPortFn func(*metadata.Server) (int, bool) +// FloodAddrPortFn gets the address and port to use for a given server when +// flood-joining. This will return false if it doesn't have one. +type FloodAddrFn func(*metadata.Server) (string, error) // FloodJoins attempts to make sure all Consul servers in the src Serf // instance are joined in the dst Serf instance. It assumes names in the // src area are of the form and those in the dst area are of the // form . as is done for WAN and general network areas in Consul // Enterprise. -func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, +func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, localDatacenter string, srcSerf *serf.Serf, dstSerf *serf.Serf) { // Names in the dst Serf have the datacenter suffixed. @@ -65,37 +60,45 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, portFn FloodPortFn, // TODO refactor into one function: - // We can't use the port number from the src Serf, so we just - // get the host part. - addr, _, err := net.SplitHostPort(server.Addr.String()) + addr, err := addrFn(server) if err != nil { - logger.Debug("Failed to flood-join server (bad address)", - "server", server.Name, - "address", server.Addr.String(), + logger.Debug("Failed to flood-join server", "server", + server.Name, "address", server.Addr.String(), "error", err, ) + continue } - if addrFn != nil { - if a, ok := addrFn(server); ok { - addr = a - } - } + // // We can't use the port number from the src Serf, so we just + // // get the host part. + // addr, _, err := net.SplitHostPort(server.Addr.String()) + // if err != nil { + // logger.Debug("Failed to flood-join server (bad address)", + // "server", server.Name, + // "address", server.Addr.String(), + // "error", err, + // ) + // } + // if addrFn != nil { + // if a, ok := addrFn(server); ok { + // addr = a + // } + // } - // Let the callback see if it can get the port number, otherwise - // leave it blank to behave as if we just supplied an address. - if port, ok := portFn(server); ok { - addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port)) - } else { - // If we have an IPv6 address, we should add brackets, - // single dstSerf.Join expects that. - if ip := net.ParseIP(addr); ip != nil { - if ip.To4() == nil { - addr = fmt.Sprintf("[%s]", addr) - } - } else { - logger.Debug("Failed to parse IP", "ip", addr) - } - } + // // Let the callback see if it can get the port number, otherwise + // // leave it blank to behave as if we just supplied an address. + // if port, ok := portFn(server); ok { + // addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port)) + // } else { + // // If we have an IPv6 address, we should add brackets, + // // single dstSerf.Join expects that. + // if ip := net.ParseIP(addr); ip != nil { + // if ip.To4() == nil { + // addr = fmt.Sprintf("[%s]", addr) + // } + // } else { + // logger.Debug("Failed to parse IP", "ip", addr) + // } + // } // end refactor From e3e2b82a008ee4d5047c36ff6abe27062f3016c9 Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Wed, 29 Apr 2020 11:13:03 +0200 Subject: [PATCH 3/3] network_segments: stop advertising segment tags --- agent/consul/server_serf.go | 5 ----- agent/consul/server_test.go | 2 +- agent/router/serf_flooder.go | 37 ------------------------------------ 3 files changed, 1 insertion(+), 43 deletions(-) diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 821d9c317..b8820d27d 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -49,11 +49,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.Tags["role"] = "consul" conf.Tags["dc"] = s.config.Datacenter conf.Tags["segment"] = segment - if segment == "" { - for _, s := range s.config.Segments { - conf.Tags["sl_"+s.Name] = net.JoinHostPort(s.Advertise, fmt.Sprintf("%d", s.Port)) - } - } conf.Tags["id"] = string(s.config.NodeID) conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion) conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin) diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 61cb1a092..941fb7ea5 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -145,7 +145,7 @@ func testServerConfig(t *testing.T) (string, *Config) { config.ServerHealthInterval = 50 * time.Millisecond config.AutopilotInterval = 100 * time.Millisecond - config.Build = "1.4.0" + config.Build = "1.7.2" config.CoordinateUpdatePeriod = 100 * time.Millisecond config.LeaveDrainTime = 1 * time.Millisecond diff --git a/agent/router/serf_flooder.go b/agent/router/serf_flooder.go index b7aa8e676..661ddbd86 100644 --- a/agent/router/serf_flooder.go +++ b/agent/router/serf_flooder.go @@ -56,10 +56,6 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, continue } - // TODO make RPC - - // TODO refactor into one function: - addr, err := addrFn(server) if err != nil { logger.Debug("Failed to flood-join server", "server", @@ -68,39 +64,6 @@ func FloodJoins(logger hclog.Logger, addrFn FloodAddrFn, ) continue } - // // We can't use the port number from the src Serf, so we just - // // get the host part. - // addr, _, err := net.SplitHostPort(server.Addr.String()) - // if err != nil { - // logger.Debug("Failed to flood-join server (bad address)", - // "server", server.Name, - // "address", server.Addr.String(), - // "error", err, - // ) - // } - // if addrFn != nil { - // if a, ok := addrFn(server); ok { - // addr = a - // } - // } - - // // Let the callback see if it can get the port number, otherwise - // // leave it blank to behave as if we just supplied an address. - // if port, ok := portFn(server); ok { - // addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port)) - // } else { - // // If we have an IPv6 address, we should add brackets, - // // single dstSerf.Join expects that. - // if ip := net.ParseIP(addr); ip != nil { - // if ip.To4() == nil { - // addr = fmt.Sprintf("[%s]", addr) - // } - // } else { - // logger.Debug("Failed to parse IP", "ip", addr) - // } - // } - - // end refactor dstServerName := fmt.Sprintf("%s.%s", server.Name, server.Datacenter)