2020-03-19 13:11:20 +00:00
|
|
|
package state
|
|
|
|
|
|
|
|
import (
|
2020-07-06 22:44:51 +00:00
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
2020-09-15 19:04:33 +00:00
|
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
2020-03-19 13:11:20 +00:00
|
|
|
"github.com/hashicorp/go-memdb"
|
|
|
|
)
|
|
|
|
|
2020-07-06 22:44:51 +00:00
|
|
|
// ReadTxn is implemented by memdb.Txn to perform read operations.
|
|
|
|
type ReadTxn interface {
|
|
|
|
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
|
2020-08-11 20:31:23 +00:00
|
|
|
First(table, index string, args ...interface{}) (interface{}, error)
|
|
|
|
FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error)
|
2020-07-06 22:44:51 +00:00
|
|
|
Abort()
|
|
|
|
}
|
|
|
|
|
2020-09-02 15:24:19 +00:00
|
|
|
// WriteTxn is implemented by memdb.Txn to perform write operations.
|
|
|
|
type WriteTxn interface {
|
|
|
|
ReadTxn
|
|
|
|
Insert(table string, obj interface{}) error
|
|
|
|
Commit() error
|
|
|
|
}
|
|
|
|
|
2020-07-06 22:44:51 +00:00
|
|
|
// Changes wraps a memdb.Changes to include the index at which these changes
|
|
|
|
// were made.
|
|
|
|
type Changes struct {
|
|
|
|
// Index is the latest index at the time these changes were committed.
|
|
|
|
Index uint64
|
|
|
|
Changes memdb.Changes
|
|
|
|
}
|
|
|
|
|
2020-06-03 16:59:10 +00:00
|
|
|
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
|
2020-09-02 15:24:22 +00:00
|
|
|
// all write transactions. When the transaction is committed the changes are:
|
|
|
|
// 1. Used to update our internal usage tracking
|
|
|
|
// 2. Sent to the eventPublisher which will create and emit change events
|
2020-06-03 16:59:10 +00:00
|
|
|
type changeTrackerDB struct {
|
2020-07-06 22:44:51 +00:00
|
|
|
db *memdb.MemDB
|
2020-09-09 20:26:11 +00:00
|
|
|
publisher *stream.EventPublisher
|
2020-07-06 22:44:51 +00:00
|
|
|
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
|
2020-06-17 22:15:45 +00:00
|
|
|
}
|
|
|
|
|
2020-06-02 20:05:08 +00:00
|
|
|
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
|
|
|
// code may use it to create a read-only transaction, but it will panic if called
|
|
|
|
// with write=true.
|
|
|
|
//
|
|
|
|
// Deprecated: use either ReadTxn, or WriteTxn.
|
2020-07-06 18:24:30 +00:00
|
|
|
func (c *changeTrackerDB) Txn(write bool) *txn {
|
2020-03-19 13:11:20 +00:00
|
|
|
if write {
|
|
|
|
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
|
|
|
|
}
|
2020-07-06 18:24:30 +00:00
|
|
|
return c.ReadTxn()
|
2020-06-02 20:05:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ReadTxn returns a read-only transaction which behaves exactly the same as
|
|
|
|
// memdb.Txn
|
2020-07-06 18:24:30 +00:00
|
|
|
//
|
|
|
|
// TODO: this could return a regular memdb.Txn if all the state functions accepted
|
2020-07-08 18:45:18 +00:00
|
|
|
// the ReadTxn interface
|
2020-07-06 18:24:30 +00:00
|
|
|
func (c *changeTrackerDB) ReadTxn() *txn {
|
|
|
|
return &txn{Txn: c.db.Txn(false)}
|
2020-03-19 13:11:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
|
2020-06-02 20:05:08 +00:00
|
|
|
// It will track changes and publish events for the changes when Commit
|
|
|
|
// is called.
|
|
|
|
//
|
|
|
|
// The idx argument must be the index of the current Raft operation. Almost
|
|
|
|
// all mutations to state should happen as part of a raft apply so the index of
|
|
|
|
// the log being applied can be passed to WriteTxn.
|
|
|
|
// The exceptional cases are transactions that are executed on an empty
|
|
|
|
// memdb.DB as part of Restore, and those executed by tests where we insert
|
2020-03-19 13:11:20 +00:00
|
|
|
// data directly into the DB. These cases may use WriteTxnRestore.
|
2020-07-06 18:24:30 +00:00
|
|
|
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
|
2020-06-03 17:21:00 +00:00
|
|
|
t := &txn{
|
2020-07-06 22:44:51 +00:00
|
|
|
Txn: c.db.Txn(true),
|
|
|
|
Index: idx,
|
|
|
|
publish: c.publish,
|
2020-03-19 13:11:20 +00:00
|
|
|
}
|
|
|
|
t.Txn.TrackChanges()
|
|
|
|
return t
|
|
|
|
}
|
|
|
|
|
2020-07-21 22:20:34 +00:00
|
|
|
func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error {
|
|
|
|
events, err := c.processChanges(tx, changes)
|
2020-07-06 22:44:51 +00:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed generating events from changes: %v", err)
|
|
|
|
}
|
2020-07-08 04:31:22 +00:00
|
|
|
c.publisher.Publish(events)
|
2020-07-06 22:44:51 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-09-02 15:24:16 +00:00
|
|
|
// WriteTxnRestore returns a wrapped RW transaction that should only be used in
|
|
|
|
// Restore where we need to replace the entire contents of the Store.
|
|
|
|
// WriteTxnRestore uses a zero index since the whole restore doesn't really
|
|
|
|
// occur at one index - the effect is to write many values that were previously
|
2020-09-02 15:24:22 +00:00
|
|
|
// written across many indexes. WriteTxnRestore also does not publish any
|
|
|
|
// change events to subscribers.
|
2020-07-06 18:24:30 +00:00
|
|
|
func (c *changeTrackerDB) WriteTxnRestore() *txn {
|
2020-09-02 15:24:16 +00:00
|
|
|
t := &txn{
|
2020-07-06 18:24:30 +00:00
|
|
|
Txn: c.db.Txn(true),
|
2020-03-19 13:11:20 +00:00
|
|
|
Index: 0,
|
|
|
|
}
|
2020-09-02 15:24:16 +00:00
|
|
|
|
|
|
|
// We enable change tracking so that usage data is correctly populated.
|
|
|
|
t.Txn.TrackChanges()
|
|
|
|
return t
|
2020-03-19 13:11:20 +00:00
|
|
|
}
|
|
|
|
|
2020-06-03 17:21:00 +00:00
|
|
|
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
|
2020-06-02 20:05:08 +00:00
|
|
|
//
|
|
|
|
// This can not be done with txn.Defer because the callback passed to Defer is
|
|
|
|
// invoked after commit completes, and because the callback can not return an
|
|
|
|
// error. Any errors from the callback would be lost, which would result in a
|
|
|
|
// missing change event, even though the state store had changed.
|
2020-06-03 17:21:00 +00:00
|
|
|
type txn struct {
|
2020-06-17 22:15:45 +00:00
|
|
|
*memdb.Txn
|
2020-06-02 20:05:08 +00:00
|
|
|
// Index in raft where the write is occurring. The value is zero for a
|
2020-07-08 18:45:18 +00:00
|
|
|
// read-only, or WriteTxnRestore transaction.
|
2020-06-02 20:05:08 +00:00
|
|
|
// Index is stored so that it may be passed along to any subscribers as part
|
|
|
|
// of a change event.
|
2020-07-06 18:24:30 +00:00
|
|
|
Index uint64
|
2020-07-21 22:20:34 +00:00
|
|
|
publish func(tx ReadTxn, changes Changes) error
|
2020-03-19 13:11:20 +00:00
|
|
|
}
|
|
|
|
|
2020-06-02 20:05:08 +00:00
|
|
|
// Commit first pushes changes to EventPublisher, then calls Commit on the
|
|
|
|
// underlying transaction.
|
2020-03-19 13:11:20 +00:00
|
|
|
//
|
2020-06-02 20:05:08 +00:00
|
|
|
// Note that this function, unlike memdb.Txn, returns an error which must be checked
|
|
|
|
// by the caller. A non-nil error indicates that a commit failed and was not
|
|
|
|
// applied.
|
2020-06-03 17:21:00 +00:00
|
|
|
func (tx *txn) Commit() error {
|
2020-09-02 15:24:16 +00:00
|
|
|
changes := Changes{
|
|
|
|
Index: tx.Index,
|
|
|
|
Changes: tx.Txn.Changes(),
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(changes.Changes) > 0 {
|
|
|
|
if err := updateUsage(tx, changes); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-06 18:24:30 +00:00
|
|
|
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
|
|
|
|
// In those cases changes should also be empty, and there will be nothing
|
|
|
|
// to publish.
|
|
|
|
if tx.publish != nil {
|
2020-07-21 22:20:34 +00:00
|
|
|
if err := tx.publish(tx.Txn, changes); err != nil {
|
2020-06-17 22:15:45 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2020-03-19 13:11:20 +00:00
|
|
|
|
|
|
|
tx.Txn.Commit()
|
|
|
|
return nil
|
|
|
|
}
|
2020-07-06 22:44:51 +00:00
|
|
|
|
2020-07-15 15:54:50 +00:00
|
|
|
var (
|
2020-09-15 19:04:33 +00:00
|
|
|
topicServiceHealth = pbsubscribe.Topic_ServiceHealth
|
|
|
|
topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect
|
2020-07-15 15:54:50 +00:00
|
|
|
)
|
|
|
|
|
2020-07-06 22:44:51 +00:00
|
|
|
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
2020-07-15 15:54:50 +00:00
|
|
|
var events []stream.Event
|
|
|
|
fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){
|
|
|
|
aclChangeUnsubscribeEvent,
|
|
|
|
ServiceHealthEventsFromChanges,
|
|
|
|
// TODO: add other table handlers here.
|
|
|
|
}
|
|
|
|
for _, fn := range fns {
|
|
|
|
e, err := fn(tx, changes)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
events = append(events, e...)
|
|
|
|
}
|
|
|
|
return events, nil
|
2020-07-06 22:44:51 +00:00
|
|
|
}
|
|
|
|
|
2020-07-15 15:54:50 +00:00
|
|
|
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
|
|
|
|
return stream.SnapshotHandlers{
|
2020-09-15 19:04:33 +00:00
|
|
|
topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth),
|
|
|
|
topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect),
|
2020-07-15 15:54:50 +00:00
|
|
|
}
|
2020-07-06 22:44:51 +00:00
|
|
|
}
|