573 lines
16 KiB
Go
573 lines
16 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/fsm"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
func TestBasicController(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
reconciler := newTestReconciler(false)
|
|
|
|
publisher := stream.NewEventPublisher(1 * time.Millisecond)
|
|
go publisher.Run(ctx)
|
|
|
|
// get the store through the FSM since the publisher handlers get registered through it
|
|
store := fsm.NewFromDeps(fsm.Deps{
|
|
Logger: hclog.New(nil),
|
|
NewStateStore: func() *state.Store {
|
|
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
|
},
|
|
Publisher: publisher,
|
|
StorageBackend: fsm.NullStorageBackend,
|
|
}).State()
|
|
|
|
for i := 0; i < 200; i++ {
|
|
entryIndex := uint64(i + 1)
|
|
name := fmt.Sprintf("foo-%d", i)
|
|
require.NoError(t, store.EnsureConfigEntry(entryIndex, &structs.IngressGatewayConfigEntry{
|
|
Kind: structs.IngressGateway,
|
|
Name: name,
|
|
}))
|
|
}
|
|
|
|
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
|
|
Topic: state.EventTopicIngressGateway,
|
|
Subject: stream.SubjectWildcard,
|
|
}).WithWorkers(10).Run(ctx)
|
|
|
|
received := []string{}
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case request := <-reconciler.received:
|
|
require.Equal(t, structs.IngressGateway, request.Kind)
|
|
received = append(received, request.Name)
|
|
if len(received) == 200 {
|
|
break LOOP
|
|
}
|
|
case <-ctx.Done():
|
|
break LOOP
|
|
}
|
|
}
|
|
|
|
// since we only modified each entry once, we should have exactly 200 reconciliation calls
|
|
require.Len(t, received, 200)
|
|
for i := 0; i < 200; i++ {
|
|
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
|
|
}
|
|
}
|
|
|
|
func TestBasicController_Transform(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
reconciler := newTestReconciler(false)
|
|
|
|
publisher := stream.NewEventPublisher(0)
|
|
go publisher.Run(ctx)
|
|
|
|
// get the store through the FSM since the publisher handlers get registered through it
|
|
store := fsm.NewFromDeps(fsm.Deps{
|
|
Logger: hclog.New(nil),
|
|
NewStateStore: func() *state.Store {
|
|
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
|
},
|
|
Publisher: publisher,
|
|
StorageBackend: fsm.NullStorageBackend,
|
|
}).State()
|
|
|
|
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
|
|
Topic: state.EventTopicIngressGateway,
|
|
Subject: stream.SubjectWildcard,
|
|
}, func(entry structs.ConfigEntry) []Request {
|
|
return []Request{{
|
|
Kind: "foo",
|
|
Name: "bar",
|
|
}}
|
|
}).Run(ctx)
|
|
|
|
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
|
|
Kind: structs.IngressGateway,
|
|
Name: "test",
|
|
}))
|
|
|
|
select {
|
|
case request := <-reconciler.received:
|
|
require.Equal(t, "foo", request.Kind)
|
|
require.Equal(t, "bar", request.Name)
|
|
case <-ctx.Done():
|
|
t.Fatal("stopped reconciler before event received")
|
|
}
|
|
}
|
|
|
|
func TestBasicController_Retry(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
reconciler := newTestReconciler(true)
|
|
defer reconciler.stop()
|
|
|
|
publisher := stream.NewEventPublisher(0)
|
|
go publisher.Run(ctx)
|
|
|
|
// get the store through the FSM since the publisher handlers get registered through it
|
|
store := fsm.NewFromDeps(fsm.Deps{
|
|
Logger: hclog.New(nil),
|
|
NewStateStore: func() *state.Store {
|
|
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
|
},
|
|
Publisher: publisher,
|
|
StorageBackend: fsm.NullStorageBackend,
|
|
}).State()
|
|
|
|
queueInitialized := make(chan *countingWorkQueue)
|
|
controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
|
|
Topic: state.EventTopicIngressGateway,
|
|
Subject: stream.SubjectWildcard,
|
|
}).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond)
|
|
go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
|
|
queue := newCountingWorkQueue(RunWorkQueue(ctx, baseBackoff, maxBackoff))
|
|
queueInitialized <- queue
|
|
return queue
|
|
}).Run(ctx)
|
|
|
|
queue := <-queueInitialized
|
|
|
|
ensureCalled := func(request chan Request, name string) bool {
|
|
// give a short window for our counters to update
|
|
defer time.Sleep(10 * time.Millisecond)
|
|
select {
|
|
case req := <-request:
|
|
require.Equal(t, structs.IngressGateway, req.Kind)
|
|
require.Equal(t, name, req.Name)
|
|
return true
|
|
case <-time.After(10 * time.Millisecond):
|
|
return false
|
|
}
|
|
}
|
|
|
|
// check to make sure we are called once
|
|
queue.reset()
|
|
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
|
|
Kind: structs.IngressGateway,
|
|
Name: "foo-1",
|
|
}))
|
|
require.False(t, ensureCalled(reconciler.received, "foo-1"))
|
|
require.EqualValues(t, 0, queue.dones())
|
|
require.EqualValues(t, 0, queue.requeues())
|
|
reconciler.step()
|
|
require.True(t, ensureCalled(reconciler.received, "foo-1"))
|
|
require.EqualValues(t, 1, queue.dones())
|
|
require.EqualValues(t, 0, queue.requeues())
|
|
|
|
// check that we requeue when an arbitrary error occurs
|
|
queue.reset()
|
|
reconciler.setResponse(errors.New("error"))
|
|
require.NoError(t, store.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{
|
|
Kind: structs.IngressGateway,
|
|
Name: "foo-2",
|
|
}))
|
|
require.False(t, ensureCalled(reconciler.received, "foo-2"))
|
|
require.EqualValues(t, 0, queue.dones())
|
|
require.EqualValues(t, 0, queue.requeues())
|
|
require.EqualValues(t, 0, queue.addRateLimiteds())
|
|
reconciler.step()
|
|
// check we're processed the first time and re-queued
|
|
require.True(t, ensureCalled(reconciler.received, "foo-2"))
|
|
require.EqualValues(t, 1, queue.dones())
|
|
require.EqualValues(t, 1, queue.requeues())
|
|
require.EqualValues(t, 1, queue.addRateLimiteds())
|
|
// now make sure we succeed
|
|
reconciler.setResponse(nil)
|
|
reconciler.step()
|
|
require.True(t, ensureCalled(reconciler.received, "foo-2"))
|
|
require.EqualValues(t, 2, queue.dones())
|
|
require.EqualValues(t, 1, queue.requeues())
|
|
require.EqualValues(t, 1, queue.addRateLimiteds())
|
|
|
|
// check that we requeue at a given rate when using a RequeueAfterError
|
|
queue.reset()
|
|
reconciler.setResponse(RequeueNow())
|
|
require.NoError(t, store.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{
|
|
Kind: structs.IngressGateway,
|
|
Name: "foo-3",
|
|
}))
|
|
require.False(t, ensureCalled(reconciler.received, "foo-3"))
|
|
require.EqualValues(t, 0, queue.dones())
|
|
require.EqualValues(t, 0, queue.requeues())
|
|
require.EqualValues(t, 0, queue.addRateLimiteds())
|
|
reconciler.step()
|
|
// check we're processed the first time and re-queued
|
|
require.True(t, ensureCalled(reconciler.received, "foo-3"))
|
|
require.EqualValues(t, 1, queue.dones())
|
|
require.EqualValues(t, 1, queue.requeues())
|
|
require.EqualValues(t, 1, queue.addAfters())
|
|
// now make sure we succeed
|
|
reconciler.setResponse(nil)
|
|
reconciler.step()
|
|
require.True(t, ensureCalled(reconciler.received, "foo-3"))
|
|
require.EqualValues(t, 2, queue.dones())
|
|
require.EqualValues(t, 1, queue.requeues())
|
|
require.EqualValues(t, 1, queue.addAfters())
|
|
}
|
|
|
|
func TestBasicController_RunPanicAssertions(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
started := make(chan struct{})
|
|
reconciler := newTestReconciler(false)
|
|
publisher := stream.NewEventPublisher(0)
|
|
controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
|
|
close(started)
|
|
return RunWorkQueue(ctx, baseBackoff, maxBackoff)
|
|
})
|
|
subscription := &stream.SubscribeRequest{
|
|
Topic: state.EventTopicIngressGateway,
|
|
Subject: stream.SubjectWildcard,
|
|
}
|
|
|
|
// kick off the controller
|
|
go controller.Subscribe(subscription).Run(ctx)
|
|
|
|
// wait to make sure the following assertions don't
|
|
// get run before the above goroutine is spawned
|
|
<-started
|
|
|
|
// make sure we can't call Run again
|
|
require.Panics(t, func() {
|
|
controller.Run(ctx)
|
|
})
|
|
|
|
// make sure all of our configuration methods panic
|
|
require.Panics(t, func() {
|
|
controller.Subscribe(subscription)
|
|
})
|
|
require.Panics(t, func() {
|
|
controller.WithBackoff(1, 10)
|
|
})
|
|
require.Panics(t, func() {
|
|
controller.WithWorkers(1)
|
|
})
|
|
require.Panics(t, func() {
|
|
controller.WithQueueFactory(RunWorkQueue)
|
|
})
|
|
}
|
|
|
|
func TestConfigEntrySubscriptions(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
cases := map[string]struct {
|
|
configEntry func(string) structs.ConfigEntry
|
|
topic stream.Topic
|
|
kind string
|
|
}{
|
|
"Subscribe to Service Resolver Config Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.ServiceResolverConfigEntry{
|
|
Kind: structs.ServiceResolver,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicServiceResolver,
|
|
kind: structs.ServiceResolver,
|
|
},
|
|
"Subscribe to Ingress Gateway Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.IngressGatewayConfigEntry{
|
|
Kind: structs.IngressGateway,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicIngressGateway,
|
|
kind: structs.IngressGateway,
|
|
},
|
|
"Subscribe to Service Intentions Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.ServiceIntentionsConfigEntry{
|
|
Kind: structs.ServiceIntentions,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicServiceIntentions,
|
|
kind: structs.ServiceIntentions,
|
|
},
|
|
"Subscribe to API Gateway Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.APIGatewayConfigEntry{
|
|
Kind: structs.APIGateway,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicAPIGateway,
|
|
kind: structs.APIGateway,
|
|
},
|
|
"Subscribe to Inline Certificate Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.InlineCertificateConfigEntry{
|
|
Kind: structs.InlineCertificate,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicInlineCertificate,
|
|
kind: structs.InlineCertificate,
|
|
},
|
|
"Subscribe to HTTP Route Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.HTTPRouteConfigEntry{
|
|
Kind: structs.HTTPRoute,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicHTTPRoute,
|
|
kind: structs.HTTPRoute,
|
|
},
|
|
"Subscribe to TCP Route Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.TCPRouteConfigEntry{
|
|
Kind: structs.TCPRoute,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicTCPRoute,
|
|
kind: structs.TCPRoute,
|
|
},
|
|
"Subscribe to Bound API Gateway Changes": {
|
|
configEntry: func(name string) structs.ConfigEntry {
|
|
return &structs.BoundAPIGatewayConfigEntry{
|
|
Kind: structs.BoundAPIGateway,
|
|
Name: name,
|
|
}
|
|
},
|
|
topic: state.EventTopicBoundAPIGateway,
|
|
kind: structs.BoundAPIGateway,
|
|
},
|
|
}
|
|
|
|
for name, tc := range cases {
|
|
t.Run(name, func(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
reconciler := newTestReconciler(false)
|
|
|
|
publisher := stream.NewEventPublisher(1 * time.Millisecond)
|
|
go publisher.Run(ctx)
|
|
|
|
// get the store through the FSM since the publisher handlers get registered through it
|
|
store := fsm.NewFromDeps(fsm.Deps{
|
|
Logger: hclog.New(nil),
|
|
NewStateStore: func() *state.Store {
|
|
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
|
},
|
|
Publisher: publisher,
|
|
StorageBackend: fsm.NullStorageBackend,
|
|
}).State()
|
|
|
|
for i := 0; i < 200; i++ {
|
|
entryIndex := uint64(i + 1)
|
|
name := fmt.Sprintf("foo-%d", i)
|
|
require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name)))
|
|
}
|
|
|
|
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
|
|
Topic: tc.topic,
|
|
Subject: stream.SubjectWildcard,
|
|
}).WithWorkers(10).Run(ctx)
|
|
|
|
received := []string{}
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case request := <-reconciler.received:
|
|
require.Equal(t, tc.kind, request.Kind)
|
|
received = append(received, request.Name)
|
|
if len(received) == 200 {
|
|
break LOOP
|
|
}
|
|
case <-ctx.Done():
|
|
break LOOP
|
|
}
|
|
}
|
|
|
|
// since we only modified each entry once, we should have exactly 200 reconciliation calls
|
|
require.Len(t, received, 200)
|
|
for i := 0; i < 200; i++ {
|
|
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestBasicController_Triggers(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
reconciler := newTestReconciler(true)
|
|
|
|
publisher := stream.NewEventPublisher(0)
|
|
go publisher.Run(ctx)
|
|
|
|
controller := New(publisher, reconciler)
|
|
|
|
go func() {
|
|
require.NoError(t, controller.Run(ctx))
|
|
}()
|
|
|
|
ensureCalled := func(request chan Request, name string) bool {
|
|
select {
|
|
case req := <-request:
|
|
require.Equal(t, structs.IngressGateway, req.Kind)
|
|
require.Equal(t, name, req.Name)
|
|
return true
|
|
case <-time.After(10 * time.Millisecond):
|
|
return false
|
|
}
|
|
}
|
|
|
|
request := Request{
|
|
Kind: structs.IngressGateway,
|
|
Name: "foo-1",
|
|
}
|
|
|
|
triggerOneChan := make(chan struct{}, 3)
|
|
triggerOne := func(ctx context.Context) error {
|
|
select {
|
|
case <-triggerOneChan:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
controller.AddTrigger(request, triggerOne)
|
|
require.False(t, ensureCalled(reconciler.received, "foo-1"))
|
|
triggerOneChan <- struct{}{}
|
|
reconciler.stepFor(10 * time.Millisecond)
|
|
require.True(t, ensureCalled(reconciler.received, "foo-1"))
|
|
|
|
// do it again
|
|
require.False(t, ensureCalled(reconciler.received, "foo-1"))
|
|
controller.AddTrigger(request, triggerOne)
|
|
triggerOneChan <- struct{}{}
|
|
reconciler.stepFor(10 * time.Millisecond)
|
|
require.True(t, ensureCalled(reconciler.received, "foo-1"))
|
|
|
|
// check with the overwritten trigger
|
|
controller.AddTrigger(request, triggerOne)
|
|
triggerTwoChan := make(chan struct{}, 2)
|
|
triggerTwo := func(ctx context.Context) error {
|
|
select {
|
|
case <-triggerTwoChan:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
controller.AddTrigger(request, triggerTwo)
|
|
triggerOneChan <- struct{}{}
|
|
reconciler.stepFor(10 * time.Millisecond)
|
|
require.False(t, ensureCalled(reconciler.received, "foo-1"))
|
|
triggerTwoChan <- struct{}{}
|
|
reconciler.stepFor(10 * time.Millisecond)
|
|
require.True(t, ensureCalled(reconciler.received, "foo-1"))
|
|
|
|
// remove the trigger and make sure we're not called again
|
|
controller.RemoveTrigger(request)
|
|
triggerTwoChan <- struct{}{}
|
|
reconciler.stepFor(10 * time.Millisecond)
|
|
require.False(t, ensureCalled(reconciler.received, "foo-1"))
|
|
}
|
|
|
|
func TestDiscoveryChainController(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
reconciler := newTestReconciler(false)
|
|
|
|
publisher := stream.NewEventPublisher(1 * time.Millisecond)
|
|
go publisher.Run(ctx)
|
|
|
|
// get the store through the FSM since the publisher handlers get registered through it
|
|
store := fsm.NewFromDeps(fsm.Deps{
|
|
Logger: hclog.New(nil),
|
|
NewStateStore: func() *state.Store {
|
|
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
|
},
|
|
Publisher: publisher,
|
|
StorageBackend: fsm.NullStorageBackend,
|
|
}).State()
|
|
|
|
controller := New(publisher, reconciler)
|
|
go controller.Subscribe(&stream.SubscribeRequest{
|
|
Topic: state.EventTopicIngressGateway,
|
|
Subject: stream.SubjectWildcard,
|
|
}).WithWorkers(10).Run(ctx)
|
|
|
|
request := Request{
|
|
Kind: structs.IngressGateway,
|
|
Name: "foo-1",
|
|
}
|
|
|
|
ensureCalled := func(request chan Request, name string) bool {
|
|
select {
|
|
case req := <-request:
|
|
require.Equal(t, structs.IngressGateway, req.Kind)
|
|
require.Equal(t, name, req.Name)
|
|
return true
|
|
case <-time.After(10 * time.Millisecond):
|
|
return false
|
|
}
|
|
}
|
|
|
|
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
|
|
Kind: structs.IngressGateway,
|
|
Name: "foo-1",
|
|
}))
|
|
require.True(t, ensureCalled(reconciler.received, "foo-1"))
|
|
|
|
// create the trigger and something that changes in its upstream discovery chain and ensure that we've
|
|
// fired the reconciler
|
|
ws := memdb.NewWatchSet()
|
|
ws.Add(store.AbandonCh())
|
|
_, _, err := store.ReadDiscoveryChainConfigEntries(ws, "foo-2", nil)
|
|
require.NoError(t, err)
|
|
controller.AddTrigger(request, ws.WatchCtx)
|
|
|
|
require.False(t, ensureCalled(reconciler.received, "foo-1"))
|
|
require.NoError(t, store.EnsureConfigEntry(1, &structs.ServiceResolverConfigEntry{
|
|
Kind: structs.ServiceResolver,
|
|
Name: "foo-2",
|
|
}))
|
|
require.True(t, ensureCalled(reconciler.received, "foo-1"))
|
|
}
|