2019-07-12 14:10:48 +00:00
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-10-28 17:49:57 +00:00
|
|
|
"fmt"
|
2019-07-12 14:10:48 +00:00
|
|
|
"testing"
|
|
|
|
|
2019-10-04 17:08:45 +00:00
|
|
|
"github.com/hashicorp/consul/sdk/testutil"
|
2019-10-28 17:49:57 +00:00
|
|
|
"github.com/stretchr/testify/mock"
|
2019-07-12 14:10:48 +00:00
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestReplicationRestart(t *testing.T) {
|
2019-10-04 17:08:45 +00:00
|
|
|
mgr := NewLeaderRoutineManager(testutil.TestLogger(t))
|
|
|
|
|
2019-07-12 14:10:48 +00:00
|
|
|
config := ReplicatorConfig{
|
|
|
|
Name: "mock",
|
2019-10-28 17:49:57 +00:00
|
|
|
Delegate: &FunctionReplicator{
|
|
|
|
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
|
|
return 1, false, nil
|
|
|
|
},
|
2019-07-12 14:10:48 +00:00
|
|
|
},
|
2019-10-04 17:08:45 +00:00
|
|
|
|
2019-07-12 14:10:48 +00:00
|
|
|
Rate: 1,
|
|
|
|
Burst: 1,
|
|
|
|
}
|
|
|
|
|
|
|
|
repl, err := NewReplicator(&config)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-10-04 17:08:45 +00:00
|
|
|
mgr.Start("mock", repl.Run)
|
|
|
|
mgr.Stop("mock")
|
|
|
|
mgr.Start("mock", repl.Run)
|
2019-07-12 14:10:48 +00:00
|
|
|
// Previously this would have segfaulted
|
2019-10-04 17:08:45 +00:00
|
|
|
mgr.Stop("mock")
|
2019-07-12 14:10:48 +00:00
|
|
|
}
|
2019-10-28 17:49:57 +00:00
|
|
|
|
|
|
|
type indexReplicatorTestDelegate struct {
|
|
|
|
mock.Mock
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) SingularNoun() string {
|
|
|
|
return "test"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) PluralNoun() string {
|
|
|
|
return "tests"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) MetricName() string {
|
|
|
|
return "test"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
|
|
|
|
ret := d.Called(lastRemoteIndex)
|
|
|
|
return ret.Int(0), ret.Get(1), ret.Get(2).(uint64), ret.Error(3)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) FetchLocal() (int, interface{}, error) {
|
|
|
|
ret := d.Called()
|
|
|
|
return ret.Int(0), ret.Get(1), ret.Error(2)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) DiffRemoteAndLocalState(local interface{}, remote interface{}, lastRemoteIndex uint64) (*IndexReplicatorDiff, error) {
|
|
|
|
ret := d.Called(local, remote, lastRemoteIndex)
|
|
|
|
return ret.Get(0).(*IndexReplicatorDiff), ret.Error(1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) PerformDeletions(ctx context.Context, deletions interface{}) (exit bool, err error) {
|
|
|
|
// ignore the context for the call
|
|
|
|
ret := d.Called(deletions)
|
|
|
|
return ret.Bool(0), ret.Error(1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *indexReplicatorTestDelegate) PerformUpdates(ctx context.Context, updates interface{}) (exit bool, err error) {
|
|
|
|
// ignore the context for the call
|
|
|
|
ret := d.Called(updates)
|
|
|
|
return ret.Bool(0), ret.Error(1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestIndexReplicator(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
t.Run("Remote Fetch Error", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(0)).Return(0, nil, uint64(0), fmt.Errorf("induced error"))
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 0)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(0), idx)
|
|
|
|
require.False(t, done)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), "failed to retrieve tests: induced error")
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("Local Fetch Error", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(1, nil, uint64(1), nil)
|
|
|
|
delegate.On("FetchLocal").Return(0, nil, fmt.Errorf("induced error"))
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(0), idx)
|
|
|
|
require.False(t, done)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), "failed to retrieve local tests: induced error")
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("Diff Error", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(1, nil, uint64(1), nil)
|
|
|
|
delegate.On("FetchLocal").Return(1, nil, nil)
|
|
|
|
// this also is verifying that when the remote index goes backwards then we reset the index to 0
|
|
|
|
delegate.On("DiffRemoteAndLocalState", nil, nil, uint64(0)).Return(&IndexReplicatorDiff{}, fmt.Errorf("induced error"))
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(0), idx)
|
|
|
|
require.False(t, done)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), "failed to diff test local and remote states: induced error")
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("No Change", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(1, nil, uint64(4), nil)
|
|
|
|
delegate.On("FetchLocal").Return(1, nil, nil)
|
|
|
|
delegate.On("DiffRemoteAndLocalState", nil, nil, uint64(3)).Return(&IndexReplicatorDiff{}, nil)
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(4), idx)
|
|
|
|
require.False(t, done)
|
|
|
|
require.NoError(t, err)
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("Deletion Error", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(1, nil, uint64(4), nil)
|
|
|
|
delegate.On("FetchLocal").Return(1, nil, nil)
|
|
|
|
delegate.On("DiffRemoteAndLocalState", nil, nil, uint64(3)).Return(&IndexReplicatorDiff{NumDeletions: 1}, nil)
|
|
|
|
delegate.On("PerformDeletions", nil).Return(false, fmt.Errorf("induced error"))
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(0), idx)
|
|
|
|
require.False(t, done)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), "failed to apply local test deletions: induced error")
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("Deletion Exit", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(1, nil, uint64(4), nil)
|
|
|
|
delegate.On("FetchLocal").Return(1, nil, nil)
|
|
|
|
delegate.On("DiffRemoteAndLocalState", nil, nil, uint64(3)).Return(&IndexReplicatorDiff{NumDeletions: 1}, nil)
|
|
|
|
delegate.On("PerformDeletions", nil).Return(true, nil)
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(0), idx)
|
|
|
|
require.True(t, done)
|
|
|
|
require.NoError(t, err)
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("Update Error", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(1, nil, uint64(4), nil)
|
|
|
|
delegate.On("FetchLocal").Return(1, nil, nil)
|
|
|
|
delegate.On("DiffRemoteAndLocalState", nil, nil, uint64(3)).Return(&IndexReplicatorDiff{NumUpdates: 1}, nil)
|
|
|
|
delegate.On("PerformUpdates", nil).Return(false, fmt.Errorf("induced error"))
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(0), idx)
|
|
|
|
require.False(t, done)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), "failed to apply local test updates: induced error")
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("Update Exit", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(1, nil, uint64(4), nil)
|
|
|
|
delegate.On("FetchLocal").Return(1, nil, nil)
|
|
|
|
delegate.On("DiffRemoteAndLocalState", nil, nil, uint64(3)).Return(&IndexReplicatorDiff{NumUpdates: 1}, nil)
|
|
|
|
delegate.On("PerformUpdates", nil).Return(true, nil)
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(0), idx)
|
|
|
|
require.True(t, done)
|
|
|
|
require.NoError(t, err)
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("All Good", func(t *testing.T) {
|
|
|
|
delegate := &indexReplicatorTestDelegate{}
|
|
|
|
|
|
|
|
replicator := IndexReplicator{
|
|
|
|
Delegate: delegate,
|
|
|
|
Logger: testutil.TestLogger(t),
|
|
|
|
}
|
|
|
|
|
|
|
|
delegate.On("FetchRemote", uint64(3)).Return(3, "bcd", uint64(4), nil)
|
|
|
|
delegate.On("FetchLocal").Return(1, "a", nil)
|
|
|
|
delegate.On("DiffRemoteAndLocalState", "a", "bcd", uint64(3)).Return(&IndexReplicatorDiff{NumDeletions: 1, Deletions: "a", NumUpdates: 3, Updates: "bcd"}, nil)
|
|
|
|
delegate.On("PerformDeletions", "a").Return(false, nil)
|
|
|
|
delegate.On("PerformUpdates", "bcd").Return(false, nil)
|
|
|
|
|
|
|
|
idx, done, err := replicator.Replicate(context.Background(), 3)
|
|
|
|
|
|
|
|
require.Equal(t, uint64(4), idx)
|
|
|
|
require.False(t, done)
|
|
|
|
require.NoError(t, err)
|
|
|
|
delegate.AssertExpectations(t)
|
|
|
|
})
|
|
|
|
}
|