open-consul/agent/consul/controller/controller_test.go

574 lines
16 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package controller
import (
"context"
"errors"
"fmt"
"testing"
"time"
2023-04-04 16:30:06 +00:00
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
)
func TestBasicController(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
2023-04-04 16:30:06 +00:00
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}))
}
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)
received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, structs.IngressGateway, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
Native API Gateway Config Entries (#15897) * Stub Config Entries for Consul Native API Gateway (#15644) * Add empty InlineCertificate struct and protobuf * apigateway stubs * Stub HTTPRoute in api pkg * Stub HTTPRoute in structs pkg * Simplify api.APIGatewayConfigEntry to be consistent w/ other entries * Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry * Add TCPRoute to MakeConfigEntry, return unique Kind * Stub BoundAPIGatewayConfigEntry in agent * Add RaftIndex to APIGatewayConfigEntry stub * Add new config entry kinds to validation allow-list * Add RaftIndex to other added config entry stubs * Update usage metrics assertions to include new cfg entries * Add Meta and acl.EnterpriseMeta to all new ConfigEntry types * Remove unnecessary Services field from added config entry types * Implement GetMeta(), GetEnterpriseMeta() for added config entry types * Add meta field to proto, name consistently w/ existing config entries * Format config_entry.proto * Add initial implementation of CanRead + CanWrite for new config entry types * Add unit tests for decoding of new config entry types * Add unit tests for parsing of new config entry types * Add unit tests for API Gateway config entry ACLs * Return typed PermissionDeniedError on BoundAPIGateway CanWrite * Add unit tests for added config entry ACLs * Add BoundAPIGateway type to AllConfigEntryKinds * Return proper kind from BoundAPIGateway * Add docstrings for new config entry types * Add missing config entry kinds to proto def * Update usagemetrics_oss_test.go * Use utility func for returning PermissionDeniedError * EventPublisher subscriptions for Consul Native API Gateway (#15757) * Create new event topics in subscribe proto * Add tests for PBSubscribe func * Make configs singular, add all configs to PBToStreamSubscribeRequest * Add snapshot methods * Add config_entry_events tests * Add config entry kind to topic for new configs * Add unit tests for snapshot methods * Start adding integration test * Test using the new controller code * Update agent/consul/state/config_entry_events.go * Check value of error * Add controller stubs for API Gateway (#15837) * update initial stub implementation * move files, clean up mutex references * Remove embed, use idiomatic names for constructors * Remove stray file introduced in merge * Add APIGateway validation (#15847) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * APIGateway InlineCertificate validation (#15856) * Add APIGateway validation * Add additional validations * Add protobuf definitions * Tabs to spaces * Add API structs * Move struct fields around a bit * Add validation for InlineCertificate * Fix ACL test * APIGateway BoundAPIGateway validation (#15858) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * Add validation for BoundAPIGateway * APIGateway TCPRoute validation (#15855) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Add TCPRoute normalization and validation * Add forgotten Status * Add some more field docs in api package * Fix test * Format imports * Rename snapshot test variable names * Add plumbing for Native API GW Subscriptions (#16003) Co-authored-by: Sarah Alsmiller <sarah.alsmiller@hashicorp.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> Co-authored-by: sarahalsmiller <100602640+sarahalsmiller@users.noreply.github.com> Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
2023-01-18 22:14:34 +00:00
// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
}
func TestBasicController_Transform(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
2023-04-04 16:30:06 +00:00
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}, func(entry structs.ConfigEntry) []Request {
return []Request{{
Kind: "foo",
Name: "bar",
}}
}).Run(ctx)
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "test",
}))
select {
case request := <-reconciler.received:
require.Equal(t, "foo", request.Kind)
require.Equal(t, "bar", request.Name)
case <-ctx.Done():
t.Fatal("stopped reconciler before event received")
}
}
func TestBasicController_Retry(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
reconciler := newTestReconciler(true)
defer reconciler.stop()
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
2023-04-04 16:30:06 +00:00
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
queueInitialized := make(chan *countingWorkQueue[Request])
controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond)
go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] {
queue := newCountingWorkQueue(queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff))
queueInitialized <- queue
return queue
}).Run(ctx)
queue := <-queueInitialized
ensureCalled := func(request chan Request, name string) bool {
// give a short window for our counters to update
defer time.Sleep(10 * time.Millisecond)
select {
case req := <-request:
require.Equal(t, structs.IngressGateway, req.Kind)
require.Equal(t, name, req.Name)
return true
case <-time.After(10 * time.Millisecond):
return false
}
}
// check to make sure we are called once
queue.reset()
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-1",
}))
require.False(t, ensureCalled(reconciler.received, "foo-1"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-1"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 0, queue.requeues())
// check that we requeue when an arbitrary error occurs
queue.reset()
reconciler.setResponse(errors.New("error"))
require.NoError(t, store.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-2",
}))
require.False(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
require.EqualValues(t, 0, queue.addRateLimiteds())
reconciler.step()
// check we're processed the first time and re-queued
require.True(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addRateLimiteds())
// now make sure we succeed
reconciler.setResponse(nil)
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 2, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addRateLimiteds())
// check that we requeue at a given rate when using a RequeueAfterError
queue.reset()
reconciler.setResponse(RequeueNow())
require.NoError(t, store.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-3",
}))
require.False(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
require.EqualValues(t, 0, queue.addRateLimiteds())
reconciler.step()
// check we're processed the first time and re-queued
require.True(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addAfters())
// now make sure we succeed
reconciler.setResponse(nil)
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 2, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addAfters())
}
func TestBasicController_RunPanicAssertions(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
started := make(chan struct{})
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(0)
controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] {
close(started)
return queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff)
})
subscription := &stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}
// kick off the controller
go controller.Subscribe(subscription).Run(ctx)
// wait to make sure the following assertions don't
// get run before the above goroutine is spawned
<-started
// make sure we can't call Run again
require.Panics(t, func() {
controller.Run(ctx)
})
// make sure all of our configuration methods panic
require.Panics(t, func() {
controller.Subscribe(subscription)
})
require.Panics(t, func() {
controller.WithBackoff(1, 10)
})
require.Panics(t, func() {
controller.WithWorkers(1)
})
require.Panics(t, func() {
controller.WithQueueFactory(queue.RunWorkQueue[Request])
})
}
Native API Gateway Config Entries (#15897) * Stub Config Entries for Consul Native API Gateway (#15644) * Add empty InlineCertificate struct and protobuf * apigateway stubs * Stub HTTPRoute in api pkg * Stub HTTPRoute in structs pkg * Simplify api.APIGatewayConfigEntry to be consistent w/ other entries * Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry * Add TCPRoute to MakeConfigEntry, return unique Kind * Stub BoundAPIGatewayConfigEntry in agent * Add RaftIndex to APIGatewayConfigEntry stub * Add new config entry kinds to validation allow-list * Add RaftIndex to other added config entry stubs * Update usage metrics assertions to include new cfg entries * Add Meta and acl.EnterpriseMeta to all new ConfigEntry types * Remove unnecessary Services field from added config entry types * Implement GetMeta(), GetEnterpriseMeta() for added config entry types * Add meta field to proto, name consistently w/ existing config entries * Format config_entry.proto * Add initial implementation of CanRead + CanWrite for new config entry types * Add unit tests for decoding of new config entry types * Add unit tests for parsing of new config entry types * Add unit tests for API Gateway config entry ACLs * Return typed PermissionDeniedError on BoundAPIGateway CanWrite * Add unit tests for added config entry ACLs * Add BoundAPIGateway type to AllConfigEntryKinds * Return proper kind from BoundAPIGateway * Add docstrings for new config entry types * Add missing config entry kinds to proto def * Update usagemetrics_oss_test.go * Use utility func for returning PermissionDeniedError * EventPublisher subscriptions for Consul Native API Gateway (#15757) * Create new event topics in subscribe proto * Add tests for PBSubscribe func * Make configs singular, add all configs to PBToStreamSubscribeRequest * Add snapshot methods * Add config_entry_events tests * Add config entry kind to topic for new configs * Add unit tests for snapshot methods * Start adding integration test * Test using the new controller code * Update agent/consul/state/config_entry_events.go * Check value of error * Add controller stubs for API Gateway (#15837) * update initial stub implementation * move files, clean up mutex references * Remove embed, use idiomatic names for constructors * Remove stray file introduced in merge * Add APIGateway validation (#15847) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * APIGateway InlineCertificate validation (#15856) * Add APIGateway validation * Add additional validations * Add protobuf definitions * Tabs to spaces * Add API structs * Move struct fields around a bit * Add validation for InlineCertificate * Fix ACL test * APIGateway BoundAPIGateway validation (#15858) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * Add validation for BoundAPIGateway * APIGateway TCPRoute validation (#15855) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Add TCPRoute normalization and validation * Add forgotten Status * Add some more field docs in api package * Fix test * Format imports * Rename snapshot test variable names * Add plumbing for Native API GW Subscriptions (#16003) Co-authored-by: Sarah Alsmiller <sarah.alsmiller@hashicorp.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> Co-authored-by: sarahalsmiller <100602640+sarahalsmiller@users.noreply.github.com> Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
2023-01-18 22:14:34 +00:00
func TestConfigEntrySubscriptions(t *testing.T) {
t.Parallel()
cases := map[string]struct {
configEntry func(string) structs.ConfigEntry
topic stream.Topic
kind string
}{
"Subscribe to Service Resolver Config Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: name,
}
},
topic: state.EventTopicServiceResolver,
kind: structs.ServiceResolver,
},
"Subscribe to Ingress Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}
},
topic: state.EventTopicIngressGateway,
kind: structs.IngressGateway,
},
"Subscribe to Service Intentions Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: name,
}
},
topic: state.EventTopicServiceIntentions,
kind: structs.ServiceIntentions,
},
"Subscribe to API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.APIGatewayConfigEntry{
Kind: structs.APIGateway,
Name: name,
}
},
topic: state.EventTopicAPIGateway,
kind: structs.APIGateway,
},
"Subscribe to Inline Certificate Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.InlineCertificateConfigEntry{
Kind: structs.InlineCertificate,
Name: name,
}
},
topic: state.EventTopicInlineCertificate,
kind: structs.InlineCertificate,
},
"Subscribe to HTTP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.HTTPRouteConfigEntry{
Kind: structs.HTTPRoute,
Name: name,
}
},
topic: state.EventTopicHTTPRoute,
kind: structs.HTTPRoute,
},
"Subscribe to TCP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: name,
}
},
topic: state.EventTopicTCPRoute,
kind: structs.TCPRoute,
},
"Subscribe to Bound API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: name,
}
},
topic: state.EventTopicBoundAPIGateway,
kind: structs.BoundAPIGateway,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
2023-04-04 16:30:06 +00:00
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
Native API Gateway Config Entries (#15897) * Stub Config Entries for Consul Native API Gateway (#15644) * Add empty InlineCertificate struct and protobuf * apigateway stubs * Stub HTTPRoute in api pkg * Stub HTTPRoute in structs pkg * Simplify api.APIGatewayConfigEntry to be consistent w/ other entries * Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry * Add TCPRoute to MakeConfigEntry, return unique Kind * Stub BoundAPIGatewayConfigEntry in agent * Add RaftIndex to APIGatewayConfigEntry stub * Add new config entry kinds to validation allow-list * Add RaftIndex to other added config entry stubs * Update usage metrics assertions to include new cfg entries * Add Meta and acl.EnterpriseMeta to all new ConfigEntry types * Remove unnecessary Services field from added config entry types * Implement GetMeta(), GetEnterpriseMeta() for added config entry types * Add meta field to proto, name consistently w/ existing config entries * Format config_entry.proto * Add initial implementation of CanRead + CanWrite for new config entry types * Add unit tests for decoding of new config entry types * Add unit tests for parsing of new config entry types * Add unit tests for API Gateway config entry ACLs * Return typed PermissionDeniedError on BoundAPIGateway CanWrite * Add unit tests for added config entry ACLs * Add BoundAPIGateway type to AllConfigEntryKinds * Return proper kind from BoundAPIGateway * Add docstrings for new config entry types * Add missing config entry kinds to proto def * Update usagemetrics_oss_test.go * Use utility func for returning PermissionDeniedError * EventPublisher subscriptions for Consul Native API Gateway (#15757) * Create new event topics in subscribe proto * Add tests for PBSubscribe func * Make configs singular, add all configs to PBToStreamSubscribeRequest * Add snapshot methods * Add config_entry_events tests * Add config entry kind to topic for new configs * Add unit tests for snapshot methods * Start adding integration test * Test using the new controller code * Update agent/consul/state/config_entry_events.go * Check value of error * Add controller stubs for API Gateway (#15837) * update initial stub implementation * move files, clean up mutex references * Remove embed, use idiomatic names for constructors * Remove stray file introduced in merge * Add APIGateway validation (#15847) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * APIGateway InlineCertificate validation (#15856) * Add APIGateway validation * Add additional validations * Add protobuf definitions * Tabs to spaces * Add API structs * Move struct fields around a bit * Add validation for InlineCertificate * Fix ACL test * APIGateway BoundAPIGateway validation (#15858) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * Add validation for BoundAPIGateway * APIGateway TCPRoute validation (#15855) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Add TCPRoute normalization and validation * Add forgotten Status * Add some more field docs in api package * Fix test * Format imports * Rename snapshot test variable names * Add plumbing for Native API GW Subscriptions (#16003) Co-authored-by: Sarah Alsmiller <sarah.alsmiller@hashicorp.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> Co-authored-by: sarahalsmiller <100602640+sarahalsmiller@users.noreply.github.com> Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
2023-01-18 22:14:34 +00:00
}).State()
for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name)))
}
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: tc.topic,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)
received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, tc.kind, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
})
}
}
func TestBasicController_Triggers(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
reconciler := newTestReconciler(true)
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
controller := New(publisher, reconciler)
go func() {
require.NoError(t, controller.Run(ctx))
}()
ensureCalled := func(request chan Request, name string) bool {
select {
case req := <-request:
require.Equal(t, structs.IngressGateway, req.Kind)
require.Equal(t, name, req.Name)
return true
case <-time.After(10 * time.Millisecond):
return false
}
}
request := Request{
Kind: structs.IngressGateway,
Name: "foo-1",
}
triggerOneChan := make(chan struct{}, 3)
triggerOne := func(ctx context.Context) error {
select {
case <-triggerOneChan:
return nil
case <-ctx.Done():
return nil
}
}
controller.AddTrigger(request, triggerOne)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
triggerOneChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// do it again
require.False(t, ensureCalled(reconciler.received, "foo-1"))
controller.AddTrigger(request, triggerOne)
triggerOneChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// check with the overwritten trigger
controller.AddTrigger(request, triggerOne)
triggerTwoChan := make(chan struct{}, 2)
triggerTwo := func(ctx context.Context) error {
select {
case <-triggerTwoChan:
return nil
case <-ctx.Done():
return nil
}
}
controller.AddTrigger(request, triggerTwo)
triggerOneChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
triggerTwoChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// remove the trigger and make sure we're not called again
controller.RemoveTrigger(request)
triggerTwoChan <- struct{}{}
reconciler.stepFor(10 * time.Millisecond)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
}
func TestDiscoveryChainController(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
2023-04-04 16:30:06 +00:00
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
controller := New(publisher, reconciler)
go controller.Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)
request := Request{
Kind: structs.IngressGateway,
Name: "foo-1",
}
ensureCalled := func(request chan Request, name string) bool {
select {
case req := <-request:
require.Equal(t, structs.IngressGateway, req.Kind)
require.Equal(t, name, req.Name)
return true
case <-time.After(10 * time.Millisecond):
return false
}
}
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-1",
}))
require.True(t, ensureCalled(reconciler.received, "foo-1"))
// create the trigger and something that changes in its upstream discovery chain and ensure that we've
// fired the reconciler
ws := memdb.NewWatchSet()
ws.Add(store.AbandonCh())
_, _, err := store.ReadDiscoveryChainConfigEntries(ws, "foo-2", nil)
require.NoError(t, err)
controller.AddTrigger(request, ws.WatchCtx)
require.False(t, ensureCalled(reconciler.received, "foo-1"))
require.NoError(t, store.EnsureConfigEntry(1, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "foo-2",
}))
require.True(t, ensureCalled(reconciler.received, "foo-1"))
}