Encode config entry FSM messages in a generic type

This commit is contained in:
Kyle Havlovitz 2019-03-27 23:56:35 -07:00
parent 96a460c0cf
commit ace5c7a1cb
6 changed files with 109 additions and 72 deletions

View File

@ -29,8 +29,7 @@ func init() {
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation)
registerCommand(structs.ServiceConfigEntryRequestType, (*FSM).applyServiceConfigEntryOperation) registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation)
registerCommand(structs.ProxyConfigEntryRequestType, (*FSM).applyProxyConfigEntryOperation)
} }
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -431,30 +430,14 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
} }
func (c *FSM) applyServiceConfigEntryOperation(buf []byte, index uint64) interface{} { func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return c.applyConfigEntryOperation(index, req)
}
func (c *FSM) applyProxyConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{ req := structs.ConfigEntryRequest{
Entry: &structs.ProxyConfigEntry{}, Entry: &structs.ProxyConfigEntry{},
} }
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
if err := c.applyConfigEntryOperation(index, req); err != nil {
panic(err)
}
return true
}
func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryRequest) error {
switch req.Op { switch req.Op {
case structs.ConfigEntryUpsert: case structs.ConfigEntryUpsert:
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),

View File

@ -1368,21 +1368,24 @@ func TestFSM_ConfigEntry(t *testing.T) {
entry := &structs.ProxyConfigEntry{ entry := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults, Kind: structs.ProxyDefaults,
Name: "global", Name: "global",
ProxyConfig: structs.ConnectProxyConfig{ Config: map[string]interface{}{
DestinationServiceName: "foo", "DestinationServiceName": "foo",
}, },
} }
// Create a new request. // Create a new request.
req := structs.ConfigEntryRequest{ req := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert, Op: structs.ConfigEntryUpsert,
Entry: entry, Entry: entry,
} }
{ {
buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req) buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
require.NoError(err) require.NoError(err)
require.True(fsm.Apply(makeLog(buf)).(bool)) resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
} }
// Verify it's in the state store. // Verify it's in the state store.
@ -1391,6 +1394,14 @@ func TestFSM_ConfigEntry(t *testing.T) {
require.NoError(err) require.NoError(err)
entry.RaftIndex.CreateIndex = 1 entry.RaftIndex.CreateIndex = 1
entry.RaftIndex.ModifyIndex = 1 entry.RaftIndex.ModifyIndex = 1
proxyConf, ok := config.(*structs.ProxyConfigEntry)
require.True(ok)
// Read the map[string]interface{} back out.
value, _ := proxyConf.Config["DestinationServiceName"].([]uint8)
proxyConf.Config["DestinationServiceName"] = structs.Uint8ToString(value)
require.Equal(entry, config) require.Equal(entry, config)
} }
} }

View File

@ -1,8 +1,6 @@
package fsm package fsm
import ( import (
"fmt"
"github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -377,10 +375,10 @@ func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink,
if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil { if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil {
return err return err
} }
if err := encoder.Encode(entry.GetKind()); err != nil { req := &structs.ConfigEntryRequest{
return err Entry: entry,
} }
if err := encoder.Encode(entry); err != nil { if err := encoder.Encode(req); err != nil {
return err return err
} }
} }
@ -594,23 +592,9 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code
} }
func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.ConfigEntry var req structs.ConfigEntryRequest
var kind string
if err := decoder.Decode(&kind); err != nil {
return err
}
switch kind {
case structs.ServiceDefaults:
req = &structs.ServiceConfigEntry{}
case structs.ProxyDefaults:
req = &structs.ProxyConfigEntry{}
default:
return fmt.Errorf("invalid config type: %s", kind)
}
if err := decoder.Decode(&req); err != nil { if err := decoder.Decode(&req); err != nil {
return err return err
} }
return restore.ConfigEntry(req) return restore.ConfigEntry(req.Entry)
} }

View File

@ -317,8 +317,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
// Verify ACL Token is restored // Verify ACL Token is restored
_, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID) _, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID)
require.NoError(err) require.NoError(err)
require.Equal(t, token.AccessorID, a.AccessorID) require.Equal(token.AccessorID, a.AccessorID)
require.Equal(t, token.ModifyIndex, a.ModifyIndex) require.Equal(token.ModifyIndex, a.ModifyIndex)
// Verify the acl-token-bootstrap index was restored // Verify the acl-token-bootstrap index was restored
canBootstrap, index, err := fsm2.state.CanBootstrapACLToken() canBootstrap, index, err := fsm2.state.CanBootstrapACLToken()

View File

@ -3,6 +3,8 @@ package structs
import ( import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/go-msgpack/codec"
) )
const ( const (
@ -163,3 +165,62 @@ type ConfigEntryRequest struct {
Op ConfigEntryOp Op ConfigEntryOp
Entry ConfigEntry Entry ConfigEntry
} }
func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
// bs will grow if needed but allocate enough to avoid reallocation in common
// case.
bs := make([]byte, 128)
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
// Encode kind first
err = enc.Encode(r.Entry.GetKind())
if err != nil {
return nil, err
}
// Then actual value using alias trick to avoid infinite recursion
type Alias ConfigEntryRequest
err = enc.Encode(struct {
*Alias
}{
Alias: (*Alias)(r),
})
if err != nil {
return nil, err
}
return bs, nil
}
func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
// First decode the kind prefix
var kind string
dec := codec.NewDecoderBytes(data, msgpackHandle)
if err := dec.Decode(&kind); err != nil {
return err
}
// Then decode the real thing with appropriate kind of ConfigEntry
r.Entry = makeConfigEntry(kind)
// Alias juggling to prevent infinite recursive calls back to this decode
// method.
type Alias ConfigEntryRequest
as := struct {
*Alias
}{
Alias: (*Alias)(r),
}
if err := dec.Decode(&as); err != nil {
return err
}
return nil
}
func makeConfigEntry(kind string) ConfigEntry {
switch kind {
case ServiceDefaults:
return &ServiceConfigEntry{}
case ProxyDefaults:
return &ProxyConfigEntry{}
default:
panic("invalid kind")
}
}

View File

@ -33,31 +33,29 @@ type RaftIndex struct {
// These are serialized between Consul servers and stored in Consul snapshots, // These are serialized between Consul servers and stored in Consul snapshots,
// so entries must only ever be added. // so entries must only ever be added.
const ( const (
RegisterRequestType MessageType = 0 RegisterRequestType MessageType = 0
DeregisterRequestType = 1 DeregisterRequestType = 1
KVSRequestType = 2 KVSRequestType = 2
SessionRequestType = 3 SessionRequestType = 3
ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat) ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat)
TombstoneRequestType = 5 TombstoneRequestType = 5
CoordinateBatchUpdateType = 6 CoordinateBatchUpdateType = 6
PreparedQueryRequestType = 7 PreparedQueryRequestType = 7
TxnRequestType = 8 TxnRequestType = 8
AutopilotRequestType = 9 AutopilotRequestType = 9
AreaRequestType = 10 AreaRequestType = 10
ACLBootstrapRequestType = 11 ACLBootstrapRequestType = 11
IntentionRequestType = 12 IntentionRequestType = 12
ConnectCARequestType = 13 ConnectCARequestType = 13
ConnectCAProviderStateType = 14 ConnectCAProviderStateType = 14
ConnectCAConfigType = 15 // FSM snapshots only. ConnectCAConfigType = 15 // FSM snapshots only.
IndexRequestType = 16 // FSM snapshots only. IndexRequestType = 16 // FSM snapshots only.
ACLTokenSetRequestType = 17 ACLTokenSetRequestType = 17
ACLTokenDeleteRequestType = 18 ACLTokenDeleteRequestType = 18
ACLPolicySetRequestType = 19 ACLPolicySetRequestType = 19
ACLPolicyDeleteRequestType = 20 ACLPolicyDeleteRequestType = 20
ConnectCALeafRequestType = 21 ConnectCALeafRequestType = 21
ConfigEntryRequestType = 22 // FSM snapshots only. ConfigEntryRequestType = 22 // FSM snapshots only.
ServiceConfigEntryRequestType = 23
ProxyConfigEntryRequestType = 24
) )
const ( const (