allow ClusterMetadata to accept a watchset (#8299)
* allow ClusterMetadata to accept a watchset * use nil instead of empty watchset
This commit is contained in:
parent
362660e92c
commit
01e2cc5054
|
@ -2155,7 +2155,8 @@ func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink,
|
|||
encoder *codec.Encoder) error {
|
||||
|
||||
// Get the cluster metadata
|
||||
clusterMetadata, err := s.snap.ClusterMetadata()
|
||||
ws := memdb.NewWatchSet()
|
||||
clusterMetadata, err := s.snap.ClusterMetadata(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -2854,7 +2854,7 @@ func TestFSM_SnapshotRestore_ClusterMetadata(t *testing.T) {
|
|||
require := require.New(t)
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
out, err := state2.ClusterMetadata()
|
||||
out, err := state2.ClusterMetadata(memdb.NewWatchSet())
|
||||
require.NoError(err)
|
||||
require.Equal(clusterID, out.ClusterID)
|
||||
}
|
||||
|
@ -3153,7 +3153,8 @@ func TestFSM_ClusterMetadata(t *testing.T) {
|
|||
r.Nil(result)
|
||||
|
||||
// Verify the clusterID is set directly in the state store
|
||||
storedMetadata, err := fsm.state.ClusterMetadata()
|
||||
ws := memdb.NewWatchSet()
|
||||
storedMetadata, err := fsm.state.ClusterMetadata(ws)
|
||||
r.NoError(err)
|
||||
r.Equal(clusterID, storedMetadata.ClusterID)
|
||||
|
||||
|
@ -3167,7 +3168,7 @@ func TestFSM_ClusterMetadata(t *testing.T) {
|
|||
result = fsm.Apply(makeLog(buf))
|
||||
r.Error(result.(error))
|
||||
|
||||
storedMetadata, err = fsm.state.ClusterMetadata()
|
||||
storedMetadata, err = fsm.state.ClusterMetadata(ws)
|
||||
r.NoError(err)
|
||||
r.Equal(clusterID, storedMetadata.ClusterID)
|
||||
r.Equal(now, storedMetadata.CreateTime)
|
||||
|
|
|
@ -1650,7 +1650,7 @@ func (s *Server) ClusterID() (string, error) {
|
|||
|
||||
// try to load the cluster ID from state store
|
||||
fsmState := s.fsm.State()
|
||||
existingMeta, err := fsmState.ClusterMetadata()
|
||||
existingMeta, err := fsmState.ClusterMetadata(nil)
|
||||
if err != nil {
|
||||
s.logger.Named("core").Error("failed to get cluster ID", "error", err)
|
||||
return "", err
|
||||
|
|
|
@ -5046,15 +5046,16 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *StateStore) ClusterMetadata() (*structs.ClusterMetadata, error) {
|
||||
func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadata, error) {
|
||||
txn := s.db.Txn(false)
|
||||
defer txn.Abort()
|
||||
|
||||
// Get the cluster metadata
|
||||
m, err := txn.First("cluster_meta", "id")
|
||||
watchCh, m, err := txn.FirstWatch("cluster_meta", "id")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed cluster metadata lookup")
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
|
||||
if m != nil {
|
||||
return m.(*structs.ClusterMetadata), nil
|
||||
|
|
|
@ -8481,7 +8481,7 @@ func TestStateStore_ClusterMetadata(t *testing.T) {
|
|||
err := state.ClusterSetMetadata(100, meta)
|
||||
require.NoError(err)
|
||||
|
||||
result, err := state.ClusterMetadata()
|
||||
result, err := state.ClusterMetadata(nil)
|
||||
require.NoError(err)
|
||||
require.Equal(clusterID, result.ClusterID)
|
||||
require.Equal(now, result.CreateTime)
|
||||
|
@ -8503,7 +8503,7 @@ func TestStateStore_ClusterMetadataRestore(t *testing.T) {
|
|||
|
||||
restore.Commit()
|
||||
|
||||
out, err := state.ClusterMetadata()
|
||||
out, err := state.ClusterMetadata(nil)
|
||||
require.NoError(err)
|
||||
require.Equal(clusterID, out.ClusterID)
|
||||
require.Equal(now, out.CreateTime)
|
||||
|
|
Loading…
Reference in New Issue