open-consul/agent/consul/state/memdb.go

215 lines
6.9 KiB
Go
Raw Normal View History

package state
import (
"fmt"
2020-10-26 15:42:27 +00:00
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
First(table, index string, args ...interface{}) (interface{}, error)
FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error)
}
// AbortTxn is a ReadTxn that can also be aborted to end the transaction.
type AbortTxn interface {
ReadTxn
Abort()
}
// ReadDB is a DB that provides read-only transactions.
type ReadDB interface {
ReadTxn() AbortTxn
}
// WriteTxn is implemented by memdb.Txn to perform write operations.
type WriteTxn interface {
ReadTxn
Defer(func())
Delete(table string, obj interface{}) error
DeleteAll(table, index string, args ...interface{}) (int, error)
DeletePrefix(table string, index string, prefix string) (bool, error)
Insert(table string, obj interface{}) error
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}
2020-06-03 16:59:10 +00:00
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are:
// 1. Used to update our internal usage tracking
// 2. Sent to the eventPublisher which will create and emit change events
2020-06-03 16:59:10 +00:00
type changeTrackerDB struct {
db *memdb.MemDB
publisher EventPublisher
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
}
type EventPublisher interface {
Publish([]stream.Event)
RegisterHandler(stream.Topic, stream.SnapshotFunc, bool) error
Subscribe(*stream.SubscribeRequest) (*stream.Subscription, error)
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
// code may use it to create a read-only transaction, but it will panic if called
// with write=true.
//
// Deprecated: use either ReadTxn, or WriteTxn.
func (c *changeTrackerDB) Txn(write bool) *memdb.Txn {
if write {
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
}
return c.ReadTxn()
}
// ReadTxn returns a read-only transaction.
func (c *changeTrackerDB) ReadTxn() *memdb.Txn {
return c.db.Txn(false)
}
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
// It will track changes and publish events for the changes when Commit
// is called.
//
// The idx argument must be the index of the current Raft operation. Almost
// all mutations to state should happen as part of a raft apply so the index of
// the log being applied can be passed to WriteTxn.
// The exceptional cases are transactions that are executed on an empty
// memdb.DB as part of Restore, and those executed by tests where we insert
// data directly into the DB. These cases may use WriteTxnRestore.
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
2020-06-03 17:21:00 +00:00
t := &txn{
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
}
t.Txn.TrackChanges()
return t
}
func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error {
events, err := c.processChanges(tx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that should only be used in
// Restore where we need to replace the entire contents of the Store.
// WriteTxnRestore uses a zero index since the whole restore doesn't really
// occur at one index - the effect is to write many values that were previously
// written across many indexes. WriteTxnRestore also does not publish any
// change events to subscribers.
func (c *changeTrackerDB) WriteTxnRestore() *txn {
t := &txn{
Txn: c.db.Txn(true),
Index: 0,
}
// We enable change tracking so that usage data is correctly populated.
t.Txn.TrackChanges()
return t
}
2020-06-03 17:21:00 +00:00
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
//
// This can not be done with txn.Defer because the callback passed to Defer is
// invoked after commit completes, and because the callback can not return an
// error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed.
2020-06-03 17:21:00 +00:00
type txn struct {
*memdb.Txn
// Index in raft where the write is occurring. The value is zero for a
// read-only, or WriteTxnRestore transaction.
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
publish func(tx ReadTxn, changes Changes) error
}
// Commit first pushes changes to EventPublisher, then calls Commit on the
// underlying transaction.
//
// Note that this function, unlike memdb.Txn, returns an error which must be checked
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
2020-06-03 17:21:00 +00:00
func (tx *txn) Commit() error {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
if len(changes.Changes) > 0 {
if err := updateUsage(tx, changes); err != nil {
return err
}
}
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
if err := tx.publish(tx.Txn, changes); err != nil {
return err
}
}
tx.Txn.Commit()
return nil
}
type readDB memdb.MemDB
func (db *readDB) ReadTxn() AbortTxn {
return (*memdb.MemDB)(db).Txn(false)
}
var (
EventTopicServiceHealth = pbsubscribe.Topic_ServiceHealth
EventTopicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect
EventTopicMeshConfig = pbsubscribe.Topic_MeshConfig
EventTopicServiceResolver = pbsubscribe.Topic_ServiceResolver
EventTopicIngressGateway = pbsubscribe.Topic_IngressGateway
EventTopicServiceIntentions = pbsubscribe.Topic_ServiceIntentions
EventTopicServiceDefaults = pbsubscribe.Topic_ServiceDefaults
EventTopicServiceList = pbsubscribe.Topic_ServiceList
Native API Gateway Config Entries (#15897) * Stub Config Entries for Consul Native API Gateway (#15644) * Add empty InlineCertificate struct and protobuf * apigateway stubs * Stub HTTPRoute in api pkg * Stub HTTPRoute in structs pkg * Simplify api.APIGatewayConfigEntry to be consistent w/ other entries * Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry * Add TCPRoute to MakeConfigEntry, return unique Kind * Stub BoundAPIGatewayConfigEntry in agent * Add RaftIndex to APIGatewayConfigEntry stub * Add new config entry kinds to validation allow-list * Add RaftIndex to other added config entry stubs * Update usage metrics assertions to include new cfg entries * Add Meta and acl.EnterpriseMeta to all new ConfigEntry types * Remove unnecessary Services field from added config entry types * Implement GetMeta(), GetEnterpriseMeta() for added config entry types * Add meta field to proto, name consistently w/ existing config entries * Format config_entry.proto * Add initial implementation of CanRead + CanWrite for new config entry types * Add unit tests for decoding of new config entry types * Add unit tests for parsing of new config entry types * Add unit tests for API Gateway config entry ACLs * Return typed PermissionDeniedError on BoundAPIGateway CanWrite * Add unit tests for added config entry ACLs * Add BoundAPIGateway type to AllConfigEntryKinds * Return proper kind from BoundAPIGateway * Add docstrings for new config entry types * Add missing config entry kinds to proto def * Update usagemetrics_oss_test.go * Use utility func for returning PermissionDeniedError * EventPublisher subscriptions for Consul Native API Gateway (#15757) * Create new event topics in subscribe proto * Add tests for PBSubscribe func * Make configs singular, add all configs to PBToStreamSubscribeRequest * Add snapshot methods * Add config_entry_events tests * Add config entry kind to topic for new configs * Add unit tests for snapshot methods * Start adding integration test * Test using the new controller code * Update agent/consul/state/config_entry_events.go * Check value of error * Add controller stubs for API Gateway (#15837) * update initial stub implementation * move files, clean up mutex references * Remove embed, use idiomatic names for constructors * Remove stray file introduced in merge * Add APIGateway validation (#15847) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * APIGateway InlineCertificate validation (#15856) * Add APIGateway validation * Add additional validations * Add protobuf definitions * Tabs to spaces * Add API structs * Move struct fields around a bit * Add validation for InlineCertificate * Fix ACL test * APIGateway BoundAPIGateway validation (#15858) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * Add validation for BoundAPIGateway * APIGateway TCPRoute validation (#15855) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Add TCPRoute normalization and validation * Add forgotten Status * Add some more field docs in api package * Fix test * Format imports * Rename snapshot test variable names * Add plumbing for Native API GW Subscriptions (#16003) Co-authored-by: Sarah Alsmiller <sarah.alsmiller@hashicorp.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> Co-authored-by: sarahalsmiller <100602640+sarahalsmiller@users.noreply.github.com> Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
2023-01-18 22:14:34 +00:00
EventTopicAPIGateway = pbsubscribe.Topic_APIGateway
EventTopicTCPRoute = pbsubscribe.Topic_TCPRoute
EventTopicHTTPRoute = pbsubscribe.Topic_HTTPRoute
EventTopicInlineCertificate = pbsubscribe.Topic_InlineCertificate
EventTopicBoundAPIGateway = pbsubscribe.Topic_BoundAPIGateway
)
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){
aclChangeUnsubscribeEvent,
caRootsChangeEvents,
ServiceHealthEventsFromChanges,
ServiceListUpdateEventsFromChanges,
ConfigEntryEventsFromChanges,
// TODO: add other table handlers here.
}
for _, fn := range fns {
e, err := fn(tx, changes)
if err != nil {
return nil, err
}
events = append(events, e...)
}
return events, nil
}