Restoring config entries updates the gateway-services table (#7811)
- Adds a new validateConfigEntryEnterprise function - Also fixes some state store tests that were failing in enterprise
This commit is contained in:
parent
a37d7a42c9
commit
a635e23f86
|
@ -244,6 +244,25 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.NoError(fsm.state.EnsureConfigEntry(18, serviceConfig, structs.DefaultEnterpriseMeta()))
|
||||
require.NoError(fsm.state.EnsureConfigEntry(19, proxyConfig, structs.DefaultEnterpriseMeta()))
|
||||
|
||||
ingress := &structs.IngressGatewayConfigEntry{
|
||||
Kind: structs.IngressGateway,
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 8080,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(fsm.state.EnsureConfigEntry(20, ingress, structs.DefaultEnterpriseMeta()))
|
||||
_, gatewayServices, err := fsm.state.GatewayServices(nil, "ingress", structs.DefaultEnterpriseMeta())
|
||||
require.NoError(err)
|
||||
|
||||
// Raft Chunking
|
||||
chunkState := &raftchunking.State{
|
||||
ChunkMap: make(raftchunking.ChunkMap),
|
||||
|
@ -593,6 +612,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.NoError(err)
|
||||
assert.Equal(proxyConfig, proxyConfEntry)
|
||||
|
||||
_, ingressRestored, err := fsm2.state.ConfigEntry(nil, structs.IngressGateway, "ingress", structs.DefaultEnterpriseMeta())
|
||||
require.NoError(err)
|
||||
assert.Equal(ingress, ingressRestored)
|
||||
|
||||
_, restoredGatewayServices, err := fsm2.state.GatewayServices(nil, "ingress", structs.DefaultEnterpriseMeta())
|
||||
require.NoError(err)
|
||||
require.Equal(gatewayServices, restoredGatewayServices)
|
||||
|
||||
newChunkState, err := fsm2.chunker.CurrentState()
|
||||
require.NoError(err)
|
||||
assert.Equal(newChunkState, chunkState)
|
||||
|
|
|
@ -96,15 +96,7 @@ func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) {
|
|||
|
||||
// ConfigEntry is used when restoring from a snapshot.
|
||||
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
|
||||
// Insert
|
||||
if err := s.tx.Insert(configTableName, c); err != nil {
|
||||
return fmt.Errorf("failed restoring config entry object: %s", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.store.insertConfigEntryWithTxn(s.tx, c.GetRaftIndex().ModifyIndex, c)
|
||||
}
|
||||
|
||||
// ConfigEntry is called to get a given config entry.
|
||||
|
@ -216,24 +208,11 @@ func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.Con
|
|||
return err // Err is already sufficiently decorated.
|
||||
}
|
||||
|
||||
// If the config entry is for a terminating or ingress gateway we update the memdb table
|
||||
// that associates gateways <-> services.
|
||||
if conf.GetKind() == structs.TerminatingGateway || conf.GetKind() == structs.IngressGateway {
|
||||
err = s.updateGatewayServices(tx, idx, conf, entMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to associate services to gateway: %v", err)
|
||||
}
|
||||
if err := s.validateConfigEntryEnterprise(tx, conf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Insert the config entry and update the index
|
||||
if err := s.insertConfigEntryWithTxn(tx, conf); err != nil {
|
||||
return fmt.Errorf("failed inserting config entry: %s", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil {
|
||||
return fmt.Errorf("failed updating index: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.insertConfigEntryWithTxn(tx, idx, conf)
|
||||
}
|
||||
|
||||
// EnsureConfigEntryCAS is called to do a check-and-set upsert of a given config entry.
|
||||
|
@ -319,6 +298,30 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) insertConfigEntryWithTxn(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry) error {
|
||||
if conf == nil {
|
||||
return fmt.Errorf("cannot insert nil config entry")
|
||||
}
|
||||
// If the config entry is for a terminating or ingress gateway we update the memdb table
|
||||
// that associates gateways <-> services.
|
||||
if conf.GetKind() == structs.TerminatingGateway || conf.GetKind() == structs.IngressGateway {
|
||||
err := s.updateGatewayServices(tx, idx, conf, conf.GetEnterpriseMeta())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to associate services to gateway: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the config entry and update the index
|
||||
if err := tx.Insert(configTableName, conf); err != nil {
|
||||
return fmt.Errorf("failed inserting config entry: %s", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil {
|
||||
return fmt.Errorf("failed updating index: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateProposedConfigEntryInGraph can be used to verify graph integrity for
|
||||
// a proposed graph create/update/delete.
|
||||
//
|
||||
|
|
|
@ -59,8 +59,8 @@ func (s *Store) firstWatchConfigEntryWithTxn(tx *memdb.Txn,
|
|||
return tx.FirstWatch(configTableName, "id", kind, name)
|
||||
}
|
||||
|
||||
func (s *Store) insertConfigEntryWithTxn(tx *memdb.Txn, conf structs.ConfigEntry) error {
|
||||
return tx.Insert(configTableName, conf)
|
||||
func (s *Store) validateConfigEntryEnterprise(tx *memdb.Txn, conf structs.ConfigEntry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getAllConfigEntriesWithTxn(tx *memdb.Txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
|
|
|
@ -1304,7 +1304,7 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
|
|||
t.Run("default to tcp", func(t *testing.T) {
|
||||
err := s.EnsureConfigEntry(0, ingress, nil)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), `service "web" has protocol "tcp"`)
|
||||
require.Contains(t, err.Error(), `has protocol "tcp"`)
|
||||
})
|
||||
|
||||
t.Run("with proxy-default", func(t *testing.T) {
|
||||
|
@ -1319,7 +1319,7 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
|
|||
|
||||
err := s.EnsureConfigEntry(1, ingress, nil)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), `service "web" has protocol "http2"`)
|
||||
require.Contains(t, err.Error(), `has protocol "http2"`)
|
||||
})
|
||||
|
||||
t.Run("with service-defaults override", func(t *testing.T) {
|
||||
|
@ -1331,7 +1331,7 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
|
|||
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
|
||||
err := s.EnsureConfigEntry(2, ingress, nil)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), `service "web" has protocol "grpc"`)
|
||||
require.Contains(t, err.Error(), `has protocol "grpc"`)
|
||||
})
|
||||
|
||||
t.Run("with service-defaults correct protocol", func(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue