diff --git a/.changelog/16871.txt b/.changelog/16871.txt new file mode 100644 index 000000000..f1167c450 --- /dev/null +++ b/.changelog/16871.txt @@ -0,0 +1,3 @@ +```release-note:bug +Fix a race condition where an event is published before the data associated is commited to memdb. +``` diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index bd0e58b2d..4ef90f006 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -4,7 +4,7 @@ package state import ( - "fmt" + "sync" "github.com/hashicorp/go-memdb" @@ -93,23 +93,15 @@ func (c *changeTrackerDB) ReadTxn() *memdb.Txn { // data directly into the DB. These cases may use WriteTxnRestore. func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: c.db.Txn(true), - Index: idx, - publish: c.publish, + Txn: c.db.Txn(true), + Index: idx, + publish: c.publisher.Publish, + prePublish: c.processChanges, } t.Txn.TrackChanges() return t } -func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error { - events, err := c.processChanges(tx, changes) - if err != nil { - return fmt.Errorf("failed generating events from changes: %v", err) - } - c.publisher.Publish(events) - return nil -} - // WriteTxnRestore returns a wrapped RW transaction that should only be used in // Restore where we need to replace the entire contents of the Store. // WriteTxnRestore uses a zero index since the whole restore doesn't really @@ -127,6 +119,11 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn { return t } +type prePublishFuncType func(tx ReadTxn, changes Changes) ([]stream.Event, error) + +//go:generate mockery --name publishFuncType --inpackage +type publishFuncType func(events []stream.Event) + // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. // // This can not be done with txn.Defer because the callback passed to Defer is @@ -140,7 +137,11 @@ type txn struct { // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 - publish func(tx ReadTxn, changes Changes) error + publish publishFuncType + + prePublish prePublishFuncType + + commitLock sync.Mutex } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -161,16 +162,30 @@ func (tx *txn) Commit() error { } } - // publish may be nil if this is a read-only or WriteTxnRestore transaction. - // In those cases changes should also be empty, and there will be nothing - // to publish. - if tx.publish != nil { - if err := tx.publish(tx.Txn, changes); err != nil { + // This lock prevents events from concurrent transactions getting published out of order. + tx.commitLock.Lock() + defer tx.commitLock.Unlock() + + var events []stream.Event + var err error + + // prePublish need to generate a list of events before the transaction is commited, + // as we loose the changes in the transaction after the call to Commit(). + if tx.prePublish != nil { + events, err = tx.prePublish(tx.Txn, changes) + if err != nil { return err } } tx.Txn.Commit() + + // publish may be nil if this is a read-only or WriteTxnRestore transaction. + // In those cases events should also be empty, and there will be nothing + // to publish. + if tx.publish != nil { + tx.publish(events) + } return nil } diff --git a/agent/consul/state/memdb_test.go b/agent/consul/state/memdb_test.go new file mode 100644 index 000000000..32d4edf1b --- /dev/null +++ b/agent/consul/state/memdb_test.go @@ -0,0 +1,98 @@ +package state + +import ( + "fmt" + "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "testing" + "time" +) + +func testValidSchema() *memdb.DBSchema { + return &memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + "main": { + Name: "main", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "foo": { + Name: "foo", + Indexer: &memdb.StringFieldIndex{Field: "Foo"}, + }, + }, + }, + }, + } +} + +type TestObject struct { + ID string + Foo string +} + +// This test verify that the new data in a TXN is commited at the time that publishFunc is called. +// To do so, the publish func is mocked, a read on ch1 means that publish is called and blocked, +// ch2 permit to control the publish func and unblock it when receiving a signal. +func Test_txn_Commit(t *testing.T) { + db, err := memdb.NewMemDB(testValidSchema()) + require.NoError(t, err) + publishFunc := mockPublishFuncType{} + tx := txn{ + Txn: db.Txn(true), + Index: 0, + publish: publishFunc.Execute, + } + ch1 := make(chan struct{}) + ch2 := make(chan struct{}) + getCh := make(chan memdb.ResultIterator) + group := errgroup.Group{} + group.Go(func() error { + after := time.After(2 * time.Second) + select { + case <-ch1: + tx2 := txn{ + Txn: db.Txn(false), + Index: 0, + publish: publishFunc.Execute, + } + get, err := tx2.Get("main", "id") + if err != nil { + return err + } + close(ch2) + getCh <- get + case <-after: + close(ch2) + return fmt.Errorf("test timed out") + } + return nil + }) + + publishFunc.On("Execute", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + close(ch1) + <-ch2 + }).Return(nil) + + err = tx.Insert("main", TestObject{ID: "1", Foo: "foo"}) + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + get := <-getCh + require.NotNil(t, get) + next := get.Next() + require.NotNil(t, next) + + val := next.(TestObject) + require.Equal(t, val.ID, "1") + require.Equal(t, val.Foo, "foo") + + err = group.Wait() + require.NoError(t, err) + +} diff --git a/agent/consul/state/mock_publishFuncType.go b/agent/consul/state/mock_publishFuncType.go new file mode 100644 index 000000000..bf1c6a5ac --- /dev/null +++ b/agent/consul/state/mock_publishFuncType.go @@ -0,0 +1,33 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package state + +import ( + stream "github.com/hashicorp/consul/agent/consul/stream" + mock "github.com/stretchr/testify/mock" +) + +// mockPublishFuncType is an autogenerated mock type for the publishFuncType type +type mockPublishFuncType struct { + mock.Mock +} + +// Execute provides a mock function with given fields: events +func (_m *mockPublishFuncType) Execute(events []stream.Event) { + _m.Called(events) +} + +type mockConstructorTestingTnewMockPublishFuncType interface { + mock.TestingT + Cleanup(func()) +} + +// newMockPublishFuncType creates a new instance of mockPublishFuncType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockPublishFuncType(t mockConstructorTestingTnewMockPublishFuncType) *mockPublishFuncType { + mock := &mockPublishFuncType{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}