server: skip deleted and deleting namespaces when migrating intentions to config entries (#9186)
This commit is contained in:
parent
a343365da7
commit
6300abed18
|
@ -0,0 +1,18 @@
|
|||
```release-note:bug
|
||||
server: skip deleted and deleting namespaces when migrating intentions to config entries
|
||||
```
|
||||
|
||||
```release-note:breaking-change
|
||||
server: **(Enterprise only)** Pre-existing intentions defined with
|
||||
non-existent destination namespaces were non-functional and are erased during
|
||||
the upgrade process. This should not matter as these intentions had nothing to
|
||||
enforce.
|
||||
```
|
||||
|
||||
```release-note:breaking-change
|
||||
server: **(OSS only)** Pre-existing intentions defined with either a source or
|
||||
destination namespace value that is not "default" are rewritten or deleted
|
||||
during the upgrade process. Wildcards first attempt to downgrade to "default"
|
||||
unless an intention already exists, otherwise these non-functional intentions
|
||||
are deleted.
|
||||
```
|
|
@ -100,6 +100,11 @@ func (s *Server) legacyIntentionMigration(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
entries, err = s.filterMigratedLegacyIntentions(entries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Totally cheat and repurpose one part of config entry replication
|
||||
// here so we automatically get our writes rate limited.
|
||||
_, err = s.reconcileLocalConfig(ctx, entries, structs.ConfigEntryUpsert)
|
||||
|
|
|
@ -84,3 +84,7 @@ func migrateIntentionsToConfigEntries(ixns structs.Intentions) []*structs.Servic
|
|||
|
||||
return structs.MigrateIntentions(output)
|
||||
}
|
||||
|
||||
func (s *Server) filterMigratedLegacyIntentions(entries []structs.ConfigEntry) ([]structs.ConfigEntry, error) {
|
||||
return entries, nil
|
||||
}
|
||||
|
|
|
@ -11,6 +11,13 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func testLeader_LegacyIntentionMigrationHookEnterprise(_ *testing.T, _ *Server, _ bool) {
|
||||
}
|
||||
|
||||
func appendLegacyIntentionsForMigrationTestEnterprise(_ *testing.T, _ *Server, ixns []*structs.Intention) []*structs.Intention {
|
||||
return ixns
|
||||
}
|
||||
|
||||
func TestMigrateIntentionsToConfigEntries(t *testing.T) {
|
||||
compare := func(t *testing.T, got structs.Intentions, expect [][]string) {
|
||||
t.Helper()
|
||||
|
|
|
@ -425,6 +425,11 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
makeIxn("contractor", "*", false),
|
||||
makeIxn("*", "*", true),
|
||||
}
|
||||
ixns = appendLegacyIntentionsForMigrationTestEnterprise(t, s1pre, ixns)
|
||||
|
||||
testLeader_LegacyIntentionMigrationHookEnterprise(t, s1pre, true)
|
||||
|
||||
var retained []*structs.Intention
|
||||
for _, ixn := range ixns {
|
||||
ixn2 := *ixn
|
||||
resp, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
|
||||
|
@ -435,6 +440,10 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
if respErr, ok := resp.(error); ok {
|
||||
t.Fatalf("respErr: %v", respErr)
|
||||
}
|
||||
|
||||
if _, present := ixn.Meta["unit-test-discarded"]; !present {
|
||||
retained = append(retained, ixn)
|
||||
}
|
||||
}
|
||||
|
||||
mapify := func(ixns []*structs.Intention) map[string]*structs.Intention {
|
||||
|
@ -465,7 +474,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
for k, expectV := range expect {
|
||||
gotV, ok := gotM[k]
|
||||
if !ok {
|
||||
r.Errorf("results are missing key %q", k)
|
||||
r.Errorf("results are missing key %q: %v", k, expectV)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -483,8 +492,14 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
}
|
||||
|
||||
expectM := mapify(ixns)
|
||||
expectRetainedM := mapify(retained)
|
||||
|
||||
require.True(t, t.Run("check initial intentions", func(t *testing.T) {
|
||||
checkIntentions(t, s1pre, false, expectM)
|
||||
}))
|
||||
require.True(t, t.Run("check initial legacy intentions", func(t *testing.T) {
|
||||
checkIntentions(t, s1pre, true, expectM)
|
||||
}))
|
||||
|
||||
// Shutdown s1pre and restart it to trigger migration.
|
||||
s1pre.Shutdown()
|
||||
|
@ -500,8 +515,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// check that all 7 intentions are present before migration
|
||||
checkIntentions(t, s1, false, expectM)
|
||||
testLeader_LegacyIntentionMigrationHookEnterprise(t, s1, false)
|
||||
|
||||
// Wait until the migration routine is complete.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -513,9 +527,13 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
})
|
||||
|
||||
// check that all 7 intentions are present the general way after migration
|
||||
checkIntentions(t, s1, false, expectM)
|
||||
require.True(t, t.Run("check migrated intentions", func(t *testing.T) {
|
||||
checkIntentions(t, s1, false, expectRetainedM)
|
||||
}))
|
||||
require.True(t, t.Run("check migrated legacy intentions", func(t *testing.T) {
|
||||
// check that no intentions exist in the legacy table
|
||||
checkIntentions(t, s1, true, map[string]*structs.Intention{})
|
||||
}))
|
||||
|
||||
mapifyConfigs := func(entries interface{}) map[structs.ConfigEntryKindName]*structs.ServiceIntentionsConfigEntry {
|
||||
m := make(map[structs.ConfigEntryKindName]*structs.ServiceIntentionsConfigEntry)
|
||||
|
@ -541,7 +559,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
gotConfigsM := mapifyConfigs(gotConfigs)
|
||||
|
||||
expectConfigs := structs.MigrateIntentions(ixns)
|
||||
expectConfigs := structs.MigrateIntentions(retained)
|
||||
for _, entry := range expectConfigs {
|
||||
require.NoError(t, entry.LegacyNormalize()) // tidy them up the same way the write would
|
||||
}
|
||||
|
|
|
@ -12,8 +12,17 @@ import (
|
|||
type LeaderRoutine func(ctx context.Context) error
|
||||
|
||||
type leaderRoutine struct {
|
||||
running bool
|
||||
cancel context.CancelFunc
|
||||
stoppedCh chan struct{} // closed when no longer running
|
||||
}
|
||||
|
||||
func (r *leaderRoutine) running() bool {
|
||||
select {
|
||||
case <-r.stoppedCh:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
type LeaderRoutineManager struct {
|
||||
|
@ -41,7 +50,7 @@ func (m *LeaderRoutineManager) IsRunning(name string) bool {
|
|||
defer m.lock.Unlock()
|
||||
|
||||
if routine, ok := m.routines[name]; ok {
|
||||
return routine.running
|
||||
return routine.running()
|
||||
}
|
||||
|
||||
return false
|
||||
|
@ -55,7 +64,7 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
|||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
if instance, ok := m.routines[name]; ok && instance.running {
|
||||
if instance, ok := m.routines[name]; ok && instance.running() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -65,11 +74,15 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
|||
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
instance := &leaderRoutine{
|
||||
running: true,
|
||||
cancel: cancel,
|
||||
stoppedCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
close(instance.stoppedCh)
|
||||
}()
|
||||
|
||||
err := routine(ctx)
|
||||
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||||
m.logger.Error("routine exited with error",
|
||||
|
@ -79,10 +92,6 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
|||
} else {
|
||||
m.logger.Debug("stopped routine", "routine", name)
|
||||
}
|
||||
|
||||
m.lock.Lock()
|
||||
instance.running = false
|
||||
m.lock.Unlock()
|
||||
}()
|
||||
|
||||
m.routines[name] = instance
|
||||
|
@ -90,7 +99,19 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) Stop(name string) error {
|
||||
func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} {
|
||||
instance := m.stopInstance(name)
|
||||
if instance == nil {
|
||||
// Fabricate a closed channel so it won't block forever.
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
return instance.stoppedCh
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
|
@ -100,15 +121,16 @@ func (m *LeaderRoutineManager) Stop(name string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if !instance.running {
|
||||
return nil
|
||||
if !instance.running() {
|
||||
return instance
|
||||
}
|
||||
|
||||
m.logger.Debug("stopping routine", "routine", name)
|
||||
instance.cancel()
|
||||
|
||||
delete(m.routines, name)
|
||||
return nil
|
||||
|
||||
return instance
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) StopAll() {
|
||||
|
@ -116,7 +138,7 @@ func (m *LeaderRoutineManager) StopAll() {
|
|||
defer m.lock.Unlock()
|
||||
|
||||
for name, routine := range m.routines {
|
||||
if !routine.running {
|
||||
if !routine.running() {
|
||||
continue
|
||||
}
|
||||
m.logger.Debug("stopping routine", "routine", name)
|
||||
|
|
|
@ -36,7 +36,9 @@ func TestLeaderRoutineManager(t *testing.T) {
|
|||
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||
})
|
||||
require.NoError(t, mgr.Stop("run"))
|
||||
doneCh := mgr.Stop("run")
|
||||
require.NotNil(t, doneCh)
|
||||
<-doneCh
|
||||
|
||||
// ensure the background go routine was actually cancelled
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -51,7 +53,10 @@ func TestLeaderRoutineManager(t *testing.T) {
|
|||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||
})
|
||||
|
||||
require.NoError(t, mgr.Stop("run"))
|
||||
doneCh = mgr.Stop("run")
|
||||
require.NotNil(t, doneCh)
|
||||
<-doneCh
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue