agent/consul/state: initial work on intentions memdb table
This commit is contained in:
parent
171e95d265
commit
c05bed86e1
|
@ -0,0 +1,136 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
const (
|
||||
intentionsTableName = "connect-intentions"
|
||||
)
|
||||
|
||||
// intentionsTableSchema returns a new table schema used for storing
|
||||
// intentions for Connect.
|
||||
func intentionsTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: intentionsTableName,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": &memdb.IndexSchema{
|
||||
Name: "id",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.UUIDFieldIndex{
|
||||
Field: "ID",
|
||||
},
|
||||
},
|
||||
"destination": &memdb.IndexSchema{
|
||||
Name: "destination",
|
||||
AllowMissing: true,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "DestinationNS",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "DestinationName",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"source": &memdb.IndexSchema{
|
||||
Name: "source",
|
||||
AllowMissing: true,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "SourceNS",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "SourceName",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
registerSchema(intentionsTableSchema)
|
||||
}
|
||||
|
||||
// IntentionSet creates or updates an intention.
|
||||
func (s *Store) IntentionSet(idx uint64, ixn *structs.Intention) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
if err := s.intentionSetTxn(tx, idx, ixn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// intentionSetTxn is the inner method used to insert an intention with
|
||||
// the proper indexes into the state store.
|
||||
func (s *Store) intentionSetTxn(tx *memdb.Txn, idx uint64, ixn *structs.Intention) error {
|
||||
// ID is required
|
||||
if ixn.ID == "" {
|
||||
return ErrMissingIntentionID
|
||||
}
|
||||
|
||||
// Check for an existing intention
|
||||
existing, err := tx.First(intentionsTableName, "id", ixn.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed intention looup: %s", err)
|
||||
}
|
||||
if existing != nil {
|
||||
ixn.CreateIndex = existing.(*structs.Intention).CreateIndex
|
||||
} else {
|
||||
ixn.CreateIndex = idx
|
||||
}
|
||||
ixn.ModifyIndex = idx
|
||||
|
||||
// Insert
|
||||
if err := tx.Insert(intentionsTableName, ixn); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{intentionsTableName, idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IntentionGet returns the given intention by ID.
|
||||
func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, intentionsTableName)
|
||||
|
||||
// Look up by its ID.
|
||||
watchCh, intention, err := tx.FirstWatch(intentionsTableName, "id", id)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed intention lookup: %s", err)
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
|
||||
// Convert the interface{} if it is non-nil
|
||||
var result *structs.Intention
|
||||
if intention != nil {
|
||||
result = intention.(*structs.Intention)
|
||||
}
|
||||
|
||||
return idx, result, nil
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
func TestStore_IntentionGet_none(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Querying with no results returns nil.
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, res, err := s.IntentionGet(ws, testUUID())
|
||||
if idx != 0 || res != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_IntentionSetGet_basic(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Call Get to populate the watch set
|
||||
ws := memdb.NewWatchSet()
|
||||
_, _, err := s.IntentionGet(ws, testUUID())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Build a valid intention
|
||||
ixn := &structs.Intention{
|
||||
ID: testUUID(),
|
||||
}
|
||||
|
||||
// Inserting a with empty ID is disallowed.
|
||||
if err := s.IntentionSet(1, ixn); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex(intentionsTableName); idx != 1 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back out and verify it.
|
||||
expected := &structs.Intention{
|
||||
ID: ixn.ID,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
}
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, actual, err := s.IntentionGet(ws, ixn.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != expected.CreateIndex {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %v", actual)
|
||||
}
|
||||
|
||||
// Change a value and test updating
|
||||
ixn.SourceNS = "foo"
|
||||
if err := s.IntentionSet(2, ixn); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
if idx := s.maxIndex(intentionsTableName); idx != 2 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back and verify the data was updated
|
||||
expected.SourceNS = ixn.SourceNS
|
||||
expected.ModifyIndex = 2
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, actual, err = s.IntentionGet(ws, ixn.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != expected.ModifyIndex {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %#v", actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_IntentionSet_emptyId(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
_, _, err := s.IntentionGet(ws, testUUID())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Inserting a with empty ID is disallowed.
|
||||
if err := s.IntentionSet(1, &structs.Intention{}); err == nil {
|
||||
t.Fatalf("expected %#v, got: %#v", ErrMissingIntentionID, err)
|
||||
}
|
||||
|
||||
// Index is not updated if nothing is saved.
|
||||
if idx := s.maxIndex(intentionsTableName); idx != 0 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
|
@ -28,6 +28,10 @@ var (
|
|||
// ErrMissingQueryID is returned when a Query set is called on
|
||||
// a Query with an empty ID.
|
||||
ErrMissingQueryID = errors.New("Missing Query ID")
|
||||
|
||||
// ErrMissingIntentionID is returned when an Intention set is called
|
||||
// with an Intention with an empty ID.
|
||||
ErrMissingIntentionID = errors.New("Missing Intention ID")
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
package structs
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Intention defines an intention for the Connect Service Graph. This defines
|
||||
// the allowed or denied behavior of a connection between two services using
|
||||
// Connect.
|
||||
type Intention struct {
|
||||
// ID is the UUID-based ID for the intention, always generated by Consul.
|
||||
ID string
|
||||
|
||||
// SourceNS, SourceName are the namespace and name, respectively, of
|
||||
// the source service. Either of these may be the wildcard "*", but only
|
||||
// the full value can be a wildcard. Partial wildcards are not allowed.
|
||||
// The source may also be a non-Consul service, as specified by SourceType.
|
||||
//
|
||||
// DestinationNS, DestinationName is the same, but for the destination
|
||||
// service. The same rules apply. The destination is always a Consul
|
||||
// service.
|
||||
SourceNS, SourceName string
|
||||
DestinationNS, DestinationName string
|
||||
|
||||
// SourceType is the type of the value for the source.
|
||||
SourceType IntentionSourceType
|
||||
|
||||
// Action is whether this is a whitelist or blacklist intention.
|
||||
Action IntentionAction
|
||||
|
||||
// DefaultAddr, DefaultPort of the local listening proxy (if any) to
|
||||
// make this connection.
|
||||
DefaultAddr string
|
||||
DefaultPort int
|
||||
|
||||
// Meta is arbitrary metadata associated with the intention. This is
|
||||
// opaque to Consul but is served in API responses.
|
||||
Meta map[string]string
|
||||
|
||||
// CreatedAt and UpdatedAt keep track of when this record was created
|
||||
// or modified.
|
||||
CreatedAt, UpdatedAt time.Time
|
||||
|
||||
RaftIndex
|
||||
}
|
||||
|
||||
// IntentionAction is the action that the intention represents. This
|
||||
// can be "allow" or "deny" to whitelist or blacklist intentions.
|
||||
type IntentionAction string
|
||||
|
||||
const (
|
||||
IntentionActionAllow IntentionAction = "allow"
|
||||
IntentionActionDeny IntentionAction = "deny"
|
||||
)
|
||||
|
||||
// IntentionSourceType is the type of the source within an intention.
|
||||
type IntentionSourceType string
|
||||
|
||||
const (
|
||||
// IntentionSourceConsul is a service within the Consul catalog.
|
||||
IntentionSourceConsul IntentionSourceType = "consul"
|
||||
)
|
Loading…
Reference in New Issue