Fix fsm serialization and add snapshot/restore

This commit is contained in:
Kyle Havlovitz 2019-03-20 16:13:13 -07:00
parent 9df597b257
commit c2cba68042
7 changed files with 171 additions and 39 deletions

View File

@ -29,7 +29,8 @@ func init() {
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation)
registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) registerCommand(structs.ServiceConfigEntryRequestType, (*FSM).applyServiceConfigEntryOperation)
registerCommand(structs.ProxyConfigEntryRequestType, (*FSM).applyProxyConfigEntryOperation)
} }
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -430,18 +431,37 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
} }
func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { func (c *FSM) applyServiceConfigEntryOperation(buf []byte, index uint64) interface{} {
var req structs.ConfigEntryRequest req := structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
return c.applyConfigEntryOperation(index, req)
}
func (c *FSM) applyProxyConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{
Entry: &structs.ProxyConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := c.applyConfigEntryOperation(index, req); err != nil {
panic(err)
}
return true
}
func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryRequest) error {
switch req.Op { switch req.Op {
case structs.ConfigEntryUpsert: case structs.ConfigEntryUpsert:
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
[]metrics.Label{{Name: "op", Value: "upsert"}}) []metrics.Label{{Name: "op", Value: "upsert"}})
return c.state.EnsureConfigEntry(index, req.Entry) return c.state.EnsureConfigEntry(index, req.Entry)
case structs.ConfigEntryDelete: case structs.ConfigEntryDelete:
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
[]metrics.Label{{Name: "op", Value: "delete"}}) []metrics.Label{{Name: "op", Value: "delete"}})
return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName()) return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName())
default: default:

View File

@ -1379,7 +1379,7 @@ func TestFSM_ConfigEntry(t *testing.T) {
} }
{ {
buf, err := structs.Encode(structs.ConfigEntryRequestType, req) buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req)
assert.Nil(err) assert.Nil(err)
assert.True(fsm.Apply(makeLog(buf)).(bool)) assert.True(fsm.Apply(makeLog(buf)).(bool))
} }
@ -1388,6 +1388,8 @@ func TestFSM_ConfigEntry(t *testing.T) {
{ {
_, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global")
assert.Nil(err) assert.Nil(err)
entry.RaftIndex.CreateIndex = 1
entry.RaftIndex.ModifyIndex = 1
assert.Equal(entry, config) assert.Equal(entry, config)
} }
} }

View File

@ -1,6 +1,8 @@
package fsm package fsm
import ( import (
"fmt"
"github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -27,6 +29,7 @@ func init() {
registerRestorer(structs.IndexRequestType, restoreIndex) registerRestorer(structs.IndexRequestType, restoreIndex)
registerRestorer(structs.ACLTokenSetRequestType, restoreToken) registerRestorer(structs.ACLTokenSetRequestType, restoreToken)
registerRestorer(structs.ACLPolicySetRequestType, restorePolicy) registerRestorer(structs.ACLPolicySetRequestType, restorePolicy)
registerRestorer(structs.ConfigEntryRequestType, restoreConfigEntry)
} }
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error { func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
@ -63,6 +66,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
if err := s.persistConnectCAConfig(sink, encoder); err != nil { if err := s.persistConnectCAConfig(sink, encoder); err != nil {
return err return err
} }
if err := s.persistConfigEntries(sink, encoder); err != nil {
return err
}
if err := s.persistIndex(sink, encoder); err != nil { if err := s.persistIndex(sink, encoder); err != nil {
return err return err
} }
@ -360,6 +366,27 @@ func (s *snapshot) persistIntentions(sink raft.SnapshotSink,
return nil return nil
} }
func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
entries, err := s.state.ConfigEntries()
if err != nil {
return err
}
for _, entry := range entries {
if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil {
return err
}
if err := encoder.Encode(entry.GetKind()); err != nil {
return err
}
if err := encoder.Encode(entry); err != nil {
return err
}
}
return nil
}
func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error { func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error {
// Get all the indexes // Get all the indexes
iter, err := s.state.Indexes() iter, err := s.state.Indexes()
@ -565,3 +592,25 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code
} }
return restore.ACLPolicy(&req) return restore.ACLPolicy(&req)
} }
func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.ConfigEntry
var kind string
if err := decoder.Decode(&kind); err != nil {
return err
}
switch kind {
case structs.ServiceDefaults:
req = &structs.ServiceConfigEntry{}
case structs.ProxyDefaults:
req = &structs.ProxyConfigEntry{}
default:
return fmt.Errorf("invalid config type: %s", kind)
}
if err := decoder.Decode(&req); err != nil {
return err
}
return restore.ConfigEntry(req)
}

View File

@ -202,6 +202,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
err = fsm.state.CASetConfig(17, caConfig) err = fsm.state.CASetConfig(17, caConfig)
assert.Nil(err) assert.Nil(err)
// Config entries
serviceConfig := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
}
proxyConfig := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
}
assert.Nil(fsm.state.EnsureConfigEntry(18, serviceConfig))
assert.Nil(fsm.state.EnsureConfigEntry(19, proxyConfig))
// Snapshot // Snapshot
snap, err := fsm.Snapshot() snap, err := fsm.Snapshot()
if err != nil { if err != nil {
@ -388,6 +401,15 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
assert.Nil(err) assert.Nil(err)
assert.Equal(caConfig, caConf) assert.Equal(caConfig, caConf)
// Verify config entries are restored
_, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo")
assert.Nil(err)
assert.Equal(serviceConfig, serviceConfEntry)
_, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global")
assert.Nil(err)
assert.Equal(proxyConfig, proxyConfEntry)
// Snapshot // Snapshot
snap, err = fsm2.Snapshot() snap, err = fsm2.Snapshot()
if err != nil { if err != nil {

View File

@ -57,7 +57,7 @@ func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) {
return ret, nil return ret, nil
} }
// Configuration is used when restoring from a snapshot. // ConfigEntry is used when restoring from a snapshot.
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
// Insert // Insert
if err := s.tx.Insert(configTableName, c); err != nil { if err := s.tx.Insert(configTableName, c); err != nil {
@ -70,7 +70,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
return nil return nil
} }
// Configuration is called to get a given config entry. // ConfigEntry is called to get a given config entry.
func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
@ -95,6 +95,27 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err
return idx, conf, nil return idx, conf, nil
} }
// ConfigEntries is called to get all config entry objects.
func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Get the index
idx := maxIndexTxn(tx, configTableName)
// Get all
iter, err := tx.Get(configTableName, "id")
if err != nil {
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
}
var results []structs.ConfigEntry
for v := iter.Next(); v != nil; v = iter.Next() {
results = append(results, v.(structs.ConfigEntry))
}
return idx, results, nil
}
// EnsureConfigEntry is called to upsert creation of a given config entry. // EnsureConfigEntry is called to upsert creation of a given config entry.
func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)

View File

@ -29,10 +29,26 @@ type ServiceConfigEntry struct {
RaftIndex RaftIndex
} }
func (s *ServiceConfigEntry) GetKind() string { func (e *ServiceConfigEntry) GetKind() string {
return ServiceDefaults return ServiceDefaults
} }
func (e *ServiceConfigEntry) GetName() string {
return e.Name
}
func (e *ServiceConfigEntry) Normalize() error {
return nil
}
func (e *ServiceConfigEntry) Validate() error {
return nil
}
func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex {
return &e.RaftIndex
}
type ConnectConfiguration struct { type ConnectConfiguration struct {
SidecarProxy bool SidecarProxy bool
} }
@ -75,24 +91,24 @@ type ProxyConfigEntry struct {
RaftIndex RaftIndex
} }
func (p *ProxyConfigEntry) GetKind() string { func (e *ProxyConfigEntry) GetKind() string {
return ProxyDefaults return ProxyDefaults
} }
func (p *ProxyConfigEntry) GetName() string { func (e *ProxyConfigEntry) GetName() string {
return p.Name return e.Name
} }
func (p *ProxyConfigEntry) Normalize() error { func (e *ProxyConfigEntry) Normalize() error {
return nil return nil
} }
func (p *ProxyConfigEntry) Validate() error { func (e *ProxyConfigEntry) Validate() error {
return nil return nil
} }
func (p *ProxyConfigEntry) GetRaftIndex() *RaftIndex { func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex {
return &p.RaftIndex return &e.RaftIndex
} }
type ConfigEntryOp string type ConfigEntryOp string

View File

@ -33,29 +33,31 @@ type RaftIndex struct {
// These are serialized between Consul servers and stored in Consul snapshots, // These are serialized between Consul servers and stored in Consul snapshots,
// so entries must only ever be added. // so entries must only ever be added.
const ( const (
RegisterRequestType MessageType = 0 RegisterRequestType MessageType = 0
DeregisterRequestType = 1 DeregisterRequestType = 1
KVSRequestType = 2 KVSRequestType = 2
SessionRequestType = 3 SessionRequestType = 3
ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat)
TombstoneRequestType = 5 TombstoneRequestType = 5
CoordinateBatchUpdateType = 6 CoordinateBatchUpdateType = 6
PreparedQueryRequestType = 7 PreparedQueryRequestType = 7
TxnRequestType = 8 TxnRequestType = 8
AutopilotRequestType = 9 AutopilotRequestType = 9
AreaRequestType = 10 AreaRequestType = 10
ACLBootstrapRequestType = 11 ACLBootstrapRequestType = 11
IntentionRequestType = 12 IntentionRequestType = 12
ConnectCARequestType = 13 ConnectCARequestType = 13
ConnectCAProviderStateType = 14 ConnectCAProviderStateType = 14
ConnectCAConfigType = 15 // FSM snapshots only. ConnectCAConfigType = 15 // FSM snapshots only.
IndexRequestType = 16 // FSM snapshots only. IndexRequestType = 16 // FSM snapshots only.
ACLTokenSetRequestType = 17 ACLTokenSetRequestType = 17
ACLTokenDeleteRequestType = 18 ACLTokenDeleteRequestType = 18
ACLPolicySetRequestType = 19 ACLPolicySetRequestType = 19
ACLPolicyDeleteRequestType = 20 ACLPolicyDeleteRequestType = 20
ConnectCALeafRequestType = 21 ConnectCALeafRequestType = 21
ConfigEntryRequestType = 22 ConfigEntryRequestType = 22 // FSM snapshots only.
ServiceConfigEntryRequestType = 23
ProxyConfigEntryRequestType = 24
) )
const ( const (