Cleans up incorrect router shutdown.

Fixes #3102.
This commit is contained in:
James Phillips 2017-06-02 16:33:48 -07:00
parent 4f212570e3
commit 0cf7928222
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
3 changed files with 65 additions and 19 deletions

View File

@ -278,7 +278,7 @@ func NewServerLogger(config *Config, logger *log.Logger) (*Server, error) {
localConsuls: make(map[raft.ServerAddress]*agent.Server), localConsuls: make(map[raft.ServerAddress]*agent.Server),
logger: logger, logger: logger,
reconcileCh: make(chan serf.Member, 32), reconcileCh: make(chan serf.Member, 32),
router: servers.NewRouter(logger, shutdownCh, config.Datacenter), router: servers.NewRouter(logger, config.Datacenter),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error), reassertLeaderCh: make(chan chan error),
@ -686,6 +686,7 @@ func (s *Server) Shutdown() error {
s.logger.Printf("[WARN] consul: error removing WAN area: %v", err) s.logger.Printf("[WARN] consul: error removing WAN area: %v", err)
} }
} }
s.router.Shutdown()
if s.raft != nil { if s.raft != nil {
s.raftTransport.Close() s.raftTransport.Close()

View File

@ -36,6 +36,10 @@ type Router struct {
// routeFn is a hook to actually do the routing. // routeFn is a hook to actually do the routing.
routeFn func(datacenter string) (*Manager, *agent.Server, bool) routeFn func(datacenter string) (*Manager, *agent.Server, bool)
// isShutdown prevents adding new routes to a router after it is shut
// down.
isShutdown bool
// This top-level lock covers all the internal state. // This top-level lock covers all the internal state.
sync.RWMutex sync.RWMutex
} }
@ -74,9 +78,8 @@ type areaInfo struct {
managers map[string]*managerInfo managers map[string]*managerInfo
} }
// NewRouter returns a new Router with the given configuration. This will also // NewRouter returns a new Router with the given configuration.
// spawn a goroutine that cleans up when the given shutdownCh is closed. func NewRouter(logger *log.Logger, localDatacenter string) *Router {
func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter string) *Router {
router := &Router{ router := &Router{
logger: logger, logger: logger,
localDatacenter: localDatacenter, localDatacenter: localDatacenter,
@ -87,23 +90,25 @@ func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter str
// Hook the direct route lookup by default. // Hook the direct route lookup by default.
router.routeFn = router.findDirectRoute router.routeFn = router.findDirectRoute
// This will propagate a top-level shutdown to all the managers. return router
go func() { }
<-shutdownCh
router.Lock()
defer router.Unlock()
for _, area := range router.areas { // Shutdown removes all areas from the router, which stops all their respective
for _, info := range area.managers { // managers. No new areas can be added after the router is shut down.
close(info.shutdownCh) func (r *Router) Shutdown() {
} r.Lock()
defer r.Unlock()
for areaID, area := range r.areas {
for datacenter, info := range area.managers {
r.removeManagerFromIndex(datacenter, info.manager)
close(info.shutdownCh)
} }
router.areas = nil delete(r.areas, areaID)
router.managers = nil }
}()
return router r.isShutdown = true
} }
// AddArea registers a new network area with the router. // AddArea registers a new network area with the router.
@ -111,6 +116,10 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if r.isShutdown {
return fmt.Errorf("cannot add area, router is shut down")
}
if _, ok := r.areas[areaID]; ok { if _, ok := r.areas[areaID]; ok {
return fmt.Errorf("area ID %q already exists", areaID) return fmt.Errorf("area ID %q already exists", areaID)
} }

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"reflect" "reflect"
"sort" "sort"
"strings"
"testing" "testing"
"time" "time"
@ -95,8 +96,43 @@ func testCluster(self string) *mockCluster {
func testRouter(dc string) *Router { func testRouter(dc string) *Router {
logger := log.New(os.Stderr, "", log.LstdFlags) logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{}) return NewRouter(logger, dc)
return NewRouter(logger, shutdownCh, dc) }
func TestRouter_Shutdown(t *testing.T) {
r := testRouter("dc0")
// Create a WAN-looking area.
self := "node0.dc0"
wan := testCluster(self)
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
// Add another area.
otherID := types.AreaID("other")
other := newMockCluster(self)
other.AddMember("dcY", "node1", nil)
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
t.Fatalf("err: %v", err)
}
_, _, ok := r.FindRoute("dcY")
if !ok {
t.Fatalf("bad")
}
// Shutdown and make sure we can't see any routes from before.
r.Shutdown()
_, _, ok = r.FindRoute("dcY")
if ok {
t.Fatalf("bad")
}
// You can't add areas once the router is shut down.
err := r.AddArea(otherID, other, &fauxConnPool{})
if err == nil || !strings.Contains(err.Error(), "router is shut down") {
t.Fatalf("err: %v", err)
}
} }
func TestRouter_Routing(t *testing.T) { func TestRouter_Routing(t *testing.T) {