Support multi-table transactions with MDBTable
This commit is contained in:
parent
49bc2c4075
commit
a5a8eeee93
|
@ -17,8 +17,7 @@ var (
|
|||
/*
|
||||
An MDB table is a logical representation of a table, which is a
|
||||
generic row store. It provides a simple mechanism to store rows
|
||||
using a "row id", but then accesses can be done using any number
|
||||
of named indexes
|
||||
using a row id, while maintaining any number of secondary indexes.
|
||||
*/
|
||||
type MDBTable struct {
|
||||
lastRowID uint64 // Last used rowID
|
||||
|
@ -29,6 +28,9 @@ type MDBTable struct {
|
|||
Decoder func([]byte) interface{}
|
||||
}
|
||||
|
||||
// MDBTables is used for when we have a collection of tables
|
||||
type MDBTables []*MDBTable
|
||||
|
||||
// An Index is named, and uses a series of column values to
|
||||
// map to the row-id containing the table
|
||||
type MDBIndex struct {
|
||||
|
@ -51,7 +53,9 @@ type MDBTxn struct {
|
|||
|
||||
// Abort is used to close the transaction
|
||||
func (t *MDBTxn) Abort() {
|
||||
t.tx.Abort()
|
||||
if t != nil && t.tx != nil {
|
||||
t.tx.Abort()
|
||||
}
|
||||
}
|
||||
|
||||
// Commit is used to commit a transaction
|
||||
|
@ -60,14 +64,18 @@ func (t *MDBTxn) Commit() error {
|
|||
}
|
||||
|
||||
type RowID uint64
|
||||
type IndexFunc func([]string) string
|
||||
type IndexFunc func(*MDBIndex, []string) string
|
||||
|
||||
// DefaultIndexFunc is used if no IdxFunc is provided. It joins
|
||||
// the columns using '||' which is reasonably unlikely to occur.
|
||||
// We also prefix with a byte to ensure we never have a zero length
|
||||
// key
|
||||
func DefaultIndexFunc(parts []string) string {
|
||||
return "_" + strings.Join(parts, "||")
|
||||
func DefaultIndexFunc(idx *MDBIndex, parts []string) string {
|
||||
if len(parts) == 0 {
|
||||
return "_"
|
||||
}
|
||||
prefix := "_" + strings.Join(parts, "||") + "||"
|
||||
return prefix
|
||||
}
|
||||
|
||||
// Init is used to initialize the MDBTable and ensure it's ready
|
||||
|
@ -129,7 +137,7 @@ func (t *MDBTable) createTable() error {
|
|||
|
||||
// restoreLastRowID is used to set the last rowID that we've used
|
||||
func (t *MDBTable) restoreLastRowID() error {
|
||||
tx, err := t.StartTxn(true)
|
||||
tx, err := t.StartTxn(true, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -159,23 +167,35 @@ func (t *MDBTable) nextRowID() uint64 {
|
|||
}
|
||||
|
||||
// startTxn is used to start a transaction
|
||||
func (t *MDBTable) StartTxn(readonly bool) (*MDBTxn, error) {
|
||||
func (t *MDBTable) StartTxn(readonly bool, mdbTxn *MDBTxn) (*MDBTxn, error) {
|
||||
var txFlags uint = 0
|
||||
var tx *mdb.Txn
|
||||
var err error
|
||||
|
||||
// Ensure the modes agree
|
||||
if mdbTxn != nil {
|
||||
if mdbTxn.readonly != readonly {
|
||||
return nil, fmt.Errorf("Cannot mix read/write transactions")
|
||||
}
|
||||
tx = mdbTxn.tx
|
||||
goto EXTEND
|
||||
}
|
||||
|
||||
if readonly {
|
||||
txFlags |= mdb.RDONLY
|
||||
}
|
||||
|
||||
tx, err := t.Env.BeginTxn(nil, txFlags)
|
||||
tx, err = t.Env.BeginTxn(nil, txFlags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mdbTxn := &MDBTxn{
|
||||
mdbTxn = &MDBTxn{
|
||||
readonly: readonly,
|
||||
tx: tx,
|
||||
dbis: make(map[string]mdb.DBI),
|
||||
}
|
||||
|
||||
EXTEND:
|
||||
dbi, err := tx.DBIOpen(t.Name, 0)
|
||||
if err != nil {
|
||||
tx.Abort()
|
||||
|
@ -211,6 +231,22 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) {
|
|||
|
||||
// Insert is used to insert or update an object
|
||||
func (t *MDBTable) Insert(obj interface{}) error {
|
||||
// Start a new txn
|
||||
tx, err := t.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
if err := t.InsertTxn(tx, obj); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// Insert is used to insert or update an object within
|
||||
// a given transaction
|
||||
func (t *MDBTable) InsertTxn(tx *MDBTxn, obj interface{}) error {
|
||||
var n int
|
||||
// Construct the indexes keys
|
||||
indexes, err := t.objIndexKeys(obj)
|
||||
|
@ -221,13 +257,6 @@ func (t *MDBTable) Insert(obj interface{}) error {
|
|||
// Encode the obj
|
||||
raw := t.Encoder(obj)
|
||||
|
||||
// Start a new txn
|
||||
tx, err := t.StartTxn(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Scan and check if this primary key already exists
|
||||
primaryDbi := tx.dbis[t.Indexes["id"].dbiName]
|
||||
_, err = tx.tx.Get(primaryDbi, indexes["id"])
|
||||
|
@ -235,7 +264,7 @@ func (t *MDBTable) Insert(obj interface{}) error {
|
|||
goto AFTER_DELETE
|
||||
}
|
||||
|
||||
// Delete the existing row{
|
||||
// Delete the existing row
|
||||
n, err = t.deleteWithIndex(tx, t.Indexes["id"], indexes["id"])
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -260,26 +289,30 @@ AFTER_DELETE:
|
|||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get is used to lookup one or more rows. An index an appropriate
|
||||
// fields are specified. The fields can be a prefix of the index.
|
||||
func (t *MDBTable) Get(index string, parts ...string) ([]interface{}, error) {
|
||||
// Start a readonly txn
|
||||
tx, err := t.StartTxn(true, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Abort()
|
||||
return t.GetTxn(tx, index, parts...)
|
||||
}
|
||||
|
||||
// GetTxn is like Get but it operates within a specific transaction.
|
||||
// This can be used for read that span multiple tables
|
||||
func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interface{}, error) {
|
||||
// Get the associated index
|
||||
idx, key, err := t.getIndex(index, parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start a readonly txn
|
||||
tx, err := t.StartTxn(true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Accumulate the results
|
||||
var results []interface{}
|
||||
err = idx.iterate(tx, key, func(encRowId, res []byte) bool {
|
||||
|
@ -306,7 +339,7 @@ func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, er
|
|||
}
|
||||
|
||||
// Construct the key
|
||||
key := []byte(idx.IdxFunc(parts))
|
||||
key := idx.keyFromParts(parts...)
|
||||
return idx, key, nil
|
||||
}
|
||||
|
||||
|
@ -314,27 +347,31 @@ func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, er
|
|||
// fields are specified. The fields can be a prefix of the index.
|
||||
// Returns the rows deleted or an error.
|
||||
func (t *MDBTable) Delete(index string, parts ...string) (num int, err error) {
|
||||
// Start a write txn
|
||||
tx, err := t.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
num, err = t.DeleteTxn(tx, index, parts...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return num, tx.Commit()
|
||||
}
|
||||
|
||||
// DeleteTxn is like Delete, but occurs in a specific transaction
|
||||
// that can span multiple tables.
|
||||
func (t *MDBTable) DeleteTxn(tx *MDBTxn, index string, parts ...string) (int, error) {
|
||||
// Get the associated index
|
||||
idx, key, err := t.getIndex(index, parts)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Start a write txn
|
||||
tx, err := t.StartTxn(false)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Delete with the index
|
||||
num, err = t.deleteWithIndex(tx, idx, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Attempt a commit
|
||||
return num, tx.Commit()
|
||||
return t.deleteWithIndex(tx, idx, key)
|
||||
}
|
||||
|
||||
// deleteWithIndex deletes all associated rows while scanning
|
||||
|
@ -451,8 +488,13 @@ func (i *MDBIndex) keyFromObject(obj interface{}) ([]byte, error) {
|
|||
}
|
||||
parts = append(parts, val)
|
||||
}
|
||||
key := i.IdxFunc(parts)
|
||||
return []byte(key), nil
|
||||
key := i.keyFromParts(parts...)
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// keyFromParts returns the key from component parts
|
||||
func (i *MDBIndex) keyFromParts(parts ...string) []byte {
|
||||
return []byte(i.IdxFunc(i, parts))
|
||||
}
|
||||
|
||||
// iterate is used to iterate over keys matching the prefix,
|
||||
|
@ -512,3 +554,17 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartTxn is used to create a transaction that spans a list of tables
|
||||
func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) {
|
||||
var tx *MDBTxn
|
||||
for _, table := range t {
|
||||
newTx, err := table.StartTxn(readonly, tx)
|
||||
if err != nil {
|
||||
tx.Abort()
|
||||
return nil, err
|
||||
}
|
||||
tx = newTx
|
||||
}
|
||||
return tx, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue