Merge pull request #5539 from hashicorp/service-config
Service config state model
This commit is contained in:
commit
d6c25a13a5
|
@ -29,6 +29,7 @@ func init() {
|
||||||
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
|
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
|
||||||
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
|
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
|
||||||
registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation)
|
registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation)
|
||||||
|
registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
||||||
|
@ -428,3 +429,25 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
|
||||||
|
|
||||||
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
|
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
|
||||||
|
req := structs.ConfigEntryRequest{
|
||||||
|
Entry: &structs.ProxyConfigEntry{},
|
||||||
|
}
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
switch req.Op {
|
||||||
|
case structs.ConfigEntryUpsert:
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: "upsert"}})
|
||||||
|
return c.state.EnsureConfigEntry(index, req.Entry)
|
||||||
|
case structs.ConfigEntryDelete:
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: "delete"}})
|
||||||
|
return c.state.DeleteConfigEntry(index, req.Entry.GetKind(), req.Entry.GetName())
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid config entry operation type: %v", req.Op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
"github.com/pascaldekloe/goe/verify"
|
"github.com/pascaldekloe/goe/verify"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func generateUUID() (ret string) {
|
func generateUUID() (ret string) {
|
||||||
|
@ -1355,3 +1356,52 @@ func TestFSM_CABuiltinProvider(t *testing.T) {
|
||||||
assert.Equal(expected, state)
|
assert.Equal(expected, state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFSM_ConfigEntry(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
fsm, err := New(nil, os.Stderr)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Create a simple config entry
|
||||||
|
entry := &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: "global",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new request.
|
||||||
|
req := &structs.ConfigEntryRequest{
|
||||||
|
Op: structs.ConfigEntryUpsert,
|
||||||
|
Entry: entry,
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
|
||||||
|
require.NoError(err)
|
||||||
|
resp := fsm.Apply(makeLog(buf))
|
||||||
|
if _, ok := resp.(error); ok {
|
||||||
|
t.Fatalf("bad: %v", resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify it's in the state store.
|
||||||
|
{
|
||||||
|
_, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
entry.RaftIndex.CreateIndex = 1
|
||||||
|
entry.RaftIndex.ModifyIndex = 1
|
||||||
|
|
||||||
|
proxyConf, ok := config.(*structs.ProxyConfigEntry)
|
||||||
|
require.True(ok)
|
||||||
|
|
||||||
|
// Read the map[string]interface{} back out.
|
||||||
|
value, _ := proxyConf.Config["foo"].([]uint8)
|
||||||
|
proxyConf.Config["foo"] = structs.Uint8ToString(value)
|
||||||
|
|
||||||
|
require.Equal(entry, config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ func init() {
|
||||||
registerRestorer(structs.IndexRequestType, restoreIndex)
|
registerRestorer(structs.IndexRequestType, restoreIndex)
|
||||||
registerRestorer(structs.ACLTokenSetRequestType, restoreToken)
|
registerRestorer(structs.ACLTokenSetRequestType, restoreToken)
|
||||||
registerRestorer(structs.ACLPolicySetRequestType, restorePolicy)
|
registerRestorer(structs.ACLPolicySetRequestType, restorePolicy)
|
||||||
|
registerRestorer(structs.ConfigEntryRequestType, restoreConfigEntry)
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||||
|
@ -63,6 +64,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
|
||||||
if err := s.persistConnectCAConfig(sink, encoder); err != nil {
|
if err := s.persistConnectCAConfig(sink, encoder); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := s.persistConfigEntries(sink, encoder); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := s.persistIndex(sink, encoder); err != nil {
|
if err := s.persistIndex(sink, encoder); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -360,6 +364,30 @@ func (s *snapshot) persistIntentions(sink raft.SnapshotSink,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
entries, err := s.state.ConfigEntries()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, entry := range entries {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Encode the entry request without an operation since we don't need it for restoring.
|
||||||
|
// The request is used for its custom decoding/encoding logic around the ConfigEntry
|
||||||
|
// interface.
|
||||||
|
req := &structs.ConfigEntryRequest{
|
||||||
|
Entry: entry,
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||||
// Get all the indexes
|
// Get all the indexes
|
||||||
iter, err := s.state.Indexes()
|
iter, err := s.state.Indexes()
|
||||||
|
@ -565,3 +593,11 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code
|
||||||
}
|
}
|
||||||
return restore.ACLPolicy(&req)
|
return restore.ACLPolicy(&req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
|
var req structs.ConfigEntryRequest
|
||||||
|
if err := decoder.Decode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return restore.ConfigEntry(req.Entry)
|
||||||
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
require := require.New(t)
|
||||||
fsm, err := New(nil, os.Stderr)
|
fsm, err := New(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -46,8 +47,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
"testMeta": "testing123",
|
"testMeta": "testing123",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.NoError(fsm.state.EnsureNode(1, node1))
|
require.NoError(fsm.state.EnsureNode(1, node1))
|
||||||
assert.NoError(fsm.state.EnsureNode(2, node2))
|
require.NoError(fsm.state.EnsureNode(2, node2))
|
||||||
|
|
||||||
// Add a service instance with Connect config.
|
// Add a service instance with Connect config.
|
||||||
connectConf := structs.ServiceConnect{
|
connectConf := structs.ServiceConnect{
|
||||||
|
@ -93,7 +94,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
Syntax: acl.SyntaxCurrent,
|
Syntax: acl.SyntaxCurrent,
|
||||||
}
|
}
|
||||||
policy.SetHash(true)
|
policy.SetHash(true)
|
||||||
require.NoError(t, fsm.state.ACLPolicySet(1, &policy))
|
require.NoError(fsm.state.ACLPolicySet(1, &policy))
|
||||||
|
|
||||||
token := &structs.ACLToken{
|
token := &structs.ACLToken{
|
||||||
AccessorID: "30fca056-9fbb-4455-b94a-bf0e2bc575d6",
|
AccessorID: "30fca056-9fbb-4455-b94a-bf0e2bc575d6",
|
||||||
|
@ -109,7 +110,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
// DEPRECATED (ACL-Legacy-Compat) - This is used so that the bootstrap token is still visible via the v1 acl APIs
|
// DEPRECATED (ACL-Legacy-Compat) - This is used so that the bootstrap token is still visible via the v1 acl APIs
|
||||||
Type: structs.ACLTokenTypeManagement,
|
Type: structs.ACLTokenTypeManagement,
|
||||||
}
|
}
|
||||||
require.NoError(t, fsm.state.ACLBootstrap(10, 0, token, false))
|
require.NoError(fsm.state.ACLBootstrap(10, 0, token, false))
|
||||||
|
|
||||||
fsm.state.KVSSet(11, &structs.DirEntry{
|
fsm.state.KVSSet(11, &structs.DirEntry{
|
||||||
Key: "/remove",
|
Key: "/remove",
|
||||||
|
@ -168,7 +169,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
CreateIndex: 14,
|
CreateIndex: 14,
|
||||||
ModifyIndex: 14,
|
ModifyIndex: 14,
|
||||||
}
|
}
|
||||||
assert.Nil(fsm.state.IntentionSet(14, ixn))
|
require.NoError(fsm.state.IntentionSet(14, ixn))
|
||||||
|
|
||||||
// CA Roots
|
// CA Roots
|
||||||
roots := []*structs.CARoot{
|
roots := []*structs.CARoot{
|
||||||
|
@ -179,7 +180,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
r.Active = false
|
r.Active = false
|
||||||
}
|
}
|
||||||
ok, err := fsm.state.CARootSetCAS(15, 0, roots)
|
ok, err := fsm.state.CARootSetCAS(15, 0, roots)
|
||||||
assert.Nil(err)
|
require.NoError(err)
|
||||||
assert.True(ok)
|
assert.True(ok)
|
||||||
|
|
||||||
ok, err = fsm.state.CASetProviderState(16, &structs.CAConsulProviderState{
|
ok, err = fsm.state.CASetProviderState(16, &structs.CAConsulProviderState{
|
||||||
|
@ -187,7 +188,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
PrivateKey: "foo",
|
PrivateKey: "foo",
|
||||||
RootCert: "bar",
|
RootCert: "bar",
|
||||||
})
|
})
|
||||||
assert.Nil(err)
|
require.NoError(err)
|
||||||
assert.True(ok)
|
assert.True(ok)
|
||||||
|
|
||||||
// CA Config
|
// CA Config
|
||||||
|
@ -200,7 +201,20 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = fsm.state.CASetConfig(17, caConfig)
|
err = fsm.state.CASetConfig(17, caConfig)
|
||||||
assert.Nil(err)
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Config entries
|
||||||
|
serviceConfig := &structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "foo",
|
||||||
|
Protocol: "http",
|
||||||
|
}
|
||||||
|
proxyConfig := &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: "global",
|
||||||
|
}
|
||||||
|
require.NoError(fsm.state.EnsureConfigEntry(18, serviceConfig))
|
||||||
|
require.NoError(fsm.state.EnsureConfigEntry(19, proxyConfig))
|
||||||
|
|
||||||
// Snapshot
|
// Snapshot
|
||||||
snap, err := fsm.Snapshot()
|
snap, err := fsm.Snapshot()
|
||||||
|
@ -302,19 +316,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
|
|
||||||
// Verify ACL Token is restored
|
// Verify ACL Token is restored
|
||||||
_, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID)
|
_, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID)
|
||||||
require.NoError(t, err)
|
require.NoError(err)
|
||||||
require.Equal(t, token.AccessorID, a.AccessorID)
|
require.Equal(token.AccessorID, a.AccessorID)
|
||||||
require.Equal(t, token.ModifyIndex, a.ModifyIndex)
|
require.Equal(token.ModifyIndex, a.ModifyIndex)
|
||||||
|
|
||||||
// Verify the acl-token-bootstrap index was restored
|
// Verify the acl-token-bootstrap index was restored
|
||||||
canBootstrap, index, err := fsm2.state.CanBootstrapACLToken()
|
canBootstrap, index, err := fsm2.state.CanBootstrapACLToken()
|
||||||
require.False(t, canBootstrap)
|
require.False(canBootstrap)
|
||||||
require.True(t, index > 0)
|
require.True(index > 0)
|
||||||
|
|
||||||
// Verify ACL Policy is restored
|
// Verify ACL Policy is restored
|
||||||
_, policy2, err := fsm2.state.ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID)
|
_, policy2, err := fsm2.state.ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID)
|
||||||
require.NoError(t, err)
|
require.NoError(err)
|
||||||
require.Equal(t, policy.Name, policy2.Name)
|
require.Equal(policy.Name, policy2.Name)
|
||||||
|
|
||||||
// Verify tombstones are restored
|
// Verify tombstones are restored
|
||||||
func() {
|
func() {
|
||||||
|
@ -368,26 +382,35 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
|
|
||||||
// Verify intentions are restored.
|
// Verify intentions are restored.
|
||||||
_, ixns, err := fsm2.state.Intentions(nil)
|
_, ixns, err := fsm2.state.Intentions(nil)
|
||||||
assert.Nil(err)
|
require.NoError(err)
|
||||||
assert.Len(ixns, 1)
|
assert.Len(ixns, 1)
|
||||||
assert.Equal(ixn, ixns[0])
|
assert.Equal(ixn, ixns[0])
|
||||||
|
|
||||||
// Verify CA roots are restored.
|
// Verify CA roots are restored.
|
||||||
_, roots, err = fsm2.state.CARoots(nil)
|
_, roots, err = fsm2.state.CARoots(nil)
|
||||||
assert.Nil(err)
|
require.NoError(err)
|
||||||
assert.Len(roots, 2)
|
assert.Len(roots, 2)
|
||||||
|
|
||||||
// Verify provider state is restored.
|
// Verify provider state is restored.
|
||||||
_, state, err := fsm2.state.CAProviderState("asdf")
|
_, state, err := fsm2.state.CAProviderState("asdf")
|
||||||
assert.Nil(err)
|
require.NoError(err)
|
||||||
assert.Equal("foo", state.PrivateKey)
|
assert.Equal("foo", state.PrivateKey)
|
||||||
assert.Equal("bar", state.RootCert)
|
assert.Equal("bar", state.RootCert)
|
||||||
|
|
||||||
// Verify CA configuration is restored.
|
// Verify CA configuration is restored.
|
||||||
_, caConf, err := fsm2.state.CAConfig()
|
_, caConf, err := fsm2.state.CAConfig()
|
||||||
assert.Nil(err)
|
require.NoError(err)
|
||||||
assert.Equal(caConfig, caConf)
|
assert.Equal(caConfig, caConf)
|
||||||
|
|
||||||
|
// Verify config entries are restored
|
||||||
|
_, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo")
|
||||||
|
require.NoError(err)
|
||||||
|
assert.Equal(serviceConfig, serviceConfEntry)
|
||||||
|
|
||||||
|
_, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
assert.Equal(proxyConfig, proxyConfEntry)
|
||||||
|
|
||||||
// Snapshot
|
// Snapshot
|
||||||
snap, err = fsm2.Snapshot()
|
snap, err = fsm2.Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -0,0 +1,240 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
configTableName = "config-entries"
|
||||||
|
)
|
||||||
|
|
||||||
|
// configTableSchema returns a new table schema used to store global
|
||||||
|
// config entries.
|
||||||
|
func configTableSchema() *memdb.TableSchema {
|
||||||
|
return &memdb.TableSchema{
|
||||||
|
Name: configTableName,
|
||||||
|
Indexes: map[string]*memdb.IndexSchema{
|
||||||
|
"id": &memdb.IndexSchema{
|
||||||
|
Name: "id",
|
||||||
|
AllowMissing: false,
|
||||||
|
Unique: true,
|
||||||
|
Indexer: &memdb.CompoundIndex{
|
||||||
|
Indexes: []memdb.Indexer{
|
||||||
|
&memdb.StringFieldIndex{
|
||||||
|
Field: "Kind",
|
||||||
|
Lowercase: true,
|
||||||
|
},
|
||||||
|
&memdb.StringFieldIndex{
|
||||||
|
Field: "Name",
|
||||||
|
Lowercase: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"kind": &memdb.IndexSchema{
|
||||||
|
Name: "kind",
|
||||||
|
AllowMissing: false,
|
||||||
|
Unique: true,
|
||||||
|
Indexer: &memdb.StringFieldIndex{
|
||||||
|
Field: "Kind",
|
||||||
|
Lowercase: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registerSchema(configTableSchema)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConfigEntries is used to pull all the config entries for the snapshot.
|
||||||
|
func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) {
|
||||||
|
entries, err := s.tx.Get(configTableName, "id")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ret []structs.ConfigEntry
|
||||||
|
for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() {
|
||||||
|
ret = append(ret, wrapped.(structs.ConfigEntry))
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConfigEntry is used when restoring from a snapshot.
|
||||||
|
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
|
||||||
|
// Insert
|
||||||
|
if err := s.tx.Insert(configTableName, c); err != nil {
|
||||||
|
return fmt.Errorf("failed restoring config entry object: %s", err)
|
||||||
|
}
|
||||||
|
if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConfigEntry is called to get a given config entry.
|
||||||
|
func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the index
|
||||||
|
idx := maxIndexTxn(tx, configTableName)
|
||||||
|
|
||||||
|
// Get the existing config entry.
|
||||||
|
existing, err := tx.First(configTableName, "id", kind, name)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
|
||||||
|
}
|
||||||
|
if existing == nil {
|
||||||
|
return idx, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conf, ok := existing.(structs.ConfigEntry)
|
||||||
|
if !ok {
|
||||||
|
return 0, nil, fmt.Errorf("config entry %q (%s) is an invalid type: %T", name, kind, conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
return idx, conf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConfigEntries is called to get all config entry objects.
|
||||||
|
func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) {
|
||||||
|
return s.ConfigEntriesByKind("")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConfigEntriesByKind is called to get all config entry objects with the given kind.
|
||||||
|
// If kind is empty, all config entries will be returned.
|
||||||
|
func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the index
|
||||||
|
idx := maxIndexTxn(tx, configTableName)
|
||||||
|
|
||||||
|
// Lookup by kind, or all if kind is empty
|
||||||
|
var iter memdb.ResultIterator
|
||||||
|
var err error
|
||||||
|
if kind != "" {
|
||||||
|
iter, err = tx.Get(configTableName, "kind", kind)
|
||||||
|
} else {
|
||||||
|
iter, err = tx.Get(configTableName, "id")
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var results []structs.ConfigEntry
|
||||||
|
for v := iter.Next(); v != nil; v = iter.Next() {
|
||||||
|
results = append(results, v.(structs.ConfigEntry))
|
||||||
|
}
|
||||||
|
return idx, results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnsureConfigEntry is called to do an upsert of a given config entry.
|
||||||
|
func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureConfigEntryTxn upserts a config entry inside of a transaction.
|
||||||
|
func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry) error {
|
||||||
|
// Check for existing configuration.
|
||||||
|
existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed configuration lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
raftIndex := conf.GetRaftIndex()
|
||||||
|
if existing != nil {
|
||||||
|
existingIdx := existing.(structs.ConfigEntry).GetRaftIndex()
|
||||||
|
raftIndex.CreateIndex = existingIdx.CreateIndex
|
||||||
|
raftIndex.ModifyIndex = existingIdx.ModifyIndex
|
||||||
|
} else {
|
||||||
|
raftIndex.CreateIndex = idx
|
||||||
|
}
|
||||||
|
raftIndex.ModifyIndex = idx
|
||||||
|
|
||||||
|
// Insert the config entry and update the index
|
||||||
|
if err := tx.Insert(configTableName, conf); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting config entry: %s", err)
|
||||||
|
}
|
||||||
|
if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnsureConfigEntryCAS is called to do a check-and-set upsert of a given config entry.
|
||||||
|
func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry) (bool, error) {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Check for existing configuration.
|
||||||
|
existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName())
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed configuration lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the we should do the set. A ModifyIndex of 0 means that
|
||||||
|
// we are doing a set-if-not-exists.
|
||||||
|
var existingIdx structs.RaftIndex
|
||||||
|
if existing != nil {
|
||||||
|
existingIdx = *existing.(structs.ConfigEntry).GetRaftIndex()
|
||||||
|
}
|
||||||
|
if cidx == 0 && existing != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if cidx != 0 && existing == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if existing != nil && cidx != 0 && cidx != existingIdx.ModifyIndex {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Try to retrieve the existing health check.
|
||||||
|
existing, err := tx.First(configTableName, "id", kind, name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed config entry lookup: %s", err)
|
||||||
|
}
|
||||||
|
if existing == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the config entry from the DB and update the index.
|
||||||
|
if err := tx.Delete(configTableName, existing); err != nil {
|
||||||
|
return fmt.Errorf("failed removing check: %s", err)
|
||||||
|
}
|
||||||
|
if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,138 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStore_ConfigEntry(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
expected := &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: "global",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"DestinationServiceName": "foo",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create
|
||||||
|
require.NoError(s.EnsureConfigEntry(0, expected))
|
||||||
|
|
||||||
|
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(0), idx)
|
||||||
|
require.Equal(expected, config)
|
||||||
|
|
||||||
|
// Update
|
||||||
|
updated := &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: "global",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"DestinationServiceName": "bar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
require.NoError(s.EnsureConfigEntry(1, updated))
|
||||||
|
|
||||||
|
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(1), idx)
|
||||||
|
require.Equal(updated, config)
|
||||||
|
|
||||||
|
// Delete
|
||||||
|
require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global"))
|
||||||
|
|
||||||
|
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(2), idx)
|
||||||
|
require.Nil(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStore_ConfigEntryCAS(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
expected := &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: "global",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"DestinationServiceName": "foo",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create
|
||||||
|
require.NoError(s.EnsureConfigEntry(1, expected))
|
||||||
|
|
||||||
|
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(1), idx)
|
||||||
|
require.Equal(expected, config)
|
||||||
|
|
||||||
|
// Update with invalid index
|
||||||
|
updated := &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: "global",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"DestinationServiceName": "bar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, err := s.EnsureConfigEntryCAS(2, 99, updated)
|
||||||
|
require.False(ok)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Entry should not be changed
|
||||||
|
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(1), idx)
|
||||||
|
require.Equal(expected, config)
|
||||||
|
|
||||||
|
// Update with a valid index
|
||||||
|
ok, err = s.EnsureConfigEntryCAS(2, 1, updated)
|
||||||
|
require.True(ok)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Entry should be updated
|
||||||
|
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(2), idx)
|
||||||
|
require.Equal(updated, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStore_ConfigEntries(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Create some config entries.
|
||||||
|
entry1 := &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: "test1",
|
||||||
|
}
|
||||||
|
entry2 := &structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "test2",
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(s.EnsureConfigEntry(0, entry1))
|
||||||
|
require.NoError(s.EnsureConfigEntry(1, entry2))
|
||||||
|
|
||||||
|
// Get all entries
|
||||||
|
idx, entries, err := s.ConfigEntries()
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(1), idx)
|
||||||
|
require.Equal([]structs.ConfigEntry{entry1, entry2}, entries)
|
||||||
|
|
||||||
|
// Get all proxy entries
|
||||||
|
idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(1), idx)
|
||||||
|
require.Equal([]structs.ConfigEntry{entry1}, entries)
|
||||||
|
|
||||||
|
// Get all service entries
|
||||||
|
idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(uint64(1), idx)
|
||||||
|
require.Equal([]structs.ConfigEntry{entry2}, entries)
|
||||||
|
}
|
|
@ -0,0 +1,230 @@
|
||||||
|
package structs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
ServiceDefaults string = "service-defaults"
|
||||||
|
ProxyDefaults string = "proxy-defaults"
|
||||||
|
|
||||||
|
ProxyConfigGlobal string = "global"
|
||||||
|
|
||||||
|
DefaultServiceProtocol = "tcp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConfigEntry is the
|
||||||
|
type ConfigEntry interface {
|
||||||
|
GetKind() string
|
||||||
|
GetName() string
|
||||||
|
|
||||||
|
// This is called in the RPC endpoint and can apply defaults or limits.
|
||||||
|
Normalize() error
|
||||||
|
Validate() error
|
||||||
|
|
||||||
|
GetRaftIndex() *RaftIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceConfiguration is the top-level struct for the configuration of a service
|
||||||
|
// across the entire cluster.
|
||||||
|
type ServiceConfigEntry struct {
|
||||||
|
Kind string
|
||||||
|
Name string
|
||||||
|
Protocol string
|
||||||
|
Connect ConnectConfiguration
|
||||||
|
ServiceDefinitionDefaults ServiceDefinitionDefaults
|
||||||
|
|
||||||
|
RaftIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ServiceConfigEntry) GetKind() string {
|
||||||
|
return ServiceDefaults
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ServiceConfigEntry) GetName() string {
|
||||||
|
if e == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ServiceConfigEntry) Normalize() error {
|
||||||
|
if e == nil {
|
||||||
|
return fmt.Errorf("config entry is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
e.Kind = ServiceDefaults
|
||||||
|
if e.Protocol == "" {
|
||||||
|
e.Protocol = DefaultServiceProtocol
|
||||||
|
} else {
|
||||||
|
e.Protocol = strings.ToLower(e.Protocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ServiceConfigEntry) Validate() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex {
|
||||||
|
if e == nil {
|
||||||
|
return &RaftIndex{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &e.RaftIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConnectConfiguration struct {
|
||||||
|
SidecarProxy bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServiceDefinitionDefaults struct {
|
||||||
|
EnableTagOverride bool
|
||||||
|
|
||||||
|
// Non script/docker checks only
|
||||||
|
Check *HealthCheck
|
||||||
|
Checks HealthChecks
|
||||||
|
|
||||||
|
// Kind is allowed to accommodate non-sidecar proxies but it will be an error
|
||||||
|
// if they also set Connect.DestinationServiceID since sidecars are
|
||||||
|
// configured via their associated service's config.
|
||||||
|
Kind ServiceKind
|
||||||
|
|
||||||
|
// Only DestinationServiceName and Config are supported.
|
||||||
|
Proxy ConnectProxyConfig
|
||||||
|
|
||||||
|
Connect ServiceConnect
|
||||||
|
|
||||||
|
Weights Weights
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
|
||||||
|
type ProxyConfigEntry struct {
|
||||||
|
Kind string
|
||||||
|
Name string
|
||||||
|
Config map[string]interface{}
|
||||||
|
|
||||||
|
RaftIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ProxyConfigEntry) GetKind() string {
|
||||||
|
return ProxyDefaults
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ProxyConfigEntry) GetName() string {
|
||||||
|
if e == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ProxyConfigEntry) Normalize() error {
|
||||||
|
if e == nil {
|
||||||
|
return fmt.Errorf("config entry is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
e.Kind = ProxyDefaults
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ProxyConfigEntry) Validate() error {
|
||||||
|
if e == nil {
|
||||||
|
return fmt.Errorf("config entry is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.Name != ProxyConfigGlobal {
|
||||||
|
return fmt.Errorf("invalid name (%q), only %q is supported", e.Name, ProxyConfigGlobal)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex {
|
||||||
|
if e == nil {
|
||||||
|
return &RaftIndex{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &e.RaftIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConfigEntryOp string
|
||||||
|
|
||||||
|
const (
|
||||||
|
ConfigEntryUpsert ConfigEntryOp = "upsert"
|
||||||
|
ConfigEntryDelete ConfigEntryOp = "delete"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConfigEntryRequest struct {
|
||||||
|
Op ConfigEntryOp
|
||||||
|
Entry ConfigEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
|
||||||
|
// bs will grow if needed but allocate enough to avoid reallocation in common
|
||||||
|
// case.
|
||||||
|
bs := make([]byte, 128)
|
||||||
|
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
|
||||||
|
// Encode kind first
|
||||||
|
err = enc.Encode(r.Entry.GetKind())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Then actual value using alias trick to avoid infinite recursion
|
||||||
|
type Alias ConfigEntryRequest
|
||||||
|
err = enc.Encode(struct {
|
||||||
|
*Alias
|
||||||
|
}{
|
||||||
|
Alias: (*Alias)(r),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return bs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
|
||||||
|
// First decode the kind prefix
|
||||||
|
var kind string
|
||||||
|
dec := codec.NewDecoderBytes(data, msgpackHandle)
|
||||||
|
if err := dec.Decode(&kind); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then decode the real thing with appropriate kind of ConfigEntry
|
||||||
|
entry, err := makeConfigEntry(kind)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Entry = entry
|
||||||
|
|
||||||
|
// Alias juggling to prevent infinite recursive calls back to this decode
|
||||||
|
// method.
|
||||||
|
type Alias ConfigEntryRequest
|
||||||
|
as := struct {
|
||||||
|
*Alias
|
||||||
|
}{
|
||||||
|
Alias: (*Alias)(r),
|
||||||
|
}
|
||||||
|
if err := dec.Decode(&as); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeConfigEntry(kind string) (ConfigEntry, error) {
|
||||||
|
switch kind {
|
||||||
|
case ServiceDefaults:
|
||||||
|
return &ServiceConfigEntry{}, nil
|
||||||
|
case ProxyDefaults:
|
||||||
|
return &ProxyConfigEntry{}, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid config entry kind: %s", kind)
|
||||||
|
}
|
||||||
|
}
|
|
@ -55,6 +55,7 @@ const (
|
||||||
ACLPolicySetRequestType = 19
|
ACLPolicySetRequestType = 19
|
||||||
ACLPolicyDeleteRequestType = 20
|
ACLPolicyDeleteRequestType = 20
|
||||||
ConnectCALeafRequestType = 21
|
ConnectCALeafRequestType = 21
|
||||||
|
ConfigEntryRequestType = 22
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
Loading…
Reference in New Issue