diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 9061cc696..492c3ccc5 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -463,7 +463,7 @@ func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryReq case structs.ConfigEntryDelete: defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), []metrics.Label{{Name: "op", Value: "delete"}}) - return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName()) + return c.state.DeleteConfigEntry(index, req.Entry.GetKind(), req.Entry.GetName()) default: return fmt.Errorf("invalid config entry operation type: %v", req.Op) } diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 67b70aa05..438fc8172 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -18,6 +18,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func generateUUID() (ret string) { @@ -1359,11 +1360,11 @@ func TestFSM_CABuiltinProvider(t *testing.T) { func TestFSM_ConfigEntry(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) fsm, err := New(nil, os.Stderr) - assert.Nil(err) + require.NoError(err) - // Roots + // Create a simple config entry entry := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", @@ -1380,16 +1381,16 @@ func TestFSM_ConfigEntry(t *testing.T) { { buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req) - assert.Nil(err) - assert.True(fsm.Apply(makeLog(buf)).(bool)) + require.NoError(err) + require.True(fsm.Apply(makeLog(buf)).(bool)) } // Verify it's in the state store. { _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") - assert.Nil(err) + require.NoError(err) entry.RaftIndex.CreateIndex = 1 entry.RaftIndex.ModifyIndex = 1 - assert.Equal(entry, config) + require.Equal(entry, config) } } diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 685c10374..9ac5cf0ba 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -22,6 +22,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) @@ -46,8 +47,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { "testMeta": "testing123", }, } - assert.NoError(fsm.state.EnsureNode(1, node1)) - assert.NoError(fsm.state.EnsureNode(2, node2)) + require.NoError(fsm.state.EnsureNode(1, node1)) + require.NoError(fsm.state.EnsureNode(2, node2)) // Add a service instance with Connect config. connectConf := structs.ServiceConnect{ @@ -93,7 +94,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { Syntax: acl.SyntaxCurrent, } policy.SetHash(true) - require.NoError(t, fsm.state.ACLPolicySet(1, &policy)) + require.NoError(fsm.state.ACLPolicySet(1, &policy)) token := &structs.ACLToken{ AccessorID: "30fca056-9fbb-4455-b94a-bf0e2bc575d6", @@ -109,7 +110,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // DEPRECATED (ACL-Legacy-Compat) - This is used so that the bootstrap token is still visible via the v1 acl APIs Type: structs.ACLTokenTypeManagement, } - require.NoError(t, fsm.state.ACLBootstrap(10, 0, token, false)) + require.NoError(fsm.state.ACLBootstrap(10, 0, token, false)) fsm.state.KVSSet(11, &structs.DirEntry{ Key: "/remove", @@ -168,7 +169,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { CreateIndex: 14, ModifyIndex: 14, } - assert.Nil(fsm.state.IntentionSet(14, ixn)) + require.NoError(fsm.state.IntentionSet(14, ixn)) // CA Roots roots := []*structs.CARoot{ @@ -179,7 +180,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { r.Active = false } ok, err := fsm.state.CARootSetCAS(15, 0, roots) - assert.Nil(err) + require.NoError(err) assert.True(ok) ok, err = fsm.state.CASetProviderState(16, &structs.CAConsulProviderState{ @@ -187,7 +188,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { PrivateKey: "foo", RootCert: "bar", }) - assert.Nil(err) + require.NoError(err) assert.True(ok) // CA Config @@ -200,7 +201,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { }, } err = fsm.state.CASetConfig(17, caConfig) - assert.Nil(err) + require.NoError(err) // Config entries serviceConfig := &structs.ServiceConfigEntry{ @@ -212,8 +213,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { Kind: structs.ProxyDefaults, Name: "global", } - assert.Nil(fsm.state.EnsureConfigEntry(18, serviceConfig)) - assert.Nil(fsm.state.EnsureConfigEntry(19, proxyConfig)) + require.NoError(fsm.state.EnsureConfigEntry(18, serviceConfig)) + require.NoError(fsm.state.EnsureConfigEntry(19, proxyConfig)) // Snapshot snap, err := fsm.Snapshot() @@ -315,19 +316,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Verify ACL Token is restored _, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID) - require.NoError(t, err) + require.NoError(err) require.Equal(t, token.AccessorID, a.AccessorID) require.Equal(t, token.ModifyIndex, a.ModifyIndex) // Verify the acl-token-bootstrap index was restored canBootstrap, index, err := fsm2.state.CanBootstrapACLToken() - require.False(t, canBootstrap) - require.True(t, index > 0) + require.False(canBootstrap) + require.True(index > 0) // Verify ACL Policy is restored _, policy2, err := fsm2.state.ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID) - require.NoError(t, err) - require.Equal(t, policy.Name, policy2.Name) + require.NoError(err) + require.Equal(policy.Name, policy2.Name) // Verify tombstones are restored func() { @@ -381,33 +382,33 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Verify intentions are restored. _, ixns, err := fsm2.state.Intentions(nil) - assert.Nil(err) + require.NoError(err) assert.Len(ixns, 1) assert.Equal(ixn, ixns[0]) // Verify CA roots are restored. _, roots, err = fsm2.state.CARoots(nil) - assert.Nil(err) + require.NoError(err) assert.Len(roots, 2) // Verify provider state is restored. _, state, err := fsm2.state.CAProviderState("asdf") - assert.Nil(err) + require.NoError(err) assert.Equal("foo", state.PrivateKey) assert.Equal("bar", state.RootCert) // Verify CA configuration is restored. _, caConf, err := fsm2.state.CAConfig() - assert.Nil(err) + require.NoError(err) assert.Equal(caConfig, caConf) // Verify config entries are restored _, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo") - assert.Nil(err) + require.NoError(err) assert.Equal(serviceConfig, serviceConfEntry) _, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global") - assert.Nil(err) + require.NoError(err) assert.Equal(proxyConfig, proxyConfEntry) // Snapshot diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index c3cc96adb..d4bdaea31 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -11,8 +11,8 @@ const ( configTableName = "config-entries" ) -// configTableSchema returns a new table schema used to store global service -// and proxy configurations. +// configTableSchema returns a new table schema used to store global +// config entries. func configTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: configTableName, @@ -34,6 +34,15 @@ func configTableSchema() *memdb.TableSchema { }, }, }, + "kind": &memdb.IndexSchema{ + Name: "kind", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Kind", + Lowercase: true, + }, + }, }, } } @@ -44,13 +53,13 @@ func init() { // ConfigEntries is used to pull all the config entries for the snapshot. func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { - ixns, err := s.tx.Get(configTableName, "id") + entries, err := s.tx.Get(configTableName, "id") if err != nil { return nil, err } var ret []structs.ConfigEntry - for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { + for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() { ret = append(ret, wrapped.(structs.ConfigEntry)) } @@ -72,7 +81,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { // ConfigEntry is called to get a given config entry. func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { - tx := s.db.Txn(true) + tx := s.db.Txn(false) defer tx.Abort() // Get the index @@ -84,7 +93,7 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) } if existing == nil { - return 0, nil, nil + return idx, nil, nil } conf, ok := existing.(structs.ConfigEntry) @@ -97,14 +106,26 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err // ConfigEntries is called to get all config entry objects. func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { - tx := s.db.Txn(true) + return s.ConfigEntriesByKind("") +} + +// ConfigEntriesByKind is called to get all config entry objects with the given kind. +// If kind is empty, all config entries will be returned. +func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) { + tx := s.db.Txn(false) defer tx.Abort() // Get the index idx := maxIndexTxn(tx, configTableName) - // Get all - iter, err := tx.Get(configTableName, "id") + // Lookup by kind, or all if kind is empty + var iter memdb.ResultIterator + var err error + if kind != "" { + iter, err = tx.Get(configTableName, "kind", kind) + } else { + iter, err = tx.Get(configTableName, "id") + } if err != nil { return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) } @@ -116,11 +137,21 @@ func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { return idx, results, nil } -// EnsureConfigEntry is called to upsert creation of a given config entry. +// EnsureConfigEntry is called to do an upsert of a given config entry. func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { tx := s.db.Txn(true) defer tx.Abort() + if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { + return err + } + + tx.Commit() + return nil +} + +// ensureConfigEntryTxn upserts a config entry inside of a transaction. +func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry) error { // Check for existing configuration. existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) if err != nil { @@ -141,20 +172,51 @@ func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { if err := tx.Insert(configTableName, conf); err != nil { return fmt.Errorf("failed inserting config entry: %s", err) } - if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) + if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil { + return fmt.Errorf("failed updating index: %v", err) } - tx.Commit() return nil } -func (s *Store) DeleteConfigEntry(kind, name string) error { +// EnsureConfigEntryCAS is called to do a check-and-set upsert of a given config entry. +func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() - // Get the index - idx := maxIndexTxn(tx, configTableName) + // Check for existing configuration. + existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) + if err != nil { + return false, fmt.Errorf("failed configuration lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + var existingIdx structs.RaftIndex + if existing != nil { + existingIdx = *existing.(structs.ConfigEntry).GetRaftIndex() + } + if cidx == 0 && existing != nil { + return false, nil + } + if cidx != 0 && existing == nil { + return false, nil + } + if existing != nil && cidx != 0 && cidx != existingIdx.ModifyIndex { + return false, nil + } + + if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error { + tx := s.db.Txn(true) + defer tx.Abort() // Try to retrieve the existing health check. existing, err := tx.First(configTableName, "id", kind, name) diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index 7c8a1d0b3..bf74cf699 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1,76 +1,138 @@ package state import ( - "reflect" "testing" "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" ) func TestStore_ConfigEntry(t *testing.T) { + require := require.New(t) s := testStateStore(t) expected := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", - ProxyConfig: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", + Config: map[string]interface{}{ + "DestinationServiceName": "foo", }, } // Create - if err := s.EnsureConfigEntry(0, expected); err != nil { - t.Fatal(err) - } + require.NoError(s.EnsureConfigEntry(0, expected)) - { - idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") - if err != nil { - t.Fatal(err) - } - if idx != 0 { - t.Fatalf("bad: %d", idx) - } - if !reflect.DeepEqual(expected, config) { - t.Fatalf("bad: %#v, %#v", expected, config) - } - } + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(0), idx) + require.Equal(expected, config) // Update updated := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", - ProxyConfig: structs.ConnectProxyConfig{ - DestinationServiceName: "bar", + Config: map[string]interface{}{ + "DestinationServiceName": "bar", }, } - if err := s.EnsureConfigEntry(1, updated); err != nil { - t.Fatal(err) - } + require.NoError(s.EnsureConfigEntry(1, updated)) - { - idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") - if err != nil { - t.Fatal(err) - } - if idx != 1 { - t.Fatalf("bad: %d", idx) - } - if !reflect.DeepEqual(updated, config) { - t.Fatalf("bad: %#v, %#v", updated, config) - } - } + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(updated, config) // Delete - if err := s.DeleteConfigEntry(structs.ProxyDefaults, "global"); err != nil { - t.Fatal(err) + require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global")) + + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(2), idx) + require.Nil(config) +} + +func TestStore_ConfigEntryCAS(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + expected := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "foo", + }, } - _, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") - if err != nil { - t.Fatal(err) - } - if config != nil { - t.Fatalf("config should be deleted: %v", config) + // Create + require.NoError(s.EnsureConfigEntry(1, expected)) + + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(expected, config) + + // Update with invalid index + updated := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "bar", + }, } + ok, err := s.EnsureConfigEntryCAS(2, 99, updated) + require.False(ok) + require.NoError(err) + + // Entry should not be changed + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(expected, config) + + // Update with a valid index + ok, err = s.EnsureConfigEntryCAS(2, 1, updated) + require.True(ok) + require.NoError(err) + + // Entry should be updated + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(2), idx) + require.Equal(updated, config) +} + +func TestStore_ConfigEntries(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + // Create some config entries. + entry1 := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "test1", + } + entry2 := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "test2", + } + + require.NoError(s.EnsureConfigEntry(0, entry1)) + require.NoError(s.EnsureConfigEntry(1, entry2)) + + // Get all entries + idx, entries, err := s.ConfigEntries() + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry1, entry2}, entries) + + // Get all proxy entries + idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults) + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry1}, entries) + + // Get all service entries + idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults) + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry2}, entries) } diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 58a7a325a..ca5d430be 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -1,12 +1,17 @@ package structs -import "fmt" +import ( + "fmt" + "strings" +) const ( ServiceDefaults string = "service-defaults" ProxyDefaults string = "proxy-defaults" ProxyConfigGlobal string = "global" + + DefaultServiceProtocol = "tcp" ) // ConfigEntry is the @@ -14,7 +19,7 @@ type ConfigEntry interface { GetKind() string GetName() string - // This is called in the RPC endpoint and can apply defaults + // This is called in the RPC endpoint and can apply defaults or limits. Normalize() error Validate() error @@ -51,6 +56,11 @@ func (e *ServiceConfigEntry) Normalize() error { } e.Kind = ServiceDefaults + if e.Protocol == "" { + e.Protocol = DefaultServiceProtocol + } else { + e.Protocol = strings.ToLower(e.Protocol) + } return nil } @@ -89,22 +99,13 @@ type ServiceDefinitionDefaults struct { Connect ServiceConnect Weights Weights - - // DisableDirectDiscovery is a field that marks the service instance as - // not discoverable. This is useful in two cases: - // 1. Truly headless services like job workers that still need Connect - // sidecars to connect to upstreams. - // 2. Connect applications that expose services only through their sidecar - // and so discovery of their IP/port is meaningless since they can't be - // connected to by that means. - DisableDirectDiscovery bool } // ProxyConfigEntry is the top-level struct for global proxy configuration defaults. type ProxyConfigEntry struct { - Kind string - Name string - ProxyConfig ConnectProxyConfig + Kind string + Name string + Config map[string]interface{} RaftIndex }