2016-02-13 00:50:37 +00:00
package memdb
import (
2016-03-01 20:43:24 +00:00
"bytes"
2016-02-13 00:50:37 +00:00
"fmt"
"strings"
2016-03-01 20:43:24 +00:00
"sync/atomic"
"unsafe"
2016-02-13 00:50:37 +00:00
2019-10-16 16:10:12 +00:00
iradix "github.com/hashicorp/go-immutable-radix"
2016-02-13 00:50:37 +00:00
)
const (
id = "id"
)
2016-03-01 20:43:24 +00:00
2018-10-19 16:04:07 +00:00
var (
// ErrNotFound is returned when the requested item is not found
ErrNotFound = fmt . Errorf ( "not found" )
)
2016-02-13 00:50:37 +00:00
// tableIndex is a tuple of (Table, Index) used for lookups
type tableIndex struct {
Table string
Index string
}
// Txn is a transaction against a MemDB.
// This can be a read or write transaction.
type Txn struct {
db * MemDB
write bool
rootTxn * iradix . Txn
after [ ] func ( )
2020-06-02 18:18:19 +00:00
// changes is used to track the changes performed during the transaction. If
// it is nil at transaction start then changes are not tracked.
changes Changes
2016-02-13 00:50:37 +00:00
modified map [ tableIndex ] * iradix . Txn
}
2020-06-02 18:18:19 +00:00
// TrackChanges enables change tracking for the transaction. If called at any
// point before commit, subsequent mutations will be recorded and can be
// retrieved using ChangeSet. Once this has been called on a transaction it
// can't be unset. As with other Txn methods it's not safe to call this from a
// different goroutine than the one making mutations or committing the
// transaction.
func ( txn * Txn ) TrackChanges ( ) {
if txn . changes == nil {
txn . changes = make ( Changes , 0 , 1 )
}
}
2016-02-13 00:50:37 +00:00
// readableIndex returns a transaction usable for reading the given
// index in a table. If a write transaction is in progress, we may need
// to use an existing modified txn.
func ( txn * Txn ) readableIndex ( table , index string ) * iradix . Txn {
// Look for existing transaction
if txn . write && txn . modified != nil {
key := tableIndex { table , index }
exist , ok := txn . modified [ key ]
if ok {
return exist
}
}
// Create a read transaction
path := indexPath ( table , index )
raw , _ := txn . rootTxn . Get ( path )
indexTxn := raw . ( * iradix . Tree ) . Txn ( )
return indexTxn
}
// writableIndex returns a transaction usable for modifying the
// given index in a table.
func ( txn * Txn ) writableIndex ( table , index string ) * iradix . Txn {
if txn . modified == nil {
txn . modified = make ( map [ tableIndex ] * iradix . Txn )
}
// Look for existing transaction
key := tableIndex { table , index }
exist , ok := txn . modified [ key ]
if ok {
return exist
}
// Start a new transaction
path := indexPath ( table , index )
raw , _ := txn . rootTxn . Get ( path )
indexTxn := raw . ( * iradix . Tree ) . Txn ( )
2017-01-24 07:41:18 +00:00
// If we are the primary DB, enable mutation tracking. Snapshots should
// not notify, otherwise we will trigger watches on the primary DB when
// the writes will not be visible.
indexTxn . TrackMutate ( txn . db . primary )
2016-02-13 00:50:37 +00:00
// Keep this open for the duration of the txn
txn . modified [ key ] = indexTxn
return indexTxn
}
// Abort is used to cancel this transaction.
// This is a noop for read transactions.
func ( txn * Txn ) Abort ( ) {
// Noop for a read transaction
if ! txn . write {
return
}
// Check if already aborted or committed
if txn . rootTxn == nil {
return
}
// Clear the txn
txn . rootTxn = nil
txn . modified = nil
2020-06-02 18:18:19 +00:00
txn . changes = nil
2016-02-13 00:50:37 +00:00
// Release the writer lock since this is invalid
txn . db . writer . Unlock ( )
}
// Commit is used to finalize this transaction.
// This is a noop for read transactions.
func ( txn * Txn ) Commit ( ) {
// Noop for a read transaction
if ! txn . write {
return
}
// Check if already aborted or committed
if txn . rootTxn == nil {
return
}
// Commit each sub-transaction scoped to (table, index)
for key , subTxn := range txn . modified {
path := indexPath ( key . Table , key . Index )
2017-07-25 22:28:43 +00:00
final := subTxn . CommitOnly ( )
2016-02-13 00:50:37 +00:00
txn . rootTxn . Insert ( path , final )
}
// Update the root of the DB
2017-07-25 22:28:43 +00:00
newRoot := txn . rootTxn . CommitOnly ( )
2016-03-01 20:43:24 +00:00
atomic . StorePointer ( & txn . db . root , unsafe . Pointer ( newRoot ) )
2016-02-13 00:50:37 +00:00
2017-07-25 22:28:43 +00:00
// Now issue all of the mutation updates (this is safe to call
// even if mutation tracking isn't enabled); we do this after
// the root pointer is swapped so that waking responders will
// see the new state.
for _ , subTxn := range txn . modified {
subTxn . Notify ( )
}
txn . rootTxn . Notify ( )
2016-02-13 00:50:37 +00:00
// Clear the txn
txn . rootTxn = nil
txn . modified = nil
// Release the writer lock since this is invalid
txn . db . writer . Unlock ( )
// Run the deferred functions, if any
for i := len ( txn . after ) ; i > 0 ; i -- {
fn := txn . after [ i - 1 ]
fn ( )
}
}
// Insert is used to add or update an object into the given table
func ( txn * Txn ) Insert ( table string , obj interface { } ) error {
if ! txn . write {
return fmt . Errorf ( "cannot insert in read-only transaction" )
}
// Get the table schema
tableSchema , ok := txn . db . schema . Tables [ table ]
if ! ok {
return fmt . Errorf ( "invalid table '%s'" , table )
}
// Get the primary ID of the object
idSchema := tableSchema . Indexes [ id ]
2017-01-09 19:23:09 +00:00
idIndexer := idSchema . Indexer . ( SingleIndexer )
ok , idVal , err := idIndexer . FromObject ( obj )
2016-02-13 00:50:37 +00:00
if err != nil {
return fmt . Errorf ( "failed to build primary index: %v" , err )
}
if ! ok {
return fmt . Errorf ( "object missing primary index" )
}
// Lookup the object by ID first, to see if this is an update
idTxn := txn . writableIndex ( table , id )
existing , update := idTxn . Get ( idVal )
// On an update, there is an existing object with the given
// primary ID. We do the update by deleting the current object
// and inserting the new object.
for name , indexSchema := range tableSchema . Indexes {
indexTxn := txn . writableIndex ( table , name )
2016-03-01 20:43:24 +00:00
// Determine the new index value
2017-01-09 19:23:09 +00:00
var (
ok bool
vals [ ] [ ] byte
err error
)
switch indexer := indexSchema . Indexer . ( type ) {
case SingleIndexer :
var val [ ] byte
ok , val , err = indexer . FromObject ( obj )
vals = [ ] [ ] byte { val }
case MultiIndexer :
ok , vals , err = indexer . FromObject ( obj )
}
2016-03-01 20:43:24 +00:00
if err != nil {
return fmt . Errorf ( "failed to build index '%s': %v" , name , err )
}
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if ok && ! indexSchema . Unique {
2017-01-09 19:23:09 +00:00
for i := range vals {
vals [ i ] = append ( vals [ i ] , idVal ... )
}
2016-03-01 20:43:24 +00:00
}
2016-02-13 00:50:37 +00:00
// Handle the update by deleting from the index first
if update {
2017-01-09 19:23:09 +00:00
var (
okExist bool
valsExist [ ] [ ] byte
err error
)
switch indexer := indexSchema . Indexer . ( type ) {
case SingleIndexer :
var valExist [ ] byte
okExist , valExist , err = indexer . FromObject ( existing )
valsExist = [ ] [ ] byte { valExist }
case MultiIndexer :
okExist , valsExist , err = indexer . FromObject ( existing )
}
2016-02-13 00:50:37 +00:00
if err != nil {
return fmt . Errorf ( "failed to build index '%s': %v" , name , err )
}
2016-03-01 20:43:24 +00:00
if okExist {
2017-01-09 19:23:09 +00:00
for i , valExist := range valsExist {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if ! indexSchema . Unique {
valExist = append ( valExist , idVal ... )
}
// If we are writing to the same index with the same value,
// we can avoid the delete as the insert will overwrite the
// value anyways.
if i >= len ( vals ) || ! bytes . Equal ( valExist , vals [ i ] ) {
indexTxn . Delete ( valExist )
}
2016-02-13 00:50:37 +00:00
}
}
}
2016-03-01 20:43:24 +00:00
// If there is no index value, either this is an error or an expected
// case and we can skip updating
2016-02-13 00:50:37 +00:00
if ! ok {
if indexSchema . AllowMissing {
continue
} else {
return fmt . Errorf ( "missing value for index '%s'" , name )
}
}
2016-03-01 20:43:24 +00:00
// Update the value of the index
2017-01-09 19:23:09 +00:00
for _ , val := range vals {
indexTxn . Insert ( val , obj )
}
2016-02-13 00:50:37 +00:00
}
2020-06-02 18:18:19 +00:00
if txn . changes != nil {
txn . changes = append ( txn . changes , Change {
Table : table ,
Before : existing , // might be nil on a create
After : obj ,
primaryKey : idVal ,
} )
}
2016-02-13 00:50:37 +00:00
return nil
}
// Delete is used to delete a single object from the given table
// This object must already exist in the table
func ( txn * Txn ) Delete ( table string , obj interface { } ) error {
if ! txn . write {
return fmt . Errorf ( "cannot delete in read-only transaction" )
}
// Get the table schema
tableSchema , ok := txn . db . schema . Tables [ table ]
if ! ok {
return fmt . Errorf ( "invalid table '%s'" , table )
}
// Get the primary ID of the object
idSchema := tableSchema . Indexes [ id ]
2017-01-09 19:23:09 +00:00
idIndexer := idSchema . Indexer . ( SingleIndexer )
ok , idVal , err := idIndexer . FromObject ( obj )
2016-02-13 00:50:37 +00:00
if err != nil {
return fmt . Errorf ( "failed to build primary index: %v" , err )
}
if ! ok {
return fmt . Errorf ( "object missing primary index" )
}
// Lookup the object by ID first, check fi we should continue
idTxn := txn . writableIndex ( table , id )
existing , ok := idTxn . Get ( idVal )
if ! ok {
2018-10-19 16:04:07 +00:00
return ErrNotFound
2016-02-13 00:50:37 +00:00
}
// Remove the object from all the indexes
for name , indexSchema := range tableSchema . Indexes {
indexTxn := txn . writableIndex ( table , name )
// Handle the update by deleting from the index first
2017-01-09 19:23:09 +00:00
var (
ok bool
vals [ ] [ ] byte
err error
)
switch indexer := indexSchema . Indexer . ( type ) {
case SingleIndexer :
var val [ ] byte
ok , val , err = indexer . FromObject ( existing )
vals = [ ] [ ] byte { val }
case MultiIndexer :
ok , vals , err = indexer . FromObject ( existing )
}
2016-02-13 00:50:37 +00:00
if err != nil {
return fmt . Errorf ( "failed to build index '%s': %v" , name , err )
}
if ok {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
2017-01-09 19:23:09 +00:00
for _ , val := range vals {
if ! indexSchema . Unique {
val = append ( val , idVal ... )
}
indexTxn . Delete ( val )
2016-02-13 00:50:37 +00:00
}
}
}
2020-06-02 18:18:19 +00:00
if txn . changes != nil {
txn . changes = append ( txn . changes , Change {
Table : table ,
Before : existing ,
After : nil , // Now nil indicates deletion
primaryKey : idVal ,
} )
}
2016-02-13 00:50:37 +00:00
return nil
}
2017-07-25 22:28:43 +00:00
// DeletePrefix is used to delete an entire subtree based on a prefix.
// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete.
// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation.
// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects.
func ( txn * Txn ) DeletePrefix ( table string , prefix_index string , prefix string ) ( bool , error ) {
if ! txn . write {
return false , fmt . Errorf ( "cannot delete in read-only transaction" )
}
if ! strings . HasSuffix ( prefix_index , "_prefix" ) {
return false , fmt . Errorf ( "Index name for DeletePrefix must be a prefix index, Got %v " , prefix_index )
}
deletePrefixIndex := strings . TrimSuffix ( prefix_index , "_prefix" )
// Get an iterator over all of the keys with the given prefix.
entries , err := txn . Get ( table , prefix_index , prefix )
if err != nil {
return false , fmt . Errorf ( "failed kvs lookup: %s" , err )
}
// Get the table schema
tableSchema , ok := txn . db . schema . Tables [ table ]
if ! ok {
return false , fmt . Errorf ( "invalid table '%s'" , table )
}
foundAny := false
for entry := entries . Next ( ) ; entry != nil ; entry = entries . Next ( ) {
if ! foundAny {
foundAny = true
}
// Get the primary ID of the object
idSchema := tableSchema . Indexes [ id ]
idIndexer := idSchema . Indexer . ( SingleIndexer )
ok , idVal , err := idIndexer . FromObject ( entry )
if err != nil {
return false , fmt . Errorf ( "failed to build primary index: %v" , err )
}
if ! ok {
return false , fmt . Errorf ( "object missing primary index" )
}
2020-06-02 18:18:19 +00:00
if txn . changes != nil {
// Record the deletion
idTxn := txn . writableIndex ( table , id )
existing , ok := idTxn . Get ( idVal )
if ok {
txn . changes = append ( txn . changes , Change {
Table : table ,
Before : existing ,
After : nil , // Now nil indicates deletion
primaryKey : idVal ,
} )
}
}
2017-07-25 22:28:43 +00:00
// Remove the object from all the indexes except the given prefix index
for name , indexSchema := range tableSchema . Indexes {
if name == deletePrefixIndex {
continue
}
indexTxn := txn . writableIndex ( table , name )
// Handle the update by deleting from the index first
var (
ok bool
vals [ ] [ ] byte
err error
)
switch indexer := indexSchema . Indexer . ( type ) {
case SingleIndexer :
var val [ ] byte
ok , val , err = indexer . FromObject ( entry )
vals = [ ] [ ] byte { val }
case MultiIndexer :
ok , vals , err = indexer . FromObject ( entry )
}
if err != nil {
return false , fmt . Errorf ( "failed to build index '%s': %v" , name , err )
}
if ok {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
for _ , val := range vals {
if ! indexSchema . Unique {
val = append ( val , idVal ... )
}
indexTxn . Delete ( val )
}
}
}
2020-06-02 18:18:19 +00:00
2017-07-25 22:28:43 +00:00
}
if foundAny {
indexTxn := txn . writableIndex ( table , deletePrefixIndex )
ok = indexTxn . DeletePrefix ( [ ] byte ( prefix ) )
if ! ok {
panic ( fmt . Errorf ( "prefix %v matched some entries but DeletePrefix did not delete any " , prefix ) )
}
return true , nil
}
return false , nil
}
2016-02-13 00:50:37 +00:00
// DeleteAll is used to delete all the objects in a given table
// matching the constraints on the index
func ( txn * Txn ) DeleteAll ( table , index string , args ... interface { } ) ( int , error ) {
if ! txn . write {
return 0 , fmt . Errorf ( "cannot delete in read-only transaction" )
}
// Get all the objects
iter , err := txn . Get ( table , index , args ... )
if err != nil {
return 0 , err
}
// Put them into a slice so there are no safety concerns while actually
// performing the deletes
var objs [ ] interface { }
for {
obj := iter . Next ( )
if obj == nil {
break
}
objs = append ( objs , obj )
}
// Do the deletes
num := 0
2016-03-01 20:43:24 +00:00
for _ , obj := range objs {
2016-02-13 00:50:37 +00:00
if err := txn . Delete ( table , obj ) ; err != nil {
return num , err
}
num ++
}
return num , nil
}
2017-01-24 07:41:18 +00:00
// FirstWatch is used to return the first matching object for
// the given constraints on the index along with the watch channel
func ( txn * Txn ) FirstWatch ( table , index string , args ... interface { } ) ( <- chan struct { } , interface { } , error ) {
2016-02-13 00:50:37 +00:00
// Get the index value
indexSchema , val , err := txn . getIndexValue ( table , index , args ... )
if err != nil {
2017-01-24 07:41:18 +00:00
return nil , nil , err
2016-02-13 00:50:37 +00:00
}
// Get the index itself
indexTxn := txn . readableIndex ( table , indexSchema . Name )
// Do an exact lookup
if indexSchema . Unique && val != nil && indexSchema . Name == index {
2017-01-24 07:41:18 +00:00
watch , obj , ok := indexTxn . GetWatch ( val )
2016-02-13 00:50:37 +00:00
if ! ok {
2017-01-24 07:41:18 +00:00
return watch , nil , nil
2016-02-13 00:50:37 +00:00
}
2017-01-24 07:41:18 +00:00
return watch , obj , nil
2016-02-13 00:50:37 +00:00
}
// Handle non-unique index by using an iterator and getting the first value
iter := indexTxn . Root ( ) . Iterator ( )
2017-01-24 07:41:18 +00:00
watch := iter . SeekPrefixWatch ( val )
2016-02-13 00:50:37 +00:00
_ , value , _ := iter . Next ( )
2017-01-24 07:41:18 +00:00
return watch , value , nil
}
// First is used to return the first matching object for
// the given constraints on the index
func ( txn * Txn ) First ( table , index string , args ... interface { } ) ( interface { } , error ) {
_ , val , err := txn . FirstWatch ( table , index , args ... )
return val , err
2016-02-13 00:50:37 +00:00
}
2016-03-01 20:43:24 +00:00
// LongestPrefix is used to fetch the longest prefix match for the given
// constraints on the index. Note that this will not work with the memdb
// StringFieldIndex because it adds null terminators which prevent the
// algorithm from correctly finding a match (it will get to right before the
// null and fail to find a leaf node). This should only be used where the prefix
// given is capable of matching indexed entries directly, which typically only
// applies to a custom indexer. See the unit test for an example.
func ( txn * Txn ) LongestPrefix ( table , index string , args ... interface { } ) ( interface { } , error ) {
// Enforce that this only works on prefix indexes.
if ! strings . HasSuffix ( index , "_prefix" ) {
return nil , fmt . Errorf ( "must use '%s_prefix' on index" , index )
}
// Get the index value.
indexSchema , val , err := txn . getIndexValue ( table , index , args ... )
if err != nil {
return nil , err
}
// This algorithm only makes sense against a unique index, otherwise the
// index keys will have the IDs appended to them.
if ! indexSchema . Unique {
return nil , fmt . Errorf ( "index '%s' is not unique" , index )
}
// Find the longest prefix match with the given index.
indexTxn := txn . readableIndex ( table , indexSchema . Name )
if _ , value , ok := indexTxn . Root ( ) . LongestPrefix ( val ) ; ok {
return value , nil
}
return nil , nil
}
2016-02-13 00:50:37 +00:00
// getIndexValue is used to get the IndexSchema and the value
// used to scan the index given the parameters. This handles prefix based
// scans when the index has the "_prefix" suffix. The index must support
// prefix iteration.
func ( txn * Txn ) getIndexValue ( table , index string , args ... interface { } ) ( * IndexSchema , [ ] byte , error ) {
// Get the table schema
tableSchema , ok := txn . db . schema . Tables [ table ]
if ! ok {
return nil , nil , fmt . Errorf ( "invalid table '%s'" , table )
}
// Check for a prefix scan
prefixScan := false
if strings . HasSuffix ( index , "_prefix" ) {
index = strings . TrimSuffix ( index , "_prefix" )
prefixScan = true
}
// Get the index schema
indexSchema , ok := tableSchema . Indexes [ index ]
if ! ok {
return nil , nil , fmt . Errorf ( "invalid index '%s'" , index )
}
// Hot-path for when there are no arguments
if len ( args ) == 0 {
return indexSchema , nil , nil
}
// Special case the prefix scanning
if prefixScan {
prefixIndexer , ok := indexSchema . Indexer . ( PrefixIndexer )
if ! ok {
return indexSchema , nil ,
fmt . Errorf ( "index '%s' does not support prefix scanning" , index )
}
val , err := prefixIndexer . PrefixFromArgs ( args ... )
if err != nil {
return indexSchema , nil , fmt . Errorf ( "index error: %v" , err )
}
return indexSchema , val , err
}
// Get the exact match index
val , err := indexSchema . Indexer . FromArgs ( args ... )
if err != nil {
return indexSchema , nil , fmt . Errorf ( "index error: %v" , err )
}
return indexSchema , val , err
}
// ResultIterator is used to iterate over a list of results
// from a Get query on a table.
type ResultIterator interface {
2017-01-24 07:41:18 +00:00
WatchCh ( ) <- chan struct { }
2016-02-13 00:50:37 +00:00
Next ( ) interface { }
}
// Get is used to construct a ResultIterator over all the
// rows that match the given constraints of an index.
func ( txn * Txn ) Get ( table , index string , args ... interface { } ) ( ResultIterator , error ) {
2019-10-16 16:10:12 +00:00
indexIter , val , err := txn . getIndexIterator ( table , index , args ... )
2016-02-13 00:50:37 +00:00
if err != nil {
return nil , err
}
// Seek the iterator to the appropriate sub-set
2017-01-24 07:41:18 +00:00
watchCh := indexIter . SeekPrefixWatch ( val )
2016-02-13 00:50:37 +00:00
// Create an iterator
iter := & radixIterator {
2017-01-24 07:41:18 +00:00
iter : indexIter ,
watchCh : watchCh ,
2016-02-13 00:50:37 +00:00
}
return iter , nil
}
2019-10-16 16:10:12 +00:00
// LowerBound is used to construct a ResultIterator over all the the range of
// rows that have an index value greater than or equal to the provide args.
// Calling this then iterating until the rows are larger than required allows
// range scans within an index. It is not possible to watch the resulting
// iterator since the radix tree doesn't efficiently allow watching on lower
// bound changes. The WatchCh returned will be nill and so will block forever.
func ( txn * Txn ) LowerBound ( table , index string , args ... interface { } ) ( ResultIterator , error ) {
indexIter , val , err := txn . getIndexIterator ( table , index , args ... )
if err != nil {
return nil , err
}
// Seek the iterator to the appropriate sub-set
indexIter . SeekLowerBound ( val )
// Create an iterator
iter := & radixIterator {
iter : indexIter ,
}
return iter , nil
}
2020-06-02 18:18:19 +00:00
// objectID is a tuple of table name and the raw internal id byte slice
// converted to a string. It's only converted to a string to make it comparable
// so this struct can be used as a map index.
type objectID struct {
Table string
IndexVal string
}
// mutInfo stores metadata about mutations to allow collapsing multiple
// mutations to the same object into one.
type mutInfo struct {
firstBefore interface { }
lastIdx int
}
// Changes returns the set of object changes that have been made in the
// transaction so far. If change tracking is not enabled it wil always return
// nil. It can be called before or after Commit. If it is before Commit it will
// return all changes made so far which may not be the same as the final
// Changes. After abort it will always return nil. As with other Txn methods
// it's not safe to call this from a different goroutine than the one making
// mutations or committing the transaction. Mutations will appear in the order
// they were performed in the transaction but multiple operations to the same
// object will be collapsed so only the effective overall change to that object
// is present. If transaction operations are dependent (e.g. copy object X to Y
// then delete X) this might mean the set of mutations is incomplete to verify
// history, but it is complete in that the net effect is preserved (Y got a new
// value, X got removed).
func ( txn * Txn ) Changes ( ) Changes {
if txn . changes == nil {
return nil
}
// De-duplicate mutations by key so all take effect at the point of the last
// write but we keep the mutations in order.
dups := make ( map [ objectID ] mutInfo )
for i , m := range txn . changes {
oid := objectID {
Table : m . Table ,
IndexVal : string ( m . primaryKey ) ,
}
// Store the latest mutation index for each key value
mi , ok := dups [ oid ]
if ! ok {
// First entry for key, store the before value
mi . firstBefore = m . Before
}
mi . lastIdx = i
dups [ oid ] = mi
}
if len ( dups ) == len ( txn . changes ) {
// No duplicates found, fast path return it as is
return txn . changes
}
// Need to remove the duplicates
cs := make ( Changes , 0 , len ( dups ) )
for i , m := range txn . changes {
oid := objectID {
Table : m . Table ,
IndexVal : string ( m . primaryKey ) ,
}
mi := dups [ oid ]
if mi . lastIdx == i {
// This was the latest value for this key copy it with the before value in
// case it's different. Note that m is not a pointer so we are not
// modifying the txn.changeSet here - it's already a copy.
m . Before = mi . firstBefore
cs = append ( cs , m )
}
}
// Store the de-duped version in case this is called again
txn . changes = cs
return cs
}
2019-10-16 16:10:12 +00:00
func ( txn * Txn ) getIndexIterator ( table , index string , args ... interface { } ) ( * iradix . Iterator , [ ] byte , error ) {
// Get the index value to scan
indexSchema , val , err := txn . getIndexValue ( table , index , args ... )
if err != nil {
return nil , nil , err
}
// Get the index itself
indexTxn := txn . readableIndex ( table , indexSchema . Name )
indexRoot := indexTxn . Root ( )
// Get an interator over the index
indexIter := indexRoot . Iterator ( )
return indexIter , val , nil
}
2016-02-13 00:50:37 +00:00
// Defer is used to push a new arbitrary function onto a stack which
// gets called when a transaction is committed and finished. Deferred
// functions are called in LIFO order, and only invoked at the end of
// write transactions.
func ( txn * Txn ) Defer ( fn func ( ) ) {
txn . after = append ( txn . after , fn )
}
// radixIterator is used to wrap an underlying iradix iterator.
2017-01-24 07:41:18 +00:00
// This is much more efficient than a sliceIterator as we are not
2016-02-13 00:50:37 +00:00
// materializing the entire view.
type radixIterator struct {
2017-01-24 07:41:18 +00:00
iter * iradix . Iterator
watchCh <- chan struct { }
}
func ( r * radixIterator ) WatchCh ( ) <- chan struct { } {
return r . watchCh
2016-02-13 00:50:37 +00:00
}
func ( r * radixIterator ) Next ( ) interface { } {
_ , value , ok := r . iter . Next ( )
if ! ok {
return nil
}
return value
}