server: create new memdb table for storing system metadata (#8703)
This adds a new very tiny memdb table and corresponding raft operation for updating a very small effective map[string]string collection of "system metadata". This can persistently record a fact about the Consul state machine itself. The first use of this feature will come in a later PR.
This commit is contained in:
parent
c06954bebd
commit
a77b518542
|
@ -0,0 +1,4 @@
|
|||
```release-note:feature
|
||||
server: create new memdb table for storing system metadata
|
||||
```
|
||||
|
|
@ -37,6 +37,7 @@ func init() {
|
|||
registerCommand(structs.ACLAuthMethodSetRequestType, (*FSM).applyACLAuthMethodSetOperation)
|
||||
registerCommand(structs.ACLAuthMethodDeleteRequestType, (*FSM).applyACLAuthMethodDeleteOperation)
|
||||
registerCommand(structs.FederationStateRequestType, (*FSM).applyFederationStateOperation)
|
||||
registerCommand(structs.SystemMetadataRequestType, (*FSM).applySystemMetadataOperation)
|
||||
}
|
||||
|
||||
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
||||
|
@ -568,3 +569,26 @@ func (c *FSM) applyFederationStateOperation(buf []byte, index uint64) interface{
|
|||
return fmt.Errorf("invalid federation state operation type: %v", req.Op)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *FSM) applySystemMetadataOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.SystemMetadataRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
switch req.Op {
|
||||
case structs.SystemMetadataUpsert:
|
||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: "upsert"}})
|
||||
if err := c.state.SystemMetadataSet(index, req.Entry); err != nil {
|
||||
return err
|
||||
}
|
||||
return true
|
||||
case structs.SystemMetadataDelete:
|
||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: "delete"}})
|
||||
return c.state.SystemMetadataDelete(index, req.Entry)
|
||||
default:
|
||||
return fmt.Errorf("invalid system metadata operation type: %v", req.Op)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ func init() {
|
|||
registerRestorer(structs.ACLBindingRuleSetRequestType, restoreBindingRule)
|
||||
registerRestorer(structs.ACLAuthMethodSetRequestType, restoreAuthMethod)
|
||||
registerRestorer(structs.FederationStateRequestType, restoreFederationState)
|
||||
registerRestorer(structs.SystemMetadataRequestType, restoreSystemMetadata)
|
||||
}
|
||||
|
||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
|
@ -74,6 +75,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
|
|||
if err := s.persistFederationStates(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistSystemMetadata(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistIndex(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -464,6 +468,23 @@ func (s *snapshot) persistFederationStates(sink raft.SnapshotSink, encoder *code
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistSystemMetadata(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
entries, err := s.state.SystemMetadataEntries()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
if _, err := sink.Write([]byte{byte(structs.SystemMetadataRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(entry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
// Get all the indexes
|
||||
iter, err := s.state.Indexes()
|
||||
|
@ -712,3 +733,11 @@ func restoreFederationState(header *snapshotHeader, restore *state.Restore, deco
|
|||
}
|
||||
return restore.FederationState(req.State)
|
||||
}
|
||||
|
||||
func restoreSystemMetadata(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.SystemMetadataEntry
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
return restore.SystemMetadataEntry(&req)
|
||||
}
|
||||
|
|
|
@ -399,6 +399,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
ServiceID: "web",
|
||||
}))
|
||||
|
||||
// system metadata
|
||||
systemMetadataEntry := &structs.SystemMetadataEntry{
|
||||
Key: "key1", Value: "val1",
|
||||
}
|
||||
require.NoError(t, fsm.state.SystemMetadataSet(25, systemMetadataEntry))
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
@ -660,6 +666,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.Equal(t, len(nodes), nodeCount)
|
||||
require.NotZero(t, idx)
|
||||
|
||||
// Verify system metadata is restored.
|
||||
_, systemMetadataLoaded, err := fsm2.state.SystemMetadataList(nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, systemMetadataLoaded, 1)
|
||||
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[0])
|
||||
|
||||
// Snapshot
|
||||
snap, err = fsm2.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
const systemMetadataTableName = "system-metadata"
|
||||
|
||||
func systemMetadataTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: systemMetadataTableName,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": {
|
||||
Name: "id",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Key",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
func init() {
|
||||
registerSchema(systemMetadataTableSchema)
|
||||
}
|
||||
|
||||
// SystemMetadataEntries used to pull all the system metadata entries for the snapshot.
|
||||
func (s *Snapshot) SystemMetadataEntries() ([]*structs.SystemMetadataEntry, error) {
|
||||
entries, err := s.tx.Get(systemMetadataTableName, "id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ret []*structs.SystemMetadataEntry
|
||||
for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() {
|
||||
ret = append(ret, wrapped.(*structs.SystemMetadataEntry))
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// SystemMetadataEntry is used when restoring from a snapshot.
|
||||
func (s *Restore) SystemMetadataEntry(entry *structs.SystemMetadataEntry) error {
|
||||
// Insert
|
||||
if err := s.tx.Insert(systemMetadataTableName, entry); err != nil {
|
||||
return fmt.Errorf("failed restoring system metadata object: %s", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, systemMetadataTableName); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SystemMetadataSet is called to do an upsert of a set of system metadata entries.
|
||||
func (s *Store) SystemMetadataSet(idx uint64, entry *structs.SystemMetadataEntry) error {
|
||||
tx := s.db.WriteTxn(idx)
|
||||
defer tx.Abort()
|
||||
|
||||
if err := systemMetadataSetTxn(tx, idx, entry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// systemMetadataSetTxn upserts a system metadata inside of a transaction.
|
||||
func systemMetadataSetTxn(tx *txn, idx uint64, entry *structs.SystemMetadataEntry) error {
|
||||
// The only validation we care about is non-empty keys.
|
||||
if entry.Key == "" {
|
||||
return fmt.Errorf("missing key on system metadata")
|
||||
}
|
||||
|
||||
// Check for existing.
|
||||
var existing *structs.SystemMetadataEntry
|
||||
existingRaw, err := tx.First(systemMetadataTableName, "id", entry.Key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed system metadata lookup: %s", err)
|
||||
}
|
||||
|
||||
if existingRaw != nil {
|
||||
existing = existingRaw.(*structs.SystemMetadataEntry)
|
||||
}
|
||||
|
||||
// Set the indexes
|
||||
if existing != nil {
|
||||
entry.CreateIndex = existing.CreateIndex
|
||||
entry.ModifyIndex = idx
|
||||
} else {
|
||||
entry.CreateIndex = idx
|
||||
entry.ModifyIndex = idx
|
||||
}
|
||||
|
||||
// Insert the system metadata and update the index
|
||||
if err := tx.Insert(systemMetadataTableName, entry); err != nil {
|
||||
return fmt.Errorf("failed inserting system metadata: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SystemMetadataGet is called to get a system metadata.
|
||||
func (s *Store) SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
return systemMetadataGetTxn(tx, ws, key)
|
||||
}
|
||||
|
||||
func systemMetadataGetTxn(tx ReadTxn, ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) {
|
||||
// Get the index
|
||||
idx := maxIndexTxn(tx, systemMetadataTableName)
|
||||
|
||||
// Get the existing contents.
|
||||
watchCh, existing, err := tx.FirstWatch(systemMetadataTableName, "id", key)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err)
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
|
||||
if existing == nil {
|
||||
return idx, nil, nil
|
||||
}
|
||||
|
||||
entry, ok := existing.(*structs.SystemMetadataEntry)
|
||||
if !ok {
|
||||
return 0, nil, fmt.Errorf("system metadata %q is an invalid type: %T", key, existing)
|
||||
}
|
||||
|
||||
return idx, entry, nil
|
||||
}
|
||||
|
||||
// SystemMetadataList is called to get all system metadata objects.
|
||||
func (s *Store) SystemMetadataList(ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
return systemMetadataListTxn(tx, ws)
|
||||
}
|
||||
|
||||
func systemMetadataListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) {
|
||||
// Get the index
|
||||
idx := maxIndexTxn(tx, systemMetadataTableName)
|
||||
|
||||
iter, err := tx.Get(systemMetadataTableName, "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err)
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
var results []*structs.SystemMetadataEntry
|
||||
for v := iter.Next(); v != nil; v = iter.Next() {
|
||||
results = append(results, v.(*structs.SystemMetadataEntry))
|
||||
}
|
||||
return idx, results, nil
|
||||
}
|
||||
|
||||
func (s *Store) SystemMetadataDelete(idx uint64, entry *structs.SystemMetadataEntry) error {
|
||||
tx := s.db.WriteTxn(idx)
|
||||
defer tx.Abort()
|
||||
|
||||
if err := systemMetadataDeleteTxn(tx, idx, entry.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func systemMetadataDeleteTxn(tx *txn, idx uint64, key string) error {
|
||||
// Try to retrieve the existing system metadata.
|
||||
existing, err := tx.First(systemMetadataTableName, "id", key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed system metadata lookup: %s", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the system metadata from the DB and update the index.
|
||||
if err := tx.Delete(systemMetadataTableName, existing); err != nil {
|
||||
return fmt.Errorf("failed removing system metadata: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStore_SystemMetadata(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
mapify := func(entries []*structs.SystemMetadataEntry) map[string]string {
|
||||
m := make(map[string]string)
|
||||
for _, entry := range entries {
|
||||
m[entry.Key] = entry.Value
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
checkListAndGet := func(t *testing.T, expect map[string]string) {
|
||||
// List all
|
||||
_, entries, err := s.SystemMetadataList(nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, len(expect))
|
||||
require.Equal(t, expect, mapify(entries))
|
||||
|
||||
// Read each
|
||||
for expectKey, expectVal := range expect {
|
||||
_, entry, err := s.SystemMetadataGet(nil, expectKey)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, entry)
|
||||
require.Equal(t, expectVal, entry.Value)
|
||||
}
|
||||
}
|
||||
|
||||
checkListAndGet(t, map[string]string{})
|
||||
|
||||
var nextIndex uint64
|
||||
|
||||
// Create 3 keys
|
||||
nextIndex++
|
||||
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
|
||||
Key: "key1", Value: "val1",
|
||||
}))
|
||||
nextIndex++
|
||||
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
|
||||
Key: "key2", Value: "val2",
|
||||
}))
|
||||
nextIndex++
|
||||
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
|
||||
Key: "key3",
|
||||
}))
|
||||
|
||||
checkListAndGet(t, map[string]string{
|
||||
"key1": "val1",
|
||||
"key2": "val2",
|
||||
"key3": "",
|
||||
})
|
||||
|
||||
// Missing results are nil
|
||||
_, entry, err := s.SystemMetadataGet(nil, "key4")
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, entry)
|
||||
|
||||
// Delete one that exists and one that does not
|
||||
nextIndex++
|
||||
require.NoError(t, s.SystemMetadataDelete(nextIndex, &structs.SystemMetadataEntry{
|
||||
Key: "key2",
|
||||
}))
|
||||
nextIndex++
|
||||
require.NoError(t, s.SystemMetadataDelete(nextIndex, &structs.SystemMetadataEntry{
|
||||
Key: "key4",
|
||||
}))
|
||||
|
||||
checkListAndGet(t, map[string]string{
|
||||
"key1": "val1",
|
||||
"key3": "",
|
||||
})
|
||||
|
||||
// Update one that exists and add another one.
|
||||
nextIndex++
|
||||
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
|
||||
Key: "key3", Value: "val3",
|
||||
}))
|
||||
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
|
||||
Key: "key4", Value: "val4",
|
||||
}))
|
||||
|
||||
checkListAndGet(t, map[string]string{
|
||||
"key1": "val1",
|
||||
"key3": "val3",
|
||||
"key4": "val4",
|
||||
})
|
||||
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLeader_SystemMetadata_CRUD(t *testing.T) {
|
||||
// This test is a little strange because it is testing behavior that
|
||||
// doesn't have an exposed RPC. We're just testing the full round trip of
|
||||
// raft+fsm For now,
|
||||
|
||||
dir1, srv := testServerWithConfig(t, nil)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer srv.Shutdown()
|
||||
codec := rpcClient(t, srv)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, srv.RPC, "dc1")
|
||||
|
||||
state := srv.fsm.State()
|
||||
|
||||
// Initially empty
|
||||
_, entries, err := state.SystemMetadataList(nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 0)
|
||||
|
||||
// Create 3
|
||||
require.NoError(t, setSystemMetadataKey(srv, "key1", "val1"))
|
||||
require.NoError(t, setSystemMetadataKey(srv, "key2", "val2"))
|
||||
require.NoError(t, setSystemMetadataKey(srv, "key3", ""))
|
||||
|
||||
mapify := func(entries []*structs.SystemMetadataEntry) map[string]string {
|
||||
m := make(map[string]string)
|
||||
for _, entry := range entries {
|
||||
m[entry.Key] = entry.Value
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
_, entries, err = state.SystemMetadataList(nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 3)
|
||||
|
||||
require.Equal(t, map[string]string{
|
||||
"key1": "val1",
|
||||
"key2": "val2",
|
||||
"key3": "",
|
||||
}, mapify(entries))
|
||||
|
||||
// Update one and delete one.
|
||||
require.NoError(t, setSystemMetadataKey(srv, "key3", "val3"))
|
||||
require.NoError(t, deleteSystemMetadataKey(srv, "key1"))
|
||||
|
||||
_, entries, err = state.SystemMetadataList(nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 2)
|
||||
|
||||
require.Equal(t, map[string]string{
|
||||
"key2": "val2",
|
||||
"key3": "val3",
|
||||
}, mapify(entries))
|
||||
}
|
||||
|
||||
// Note when this behavior is actually used, consider promoting these 2
|
||||
// functions out of test code.
|
||||
|
||||
func setSystemMetadataKey(s *Server, key, val string) error {
|
||||
args := &structs.SystemMetadataRequest{
|
||||
Op: structs.SystemMetadataUpsert,
|
||||
Entry: &structs.SystemMetadataEntry{
|
||||
Key: key, Value: val,
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteSystemMetadataKey(s *Server, key string) error {
|
||||
args := &structs.SystemMetadataRequest{
|
||||
Op: structs.SystemMetadataDelete,
|
||||
Entry: &structs.SystemMetadataEntry{
|
||||
Key: key,
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -68,6 +68,7 @@ const (
|
|||
ACLAuthMethodDeleteRequestType = 28
|
||||
ChunkingStateType = 29
|
||||
FederationStateRequestType = 30
|
||||
SystemMetadataRequestType = 31
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
package structs
|
||||
|
||||
// SystemMetadataOp is the operation for a request related to system metadata.
|
||||
type SystemMetadataOp string
|
||||
|
||||
const (
|
||||
SystemMetadataUpsert SystemMetadataOp = "upsert"
|
||||
SystemMetadataDelete SystemMetadataOp = "delete"
|
||||
)
|
||||
|
||||
// SystemMetadataRequest is used to upsert and delete system metadata.
|
||||
type SystemMetadataRequest struct {
|
||||
// Datacenter is the target for this request.
|
||||
Datacenter string
|
||||
|
||||
// Op is the type of operation being requested.
|
||||
Op SystemMetadataOp
|
||||
|
||||
// Entry is the key to modify.
|
||||
Entry *SystemMetadataEntry
|
||||
|
||||
// WriteRequest is a common struct containing ACL tokens and other
|
||||
// write-related common elements for requests.
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
type SystemMetadataEntry struct {
|
||||
Key string
|
||||
Value string `json:",omitempty"`
|
||||
RaftIndex
|
||||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (c *SystemMetadataRequest) RequestDatacenter() string {
|
||||
return c.Datacenter
|
||||
}
|
Loading…
Reference in New Issue