open-consul/agent/consul/controller/controller_test.go
Andrew Stucki b376fd2151
Add trigger for doing reconciliation based on watch sets (#16052)
* Add trigger for doing reconciliation based on watch sets

* update doc string

* Fix my grammar fail
2023-01-26 15:20:37 -05:00

564 lines
16 KiB
Go

package controller
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
func TestBasicController(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}))
}
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)
received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, structs.IngressGateway, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
}
func TestBasicController_Transform(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}, func(entry structs.ConfigEntry) []Request {
return []Request{{
Kind: "foo",
Name: "bar",
}}
}).Run(ctx)
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "test",
}))
select {
case request := <-reconciler.received:
require.Equal(t, "foo", request.Kind)
require.Equal(t, "bar", request.Name)
case <-ctx.Done():
t.Fatal("stopped reconciler before event received")
}
}
func TestBasicController_Retry(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
reconciler := newTestReconciler(true)
defer reconciler.stop()
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
queueInitialized := make(chan *countingWorkQueue)
controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond)
go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
queue := newCountingWorkQueue(RunWorkQueue(ctx, baseBackoff, maxBackoff))
queueInitialized <- queue
return queue
}).Run(ctx)
queue := <-queueInitialized
ensureCalled := func(request chan Request, name string) bool {
// give a short window for our counters to update
defer time.Sleep(10 * time.Millisecond)
select {
case req := <-request:
require.Equal(t, structs.IngressGateway, req.Kind)
require.Equal(t, name, req.Name)
return true
case <-time.After(10 * time.Millisecond):
return false
}
}
// check to make sure we are called once
queue.reset()
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-1",
}))
require.False(t, ensureCalled(reconciler.received, "foo-1"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-1"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 0, queue.requeues())
// check that we requeue when an arbitrary error occurs
queue.reset()
reconciler.setResponse(errors.New("error"))
require.NoError(t, store.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-2",
}))
require.False(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
require.EqualValues(t, 0, queue.addRateLimiteds())
reconciler.step()
// check we're processed the first time and re-queued
require.True(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addRateLimiteds())
// now make sure we succeed
reconciler.setResponse(nil)
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 2, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addRateLimiteds())
// check that we requeue at a given rate when using a RequeueAfterError
queue.reset()
reconciler.setResponse(RequeueNow())
require.NoError(t, store.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-3",
}))
require.False(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
require.EqualValues(t, 0, queue.addRateLimiteds())
reconciler.step()
// check we're processed the first time and re-queued
require.True(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addAfters())
// now make sure we succeed
reconciler.setResponse(nil)
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 2, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addAfters())
}
func TestBasicController_RunPanicAssertions(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
started := make(chan struct{})
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(0)
controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
close(started)
return RunWorkQueue(ctx, baseBackoff, maxBackoff)
})
subscription := &stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}
// kick off the controller
go controller.Subscribe(subscription).Run(ctx)
// wait to make sure the following assertions don't
// get run before the above goroutine is spawned
<-started
// make sure we can't call Run again
require.Panics(t, func() {
controller.Run(ctx)
})
// make sure all of our configuration methods panic
require.Panics(t, func() {
controller.Subscribe(subscription)
})
require.Panics(t, func() {
controller.WithBackoff(1, 10)
})
require.Panics(t, func() {
controller.WithWorkers(1)
})
require.Panics(t, func() {
controller.WithQueueFactory(RunWorkQueue)
})
}
func TestConfigEntrySubscriptions(t *testing.T) {
t.Parallel()
cases := map[string]struct {
configEntry func(string) structs.ConfigEntry
topic stream.Topic
kind string
}{
"Subscribe to Service Resolver Config Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: name,
}
},
topic: state.EventTopicServiceResolver,
kind: structs.ServiceResolver,
},
"Subscribe to Ingress Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}
},
topic: state.EventTopicIngressGateway,
kind: structs.IngressGateway,
},
"Subscribe to Service Intentions Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: name,
}
},
topic: state.EventTopicServiceIntentions,
kind: structs.ServiceIntentions,
},
"Subscribe to API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.APIGatewayConfigEntry{
Kind: structs.APIGateway,
Name: name,
}
},
topic: state.EventTopicAPIGateway,
kind: structs.APIGateway,
},
"Subscribe to Inline Certificate Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.InlineCertificateConfigEntry{
Kind: structs.InlineCertificate,
Name: name,
}
},
topic: state.EventTopicInlineCertificate,
kind: structs.InlineCertificate,
},
"Subscribe to HTTP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.HTTPRouteConfigEntry{
Kind: structs.HTTPRoute,
Name: name,
}
},
topic: state.EventTopicHTTPRoute,
kind: structs.HTTPRoute,
},
"Subscribe to TCP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: name,
}
},
topic: state.EventTopicTCPRoute,
kind: structs.TCPRoute,
},
"Subscribe to Bound API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: name,
}
},
topic: state.EventTopicBoundAPIGateway,
kind: structs.BoundAPIGateway,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name)))
}
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: tc.topic,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)
received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, tc.kind, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
})
}
}
func TestBasicController_Triggers(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
reconciler := newTestReconciler(true)
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
controller := New(publisher, reconciler)
go func() {
require.NoError(t, controller.Run(ctx))
}()
ensureCalled := func(request chan Request, name string) bool {
select {
case req := <-request:
require.Equal(t, structs.IngressGateway, req.Kind)
require.Equal(t, name, req.Name)
return true
case <-time.After(10 * time.Millisecond):
return false
}
}
request := Request{
Kind: structs.IngressGateway,
Name: "foo-1",
}
triggerOneChan := make(chan struct{}, 3)
triggerOne := func(ctx context.Context) error {
select {
case <-triggerOneChan:
return nil
case <-ctx.Done():
return nil
}
}
controller.AddTrigger(request, triggerOne)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
triggerOneChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// do it again
require.False(t, ensureCalled(reconciler.received, "foo-1"))
controller.AddTrigger(request, triggerOne)
triggerOneChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// check with the overwritten trigger
controller.AddTrigger(request, triggerOne)
triggerTwoChan := make(chan struct{}, 2)
triggerTwo := func(ctx context.Context) error {
select {
case <-triggerTwoChan:
return nil
case <-ctx.Done():
return nil
}
}
controller.AddTrigger(request, triggerTwo)
triggerOneChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
triggerTwoChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// remove the trigger and make sure we're not called again
controller.RemoveTrigger(request)
triggerTwoChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
}
func TestDiscoveryChainController(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
controller := New(publisher, reconciler)
go controller.Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)
request := Request{
Kind: structs.IngressGateway,
Name: "foo-1",
}
ensureCalled := func(request chan Request, name string) bool {
select {
case req := <-request:
require.Equal(t, structs.IngressGateway, req.Kind)
require.Equal(t, name, req.Name)
return true
case <-time.After(10 * time.Millisecond):
return false
}
}
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-1",
}))
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// create the trigger and something that changes in its upstream discovery chain and ensure that we've
// fired the reconciler
ws := memdb.NewWatchSet()
ws.Add(store.AbandonCh())
_, _, err := store.ReadDiscoveryChainConfigEntries(ws, "foo-2", nil)
require.NoError(t, err)
controller.AddTrigger(request, ws.WatchCtx)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
require.NoError(t, store.EnsureConfigEntry(1, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "foo-2",
}))
require.True(t, ensureCalled(reconciler.received, "foo-1"))
}