diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 0e4b972f7..1e3651cba 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -29,6 +29,7 @@ func init() { registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) + registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -428,3 +429,25 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{ return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) } + +func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { + req := structs.ConfigEntryRequest{ + Entry: &structs.ProxyConfigEntry{}, + } + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + switch req.Op { + case structs.ConfigEntryUpsert: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), + []metrics.Label{{Name: "op", Value: "upsert"}}) + return c.state.EnsureConfigEntry(index, req.Entry) + case structs.ConfigEntryDelete: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), + []metrics.Label{{Name: "op", Value: "delete"}}) + return c.state.DeleteConfigEntry(index, req.Entry.GetKind(), req.Entry.GetName()) + default: + return fmt.Errorf("invalid config entry operation type: %v", req.Op) + } +} diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 6778a3f7e..d4e1998f1 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -18,6 +18,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func generateUUID() (ret string) { @@ -1355,3 +1356,52 @@ func TestFSM_CABuiltinProvider(t *testing.T) { assert.Equal(expected, state) } } + +func TestFSM_ConfigEntry(t *testing.T) { + t.Parallel() + + require := require.New(t) + fsm, err := New(nil, os.Stderr) + require.NoError(err) + + // Create a simple config entry + entry := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "foo": "bar", + }, + } + + // Create a new request. + req := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Entry: entry, + } + + { + buf, err := structs.Encode(structs.ConfigEntryRequestType, req) + require.NoError(err) + resp := fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } + } + + // Verify it's in the state store. + { + _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + entry.RaftIndex.CreateIndex = 1 + entry.RaftIndex.ModifyIndex = 1 + + proxyConf, ok := config.(*structs.ProxyConfigEntry) + require.True(ok) + + // Read the map[string]interface{} back out. + value, _ := proxyConf.Config["foo"].([]uint8) + proxyConf.Config["foo"] = structs.Uint8ToString(value) + + require.Equal(entry, config) + } +} diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 00adfa986..0c7713753 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -27,6 +27,7 @@ func init() { registerRestorer(structs.IndexRequestType, restoreIndex) registerRestorer(structs.ACLTokenSetRequestType, restoreToken) registerRestorer(structs.ACLPolicySetRequestType, restorePolicy) + registerRestorer(structs.ConfigEntryRequestType, restoreConfigEntry) } func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error { @@ -63,6 +64,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err if err := s.persistConnectCAConfig(sink, encoder); err != nil { return err } + if err := s.persistConfigEntries(sink, encoder); err != nil { + return err + } if err := s.persistIndex(sink, encoder); err != nil { return err } @@ -360,6 +364,30 @@ func (s *snapshot) persistIntentions(sink raft.SnapshotSink, return nil } +func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + entries, err := s.state.ConfigEntries() + if err != nil { + return err + } + + for _, entry := range entries { + if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil { + return err + } + // Encode the entry request without an operation since we don't need it for restoring. + // The request is used for its custom decoding/encoding logic around the ConfigEntry + // interface. + req := &structs.ConfigEntryRequest{ + Entry: entry, + } + if err := encoder.Encode(req); err != nil { + return err + } + } + return nil +} + func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the indexes iter, err := s.state.Indexes() @@ -565,3 +593,11 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code } return restore.ACLPolicy(&req) } + +func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.ConfigEntryRequest + if err := decoder.Decode(&req); err != nil { + return err + } + return restore.ConfigEntry(req.Entry) +} diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 1d80e3965..fe1f3db95 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -22,6 +22,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) @@ -46,8 +47,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { "testMeta": "testing123", }, } - assert.NoError(fsm.state.EnsureNode(1, node1)) - assert.NoError(fsm.state.EnsureNode(2, node2)) + require.NoError(fsm.state.EnsureNode(1, node1)) + require.NoError(fsm.state.EnsureNode(2, node2)) // Add a service instance with Connect config. connectConf := structs.ServiceConnect{ @@ -93,7 +94,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { Syntax: acl.SyntaxCurrent, } policy.SetHash(true) - require.NoError(t, fsm.state.ACLPolicySet(1, &policy)) + require.NoError(fsm.state.ACLPolicySet(1, &policy)) token := &structs.ACLToken{ AccessorID: "30fca056-9fbb-4455-b94a-bf0e2bc575d6", @@ -109,7 +110,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // DEPRECATED (ACL-Legacy-Compat) - This is used so that the bootstrap token is still visible via the v1 acl APIs Type: structs.ACLTokenTypeManagement, } - require.NoError(t, fsm.state.ACLBootstrap(10, 0, token, false)) + require.NoError(fsm.state.ACLBootstrap(10, 0, token, false)) fsm.state.KVSSet(11, &structs.DirEntry{ Key: "/remove", @@ -168,7 +169,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { CreateIndex: 14, ModifyIndex: 14, } - assert.Nil(fsm.state.IntentionSet(14, ixn)) + require.NoError(fsm.state.IntentionSet(14, ixn)) // CA Roots roots := []*structs.CARoot{ @@ -179,7 +180,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { r.Active = false } ok, err := fsm.state.CARootSetCAS(15, 0, roots) - assert.Nil(err) + require.NoError(err) assert.True(ok) ok, err = fsm.state.CASetProviderState(16, &structs.CAConsulProviderState{ @@ -187,7 +188,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { PrivateKey: "foo", RootCert: "bar", }) - assert.Nil(err) + require.NoError(err) assert.True(ok) // CA Config @@ -200,7 +201,20 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { }, } err = fsm.state.CASetConfig(17, caConfig) - assert.Nil(err) + require.NoError(err) + + // Config entries + serviceConfig := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + } + proxyConfig := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + } + require.NoError(fsm.state.EnsureConfigEntry(18, serviceConfig)) + require.NoError(fsm.state.EnsureConfigEntry(19, proxyConfig)) // Snapshot snap, err := fsm.Snapshot() @@ -302,19 +316,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Verify ACL Token is restored _, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID) - require.NoError(t, err) - require.Equal(t, token.AccessorID, a.AccessorID) - require.Equal(t, token.ModifyIndex, a.ModifyIndex) + require.NoError(err) + require.Equal(token.AccessorID, a.AccessorID) + require.Equal(token.ModifyIndex, a.ModifyIndex) // Verify the acl-token-bootstrap index was restored canBootstrap, index, err := fsm2.state.CanBootstrapACLToken() - require.False(t, canBootstrap) - require.True(t, index > 0) + require.False(canBootstrap) + require.True(index > 0) // Verify ACL Policy is restored _, policy2, err := fsm2.state.ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID) - require.NoError(t, err) - require.Equal(t, policy.Name, policy2.Name) + require.NoError(err) + require.Equal(policy.Name, policy2.Name) // Verify tombstones are restored func() { @@ -368,26 +382,35 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Verify intentions are restored. _, ixns, err := fsm2.state.Intentions(nil) - assert.Nil(err) + require.NoError(err) assert.Len(ixns, 1) assert.Equal(ixn, ixns[0]) // Verify CA roots are restored. _, roots, err = fsm2.state.CARoots(nil) - assert.Nil(err) + require.NoError(err) assert.Len(roots, 2) // Verify provider state is restored. _, state, err := fsm2.state.CAProviderState("asdf") - assert.Nil(err) + require.NoError(err) assert.Equal("foo", state.PrivateKey) assert.Equal("bar", state.RootCert) // Verify CA configuration is restored. _, caConf, err := fsm2.state.CAConfig() - assert.Nil(err) + require.NoError(err) assert.Equal(caConfig, caConf) + // Verify config entries are restored + _, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo") + require.NoError(err) + assert.Equal(serviceConfig, serviceConfEntry) + + _, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + assert.Equal(proxyConfig, proxyConfEntry) + // Snapshot snap, err = fsm2.Snapshot() if err != nil { diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go new file mode 100644 index 000000000..d4bdaea31 --- /dev/null +++ b/agent/consul/state/config_entry.go @@ -0,0 +1,240 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +const ( + configTableName = "config-entries" +) + +// configTableSchema returns a new table schema used to store global +// config entries. +func configTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: configTableName, + Indexes: map[string]*memdb.IndexSchema{ + "id": &memdb.IndexSchema{ + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Kind", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Name", + Lowercase: true, + }, + }, + }, + }, + "kind": &memdb.IndexSchema{ + Name: "kind", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Kind", + Lowercase: true, + }, + }, + }, + } +} + +func init() { + registerSchema(configTableSchema) +} + +// ConfigEntries is used to pull all the config entries for the snapshot. +func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { + entries, err := s.tx.Get(configTableName, "id") + if err != nil { + return nil, err + } + + var ret []structs.ConfigEntry + for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() { + ret = append(ret, wrapped.(structs.ConfigEntry)) + } + + return ret, nil +} + +// ConfigEntry is used when restoring from a snapshot. +func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { + // Insert + if err := s.tx.Insert(configTableName, c); err != nil { + return fmt.Errorf("failed restoring config entry object: %s", err) + } + if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + +// ConfigEntry is called to get a given config entry. +func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Get the existing config entry. + existing, err := tx.First(configTableName, "id", kind, name) + if err != nil { + return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) + } + if existing == nil { + return idx, nil, nil + } + + conf, ok := existing.(structs.ConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("config entry %q (%s) is an invalid type: %T", name, kind, conf) + } + + return idx, conf, nil +} + +// ConfigEntries is called to get all config entry objects. +func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { + return s.ConfigEntriesByKind("") +} + +// ConfigEntriesByKind is called to get all config entry objects with the given kind. +// If kind is empty, all config entries will be returned. +func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Lookup by kind, or all if kind is empty + var iter memdb.ResultIterator + var err error + if kind != "" { + iter, err = tx.Get(configTableName, "kind", kind) + } else { + iter, err = tx.Get(configTableName, "id") + } + if err != nil { + return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) + } + + var results []structs.ConfigEntry + for v := iter.Next(); v != nil; v = iter.Next() { + results = append(results, v.(structs.ConfigEntry)) + } + return idx, results, nil +} + +// EnsureConfigEntry is called to do an upsert of a given config entry. +func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { + tx := s.db.Txn(true) + defer tx.Abort() + + if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { + return err + } + + tx.Commit() + return nil +} + +// ensureConfigEntryTxn upserts a config entry inside of a transaction. +func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry) error { + // Check for existing configuration. + existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) + if err != nil { + return fmt.Errorf("failed configuration lookup: %s", err) + } + + raftIndex := conf.GetRaftIndex() + if existing != nil { + existingIdx := existing.(structs.ConfigEntry).GetRaftIndex() + raftIndex.CreateIndex = existingIdx.CreateIndex + raftIndex.ModifyIndex = existingIdx.ModifyIndex + } else { + raftIndex.CreateIndex = idx + } + raftIndex.ModifyIndex = idx + + // Insert the config entry and update the index + if err := tx.Insert(configTableName, conf); err != nil { + return fmt.Errorf("failed inserting config entry: %s", err) + } + if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil { + return fmt.Errorf("failed updating index: %v", err) + } + + return nil +} + +// EnsureConfigEntryCAS is called to do a check-and-set upsert of a given config entry. +func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Check for existing configuration. + existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) + if err != nil { + return false, fmt.Errorf("failed configuration lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + var existingIdx structs.RaftIndex + if existing != nil { + existingIdx = *existing.(structs.ConfigEntry).GetRaftIndex() + } + if cidx == 0 && existing != nil { + return false, nil + } + if cidx != 0 && existing == nil { + return false, nil + } + if existing != nil && cidx != 0 && cidx != existingIdx.ModifyIndex { + return false, nil + } + + if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Try to retrieve the existing health check. + existing, err := tx.First(configTableName, "id", kind, name) + if err != nil { + return fmt.Errorf("failed config entry lookup: %s", err) + } + if existing == nil { + return nil + } + + // Delete the config entry from the DB and update the index. + if err := tx.Delete(configTableName, existing); err != nil { + return fmt.Errorf("failed removing check: %s", err) + } + if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Commit() + return nil +} diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go new file mode 100644 index 000000000..bf74cf699 --- /dev/null +++ b/agent/consul/state/config_entry_test.go @@ -0,0 +1,138 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" +) + +func TestStore_ConfigEntry(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + expected := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "foo", + }, + } + + // Create + require.NoError(s.EnsureConfigEntry(0, expected)) + + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(0), idx) + require.Equal(expected, config) + + // Update + updated := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "bar", + }, + } + require.NoError(s.EnsureConfigEntry(1, updated)) + + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(updated, config) + + // Delete + require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global")) + + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(2), idx) + require.Nil(config) +} + +func TestStore_ConfigEntryCAS(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + expected := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "foo", + }, + } + + // Create + require.NoError(s.EnsureConfigEntry(1, expected)) + + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(expected, config) + + // Update with invalid index + updated := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "DestinationServiceName": "bar", + }, + } + ok, err := s.EnsureConfigEntryCAS(2, 99, updated) + require.False(ok) + require.NoError(err) + + // Entry should not be changed + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal(expected, config) + + // Update with a valid index + ok, err = s.EnsureConfigEntryCAS(2, 1, updated) + require.True(ok) + require.NoError(err) + + // Entry should be updated + idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + require.NoError(err) + require.Equal(uint64(2), idx) + require.Equal(updated, config) +} + +func TestStore_ConfigEntries(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + // Create some config entries. + entry1 := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "test1", + } + entry2 := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "test2", + } + + require.NoError(s.EnsureConfigEntry(0, entry1)) + require.NoError(s.EnsureConfigEntry(1, entry2)) + + // Get all entries + idx, entries, err := s.ConfigEntries() + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry1, entry2}, entries) + + // Get all proxy entries + idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults) + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry1}, entries) + + // Get all service entries + idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults) + require.NoError(err) + require.Equal(uint64(1), idx) + require.Equal([]structs.ConfigEntry{entry2}, entries) +} diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go new file mode 100644 index 000000000..76f508873 --- /dev/null +++ b/agent/structs/config_entry.go @@ -0,0 +1,230 @@ +package structs + +import ( + "fmt" + "strings" + + "github.com/hashicorp/go-msgpack/codec" +) + +const ( + ServiceDefaults string = "service-defaults" + ProxyDefaults string = "proxy-defaults" + + ProxyConfigGlobal string = "global" + + DefaultServiceProtocol = "tcp" +) + +// ConfigEntry is the +type ConfigEntry interface { + GetKind() string + GetName() string + + // This is called in the RPC endpoint and can apply defaults or limits. + Normalize() error + Validate() error + + GetRaftIndex() *RaftIndex +} + +// ServiceConfiguration is the top-level struct for the configuration of a service +// across the entire cluster. +type ServiceConfigEntry struct { + Kind string + Name string + Protocol string + Connect ConnectConfiguration + ServiceDefinitionDefaults ServiceDefinitionDefaults + + RaftIndex +} + +func (e *ServiceConfigEntry) GetKind() string { + return ServiceDefaults +} + +func (e *ServiceConfigEntry) GetName() string { + if e == nil { + return "" + } + + return e.Name +} + +func (e *ServiceConfigEntry) Normalize() error { + if e == nil { + return fmt.Errorf("config entry is nil") + } + + e.Kind = ServiceDefaults + if e.Protocol == "" { + e.Protocol = DefaultServiceProtocol + } else { + e.Protocol = strings.ToLower(e.Protocol) + } + + return nil +} + +func (e *ServiceConfigEntry) Validate() error { + return nil +} + +func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex { + if e == nil { + return &RaftIndex{} + } + + return &e.RaftIndex +} + +type ConnectConfiguration struct { + SidecarProxy bool +} + +type ServiceDefinitionDefaults struct { + EnableTagOverride bool + + // Non script/docker checks only + Check *HealthCheck + Checks HealthChecks + + // Kind is allowed to accommodate non-sidecar proxies but it will be an error + // if they also set Connect.DestinationServiceID since sidecars are + // configured via their associated service's config. + Kind ServiceKind + + // Only DestinationServiceName and Config are supported. + Proxy ConnectProxyConfig + + Connect ServiceConnect + + Weights Weights +} + +// ProxyConfigEntry is the top-level struct for global proxy configuration defaults. +type ProxyConfigEntry struct { + Kind string + Name string + Config map[string]interface{} + + RaftIndex +} + +func (e *ProxyConfigEntry) GetKind() string { + return ProxyDefaults +} + +func (e *ProxyConfigEntry) GetName() string { + if e == nil { + return "" + } + + return e.Name +} + +func (e *ProxyConfigEntry) Normalize() error { + if e == nil { + return fmt.Errorf("config entry is nil") + } + + e.Kind = ProxyDefaults + + return nil +} + +func (e *ProxyConfigEntry) Validate() error { + if e == nil { + return fmt.Errorf("config entry is nil") + } + + if e.Name != ProxyConfigGlobal { + return fmt.Errorf("invalid name (%q), only %q is supported", e.Name, ProxyConfigGlobal) + } + + return nil +} + +func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex { + if e == nil { + return &RaftIndex{} + } + + return &e.RaftIndex +} + +type ConfigEntryOp string + +const ( + ConfigEntryUpsert ConfigEntryOp = "upsert" + ConfigEntryDelete ConfigEntryOp = "delete" +) + +type ConfigEntryRequest struct { + Op ConfigEntryOp + Entry ConfigEntry +} + +func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { + // bs will grow if needed but allocate enough to avoid reallocation in common + // case. + bs := make([]byte, 128) + enc := codec.NewEncoderBytes(&bs, msgpackHandle) + // Encode kind first + err = enc.Encode(r.Entry.GetKind()) + if err != nil { + return nil, err + } + // Then actual value using alias trick to avoid infinite recursion + type Alias ConfigEntryRequest + err = enc.Encode(struct { + *Alias + }{ + Alias: (*Alias)(r), + }) + if err != nil { + return nil, err + } + return bs, nil +} + +func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { + // First decode the kind prefix + var kind string + dec := codec.NewDecoderBytes(data, msgpackHandle) + if err := dec.Decode(&kind); err != nil { + return err + } + + // Then decode the real thing with appropriate kind of ConfigEntry + entry, err := makeConfigEntry(kind) + if err != nil { + return err + } + r.Entry = entry + + // Alias juggling to prevent infinite recursive calls back to this decode + // method. + type Alias ConfigEntryRequest + as := struct { + *Alias + }{ + Alias: (*Alias)(r), + } + if err := dec.Decode(&as); err != nil { + return err + } + return nil +} + +func makeConfigEntry(kind string) (ConfigEntry, error) { + switch kind { + case ServiceDefaults: + return &ServiceConfigEntry{}, nil + case ProxyDefaults: + return &ProxyConfigEntry{}, nil + default: + return nil, fmt.Errorf("invalid config entry kind: %s", kind) + } +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index f3c3a024b..56d86ab52 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -55,6 +55,7 @@ const ( ACLPolicySetRequestType = 19 ACLPolicyDeleteRequestType = 20 ConnectCALeafRequestType = 21 + ConfigEntryRequestType = 22 ) const (