From 53913461db2af1818e47c901604fb1a754eb4457 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 19 Mar 2019 10:06:46 -0700 Subject: [PATCH 1/7] Add config types and state store table --- agent/consul/state/service_config.go | 127 +++++++++++++++++++++++++++ agent/structs/service_config.go | 75 ++++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 agent/consul/state/service_config.go create mode 100644 agent/structs/service_config.go diff --git a/agent/consul/state/service_config.go b/agent/consul/state/service_config.go new file mode 100644 index 000000000..7dd56bf57 --- /dev/null +++ b/agent/consul/state/service_config.go @@ -0,0 +1,127 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +const ( + configTableName = "configurations" +) + +// configTableSchema returns a new table schema used to store global service +// and proxy configurations. +func configTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: configTableName, + Indexes: map[string]*memdb.IndexSchema{ + "id": &memdb.IndexSchema{ + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Kind", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Name", + Lowercase: true, + }, + }, + }, + }, + }, + } +} + +func init() { + registerSchema(configTableSchema) +} + +// Configurations is used to pull all the configurations for the snapshot. +func (s *Snapshot) Configurations() ([]structs.Configuration, error) { + ixns, err := s.tx.Get(configTableName, "id") + if err != nil { + return nil, err + } + + var ret []structs.Configuration + for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { + ret = append(ret, wrapped.(structs.Configuration)) + } + + return ret, nil +} + +// Configuration is used when restoring from a snapshot. +func (s *Restore) Configuration(c structs.Configuration) error { + // Insert + if err := s.tx.Insert(configTableName, c); err != nil { + return fmt.Errorf("failed restoring configuration object: %s", err) + } + if err := indexUpdateMaxTxn(s.tx, c.ModifyIndex, configTableName); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + +// EnsureConfiguration is called to upsert creation of a given configuration. +func (s *Store) EnsureConfiguration(idx uint64, conf structs.Configuration) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Does it make sense to validate here? We do this for service meta in the state store + // but could also do this in RPC endpoint. More version compatibility that way? + if err := conf.Validate(); err != nil { + return fmt.Errorf("failed validating config: %v", err) + } + + // Check for existing configuration. + existing, err := tx.First("configurations", "id", conf.GetKind(), conf.GetName()) + if err != nil { + return fmt.Errorf("failed configuration lookup: %s", err) + } + + if existing != nil { + conf.CreateIndex = serviceNode.CreateIndex + conf.ModifyIndex = serviceNode.ModifyIndex + } else { + conf.CreateIndex = idx + } + conf.ModifyIndex = idx + + // Insert the configuration and update the index + if err := tx.Insert("configurations", conf); err != nil { + return fmt.Errorf("failed inserting service: %s", err) + } + if err := tx.Insert("index", &IndexEntry{"configurations", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Commit() + return nil +} + +// Configuration is called to get a given configuration. +func (s *Store) Configuration(idx uint64, kind structs.ConfigurationKind, name string) (structs.Configuration, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get the existing configuration. + existing, err := tx.First("configurations", "id", kind, name) + if err != nil { + return nil, fmt.Errorf("failed configuration lookup: %s", err) + } + + conf, ok := existing.(structs.Configuration) + if !ok { + return nil, fmt.Errorf("configuration %q (%s) is an invalid type: %T", name, kind, conf) + } + + return conf, nil +} diff --git a/agent/structs/service_config.go b/agent/structs/service_config.go new file mode 100644 index 000000000..1a6510818 --- /dev/null +++ b/agent/structs/service_config.go @@ -0,0 +1,75 @@ +package structs + +type ConfigurationKind string + +const ( + ServiceDefaults ConfigurationKind = "service-defaults" + ProxyDefaults ConfigurationKind = "proxy-defaults" +) + +// Should this be an interface or a switch on the existing config types? +type Configuration interface { + GetKind() ConfigurationKind + GetName() string + Validate() error +} + +// ServiceConfiguration is the top-level struct for the configuration of a service +// across the entire cluster. +type ServiceConfiguration struct { + Kind ConfigurationKind + Name string + Protocol string + Connect ConnectConfiguration + ServiceDefinitionDefaults ServiceDefinitionDefaults + + RaftIndex +} + +func (s *ServiceConfiguration) GetKind() ConfigurationKind { + return ServiceDefaults +} + +type ConnectConfiguration struct { + SidecarProxy bool +} + +type ServiceDefinitionDefaults struct { + EnableTagOverride bool + + // Non script/docker checks only + Check *HealthCheck + Checks HealthChecks + + // Kind is allowed to accommodate non-sidecar proxies but it will be an error + // if they also set Connect.DestinationServiceID since sidecars are + // configured via their associated service's config. + Kind ServiceKind + + // Only DestinationServiceName and Config are supported. + Proxy ConnectProxyConfig + + Connect ServiceConnect + + Weights Weights + + // DisableDirectDiscovery is a field that marks the service instance as + // not discoverable. This is useful in two cases: + // 1. Truly headless services like job workers that still need Connect + // sidecars to connect to upstreams. + // 2. Connect applications that expose services only through their sidecar + // and so discovery of their IP/port is meaningless since they can't be + // connected to by that means. + DisableDirectDiscovery bool +} + +// ProxyConfiguration is the top-level struct for global proxy configuration defaults. +type ProxyConfiguration struct { + Kind ConfigurationKind + Name string + ProxyConfig ConnectProxyConfig +} + +func (p *ProxyConfiguration) GetKind() ConfigurationKind { + return ProxyDefaults +} From 9df597b2573c94e32d04333edb44c3f0cf5794e4 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 19 Mar 2019 15:56:17 -0700 Subject: [PATCH 2/7] Fill out state store/FSM functions and add tests --- agent/consul/fsm/commands_oss.go | 20 +++ agent/consul/fsm/commands_oss_test.go | 36 ++++ agent/consul/state/config_entry.go | 157 ++++++++++++++++++ agent/consul/state/config_entry_test.go | 76 +++++++++ agent/consul/state/service_config.go | 127 -------------- .../{service_config.go => config_entry.go} | 61 +++++-- agent/structs/structs.go | 1 + 7 files changed, 337 insertions(+), 141 deletions(-) create mode 100644 agent/consul/state/config_entry.go create mode 100644 agent/consul/state/config_entry_test.go delete mode 100644 agent/consul/state/service_config.go rename agent/structs/{service_config.go => config_entry.go} (58%) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 0e4b972f7..ac503fd5b 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -29,6 +29,7 @@ func init() { registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) + registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -428,3 +429,22 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{ return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) } + +func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { + var req structs.ConfigEntryRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + switch req.Op { + case structs.ConfigEntryUpsert: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + []metrics.Label{{Name: "op", Value: "upsert"}}) + return c.state.EnsureConfigEntry(index, req.Entry) + case structs.ConfigEntryDelete: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + []metrics.Label{{Name: "op", Value: "delete"}}) + return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName()) + default: + return fmt.Errorf("invalid config entry operation type: %v", req.Op) + } +} diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 6778a3f7e..d8a98644e 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -1355,3 +1355,39 @@ func TestFSM_CABuiltinProvider(t *testing.T) { assert.Equal(expected, state) } } + +func TestFSM_ConfigEntry(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + fsm, err := New(nil, os.Stderr) + assert.Nil(err) + + // Roots + entry := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + ProxyConfig: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + }, + } + + // Create a new request. + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Entry: entry, + } + + { + buf, err := structs.Encode(structs.ConfigEntryRequestType, req) + assert.Nil(err) + assert.True(fsm.Apply(makeLog(buf)).(bool)) + } + + // Verify it's in the state store. + { + _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") + assert.Nil(err) + assert.Equal(entry, config) + } +} diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go new file mode 100644 index 000000000..0a1e2a70a --- /dev/null +++ b/agent/consul/state/config_entry.go @@ -0,0 +1,157 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +const ( + configTableName = "config-entries" +) + +// configTableSchema returns a new table schema used to store global service +// and proxy configurations. +func configTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: configTableName, + Indexes: map[string]*memdb.IndexSchema{ + "id": &memdb.IndexSchema{ + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Kind", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Name", + Lowercase: true, + }, + }, + }, + }, + }, + } +} + +func init() { + registerSchema(configTableSchema) +} + +// ConfigEntries is used to pull all the config entries for the snapshot. +func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { + ixns, err := s.tx.Get(configTableName, "id") + if err != nil { + return nil, err + } + + var ret []structs.ConfigEntry + for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { + ret = append(ret, wrapped.(structs.ConfigEntry)) + } + + return ret, nil +} + +// Configuration is used when restoring from a snapshot. +func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { + // Insert + if err := s.tx.Insert(configTableName, c); err != nil { + return fmt.Errorf("failed restoring config entry object: %s", err) + } + if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + +// Configuration is called to get a given config entry. +func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Get the existing config entry. + existing, err := tx.First(configTableName, "id", kind, name) + if err != nil { + return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) + } + if existing == nil { + return 0, nil, nil + } + + conf, ok := existing.(structs.ConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("config entry %q (%s) is an invalid type: %T", name, kind, conf) + } + + return idx, conf, nil +} + +// EnsureConfigEntry is called to upsert creation of a given config entry. +func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Check for existing configuration. + existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) + if err != nil { + return fmt.Errorf("failed configuration lookup: %s", err) + } + + raftIndex := conf.GetRaftIndex() + if existing != nil { + existingIdx := existing.(structs.ConfigEntry).GetRaftIndex() + raftIndex.CreateIndex = existingIdx.CreateIndex + raftIndex.ModifyIndex = existingIdx.ModifyIndex + } else { + raftIndex.CreateIndex = idx + } + raftIndex.ModifyIndex = idx + + // Insert the config entry and update the index + if err := tx.Insert(configTableName, conf); err != nil { + return fmt.Errorf("failed inserting config entry: %s", err) + } + if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Commit() + return nil +} + +func (s *Store) DeleteConfigEntry(kind, name string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Try to retrieve the existing health check. + existing, err := tx.First(configTableName, "id", kind, name) + if err != nil { + return fmt.Errorf("failed config entry lookup: %s", err) + } + if existing == nil { + return nil + } + + // Delete the config entry from the DB and update the index. + if err := tx.Delete(configTableName, existing); err != nil { + return fmt.Errorf("failed removing check: %s", err) + } + if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Commit() + return nil +} diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go new file mode 100644 index 000000000..7c8a1d0b3 --- /dev/null +++ b/agent/consul/state/config_entry_test.go @@ -0,0 +1,76 @@ +package state + +import ( + "reflect" + "testing" + + "github.com/hashicorp/consul/agent/structs" +) + +func TestStore_ConfigEntry(t *testing.T) { + s := testStateStore(t) + + expected := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + ProxyConfig: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + }, + } + + // Create + if err := s.EnsureConfigEntry(0, expected); err != nil { + t.Fatal(err) + } + + { + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + if err != nil { + t.Fatal(err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if !reflect.DeepEqual(expected, config) { + t.Fatalf("bad: %#v, %#v", expected, config) + } + } + + // Update + updated := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + ProxyConfig: structs.ConnectProxyConfig{ + DestinationServiceName: "bar", + }, + } + if err := s.EnsureConfigEntry(1, updated); err != nil { + t.Fatal(err) + } + + { + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + if err != nil { + t.Fatal(err) + } + if idx != 1 { + t.Fatalf("bad: %d", idx) + } + if !reflect.DeepEqual(updated, config) { + t.Fatalf("bad: %#v, %#v", updated, config) + } + } + + // Delete + if err := s.DeleteConfigEntry(structs.ProxyDefaults, "global"); err != nil { + t.Fatal(err) + } + + _, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + if err != nil { + t.Fatal(err) + } + if config != nil { + t.Fatalf("config should be deleted: %v", config) + } +} diff --git a/agent/consul/state/service_config.go b/agent/consul/state/service_config.go deleted file mode 100644 index 7dd56bf57..000000000 --- a/agent/consul/state/service_config.go +++ /dev/null @@ -1,127 +0,0 @@ -package state - -import ( - "fmt" - - "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" -) - -const ( - configTableName = "configurations" -) - -// configTableSchema returns a new table schema used to store global service -// and proxy configurations. -func configTableSchema() *memdb.TableSchema { - return &memdb.TableSchema{ - Name: configTableName, - Indexes: map[string]*memdb.IndexSchema{ - "id": &memdb.IndexSchema{ - Name: "id", - AllowMissing: false, - Unique: true, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Kind", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "Name", - Lowercase: true, - }, - }, - }, - }, - }, - } -} - -func init() { - registerSchema(configTableSchema) -} - -// Configurations is used to pull all the configurations for the snapshot. -func (s *Snapshot) Configurations() ([]structs.Configuration, error) { - ixns, err := s.tx.Get(configTableName, "id") - if err != nil { - return nil, err - } - - var ret []structs.Configuration - for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { - ret = append(ret, wrapped.(structs.Configuration)) - } - - return ret, nil -} - -// Configuration is used when restoring from a snapshot. -func (s *Restore) Configuration(c structs.Configuration) error { - // Insert - if err := s.tx.Insert(configTableName, c); err != nil { - return fmt.Errorf("failed restoring configuration object: %s", err) - } - if err := indexUpdateMaxTxn(s.tx, c.ModifyIndex, configTableName); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - return nil -} - -// EnsureConfiguration is called to upsert creation of a given configuration. -func (s *Store) EnsureConfiguration(idx uint64, conf structs.Configuration) error { - tx := s.db.Txn(true) - defer tx.Abort() - - // Does it make sense to validate here? We do this for service meta in the state store - // but could also do this in RPC endpoint. More version compatibility that way? - if err := conf.Validate(); err != nil { - return fmt.Errorf("failed validating config: %v", err) - } - - // Check for existing configuration. - existing, err := tx.First("configurations", "id", conf.GetKind(), conf.GetName()) - if err != nil { - return fmt.Errorf("failed configuration lookup: %s", err) - } - - if existing != nil { - conf.CreateIndex = serviceNode.CreateIndex - conf.ModifyIndex = serviceNode.ModifyIndex - } else { - conf.CreateIndex = idx - } - conf.ModifyIndex = idx - - // Insert the configuration and update the index - if err := tx.Insert("configurations", conf); err != nil { - return fmt.Errorf("failed inserting service: %s", err) - } - if err := tx.Insert("index", &IndexEntry{"configurations", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - tx.Commit() - return nil -} - -// Configuration is called to get a given configuration. -func (s *Store) Configuration(idx uint64, kind structs.ConfigurationKind, name string) (structs.Configuration, error) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Get the existing configuration. - existing, err := tx.First("configurations", "id", kind, name) - if err != nil { - return nil, fmt.Errorf("failed configuration lookup: %s", err) - } - - conf, ok := existing.(structs.Configuration) - if !ok { - return nil, fmt.Errorf("configuration %q (%s) is an invalid type: %T", name, kind, conf) - } - - return conf, nil -} diff --git a/agent/structs/service_config.go b/agent/structs/config_entry.go similarity index 58% rename from agent/structs/service_config.go rename to agent/structs/config_entry.go index 1a6510818..0d070e119 100644 --- a/agent/structs/service_config.go +++ b/agent/structs/config_entry.go @@ -1,23 +1,26 @@ package structs -type ConfigurationKind string - const ( - ServiceDefaults ConfigurationKind = "service-defaults" - ProxyDefaults ConfigurationKind = "proxy-defaults" + ServiceDefaults string = "service-defaults" + ProxyDefaults string = "proxy-defaults" ) -// Should this be an interface or a switch on the existing config types? -type Configuration interface { - GetKind() ConfigurationKind +// ConfigEntry is the +type ConfigEntry interface { + GetKind() string GetName() string + + // This is called in the RPC endpoint and can apply defaults + Normalize() error Validate() error + + GetRaftIndex() *RaftIndex } // ServiceConfiguration is the top-level struct for the configuration of a service // across the entire cluster. -type ServiceConfiguration struct { - Kind ConfigurationKind +type ServiceConfigEntry struct { + Kind string Name string Protocol string Connect ConnectConfiguration @@ -26,7 +29,7 @@ type ServiceConfiguration struct { RaftIndex } -func (s *ServiceConfiguration) GetKind() ConfigurationKind { +func (s *ServiceConfigEntry) GetKind() string { return ServiceDefaults } @@ -63,13 +66,43 @@ type ServiceDefinitionDefaults struct { DisableDirectDiscovery bool } -// ProxyConfiguration is the top-level struct for global proxy configuration defaults. -type ProxyConfiguration struct { - Kind ConfigurationKind +// ProxyConfigEntry is the top-level struct for global proxy configuration defaults. +type ProxyConfigEntry struct { + Kind string Name string ProxyConfig ConnectProxyConfig + + RaftIndex } -func (p *ProxyConfiguration) GetKind() ConfigurationKind { +func (p *ProxyConfigEntry) GetKind() string { return ProxyDefaults } + +func (p *ProxyConfigEntry) GetName() string { + return p.Name +} + +func (p *ProxyConfigEntry) Normalize() error { + return nil +} + +func (p *ProxyConfigEntry) Validate() error { + return nil +} + +func (p *ProxyConfigEntry) GetRaftIndex() *RaftIndex { + return &p.RaftIndex +} + +type ConfigEntryOp string + +const ( + ConfigEntryUpsert ConfigEntryOp = "upsert" + ConfigEntryDelete ConfigEntryOp = "delete" +) + +type ConfigEntryRequest struct { + Op ConfigEntryOp + Entry ConfigEntry +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index f3c3a024b..56d86ab52 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -55,6 +55,7 @@ const ( ACLPolicySetRequestType = 19 ACLPolicyDeleteRequestType = 20 ConnectCALeafRequestType = 21 + ConfigEntryRequestType = 22 ) const ( From c2cba6804288c0b30939d091b74f7c91bd847e56 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 20 Mar 2019 16:13:13 -0700 Subject: [PATCH 3/7] Fix fsm serialization and add snapshot/restore --- agent/consul/fsm/commands_oss.go | 30 +++++++++++++--- agent/consul/fsm/commands_oss_test.go | 4 ++- agent/consul/fsm/snapshot_oss.go | 49 +++++++++++++++++++++++++++ agent/consul/fsm/snapshot_oss_test.go | 22 ++++++++++++ agent/consul/state/config_entry.go | 25 ++++++++++++-- agent/structs/config_entry.go | 32 ++++++++++++----- agent/structs/structs.go | 48 +++++++++++++------------- 7 files changed, 171 insertions(+), 39 deletions(-) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index ac503fd5b..9061cc696 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -29,7 +29,8 @@ func init() { registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) - registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) + registerCommand(structs.ServiceConfigEntryRequestType, (*FSM).applyServiceConfigEntryOperation) + registerCommand(structs.ProxyConfigEntryRequestType, (*FSM).applyProxyConfigEntryOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -430,18 +431,37 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{ return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) } -func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { - var req structs.ConfigEntryRequest +func (c *FSM) applyServiceConfigEntryOperation(buf []byte, index uint64) interface{} { + req := structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{}, + } if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + return c.applyConfigEntryOperation(index, req) +} + +func (c *FSM) applyProxyConfigEntryOperation(buf []byte, index uint64) interface{} { + req := structs.ConfigEntryRequest{ + Entry: &structs.ProxyConfigEntry{}, + } + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + if err := c.applyConfigEntryOperation(index, req); err != nil { + panic(err) + } + return true +} + +func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryRequest) error { switch req.Op { case structs.ConfigEntryUpsert: - defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), []metrics.Label{{Name: "op", Value: "upsert"}}) return c.state.EnsureConfigEntry(index, req.Entry) case structs.ConfigEntryDelete: - defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), []metrics.Label{{Name: "op", Value: "delete"}}) return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName()) default: diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index d8a98644e..67b70aa05 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -1379,7 +1379,7 @@ func TestFSM_ConfigEntry(t *testing.T) { } { - buf, err := structs.Encode(structs.ConfigEntryRequestType, req) + buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req) assert.Nil(err) assert.True(fsm.Apply(makeLog(buf)).(bool)) } @@ -1388,6 +1388,8 @@ func TestFSM_ConfigEntry(t *testing.T) { { _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") assert.Nil(err) + entry.RaftIndex.CreateIndex = 1 + entry.RaftIndex.ModifyIndex = 1 assert.Equal(entry, config) } } diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 00adfa986..02be4c17e 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -1,6 +1,8 @@ package fsm import ( + "fmt" + "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" @@ -27,6 +29,7 @@ func init() { registerRestorer(structs.IndexRequestType, restoreIndex) registerRestorer(structs.ACLTokenSetRequestType, restoreToken) registerRestorer(structs.ACLPolicySetRequestType, restorePolicy) + registerRestorer(structs.ConfigEntryRequestType, restoreConfigEntry) } func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error { @@ -63,6 +66,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err if err := s.persistConnectCAConfig(sink, encoder); err != nil { return err } + if err := s.persistConfigEntries(sink, encoder); err != nil { + return err + } if err := s.persistIndex(sink, encoder); err != nil { return err } @@ -360,6 +366,27 @@ func (s *snapshot) persistIntentions(sink raft.SnapshotSink, return nil } +func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + entries, err := s.state.ConfigEntries() + if err != nil { + return err + } + + for _, entry := range entries { + if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil { + return err + } + if err := encoder.Encode(entry.GetKind()); err != nil { + return err + } + if err := encoder.Encode(entry); err != nil { + return err + } + } + return nil +} + func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the indexes iter, err := s.state.Indexes() @@ -565,3 +592,25 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code } return restore.ACLPolicy(&req) } + +func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.ConfigEntry + var kind string + if err := decoder.Decode(&kind); err != nil { + return err + } + + switch kind { + case structs.ServiceDefaults: + req = &structs.ServiceConfigEntry{} + case structs.ProxyDefaults: + req = &structs.ProxyConfigEntry{} + default: + return fmt.Errorf("invalid config type: %s", kind) + } + + if err := decoder.Decode(&req); err != nil { + return err + } + return restore.ConfigEntry(req) +} diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 1d80e3965..685c10374 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -202,6 +202,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { err = fsm.state.CASetConfig(17, caConfig) assert.Nil(err) + // Config entries + serviceConfig := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + } + proxyConfig := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + } + assert.Nil(fsm.state.EnsureConfigEntry(18, serviceConfig)) + assert.Nil(fsm.state.EnsureConfigEntry(19, proxyConfig)) + // Snapshot snap, err := fsm.Snapshot() if err != nil { @@ -388,6 +401,15 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { assert.Nil(err) assert.Equal(caConfig, caConf) + // Verify config entries are restored + _, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo") + assert.Nil(err) + assert.Equal(serviceConfig, serviceConfEntry) + + _, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global") + assert.Nil(err) + assert.Equal(proxyConfig, proxyConfEntry) + // Snapshot snap, err = fsm2.Snapshot() if err != nil { diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 0a1e2a70a..c3cc96adb 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -57,7 +57,7 @@ func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { return ret, nil } -// Configuration is used when restoring from a snapshot. +// ConfigEntry is used when restoring from a snapshot. func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { // Insert if err := s.tx.Insert(configTableName, c); err != nil { @@ -70,7 +70,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { return nil } -// Configuration is called to get a given config entry. +// ConfigEntry is called to get a given config entry. func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -95,6 +95,27 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err return idx, conf, nil } +// ConfigEntries is called to get all config entry objects. +func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Get all + iter, err := tx.Get(configTableName, "id") + if err != nil { + return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) + } + + var results []structs.ConfigEntry + for v := iter.Next(); v != nil; v = iter.Next() { + results = append(results, v.(structs.ConfigEntry)) + } + return idx, results, nil +} + // EnsureConfigEntry is called to upsert creation of a given config entry. func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { tx := s.db.Txn(true) diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 0d070e119..21b12ce78 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -29,10 +29,26 @@ type ServiceConfigEntry struct { RaftIndex } -func (s *ServiceConfigEntry) GetKind() string { +func (e *ServiceConfigEntry) GetKind() string { return ServiceDefaults } +func (e *ServiceConfigEntry) GetName() string { + return e.Name +} + +func (e *ServiceConfigEntry) Normalize() error { + return nil +} + +func (e *ServiceConfigEntry) Validate() error { + return nil +} + +func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex { + return &e.RaftIndex +} + type ConnectConfiguration struct { SidecarProxy bool } @@ -75,24 +91,24 @@ type ProxyConfigEntry struct { RaftIndex } -func (p *ProxyConfigEntry) GetKind() string { +func (e *ProxyConfigEntry) GetKind() string { return ProxyDefaults } -func (p *ProxyConfigEntry) GetName() string { - return p.Name +func (e *ProxyConfigEntry) GetName() string { + return e.Name } -func (p *ProxyConfigEntry) Normalize() error { +func (e *ProxyConfigEntry) Normalize() error { return nil } -func (p *ProxyConfigEntry) Validate() error { +func (e *ProxyConfigEntry) Validate() error { return nil } -func (p *ProxyConfigEntry) GetRaftIndex() *RaftIndex { - return &p.RaftIndex +func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex { + return &e.RaftIndex } type ConfigEntryOp string diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 56d86ab52..fd6532633 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -33,29 +33,31 @@ type RaftIndex struct { // These are serialized between Consul servers and stored in Consul snapshots, // so entries must only ever be added. const ( - RegisterRequestType MessageType = 0 - DeregisterRequestType = 1 - KVSRequestType = 2 - SessionRequestType = 3 - ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) - TombstoneRequestType = 5 - CoordinateBatchUpdateType = 6 - PreparedQueryRequestType = 7 - TxnRequestType = 8 - AutopilotRequestType = 9 - AreaRequestType = 10 - ACLBootstrapRequestType = 11 - IntentionRequestType = 12 - ConnectCARequestType = 13 - ConnectCAProviderStateType = 14 - ConnectCAConfigType = 15 // FSM snapshots only. - IndexRequestType = 16 // FSM snapshots only. - ACLTokenSetRequestType = 17 - ACLTokenDeleteRequestType = 18 - ACLPolicySetRequestType = 19 - ACLPolicyDeleteRequestType = 20 - ConnectCALeafRequestType = 21 - ConfigEntryRequestType = 22 + RegisterRequestType MessageType = 0 + DeregisterRequestType = 1 + KVSRequestType = 2 + SessionRequestType = 3 + ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) + TombstoneRequestType = 5 + CoordinateBatchUpdateType = 6 + PreparedQueryRequestType = 7 + TxnRequestType = 8 + AutopilotRequestType = 9 + AreaRequestType = 10 + ACLBootstrapRequestType = 11 + IntentionRequestType = 12 + ConnectCARequestType = 13 + ConnectCAProviderStateType = 14 + ConnectCAConfigType = 15 // FSM snapshots only. + IndexRequestType = 16 // FSM snapshots only. + ACLTokenSetRequestType = 17 + ACLTokenDeleteRequestType = 18 + ACLPolicySetRequestType = 19 + ACLPolicyDeleteRequestType = 20 + ConnectCALeafRequestType = 21 + ConfigEntryRequestType = 22 // FSM snapshots only. + ServiceConfigEntryRequestType = 23 + ProxyConfigEntryRequestType = 24 ) const ( From 7aa1e14b18c3f6bedf23e3ca565ba36d311fd7e7 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 22 Mar 2019 09:25:37 -0700 Subject: [PATCH 4/7] Add some basic normalize/validation logic for config entries --- agent/structs/config_entry.go | 40 +++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 21b12ce78..58a7a325a 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -1,8 +1,12 @@ package structs +import "fmt" + const ( ServiceDefaults string = "service-defaults" ProxyDefaults string = "proxy-defaults" + + ProxyConfigGlobal string = "global" ) // ConfigEntry is the @@ -34,10 +38,20 @@ func (e *ServiceConfigEntry) GetKind() string { } func (e *ServiceConfigEntry) GetName() string { + if e == nil { + return "" + } + return e.Name } func (e *ServiceConfigEntry) Normalize() error { + if e == nil { + return fmt.Errorf("config entry is nil") + } + + e.Kind = ServiceDefaults + return nil } @@ -46,6 +60,10 @@ func (e *ServiceConfigEntry) Validate() error { } func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex { + if e == nil { + return &RaftIndex{} + } + return &e.RaftIndex } @@ -96,18 +114,40 @@ func (e *ProxyConfigEntry) GetKind() string { } func (e *ProxyConfigEntry) GetName() string { + if e == nil { + return "" + } + return e.Name } func (e *ProxyConfigEntry) Normalize() error { + if e == nil { + return fmt.Errorf("config entry is nil") + } + + e.Kind = ProxyDefaults + return nil } func (e *ProxyConfigEntry) Validate() error { + if e == nil { + return fmt.Errorf("config entry is nil") + } + + if e.Name != ProxyConfigGlobal { + return fmt.Errorf("invalid name (%q), only %q is supported", e.Name, ProxyConfigGlobal) + } + return nil } func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex { + if e == nil { + return &RaftIndex{} + } + return &e.RaftIndex } From 96a460c0cfad0bb60fd2d89275ea8979440561f7 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 27 Mar 2019 16:52:38 -0700 Subject: [PATCH 5/7] Clean up service config state store methods --- agent/consul/fsm/commands_oss.go | 2 +- agent/consul/fsm/commands_oss_test.go | 15 +-- agent/consul/fsm/snapshot_oss_test.go | 43 +++---- agent/consul/state/config_entry.go | 94 ++++++++++++--- agent/consul/state/config_entry_test.go | 148 +++++++++++++++++------- agent/structs/config_entry.go | 29 ++--- 6 files changed, 229 insertions(+), 102 deletions(-) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 9061cc696..492c3ccc5 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -463,7 +463,7 @@ func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryReq case structs.ConfigEntryDelete: defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), []metrics.Label{{Name: "op", Value: "delete"}}) - return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName()) + return c.state.DeleteConfigEntry(index, req.Entry.GetKind(), req.Entry.GetName()) default: return fmt.Errorf("invalid config entry operation type: %v", req.Op) } diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 67b70aa05..438fc8172 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -18,6 +18,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func generateUUID() (ret string) { @@ -1359,11 +1360,11 @@ func TestFSM_CABuiltinProvider(t *testing.T) { func TestFSM_ConfigEntry(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) fsm, err := New(nil, os.Stderr) - assert.Nil(err) + require.NoError(err) - // Roots + // Create a simple config entry entry := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", @@ -1380,16 +1381,16 @@ func TestFSM_ConfigEntry(t *testing.T) { { buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req) - assert.Nil(err) - assert.True(fsm.Apply(makeLog(buf)).(bool)) + require.NoError(err) + require.True(fsm.Apply(makeLog(buf)).(bool)) } // Verify it's in the state store. { _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") - assert.Nil(err) + require.NoError(err) entry.RaftIndex.CreateIndex = 1 entry.RaftIndex.ModifyIndex = 1 - assert.Equal(entry, config) + require.Equal(entry, config) } } diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 685c10374..9ac5cf0ba 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -22,6 +22,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) @@ -46,8 +47,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { "testMeta": "testing123", }, } - assert.NoError(fsm.state.EnsureNode(1, node1)) - assert.NoError(fsm.state.EnsureNode(2, node2)) + require.NoError(fsm.state.EnsureNode(1, node1)) + require.NoError(fsm.state.EnsureNode(2, node2)) // Add a service instance with Connect config. connectConf := structs.ServiceConnect{ @@ -93,7 +94,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { Syntax: acl.SyntaxCurrent, } policy.SetHash(true) - require.NoError(t, fsm.state.ACLPolicySet(1, &policy)) + require.NoError(fsm.state.ACLPolicySet(1, &policy)) token := &structs.ACLToken{ AccessorID: "30fca056-9fbb-4455-b94a-bf0e2bc575d6", @@ -109,7 +110,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // DEPRECATED (ACL-Legacy-Compat) - This is used so that the bootstrap token is still visible via the v1 acl APIs Type: structs.ACLTokenTypeManagement, } - require.NoError(t, fsm.state.ACLBootstrap(10, 0, token, false)) + require.NoError(fsm.state.ACLBootstrap(10, 0, token, false)) fsm.state.KVSSet(11, &structs.DirEntry{ Key: "/remove", @@ -168,7 +169,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { CreateIndex: 14, ModifyIndex: 14, } - assert.Nil(fsm.state.IntentionSet(14, ixn)) + require.NoError(fsm.state.IntentionSet(14, ixn)) // CA Roots roots := []*structs.CARoot{ @@ -179,7 +180,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { r.Active = false } ok, err := fsm.state.CARootSetCAS(15, 0, roots) - assert.Nil(err) + require.NoError(err) assert.True(ok) ok, err = fsm.state.CASetProviderState(16, &structs.CAConsulProviderState{ @@ -187,7 +188,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { PrivateKey: "foo", RootCert: "bar", }) - assert.Nil(err) + require.NoError(err) assert.True(ok) // CA Config @@ -200,7 +201,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { }, } err = fsm.state.CASetConfig(17, caConfig) - assert.Nil(err) + require.NoError(err) // Config entries serviceConfig := &structs.ServiceConfigEntry{ @@ -212,8 +213,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { Kind: structs.ProxyDefaults, Name: "global", } - assert.Nil(fsm.state.EnsureConfigEntry(18, serviceConfig)) - assert.Nil(fsm.state.EnsureConfigEntry(19, proxyConfig)) + require.NoError(fsm.state.EnsureConfigEntry(18, serviceConfig)) + require.NoError(fsm.state.EnsureConfigEntry(19, proxyConfig)) // Snapshot snap, err := fsm.Snapshot() @@ -315,19 +316,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Verify ACL Token is restored _, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID) - require.NoError(t, err) + require.NoError(err) require.Equal(t, token.AccessorID, a.AccessorID) require.Equal(t, token.ModifyIndex, a.ModifyIndex) // Verify the acl-token-bootstrap index was restored canBootstrap, index, err := fsm2.state.CanBootstrapACLToken() - require.False(t, canBootstrap) - require.True(t, index > 0) + require.False(canBootstrap) + require.True(index > 0) // Verify ACL Policy is restored _, policy2, err := fsm2.state.ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID) - require.NoError(t, err) - require.Equal(t, policy.Name, policy2.Name) + require.NoError(err) + require.Equal(policy.Name, policy2.Name) // Verify tombstones are restored func() { @@ -381,33 +382,33 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Verify intentions are restored. _, ixns, err := fsm2.state.Intentions(nil) - assert.Nil(err) + require.NoError(err) assert.Len(ixns, 1) assert.Equal(ixn, ixns[0]) // Verify CA roots are restored. _, roots, err = fsm2.state.CARoots(nil) - assert.Nil(err) + require.NoError(err) assert.Len(roots, 2) // Verify provider state is restored. _, state, err := fsm2.state.CAProviderState("asdf") - assert.Nil(err) + require.NoError(err) assert.Equal("foo", state.PrivateKey) assert.Equal("bar", state.RootCert) // Verify CA configuration is restored. _, caConf, err := fsm2.state.CAConfig() - assert.Nil(err) + require.NoError(err) assert.Equal(caConfig, caConf) // Verify config entries are restored _, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo") - assert.Nil(err) + require.NoError(err) assert.Equal(serviceConfig, serviceConfEntry) _, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global") - assert.Nil(err) + require.NoError(err) assert.Equal(proxyConfig, proxyConfEntry) // Snapshot diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index c3cc96adb..d4bdaea31 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -11,8 +11,8 @@ const ( configTableName = "config-entries" ) -// configTableSchema returns a new table schema used to store global service -// and proxy configurations. +// configTableSchema returns a new table schema used to store global +// config entries. func configTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: configTableName, @@ -34,6 +34,15 @@ func configTableSchema() *memdb.TableSchema { }, }, }, + "kind": &memdb.IndexSchema{ + Name: "kind", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Kind", + Lowercase: true, + }, + }, }, } } @@ -44,13 +53,13 @@ func init() { // ConfigEntries is used to pull all the config entries for the snapshot. func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { - ixns, err := s.tx.Get(configTableName, "id") + entries, err := s.tx.Get(configTableName, "id") if err != nil { return nil, err } var ret []structs.ConfigEntry - for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { + for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() { ret = append(ret, wrapped.(structs.ConfigEntry)) } @@ -72,7 +81,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { // ConfigEntry is called to get a given config entry. func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { - tx := s.db.Txn(true) + tx := s.db.Txn(false) defer tx.Abort() // Get the index @@ -84,7 +93,7 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) } if existing == nil { - return 0, nil, nil + return idx, nil, nil } conf, ok := existing.(structs.ConfigEntry) @@ -97,14 +106,26 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err // ConfigEntries is called to get all config entry objects. func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { - tx := s.db.Txn(true) + return s.ConfigEntriesByKind("") +} + +// ConfigEntriesByKind is called to get all config entry objects with the given kind. +// If kind is empty, all config entries will be returned. +func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) { + tx := s.db.Txn(false) defer tx.Abort() // Get the index idx := maxIndexTxn(tx, configTableName) - // Get all - iter, err := tx.Get(configTableName, "id") + // Lookup by kind, or all if kind is empty + var iter memdb.ResultIterator + var err error + if kind != "" { + iter, err = tx.Get(configTableName, "kind", kind) + } else { + iter, err = tx.Get(configTableName, "id") + } if err != nil { return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) } @@ -116,11 +137,21 @@ func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { return idx, results, nil } -// EnsureConfigEntry is called to upsert creation of a given config entry. +// EnsureConfigEntry is called to do an upsert of a given config entry. func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { tx := s.db.Txn(true) defer tx.Abort() + if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { + return err + } + + tx.Commit() + return nil +} + +// ensureConfigEntryTxn upserts a config entry inside of a transaction. +func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry) error { // Check for existing configuration. existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) if err != nil { @@ -141,20 +172,51 @@ func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { if err := tx.Insert(configTableName, conf); err != nil { return fmt.Errorf("failed inserting config entry: %s", err) } - if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) + if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil { + return fmt.Errorf("failed updating index: %v", err) } - tx.Commit() return nil } -func (s *Store) DeleteConfigEntry(kind, name string) error { +// EnsureConfigEntryCAS is called to do a check-and-set upsert of a given config entry. +func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() - // Get the index - idx := maxIndexTxn(tx, configTableName) + // Check for existing configuration. + existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) + if err != nil { + return false, fmt.Errorf("failed configuration lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + var existingIdx structs.RaftIndex + if existing != nil { + existingIdx = *existing.(structs.ConfigEntry).GetRaftIndex() + } + if cidx == 0 && existing != nil { + return false, nil + } + if cidx != 0 && existing == nil { + return false, nil + } + if existing != nil && cidx != 0 && cidx != existingIdx.ModifyIndex { + return false, nil + } + + if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error { + tx := s.db.Txn(true) + defer tx.Abort() // Try to retrieve the existing health check. existing, err := tx.First(configTableName, "id", kind, name) diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index 7c8a1d0b3..bf74cf699 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1,76 +1,138 @@ package state import ( - "reflect" "testing" "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" ) func TestStore_ConfigEntry(t *testing.T) { + require := require.New(t) s := testStateStore(t) expected := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", - ProxyConfig: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", + Config: map[string]interface{}{ + "DestinationServiceName": "foo", }, } // Create - if err := s.EnsureConfigEntry(0, expected); err != nil { - t.Fatal(err) - } + require.NoError(s.EnsureConfigEntry(0, expected)) - { - idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") - if err != nil { - t.Fatal(err) - } - if idx != 0 { - t.Fatalf("bad: %d", idx) - } - if !reflect.DeepEqual(expected, config) { - t.Fatalf("bad: %#v, %#v", expected, config) - } - } + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(0), idx) + require.Equal(expected, config) // Update updated := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", - ProxyConfig: structs.ConnectProxyConfig{ - DestinationServiceName: "bar", + Config: map[string]interface{}{ + "DestinationServiceName": "bar", }, } - if err := s.EnsureConfigEntry(1, updated); err != nil { - t.Fatal(err) - } + require.NoError(s.EnsureConfigEntry(1, updated)) - { - idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") - if err != nil { - t.Fatal(err) - } - if idx != 1 { - t.Fatalf("bad: %d", idx) - } - if !reflect.DeepEqual(updated, config) { - t.Fatalf("bad: %#v, %#v", updated, config) - } - } + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(updated, config) // Delete - if err := s.DeleteConfigEntry(structs.ProxyDefaults, "global"); err != nil { - t.Fatal(err) + require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global")) + + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(2), idx) + require.Nil(config) +} + +func TestStore_ConfigEntryCAS(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + expected := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "foo", + }, } - _, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") - if err != nil { - t.Fatal(err) - } - if config != nil { - t.Fatalf("config should be deleted: %v", config) + // Create + require.NoError(s.EnsureConfigEntry(1, expected)) + + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(expected, config) + + // Update with invalid index + updated := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "bar", + }, } + ok, err := s.EnsureConfigEntryCAS(2, 99, updated) + require.False(ok) + require.NoError(err) + + // Entry should not be changed + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(expected, config) + + // Update with a valid index + ok, err = s.EnsureConfigEntryCAS(2, 1, updated) + require.True(ok) + require.NoError(err) + + // Entry should be updated + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(2), idx) + require.Equal(updated, config) +} + +func TestStore_ConfigEntries(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + // Create some config entries. + entry1 := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "test1", + } + entry2 := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "test2", + } + + require.NoError(s.EnsureConfigEntry(0, entry1)) + require.NoError(s.EnsureConfigEntry(1, entry2)) + + // Get all entries + idx, entries, err := s.ConfigEntries() + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry1, entry2}, entries) + + // Get all proxy entries + idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults) + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry1}, entries) + + // Get all service entries + idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults) + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry2}, entries) } diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 58a7a325a..ca5d430be 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -1,12 +1,17 @@ package structs -import "fmt" +import ( + "fmt" + "strings" +) const ( ServiceDefaults string = "service-defaults" ProxyDefaults string = "proxy-defaults" ProxyConfigGlobal string = "global" + + DefaultServiceProtocol = "tcp" ) // ConfigEntry is the @@ -14,7 +19,7 @@ type ConfigEntry interface { GetKind() string GetName() string - // This is called in the RPC endpoint and can apply defaults + // This is called in the RPC endpoint and can apply defaults or limits. Normalize() error Validate() error @@ -51,6 +56,11 @@ func (e *ServiceConfigEntry) Normalize() error { } e.Kind = ServiceDefaults + if e.Protocol == "" { + e.Protocol = DefaultServiceProtocol + } else { + e.Protocol = strings.ToLower(e.Protocol) + } return nil } @@ -89,22 +99,13 @@ type ServiceDefinitionDefaults struct { Connect ServiceConnect Weights Weights - - // DisableDirectDiscovery is a field that marks the service instance as - // not discoverable. This is useful in two cases: - // 1. Truly headless services like job workers that still need Connect - // sidecars to connect to upstreams. - // 2. Connect applications that expose services only through their sidecar - // and so discovery of their IP/port is meaningless since they can't be - // connected to by that means. - DisableDirectDiscovery bool } // ProxyConfigEntry is the top-level struct for global proxy configuration defaults. type ProxyConfigEntry struct { - Kind string - Name string - ProxyConfig ConnectProxyConfig + Kind string + Name string + Config map[string]interface{} RaftIndex } From ace5c7a1cb04f09e83662790dcdb9da4c00d8262 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 27 Mar 2019 23:56:35 -0700 Subject: [PATCH 6/7] Encode config entry FSM messages in a generic type --- agent/consul/fsm/commands_oss.go | 21 +-------- agent/consul/fsm/commands_oss_test.go | 21 ++++++--- agent/consul/fsm/snapshot_oss.go | 26 +++--------- agent/consul/fsm/snapshot_oss_test.go | 4 +- agent/structs/config_entry.go | 61 +++++++++++++++++++++++++++ agent/structs/structs.go | 48 ++++++++++----------- 6 files changed, 109 insertions(+), 72 deletions(-) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 492c3ccc5..1e3651cba 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -29,8 +29,7 @@ func init() { registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) - registerCommand(structs.ServiceConfigEntryRequestType, (*FSM).applyServiceConfigEntryOperation) - registerCommand(structs.ProxyConfigEntryRequestType, (*FSM).applyProxyConfigEntryOperation) + registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -431,30 +430,14 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{ return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) } -func (c *FSM) applyServiceConfigEntryOperation(buf []byte, index uint64) interface{} { - req := structs.ConfigEntryRequest{ - Entry: &structs.ServiceConfigEntry{}, - } - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - return c.applyConfigEntryOperation(index, req) -} - -func (c *FSM) applyProxyConfigEntryOperation(buf []byte, index uint64) interface{} { +func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { req := structs.ConfigEntryRequest{ Entry: &structs.ProxyConfigEntry{}, } if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := c.applyConfigEntryOperation(index, req); err != nil { - panic(err) - } - return true -} -func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryRequest) error { switch req.Op { case structs.ConfigEntryUpsert: defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 438fc8172..373d75acf 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -1368,21 +1368,24 @@ func TestFSM_ConfigEntry(t *testing.T) { entry := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", - ProxyConfig: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", + Config: map[string]interface{}{ + "DestinationServiceName": "foo", }, } // Create a new request. - req := structs.ConfigEntryRequest{ + req := &structs.ConfigEntryRequest{ Op: structs.ConfigEntryUpsert, Entry: entry, } { - buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req) + buf, err := structs.Encode(structs.ConfigEntryRequestType, req) require.NoError(err) - require.True(fsm.Apply(makeLog(buf)).(bool)) + resp := fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } } // Verify it's in the state store. @@ -1391,6 +1394,14 @@ func TestFSM_ConfigEntry(t *testing.T) { require.NoError(err) entry.RaftIndex.CreateIndex = 1 entry.RaftIndex.ModifyIndex = 1 + + proxyConf, ok := config.(*structs.ProxyConfigEntry) + require.True(ok) + + // Read the map[string]interface{} back out. + value, _ := proxyConf.Config["DestinationServiceName"].([]uint8) + proxyConf.Config["DestinationServiceName"] = structs.Uint8ToString(value) + require.Equal(entry, config) } } diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 02be4c17e..02049f6b3 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -1,8 +1,6 @@ package fsm import ( - "fmt" - "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" @@ -377,10 +375,10 @@ func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink, if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil { return err } - if err := encoder.Encode(entry.GetKind()); err != nil { - return err + req := &structs.ConfigEntryRequest{ + Entry: entry, } - if err := encoder.Encode(entry); err != nil { + if err := encoder.Encode(req); err != nil { return err } } @@ -594,23 +592,9 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code } func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { - var req structs.ConfigEntry - var kind string - if err := decoder.Decode(&kind); err != nil { - return err - } - - switch kind { - case structs.ServiceDefaults: - req = &structs.ServiceConfigEntry{} - case structs.ProxyDefaults: - req = &structs.ProxyConfigEntry{} - default: - return fmt.Errorf("invalid config type: %s", kind) - } - + var req structs.ConfigEntryRequest if err := decoder.Decode(&req); err != nil { return err } - return restore.ConfigEntry(req) + return restore.ConfigEntry(req.Entry) } diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 9ac5cf0ba..fe1f3db95 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -317,8 +317,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Verify ACL Token is restored _, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID) require.NoError(err) - require.Equal(t, token.AccessorID, a.AccessorID) - require.Equal(t, token.ModifyIndex, a.ModifyIndex) + require.Equal(token.AccessorID, a.AccessorID) + require.Equal(token.ModifyIndex, a.ModifyIndex) // Verify the acl-token-bootstrap index was restored canBootstrap, index, err := fsm2.state.CanBootstrapACLToken() diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index ca5d430be..a5c31dee7 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -3,6 +3,8 @@ package structs import ( "fmt" "strings" + + "github.com/hashicorp/go-msgpack/codec" ) const ( @@ -163,3 +165,62 @@ type ConfigEntryRequest struct { Op ConfigEntryOp Entry ConfigEntry } + +func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { + // bs will grow if needed but allocate enough to avoid reallocation in common + // case. + bs := make([]byte, 128) + enc := codec.NewEncoderBytes(&bs, msgpackHandle) + // Encode kind first + err = enc.Encode(r.Entry.GetKind()) + if err != nil { + return nil, err + } + // Then actual value using alias trick to avoid infinite recursion + type Alias ConfigEntryRequest + err = enc.Encode(struct { + *Alias + }{ + Alias: (*Alias)(r), + }) + if err != nil { + return nil, err + } + return bs, nil +} + +func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { + // First decode the kind prefix + var kind string + dec := codec.NewDecoderBytes(data, msgpackHandle) + if err := dec.Decode(&kind); err != nil { + return err + } + + // Then decode the real thing with appropriate kind of ConfigEntry + r.Entry = makeConfigEntry(kind) + + // Alias juggling to prevent infinite recursive calls back to this decode + // method. + type Alias ConfigEntryRequest + as := struct { + *Alias + }{ + Alias: (*Alias)(r), + } + if err := dec.Decode(&as); err != nil { + return err + } + return nil +} + +func makeConfigEntry(kind string) ConfigEntry { + switch kind { + case ServiceDefaults: + return &ServiceConfigEntry{} + case ProxyDefaults: + return &ProxyConfigEntry{} + default: + panic("invalid kind") + } +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index fd6532633..ace4da34f 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -33,31 +33,29 @@ type RaftIndex struct { // These are serialized between Consul servers and stored in Consul snapshots, // so entries must only ever be added. const ( - RegisterRequestType MessageType = 0 - DeregisterRequestType = 1 - KVSRequestType = 2 - SessionRequestType = 3 - ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) - TombstoneRequestType = 5 - CoordinateBatchUpdateType = 6 - PreparedQueryRequestType = 7 - TxnRequestType = 8 - AutopilotRequestType = 9 - AreaRequestType = 10 - ACLBootstrapRequestType = 11 - IntentionRequestType = 12 - ConnectCARequestType = 13 - ConnectCAProviderStateType = 14 - ConnectCAConfigType = 15 // FSM snapshots only. - IndexRequestType = 16 // FSM snapshots only. - ACLTokenSetRequestType = 17 - ACLTokenDeleteRequestType = 18 - ACLPolicySetRequestType = 19 - ACLPolicyDeleteRequestType = 20 - ConnectCALeafRequestType = 21 - ConfigEntryRequestType = 22 // FSM snapshots only. - ServiceConfigEntryRequestType = 23 - ProxyConfigEntryRequestType = 24 + RegisterRequestType MessageType = 0 + DeregisterRequestType = 1 + KVSRequestType = 2 + SessionRequestType = 3 + ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) + TombstoneRequestType = 5 + CoordinateBatchUpdateType = 6 + PreparedQueryRequestType = 7 + TxnRequestType = 8 + AutopilotRequestType = 9 + AreaRequestType = 10 + ACLBootstrapRequestType = 11 + IntentionRequestType = 12 + ConnectCARequestType = 13 + ConnectCAProviderStateType = 14 + ConnectCAConfigType = 15 // FSM snapshots only. + IndexRequestType = 16 // FSM snapshots only. + ACLTokenSetRequestType = 17 + ACLTokenDeleteRequestType = 18 + ACLPolicySetRequestType = 19 + ACLPolicyDeleteRequestType = 20 + ConnectCALeafRequestType = 21 + ConfigEntryRequestType = 22 // FSM snapshots only. ) const ( From 63c943477943f10e438b2df8731e035f4c1fcd92 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 2 Apr 2019 15:42:12 -0700 Subject: [PATCH 7/7] Cleaned up some error handling/comments around config entries --- agent/consul/fsm/commands_oss_test.go | 6 +++--- agent/consul/fsm/snapshot_oss.go | 3 +++ agent/structs/config_entry.go | 14 +++++++++----- agent/structs/structs.go | 2 +- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 373d75acf..d4e1998f1 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -1369,7 +1369,7 @@ func TestFSM_ConfigEntry(t *testing.T) { Kind: structs.ProxyDefaults, Name: "global", Config: map[string]interface{}{ - "DestinationServiceName": "foo", + "foo": "bar", }, } @@ -1399,8 +1399,8 @@ func TestFSM_ConfigEntry(t *testing.T) { require.True(ok) // Read the map[string]interface{} back out. - value, _ := proxyConf.Config["DestinationServiceName"].([]uint8) - proxyConf.Config["DestinationServiceName"] = structs.Uint8ToString(value) + value, _ := proxyConf.Config["foo"].([]uint8) + proxyConf.Config["foo"] = structs.Uint8ToString(value) require.Equal(entry, config) } diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 02049f6b3..0c7713753 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -375,6 +375,9 @@ func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink, if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil { return err } + // Encode the entry request without an operation since we don't need it for restoring. + // The request is used for its custom decoding/encoding logic around the ConfigEntry + // interface. req := &structs.ConfigEntryRequest{ Entry: entry, } diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index a5c31dee7..76f508873 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -198,7 +198,11 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { } // Then decode the real thing with appropriate kind of ConfigEntry - r.Entry = makeConfigEntry(kind) + entry, err := makeConfigEntry(kind) + if err != nil { + return err + } + r.Entry = entry // Alias juggling to prevent infinite recursive calls back to this decode // method. @@ -214,13 +218,13 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { return nil } -func makeConfigEntry(kind string) ConfigEntry { +func makeConfigEntry(kind string) (ConfigEntry, error) { switch kind { case ServiceDefaults: - return &ServiceConfigEntry{} + return &ServiceConfigEntry{}, nil case ProxyDefaults: - return &ProxyConfigEntry{} + return &ProxyConfigEntry{}, nil default: - panic("invalid kind") + return nil, fmt.Errorf("invalid config entry kind: %s", kind) } } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index ace4da34f..56d86ab52 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -55,7 +55,7 @@ const ( ACLPolicySetRequestType = 19 ACLPolicyDeleteRequestType = 20 ConnectCALeafRequestType = 21 - ConfigEntryRequestType = 22 // FSM snapshots only. + ConfigEntryRequestType = 22 ) const (