open-consul/agent/consul/stream/subscription_test.go
Matt Keeler 3badd4c35c
Add event generation for autopilot state updates (#12626)
Whenever autopilot updates its state it notifies Consul. That notification will then trigger Consul to extract out the ready server information. If the ready servers have changed, then an event will be published to notify any subscribers of the full set of ready servers.

All these ready server event things are contained within an autopilotevents package instead of the consul package to make importing them into the grpc related packages possible
2022-04-19 13:03:03 -04:00

171 lines
4.5 KiB
Go

package stream
import (
"context"
"testing"
time "time"
"github.com/stretchr/testify/require"
)
func noopUnSub() {}
func TestSubscription(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
eb := newEventBuffer()
index := uint64(100)
startHead := eb.Head()
// Start with an event in the buffer
publishTestEvent(index, eb, "test")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := SubscribeRequest{
Topic: testTopic,
Subject: StringSubject("test"),
}
sub := newSubscription(req, startHead, noopUnSub)
// First call to sub.Next should return our published event immediately
start := time.Now()
got, err := sub.Next(ctx)
elapsed := time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Equal(t, index, got.Index)
// Schedule an event publish in a while
index++
start = time.Now()
time.AfterFunc(200*time.Millisecond, func() {
publishTestEvent(index, eb, "test")
})
// Next call should block until event is delivered
got, err = sub.Next(ctx)
elapsed = time.Since(start)
require.NoError(t, err)
require.True(t, elapsed > 200*time.Millisecond,
"Event should have been delivered after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Event should have been delivered after short time, took %s", elapsed)
require.Equal(t, index, got.Index)
index++
publishTestEvent(index, eb, "test")
start = time.Now()
got, err = sub.Next(ctx)
elapsed = time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Equal(t, index, got.Index)
require.Equal(t, "test", got.Payload.(simplePayload).key)
// Cancelling the subscription context should unblock Next
start = time.Now()
time.AfterFunc(200*time.Millisecond, func() {
cancel()
})
_, err = sub.Next(ctx)
elapsed = time.Since(start)
require.Error(t, err)
require.True(t, elapsed > 200*time.Millisecond,
"Event should have been delivered after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Event should have been delivered after short time, took %s", elapsed)
}
func TestSubscription_Close(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
eb := newEventBuffer()
index := uint64(100)
startHead := eb.Head()
// Start with an event in the buffer
publishTestEvent(index, eb, "test")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := SubscribeRequest{
Topic: testTopic,
Subject: StringSubject("test"),
}
sub := newSubscription(req, startHead, noopUnSub)
// First call to sub.Next should return our published event immediately
start := time.Now()
got, err := sub.Next(ctx)
elapsed := time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Equal(t, index, got.Index)
// Schedule a Close simulating the server deciding this subscroption
// needs to reset (e.g. on ACL perm change).
start = time.Now()
time.AfterFunc(200*time.Millisecond, func() {
sub.forceClose()
})
_, err = sub.Next(ctx)
elapsed = time.Since(start)
require.Error(t, err)
require.Equal(t, ErrSubForceClosed, err)
require.True(t, elapsed > 200*time.Millisecond,
"Reload should have happened after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Reload should have been delivered after short time, took %s", elapsed)
}
func publishTestEvent(index uint64, b *eventBuffer, key string) {
e := Event{
Index: index,
Topic: testTopic,
Payload: simplePayload{key: key},
}
b.Append([]Event{e})
}
func TestNewEventsFromBatch(t *testing.T) {
t.Run("single item", func(t *testing.T) {
first := Event{
Topic: testTopic,
Index: 1234,
Payload: simplePayload{key: "key"},
}
e := newEventFromBatch(SubscribeRequest{}, []Event{first})
require.Equal(t, first, e)
})
t.Run("many items", func(t *testing.T) {
events := []Event{
newSimpleEvent("foo", 9999),
newSimpleEvent("foo", 9999),
newSimpleEvent("zee", 9999),
}
req := SubscribeRequest{Topic: testTopic}
e := newEventFromBatch(req, events)
expected := Event{
Topic: testTopic,
Index: 9999,
Payload: newPayloadEvents(events...),
}
require.Equal(t, expected, e)
})
}