Call RemoveServer for reap events (#5317)
This ensures that servers are removed from RPC routing when they are reaped.
This commit is contained in:
parent
23f824fb60
commit
416a6543a6
|
@ -70,12 +70,11 @@ func (c *Client) lanEventHandler() {
|
|||
switch e.EventType() {
|
||||
case serf.EventMemberJoin:
|
||||
c.nodeJoin(e.(serf.MemberEvent))
|
||||
case serf.EventMemberLeave, serf.EventMemberFailed:
|
||||
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
|
||||
c.nodeFail(e.(serf.MemberEvent))
|
||||
case serf.EventUser:
|
||||
c.localEvent(e.(serf.UserEvent))
|
||||
case serf.EventMemberUpdate: // Ignore
|
||||
case serf.EventMemberReap: // Ignore
|
||||
case serf.EventQuery: // Ignore
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
||||
|
|
|
@ -99,6 +99,46 @@ func TestClient_JoinLAN(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestClient_LANReap(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
||||
dir2, c1 := testClientWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.SerfFloodInterval = 100 * time.Millisecond
|
||||
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
|
||||
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer c1.Shutdown()
|
||||
|
||||
// Try to join
|
||||
joinLAN(t, c1, s1)
|
||||
testrpc.WaitForLeader(t, c1.RPC, "dc1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, s1.LANMembers(), 2)
|
||||
require.Len(r, c1.LANMembers(), 2)
|
||||
})
|
||||
|
||||
// Check the router has both
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
server := c1.routers.FindServer()
|
||||
require.NotNil(t, server)
|
||||
require.Equal(t, s1.config.NodeName, server.Name)
|
||||
})
|
||||
|
||||
// shutdown the second dc
|
||||
s1.Shutdown()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, c1.LANMembers(), 1)
|
||||
server := c1.routers.FindServer()
|
||||
require.Nil(t, server)
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_JoinLAN_Invalid(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
|
|
|
@ -137,12 +137,10 @@ func (s *Server) lanEventHandler() {
|
|||
s.lanNodeJoin(e.(serf.MemberEvent))
|
||||
s.localMemberEvent(e.(serf.MemberEvent))
|
||||
|
||||
case serf.EventMemberLeave, serf.EventMemberFailed:
|
||||
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
|
||||
s.lanNodeFailed(e.(serf.MemberEvent))
|
||||
s.localMemberEvent(e.(serf.MemberEvent))
|
||||
|
||||
case serf.EventMemberReap:
|
||||
s.localMemberEvent(e.(serf.MemberEvent))
|
||||
case serf.EventUser:
|
||||
s.localEvent(e.(serf.UserEvent))
|
||||
case serf.EventMemberUpdate:
|
||||
|
|
|
@ -21,6 +21,8 @@ import (
|
|||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func configureTLS(config *Config) {
|
||||
|
@ -236,6 +238,67 @@ func TestServer_JoinLAN(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestServer_LANReap(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.SerfFloodInterval = 100 * time.Millisecond
|
||||
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
|
||||
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.SerfFloodInterval = 100 * time.Millisecond
|
||||
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
|
||||
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
c.SerfFloodInterval = 100 * time.Millisecond
|
||||
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
|
||||
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Try to join
|
||||
joinLAN(t, s2, s1)
|
||||
joinLAN(t, s3, s1)
|
||||
|
||||
testrpc.WaitForLeader(t, s3.RPC, "dc1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, s1.LANMembers(), 3)
|
||||
require.Len(r, s2.LANMembers(), 3)
|
||||
require.Len(r, s3.LANMembers(), 3)
|
||||
})
|
||||
|
||||
// Check the router has both
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, s1.serverLookup.Servers(), 3)
|
||||
require.Len(r, s2.serverLookup.Servers(), 3)
|
||||
require.Len(r, s3.serverLookup.Servers(), 3)
|
||||
})
|
||||
|
||||
// shutdown the second dc
|
||||
s2.Shutdown()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, s1.LANMembers(), 2)
|
||||
servers := s1.serverLookup.Servers()
|
||||
require.Len(r, servers, 2)
|
||||
// require.Equal(r, s1.config.NodeName, servers[0].Name)
|
||||
})
|
||||
}
|
||||
|
||||
func TestServer_JoinWAN(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
|
@ -268,6 +331,46 @@ func TestServer_JoinWAN(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestServer_WANReap(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.SerfFloodInterval = 100 * time.Millisecond
|
||||
c.SerfWANConfig.ReconnectTimeout = 250 * time.Millisecond
|
||||
c.SerfWANConfig.ReapInterval = 500 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServerDC(t, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
|
||||
// Try to join
|
||||
joinWAN(t, s2, s1)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, s1.WANMembers(), 2)
|
||||
require.Len(r, s2.WANMembers(), 2)
|
||||
})
|
||||
|
||||
// Check the router has both
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, s1.router.GetDatacenters(), 2)
|
||||
require.Len(r, s2.router.GetDatacenters(), 2)
|
||||
})
|
||||
|
||||
// shutdown the second dc
|
||||
s2.Shutdown()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Len(r, s1.WANMembers(), 1)
|
||||
datacenters := s1.router.GetDatacenters()
|
||||
require.Len(r, datacenters, 1)
|
||||
require.Equal(r, "dc1", datacenters[0])
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestServer_JoinWAN_Flood(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Set up two servers in a WAN.
|
||||
|
|
|
@ -53,7 +53,7 @@ func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, s
|
|||
case serf.EventMemberJoin:
|
||||
handleMemberEvent(logger, router.AddServer, areaID, e)
|
||||
|
||||
case serf.EventMemberLeave:
|
||||
case serf.EventMemberLeave, serf.EventMemberReap:
|
||||
handleMemberEvent(logger, router.RemoveServer, areaID, e)
|
||||
|
||||
case serf.EventMemberFailed:
|
||||
|
@ -61,7 +61,6 @@ func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, s
|
|||
|
||||
// All of these event types are ignored.
|
||||
case serf.EventMemberUpdate:
|
||||
case serf.EventMemberReap:
|
||||
case serf.EventUser:
|
||||
case serf.EventQuery:
|
||||
|
||||
|
|
Loading…
Reference in New Issue