Memdb Txn Commit race condition fix (#16871)

* Add a test to reproduce the race condition

* Fix race condition by publishing the event after the commit and adding a lock to prevent out of order events.

* split publish to generate the list of events before committing the transaction.

* add changelog

* remove extra func

* Apply suggestions from code review

Co-authored-by: Dan Upton <daniel@floppy.co>

* add comment to explain test

---------

Co-authored-by: Dan Upton <daniel@floppy.co>
This commit is contained in:
Dhia Ayachi 2023-04-12 13:18:01 -04:00 committed by GitHub
parent 9a2221b07b
commit 825663b38a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 19 deletions

3
.changelog/16871.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
Fix a race condition where an event is published before the data associated is commited to memdb.
```

View File

@ -4,7 +4,7 @@
package state package state
import ( import (
"fmt" "sync"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
@ -95,21 +95,13 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{ t := &txn{
Txn: c.db.Txn(true), Txn: c.db.Txn(true),
Index: idx, Index: idx,
publish: c.publish, publish: c.publisher.Publish,
prePublish: c.processChanges,
} }
t.Txn.TrackChanges() t.Txn.TrackChanges()
return t return t
} }
func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error {
events, err := c.processChanges(tx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that should only be used in // WriteTxnRestore returns a wrapped RW transaction that should only be used in
// Restore where we need to replace the entire contents of the Store. // Restore where we need to replace the entire contents of the Store.
// WriteTxnRestore uses a zero index since the whole restore doesn't really // WriteTxnRestore uses a zero index since the whole restore doesn't really
@ -127,6 +119,11 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn {
return t return t
} }
type prePublishFuncType func(tx ReadTxn, changes Changes) ([]stream.Event, error)
//go:generate mockery --name publishFuncType --inpackage
type publishFuncType func(events []stream.Event)
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
// //
// This can not be done with txn.Defer because the callback passed to Defer is // This can not be done with txn.Defer because the callback passed to Defer is
@ -140,7 +137,11 @@ type txn struct {
// Index is stored so that it may be passed along to any subscribers as part // Index is stored so that it may be passed along to any subscribers as part
// of a change event. // of a change event.
Index uint64 Index uint64
publish func(tx ReadTxn, changes Changes) error publish publishFuncType
prePublish prePublishFuncType
commitLock sync.Mutex
} }
// Commit first pushes changes to EventPublisher, then calls Commit on the // Commit first pushes changes to EventPublisher, then calls Commit on the
@ -161,16 +162,30 @@ func (tx *txn) Commit() error {
} }
} }
// publish may be nil if this is a read-only or WriteTxnRestore transaction. // This lock prevents events from concurrent transactions getting published out of order.
// In those cases changes should also be empty, and there will be nothing tx.commitLock.Lock()
// to publish. defer tx.commitLock.Unlock()
if tx.publish != nil {
if err := tx.publish(tx.Txn, changes); err != nil { var events []stream.Event
var err error
// prePublish need to generate a list of events before the transaction is commited,
// as we loose the changes in the transaction after the call to Commit().
if tx.prePublish != nil {
events, err = tx.prePublish(tx.Txn, changes)
if err != nil {
return err return err
} }
} }
tx.Txn.Commit() tx.Txn.Commit()
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases events should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
tx.publish(events)
}
return nil return nil
} }

View File

@ -0,0 +1,98 @@
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"testing"
"time"
)
func testValidSchema() *memdb.DBSchema {
return &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"main": {
Name: "main",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "ID"},
},
"foo": {
Name: "foo",
Indexer: &memdb.StringFieldIndex{Field: "Foo"},
},
},
},
},
}
}
type TestObject struct {
ID string
Foo string
}
// This test verify that the new data in a TXN is commited at the time that publishFunc is called.
// To do so, the publish func is mocked, a read on ch1 means that publish is called and blocked,
// ch2 permit to control the publish func and unblock it when receiving a signal.
func Test_txn_Commit(t *testing.T) {
db, err := memdb.NewMemDB(testValidSchema())
require.NoError(t, err)
publishFunc := mockPublishFuncType{}
tx := txn{
Txn: db.Txn(true),
Index: 0,
publish: publishFunc.Execute,
}
ch1 := make(chan struct{})
ch2 := make(chan struct{})
getCh := make(chan memdb.ResultIterator)
group := errgroup.Group{}
group.Go(func() error {
after := time.After(2 * time.Second)
select {
case <-ch1:
tx2 := txn{
Txn: db.Txn(false),
Index: 0,
publish: publishFunc.Execute,
}
get, err := tx2.Get("main", "id")
if err != nil {
return err
}
close(ch2)
getCh <- get
case <-after:
close(ch2)
return fmt.Errorf("test timed out")
}
return nil
})
publishFunc.On("Execute", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(ch1)
<-ch2
}).Return(nil)
err = tx.Insert("main", TestObject{ID: "1", Foo: "foo"})
require.NoError(t, err)
err = tx.Commit()
require.NoError(t, err)
get := <-getCh
require.NotNil(t, get)
next := get.Next()
require.NotNil(t, next)
val := next.(TestObject)
require.Equal(t, val.ID, "1")
require.Equal(t, val.Foo, "foo")
err = group.Wait()
require.NoError(t, err)
}

View File

@ -0,0 +1,33 @@
// Code generated by mockery v2.20.0. DO NOT EDIT.
package state
import (
stream "github.com/hashicorp/consul/agent/consul/stream"
mock "github.com/stretchr/testify/mock"
)
// mockPublishFuncType is an autogenerated mock type for the publishFuncType type
type mockPublishFuncType struct {
mock.Mock
}
// Execute provides a mock function with given fields: events
func (_m *mockPublishFuncType) Execute(events []stream.Event) {
_m.Called(events)
}
type mockConstructorTestingTnewMockPublishFuncType interface {
mock.TestingT
Cleanup(func())
}
// newMockPublishFuncType creates a new instance of mockPublishFuncType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func newMockPublishFuncType(t mockConstructorTestingTnewMockPublishFuncType) *mockPublishFuncType {
mock := &mockPublishFuncType{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}