2015-06-03 11:35:48 +00:00
|
|
|
package nomad
|
|
|
|
|
|
|
|
import (
|
|
|
|
"net"
|
2015-07-24 00:30:07 +00:00
|
|
|
"reflect"
|
2015-06-03 11:35:48 +00:00
|
|
|
"testing"
|
Handle Nomad leadership flapping
Fixes a deadlock in leadership handling if leadership flapped.
Raft propagates leadership transition to Nomad through a NotifyCh channel.
Raft blocks when writing to this channel, so channel must be buffered or
aggressively consumed[1]. Otherwise, Raft blocks indefinitely in `raft.runLeader`
until the channel is consumed[1] and does not move on to executing follower
related logic (in `raft.runFollower`).
While Raft `runLeader` defer function blocks, raft cannot process any other
raft operations. For example, `run{Leader|Follower}` methods consume
`raft.applyCh`, and while runLeader defer is blocked, all raft log applications
or config lookup will block indefinitely.
Sadly, `leaderLoop` and `establishLeader` makes few Raft calls!
`establishLeader` attempts to auto-create autopilot/scheduler config [3]; and
`leaderLoop` attempts to check raft configuration [4]. All of these calls occur
without a timeout.
Thus, if leadership flapped quickly while `leaderLoop/establishLeadership` is
invoked and hit any of these Raft calls, Raft handler _deadlock_ forever.
Depending on how many times it flapped and where exactly we get stuck, I suspect
it's possible to get in the following case:
* Agent metrics/stats http and RPC calls hang as they check raft.Configurations
* raft.State remains in Leader state, and server attempts to handle RPC calls
(e.g. node/alloc updates) and these hang as well
As we create goroutines per RPC call, the number of goroutines grow over time
and may trigger a out of memory errors in addition to missed updates.
[1] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/config.go#L190-L193
[2] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/raft.go#L425-L436
[3] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L198-L202
[4] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L877
2020-01-22 15:55:44 +00:00
|
|
|
"time"
|
2015-06-03 11:35:48 +00:00
|
|
|
|
2017-09-07 23:56:15 +00:00
|
|
|
version "github.com/hashicorp/go-version"
|
2017-09-29 16:58:48 +00:00
|
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
2015-06-03 11:35:48 +00:00
|
|
|
"github.com/hashicorp/serf/serf"
|
2019-06-03 18:30:27 +00:00
|
|
|
"github.com/stretchr/testify/require"
|
2015-06-03 11:35:48 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func TestIsNomadServer(t *testing.T) {
|
2017-07-23 22:04:38 +00:00
|
|
|
t.Parallel()
|
2015-06-03 11:35:48 +00:00
|
|
|
m := serf.Member{
|
2017-09-07 23:56:15 +00:00
|
|
|
Name: "foo",
|
|
|
|
Addr: net.IP([]byte{127, 0, 0, 1}),
|
|
|
|
Status: serf.StatusAlive,
|
2015-06-03 11:35:48 +00:00
|
|
|
Tags: map[string]string{
|
2017-12-18 21:16:23 +00:00
|
|
|
"role": "nomad",
|
|
|
|
"region": "aws",
|
|
|
|
"dc": "east-aws",
|
|
|
|
"rpc_addr": "1.1.1.1",
|
|
|
|
"port": "10000",
|
|
|
|
"vsn": "1",
|
|
|
|
"raft_vsn": "2",
|
|
|
|
"build": "0.7.0+ent",
|
2018-09-20 00:13:37 +00:00
|
|
|
"nonvoter": "1",
|
2015-06-03 11:35:48 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
valid, parts := isNomadServer(m)
|
|
|
|
if !valid || parts.Region != "aws" ||
|
|
|
|
parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
|
|
|
t.Fatalf("bad: %v %v", valid, parts)
|
|
|
|
}
|
|
|
|
if parts.Name != "foo" {
|
|
|
|
t.Fatalf("bad: %v", parts)
|
|
|
|
}
|
|
|
|
if parts.Bootstrap {
|
|
|
|
t.Fatalf("unexpected bootstrap")
|
|
|
|
}
|
|
|
|
if parts.Expect != 0 {
|
|
|
|
t.Fatalf("bad: %v", parts.Expect)
|
|
|
|
}
|
2017-09-07 23:56:15 +00:00
|
|
|
if parts.Status != serf.StatusAlive {
|
|
|
|
t.Fatalf("bad: %v", parts.Status)
|
|
|
|
}
|
2017-12-18 21:16:23 +00:00
|
|
|
if parts.RaftVersion != 2 {
|
|
|
|
t.Fatalf("bad: %v", parts.RaftVersion)
|
|
|
|
}
|
|
|
|
if parts.RPCAddr.String() != "1.1.1.1:10000" {
|
|
|
|
t.Fatalf("bad: %v", parts.RPCAddr.String())
|
|
|
|
}
|
2017-09-07 23:56:15 +00:00
|
|
|
if seg := parts.Build.Segments(); len(seg) != 3 {
|
|
|
|
t.Fatalf("bad: %v", parts.Build)
|
|
|
|
} else if seg[0] != 0 && seg[1] != 7 && seg[2] != 0 {
|
|
|
|
t.Fatalf("bad: %v", parts.Build)
|
|
|
|
}
|
2018-09-20 00:13:37 +00:00
|
|
|
if !parts.NonVoter {
|
|
|
|
t.Fatalf("should be nonvoter")
|
|
|
|
}
|
2015-06-03 11:35:48 +00:00
|
|
|
|
|
|
|
m.Tags["bootstrap"] = "1"
|
|
|
|
valid, parts = isNomadServer(m)
|
|
|
|
if !valid || !parts.Bootstrap {
|
|
|
|
t.Fatalf("expected bootstrap")
|
|
|
|
}
|
|
|
|
if parts.Addr.String() != "127.0.0.1:10000" {
|
|
|
|
t.Fatalf("bad addr: %v", parts.Addr)
|
|
|
|
}
|
2016-05-28 01:14:34 +00:00
|
|
|
if parts.MajorVersion != 1 {
|
2015-06-03 11:35:48 +00:00
|
|
|
t.Fatalf("bad: %v", parts)
|
|
|
|
}
|
|
|
|
|
|
|
|
m.Tags["expect"] = "3"
|
|
|
|
delete(m.Tags, "bootstrap")
|
|
|
|
valid, parts = isNomadServer(m)
|
|
|
|
if !valid || parts.Expect != 3 {
|
|
|
|
t.Fatalf("bad: %v", parts.Expect)
|
|
|
|
}
|
2018-09-20 00:13:37 +00:00
|
|
|
|
|
|
|
delete(m.Tags, "nonvoter")
|
|
|
|
valid, parts = isNomadServer(m)
|
|
|
|
if !valid || parts.NonVoter {
|
|
|
|
t.Fatalf("should be a voter")
|
|
|
|
}
|
2015-06-03 11:35:48 +00:00
|
|
|
}
|
2015-06-05 22:14:08 +00:00
|
|
|
|
2019-03-05 21:41:41 +00:00
|
|
|
func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) {
|
2017-09-07 23:56:15 +00:00
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
cases := []struct {
|
|
|
|
members []serf.Member
|
|
|
|
ver *version.Version
|
|
|
|
expected bool
|
|
|
|
}{
|
|
|
|
// One server, meets reqs
|
|
|
|
{
|
|
|
|
members: []serf.Member{
|
2019-03-05 21:41:41 +00:00
|
|
|
makeMember("0.7.5", serf.StatusAlive),
|
2017-09-07 23:56:15 +00:00
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.7.5")),
|
|
|
|
expected: true,
|
|
|
|
},
|
|
|
|
// One server in dev, meets reqs
|
|
|
|
{
|
|
|
|
members: []serf.Member{
|
2019-03-05 21:41:41 +00:00
|
|
|
makeMember("0.8.5-dev", serf.StatusAlive),
|
2017-09-07 23:56:15 +00:00
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.7.5")),
|
|
|
|
expected: true,
|
|
|
|
},
|
|
|
|
// One server with meta, meets reqs
|
|
|
|
{
|
|
|
|
members: []serf.Member{
|
2019-03-05 21:41:41 +00:00
|
|
|
makeMember("0.7.5+ent", serf.StatusAlive),
|
2017-09-07 23:56:15 +00:00
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.7.5")),
|
|
|
|
expected: true,
|
|
|
|
},
|
|
|
|
// One server, doesn't meet reqs
|
|
|
|
{
|
|
|
|
members: []serf.Member{
|
2019-03-05 21:41:41 +00:00
|
|
|
makeMember("0.7.5", serf.StatusAlive),
|
2017-09-07 23:56:15 +00:00
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.8.0")),
|
|
|
|
expected: false,
|
|
|
|
},
|
2019-03-05 21:41:41 +00:00
|
|
|
// Multiple servers, meets req version, includes failed that doesn't meet req
|
2017-09-07 23:56:15 +00:00
|
|
|
{
|
|
|
|
members: []serf.Member{
|
2019-03-05 21:41:41 +00:00
|
|
|
makeMember("0.7.5", serf.StatusAlive),
|
|
|
|
makeMember("0.8.0", serf.StatusAlive),
|
|
|
|
makeMember("0.7.0", serf.StatusFailed),
|
2017-09-07 23:56:15 +00:00
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.7.5")),
|
|
|
|
expected: true,
|
|
|
|
},
|
|
|
|
// Multiple servers, doesn't meet req version
|
|
|
|
{
|
|
|
|
members: []serf.Member{
|
2019-03-05 21:41:41 +00:00
|
|
|
makeMember("0.7.5", serf.StatusAlive),
|
|
|
|
makeMember("0.8.0", serf.StatusAlive),
|
2017-09-07 23:56:15 +00:00
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.8.0")),
|
|
|
|
expected: false,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, tc := range cases {
|
2019-03-04 09:49:32 +00:00
|
|
|
result := ServersMeetMinimumVersion(tc.members, tc.ver, false)
|
2017-09-07 23:56:15 +00:00
|
|
|
if result != tc.expected {
|
|
|
|
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-05 21:41:41 +00:00
|
|
|
func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
cases := []struct {
|
|
|
|
members []serf.Member
|
|
|
|
ver *version.Version
|
|
|
|
expected bool
|
|
|
|
}{
|
|
|
|
// Multiple servers, meets req version
|
|
|
|
{
|
|
|
|
members: []serf.Member{
|
|
|
|
makeMember("0.7.5", serf.StatusAlive),
|
|
|
|
makeMember("0.8.0", serf.StatusAlive),
|
|
|
|
makeMember("0.7.5", serf.StatusFailed),
|
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.7.5")),
|
|
|
|
expected: true,
|
|
|
|
},
|
|
|
|
// Multiple servers, doesn't meet req version
|
|
|
|
{
|
|
|
|
members: []serf.Member{
|
|
|
|
makeMember("0.7.5", serf.StatusAlive),
|
|
|
|
makeMember("0.8.0", serf.StatusAlive),
|
|
|
|
makeMember("0.7.0", serf.StatusFailed),
|
|
|
|
},
|
|
|
|
ver: version.Must(version.NewVersion("0.7.5")),
|
|
|
|
expected: false,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, tc := range cases {
|
|
|
|
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
|
|
|
|
if result != tc.expected {
|
|
|
|
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func makeMember(version string, status serf.MemberStatus) serf.Member {
|
|
|
|
return serf.Member{
|
|
|
|
Name: "foo",
|
|
|
|
Addr: net.IP([]byte{127, 0, 0, 1}),
|
|
|
|
Tags: map[string]string{
|
|
|
|
"role": "nomad",
|
|
|
|
"region": "aws",
|
|
|
|
"dc": "east-aws",
|
|
|
|
"port": "10000",
|
|
|
|
"build": version,
|
|
|
|
"vsn": "1",
|
|
|
|
},
|
|
|
|
Status: status,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-07-24 00:30:07 +00:00
|
|
|
func TestShuffleStrings(t *testing.T) {
|
2017-07-23 22:04:38 +00:00
|
|
|
t.Parallel()
|
2015-07-24 00:30:07 +00:00
|
|
|
// Generate input
|
|
|
|
inp := make([]string, 10)
|
|
|
|
for idx := range inp {
|
2017-09-29 16:58:48 +00:00
|
|
|
inp[idx] = uuid.Generate()
|
2015-07-24 00:30:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Copy the input
|
|
|
|
orig := make([]string, len(inp))
|
|
|
|
copy(orig, inp)
|
|
|
|
|
|
|
|
// Shuffle
|
|
|
|
shuffleStrings(inp)
|
|
|
|
|
|
|
|
// Ensure order is not the same
|
|
|
|
if reflect.DeepEqual(inp, orig) {
|
|
|
|
t.Fatalf("shuffle failed")
|
|
|
|
}
|
|
|
|
}
|
2015-08-05 00:13:40 +00:00
|
|
|
|
2019-06-03 18:30:27 +00:00
|
|
|
func Test_partitionAll(t *testing.T) {
|
|
|
|
xs := []string{"a", "b", "c", "d", "e", "f"}
|
|
|
|
// evenly divisible
|
|
|
|
require.Equal(t, [][]string{{"a", "b"}, {"c", "d"}, {"e", "f"}}, partitionAll(2, xs))
|
|
|
|
require.Equal(t, [][]string{{"a", "b", "c"}, {"d", "e", "f"}}, partitionAll(3, xs))
|
|
|
|
// whole thing fits int the last part
|
|
|
|
require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(7, xs))
|
|
|
|
// odd remainder
|
|
|
|
require.Equal(t, [][]string{{"a", "b", "c", "d"}, {"e", "f"}}, partitionAll(4, xs))
|
|
|
|
// zero size
|
|
|
|
require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(0, xs))
|
|
|
|
// one size
|
|
|
|
require.Equal(t, [][]string{{"a"}, {"b"}, {"c"}, {"d"}, {"e"}, {"f"}}, partitionAll(1, xs))
|
|
|
|
}
|
|
|
|
|
2015-08-05 00:13:40 +00:00
|
|
|
func TestMaxUint64(t *testing.T) {
|
2017-07-23 22:04:38 +00:00
|
|
|
t.Parallel()
|
2015-08-05 00:13:40 +00:00
|
|
|
if maxUint64(1, 2) != 2 {
|
|
|
|
t.Fatalf("bad")
|
|
|
|
}
|
|
|
|
if maxUint64(2, 2) != 2 {
|
|
|
|
t.Fatalf("bad")
|
|
|
|
}
|
|
|
|
if maxUint64(2, 1) != 2 {
|
|
|
|
t.Fatalf("bad")
|
|
|
|
}
|
|
|
|
}
|
Handle Nomad leadership flapping
Fixes a deadlock in leadership handling if leadership flapped.
Raft propagates leadership transition to Nomad through a NotifyCh channel.
Raft blocks when writing to this channel, so channel must be buffered or
aggressively consumed[1]. Otherwise, Raft blocks indefinitely in `raft.runLeader`
until the channel is consumed[1] and does not move on to executing follower
related logic (in `raft.runFollower`).
While Raft `runLeader` defer function blocks, raft cannot process any other
raft operations. For example, `run{Leader|Follower}` methods consume
`raft.applyCh`, and while runLeader defer is blocked, all raft log applications
or config lookup will block indefinitely.
Sadly, `leaderLoop` and `establishLeader` makes few Raft calls!
`establishLeader` attempts to auto-create autopilot/scheduler config [3]; and
`leaderLoop` attempts to check raft configuration [4]. All of these calls occur
without a timeout.
Thus, if leadership flapped quickly while `leaderLoop/establishLeadership` is
invoked and hit any of these Raft calls, Raft handler _deadlock_ forever.
Depending on how many times it flapped and where exactly we get stuck, I suspect
it's possible to get in the following case:
* Agent metrics/stats http and RPC calls hang as they check raft.Configurations
* raft.State remains in Leader state, and server attempts to handle RPC calls
(e.g. node/alloc updates) and these hang as well
As we create goroutines per RPC call, the number of goroutines grow over time
and may trigger a out of memory errors in addition to missed updates.
[1] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/config.go#L190-L193
[2] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/raft.go#L425-L436
[3] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L198-L202
[4] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L877
2020-01-22 15:55:44 +00:00
|
|
|
|
|
|
|
func TestDropButLastChannelDropsValues(t *testing.T) {
|
|
|
|
sourceCh := make(chan bool)
|
|
|
|
shutdownCh := make(chan struct{})
|
2020-01-28 14:53:48 +00:00
|
|
|
defer close(shutdownCh)
|
Handle Nomad leadership flapping
Fixes a deadlock in leadership handling if leadership flapped.
Raft propagates leadership transition to Nomad through a NotifyCh channel.
Raft blocks when writing to this channel, so channel must be buffered or
aggressively consumed[1]. Otherwise, Raft blocks indefinitely in `raft.runLeader`
until the channel is consumed[1] and does not move on to executing follower
related logic (in `raft.runFollower`).
While Raft `runLeader` defer function blocks, raft cannot process any other
raft operations. For example, `run{Leader|Follower}` methods consume
`raft.applyCh`, and while runLeader defer is blocked, all raft log applications
or config lookup will block indefinitely.
Sadly, `leaderLoop` and `establishLeader` makes few Raft calls!
`establishLeader` attempts to auto-create autopilot/scheduler config [3]; and
`leaderLoop` attempts to check raft configuration [4]. All of these calls occur
without a timeout.
Thus, if leadership flapped quickly while `leaderLoop/establishLeadership` is
invoked and hit any of these Raft calls, Raft handler _deadlock_ forever.
Depending on how many times it flapped and where exactly we get stuck, I suspect
it's possible to get in the following case:
* Agent metrics/stats http and RPC calls hang as they check raft.Configurations
* raft.State remains in Leader state, and server attempts to handle RPC calls
(e.g. node/alloc updates) and these hang as well
As we create goroutines per RPC call, the number of goroutines grow over time
and may trigger a out of memory errors in addition to missed updates.
[1] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/config.go#L190-L193
[2] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/raft.go#L425-L436
[3] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L198-L202
[4] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L877
2020-01-22 15:55:44 +00:00
|
|
|
|
|
|
|
dstCh := dropButLastChannel(sourceCh, shutdownCh)
|
|
|
|
|
|
|
|
// timeout duration for any channel propagation delay
|
|
|
|
timeoutDuration := 5 * time.Millisecond
|
|
|
|
|
|
|
|
// test that dstCh doesn't emit anything initially
|
|
|
|
select {
|
|
|
|
case <-dstCh:
|
|
|
|
require.Fail(t, "received a message unexpectedly")
|
|
|
|
case <-time.After(timeoutDuration):
|
|
|
|
// yay no message - it could have been a default: but
|
|
|
|
// checking for goroutine effect
|
|
|
|
}
|
|
|
|
|
|
|
|
sourceCh <- false
|
|
|
|
select {
|
|
|
|
case v := <-dstCh:
|
|
|
|
require.False(t, v, "unexpected value from dstCh Ch")
|
|
|
|
case <-time.After(timeoutDuration):
|
|
|
|
require.Fail(t, "timed out waiting for source->dstCh propagation")
|
|
|
|
}
|
|
|
|
|
|
|
|
// channel is drained now
|
|
|
|
select {
|
|
|
|
case v := <-dstCh:
|
|
|
|
require.Failf(t, "received a message unexpectedly", "value: %v", v)
|
|
|
|
case <-time.After(timeoutDuration):
|
|
|
|
// yay no message - it could have been a default: but
|
|
|
|
// checking for goroutine effect
|
|
|
|
}
|
|
|
|
|
|
|
|
// now enqueue many messages and ensure only last one is received
|
|
|
|
// enqueueing should be fast!
|
|
|
|
sourceCh <- false
|
|
|
|
sourceCh <- false
|
|
|
|
sourceCh <- false
|
|
|
|
sourceCh <- false
|
|
|
|
sourceCh <- true
|
|
|
|
|
|
|
|
// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
|
|
|
|
// this select before the implementation goroutine dequeues last value.
|
|
|
|
//
|
|
|
|
// However, never got it to fail in test - so leaving it now to see if it ever fails;
|
|
|
|
// and if/when test fails, we can learn of how much of an issue it is and adjust
|
|
|
|
select {
|
|
|
|
case v := <-dstCh:
|
|
|
|
require.True(t, v, "unexpected value from dstCh Ch")
|
|
|
|
case <-time.After(timeoutDuration):
|
|
|
|
require.Fail(t, "timed out waiting for source->dstCh propagation")
|
|
|
|
}
|
|
|
|
|
|
|
|
sourceCh <- true
|
|
|
|
sourceCh <- true
|
|
|
|
sourceCh <- true
|
|
|
|
sourceCh <- true
|
|
|
|
sourceCh <- true
|
|
|
|
sourceCh <- false
|
|
|
|
select {
|
|
|
|
case v := <-dstCh:
|
|
|
|
require.False(t, v, "unexpected value from dstCh Ch")
|
|
|
|
case <-time.After(timeoutDuration):
|
|
|
|
require.Fail(t, "timed out waiting for source->dstCh propagation")
|
|
|
|
}
|
|
|
|
}
|
2020-01-28 14:06:52 +00:00
|
|
|
|
|
|
|
// TestDropButLastChannel_DeliversMessages asserts that last
|
|
|
|
// message is always delivered, some messages are dropped but never
|
|
|
|
// introduce new messages.
|
|
|
|
// On tight loop, receivers may get some intermediary messages.
|
|
|
|
func TestDropButLastChannel_DeliversMessages(t *testing.T) {
|
|
|
|
sourceCh := make(chan bool)
|
|
|
|
shutdownCh := make(chan struct{})
|
2020-01-28 14:53:48 +00:00
|
|
|
defer close(shutdownCh)
|
2020-01-28 14:06:52 +00:00
|
|
|
|
|
|
|
dstCh := dropButLastChannel(sourceCh, shutdownCh)
|
|
|
|
|
|
|
|
// timeout duration for any channel propagation delay
|
|
|
|
timeoutDuration := 5 * time.Millisecond
|
|
|
|
|
|
|
|
sentMessages := 100
|
|
|
|
go func() {
|
|
|
|
for i := 0; i < sentMessages-1; i++ {
|
|
|
|
sourceCh <- true
|
|
|
|
}
|
|
|
|
sourceCh <- false
|
|
|
|
}()
|
|
|
|
|
|
|
|
receivedTrue, receivedFalse := 0, 0
|
|
|
|
var lastReceived *bool
|
|
|
|
|
|
|
|
RECEIVE_LOOP:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case v := <-dstCh:
|
|
|
|
lastReceived = &v
|
|
|
|
if v {
|
|
|
|
receivedTrue++
|
|
|
|
} else {
|
|
|
|
receivedFalse++
|
|
|
|
}
|
|
|
|
|
|
|
|
case <-time.After(timeoutDuration):
|
|
|
|
break RECEIVE_LOOP
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
|
|
|
|
receivedTrue, sentMessages-1, receivedFalse, 1)
|
|
|
|
|
|
|
|
require.NotNil(t, lastReceived)
|
|
|
|
require.False(t, *lastReceived)
|
|
|
|
require.Equal(t, 1, receivedFalse)
|
|
|
|
require.LessOrEqual(t, receivedTrue, sentMessages-1)
|
|
|
|
}
|
2020-01-28 14:38:51 +00:00
|
|
|
|
|
|
|
// TestDropButLastChannel_DeliversMessages_Close asserts that last
|
|
|
|
// message is always delivered, some messages are dropped but never
|
|
|
|
// introduce new messages, even with a closed signal.
|
|
|
|
func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) {
|
|
|
|
sourceCh := make(chan bool)
|
|
|
|
shutdownCh := make(chan struct{})
|
2020-01-28 14:53:48 +00:00
|
|
|
defer close(shutdownCh)
|
2020-01-28 14:38:51 +00:00
|
|
|
|
|
|
|
dstCh := dropButLastChannel(sourceCh, shutdownCh)
|
|
|
|
|
|
|
|
// timeout duration for any channel propagation delay
|
|
|
|
timeoutDuration := 5 * time.Millisecond
|
|
|
|
|
|
|
|
sentMessages := 100
|
|
|
|
go func() {
|
|
|
|
for i := 0; i < sentMessages-1; i++ {
|
|
|
|
sourceCh <- true
|
|
|
|
}
|
|
|
|
sourceCh <- false
|
|
|
|
close(sourceCh)
|
|
|
|
}()
|
|
|
|
|
|
|
|
receivedTrue, receivedFalse := 0, 0
|
|
|
|
var lastReceived *bool
|
|
|
|
|
|
|
|
RECEIVE_LOOP:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case v, ok := <-dstCh:
|
|
|
|
if !ok {
|
|
|
|
break RECEIVE_LOOP
|
|
|
|
}
|
|
|
|
lastReceived = &v
|
|
|
|
if v {
|
|
|
|
receivedTrue++
|
|
|
|
} else {
|
|
|
|
receivedFalse++
|
|
|
|
}
|
|
|
|
|
|
|
|
case <-time.After(timeoutDuration):
|
|
|
|
require.Fail(t, "timed out while waiting for messages")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
|
|
|
|
receivedTrue, sentMessages-1, receivedFalse, 1)
|
|
|
|
|
|
|
|
require.NotNil(t, lastReceived)
|
|
|
|
require.False(t, *lastReceived)
|
|
|
|
require.Equal(t, 1, receivedFalse)
|
|
|
|
require.LessOrEqual(t, receivedTrue, sentMessages-1)
|
|
|
|
}
|