Converts sessions and ACLs over to iterators.

This commit is contained in:
James Phillips 2015-10-19 14:56:22 -07:00
parent d459d94b3f
commit 9a2fdff4c4
7 changed files with 85 additions and 58 deletions

View File

@ -445,14 +445,14 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink, func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
encoder *codec.Encoder) error { encoder *codec.Encoder) error {
sessions, err := s.state.SessionDump() iter, err := s.state.Sessions()
if err != nil { if err != nil {
return err return err
} }
for _, s := range sessions { for si := iter.Next(); si != nil; si = iter.Next() {
sink.Write([]byte{byte(structs.SessionRequestType)}) sink.Write([]byte{byte(structs.SessionRequestType)})
if err := encoder.Encode(s); err != nil { if err := encoder.Encode(si.(*structs.Session)); err != nil {
return err return err
} }
} }
@ -461,14 +461,14 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink, func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
encoder *codec.Encoder) error { encoder *codec.Encoder) error {
acls, err := s.state.ACLDump() iter, err := s.state.ACLs()
if err != nil { if err != nil {
return err return err
} }
for _, s := range acls { for ai := iter.Next(); ai != nil; ai = iter.Next() {
sink.Write([]byte{byte(structs.ACLRequestType)}) sink.Write([]byte{byte(structs.ACLRequestType)})
if err := encoder.Encode(s); err != nil { if err := encoder.Encode(ai.(*structs.ACL)); err != nil {
return err return err
} }
} }
@ -493,21 +493,22 @@ func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink,
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
encoder *codec.Encoder) error { encoder *codec.Encoder) error {
stones, err := s.state.TombstoneDump() iter, err := s.state.Tombstones()
if err != nil { if err != nil {
return err return err
} }
for _, s := range stones { for ti := iter.Next(); ti != nil; ti = iter.Next() {
sink.Write([]byte{byte(structs.TombstoneRequestType)}) sink.Write([]byte{byte(structs.TombstoneRequestType)})
// For historical reasons, these are serialized in the snapshots // For historical reasons, these are serialized in the snapshots
// as KV entries. We want to keep the snapshot format compatible // as KV entries. We want to keep the snapshot format compatible
// with pre-0.6 versions for now. // with pre-0.6 versions for now.
stone := ti.(*state.Tombstone)
fake := &structs.DirEntry{ fake := &structs.DirEntry{
Key: s.Key, Key: stone.Key,
RaftIndex: structs.RaftIndex{ RaftIndex: structs.RaftIndex{
ModifyIndex: s.Index, ModifyIndex: stone.Index,
}, },
} }
if err := encoder.Encode(fake); err != nil { if err := encoder.Encode(fake); err != nil {

View File

@ -5,6 +5,7 @@ import (
"os" "os"
"testing" "testing"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
) )
@ -474,12 +475,19 @@ func TestFSM_SnapshotRestore(t *testing.T) {
func() { func() {
snap := fsm2.state.Snapshot() snap := fsm2.state.Snapshot()
defer snap.Close() defer snap.Close()
dump, err := snap.TombstoneDump() iter, err := snap.Tombstones()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if len(dump) != 1 { stone := iter.Next().(*state.Tombstone)
t.Fatalf("bad: %#v", dump) if stone == nil {
t.Fatalf("missing tombstone")
}
if stone.Key != "/remove" || stone.Index != 12 {
t.Fatalf("bad: %v", stone)
}
if iter.Next() != nil {
t.Fatalf("unexpected extra tombstones")
} }
}() }()
} }
@ -1015,12 +1023,12 @@ func TestFSM_TombstoneReap(t *testing.T) {
// Verify the tombstones are gone // Verify the tombstones are gone
snap := fsm.state.Snapshot() snap := fsm.state.Snapshot()
defer snap.Close() defer snap.Close()
dump, err := snap.TombstoneDump() iter, err := snap.Tombstones()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if len(dump) != 0 { if iter.Next() != nil {
t.Fatalf("bad: %#v", dump) t.Fatalf("unexpected extra tombstones")
} }
} }

View File

@ -590,12 +590,15 @@ func TestLeader_ReapTombstones(t *testing.T) {
func() { func() {
snap := state.Snapshot() snap := state.Snapshot()
defer snap.Close() defer snap.Close()
dump, err := snap.TombstoneDump() iter, err := snap.Tombstones()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if len(dump) != 1 { if iter.Next() == nil {
t.Fatalf("bad: %#v", dump) t.Fatalf("missing tombstones")
}
if iter.Next() != nil {
t.Fatalf("unexpected extra tombstones")
} }
}() }()
@ -604,11 +607,11 @@ func TestLeader_ReapTombstones(t *testing.T) {
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
snap := state.Snapshot() snap := state.Snapshot()
defer snap.Close() defer snap.Close()
dump, err := snap.TombstoneDump() iter, err := snap.Tombstones()
if err != nil { if err != nil {
return false, err return false, err
} }
return len(dump) == 0, nil return iter.Next() == nil, nil
}, func(err error) { }, func(err error) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
}) })

View File

@ -62,17 +62,13 @@ func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, prefix string) (uint64, error)
} }
// DumpTxn returns all the tombstones. // DumpTxn returns all the tombstones.
func (g *Graveyard) DumpTxn(tx *memdb.Txn) ([]*Tombstone, error) { func (g *Graveyard) DumpTxn(tx *memdb.Txn) (memdb.ResultIterator, error) {
stones, err := tx.Get("tombstones", "id") iter, err := tx.Get("tombstones", "id")
if err != nil { if err != nil {
return nil, fmt.Errorf("failed querying tombstones: %s", err) return nil, err
} }
var dump []*Tombstone return iter, nil
for stone := stones.Next(); stone != nil; stone = stones.Next() {
dump = append(dump, stone.(*Tombstone))
}
return dump, nil
} }
// RestoreTxn is used when restoring from a snapshot. For general inserts, use // RestoreTxn is used when restoring from a snapshot. For general inserts, use

View File

@ -199,10 +199,14 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
dump, err := g.DumpTxn(tx) iter, err := g.DumpTxn(tx)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
return dump return dump
}() }()
@ -241,10 +245,14 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
dump, err := g.DumpTxn(tx) iter, err := g.DumpTxn(tx)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
return dump return dump
}() }()
if !reflect.DeepEqual(dump, expected) { if !reflect.DeepEqual(dump, expected) {

View File

@ -156,7 +156,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
return iter, nil return iter, nil
} }
// KVSDump is used to pull the full list of KVS entries for use during snapshots. // KVs is used to pull the full list of KVS entries for use during snapshots.
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) { func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("kvs", "id_prefix") iter, err := s.tx.Get("kvs", "id_prefix")
if err != nil { if err != nil {
@ -165,28 +165,27 @@ func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
return iter, nil return iter, nil
} }
// TombstoneDump is used to pull all the tombstones from the graveyard. // Tombstones is used to pull all the tombstones from the graveyard.
func (s *StateSnapshot) TombstoneDump() ([]*Tombstone, error) { func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) {
return s.store.kvsGraveyard.DumpTxn(s.tx) return s.store.kvsGraveyard.DumpTxn(s.tx)
} }
// SessionDump is used to pull the full list of sessions for use during snapshots. // Sessions is used to pull the full list of sessions for use during snapshots.
func (s *StateSnapshot) SessionDump() (structs.Sessions, error) { func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
sessions, err := s.tx.Get("sessions", "id") iter, err := s.tx.Get("sessions", "id")
if err != nil { if err != nil {
return nil, fmt.Errorf("failed session lookup: %s", err) return nil, err
} }
return iter, nil
var dump structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
dump = append(dump, session.(*structs.Session))
}
return dump, nil
} }
// ACLDump is used to pull all the ACLs from the snapshot. // ACLs is used to pull all the ACLs from the snapshot.
func (s *StateSnapshot) ACLDump() (structs.ACLs, error) { func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
return aclListTxn(s.tx) iter, err := s.tx.Get("acls", "id")
if err != nil {
return nil, err
}
return iter, nil
} }
// maxIndex is a helper used to retrieve the highest known index // maxIndex is a helper used to retrieve the highest known index
@ -2094,7 +2093,7 @@ func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...) idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...)
// Return the ACLs. // Return the ACLs.
acls, err := aclListTxn(tx) acls, err := s.aclListTxn(tx)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed acl lookup: %s", err) return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
} }
@ -2103,7 +2102,7 @@ func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
// aclListTxn is used to list out all of the ACLs in the state store. This is a // aclListTxn is used to list out all of the ACLs in the state store. This is a
// function vs. a method so it can be called from the snapshotter. // function vs. a method so it can be called from the snapshotter.
func aclListTxn(tx *memdb.Txn) (structs.ACLs, error) { func (s *StateStore) aclListTxn(tx *memdb.Txn) (structs.ACLs, error) {
// Query all of the ACLs in the state store // Query all of the ACLs in the state store
acls, err := tx.Get("acls", "id") acls, err := tx.Get("acls", "id")
if err != nil { if err != nil {

View File

@ -286,12 +286,12 @@ func TestStateStore_ReapTombstones(t *testing.T) {
// Make sure the tombstones are actually gone. // Make sure the tombstones are actually gone.
snap := s.Snapshot() snap := s.Snapshot()
defer snap.Close() defer snap.Close()
dump, err := snap.TombstoneDump() iter, err := snap.Tombstones()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if len(dump) != 0 { if iter.Next() != nil {
t.Fatalf("bad: %#v", dump) t.Fatalf("unexpected extra tombstones")
} }
} }
@ -3264,10 +3264,14 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
} }
// Verify the snapshot. // Verify the snapshot.
dump, err := snap.TombstoneDump() iter, err := snap.Tombstones()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
if len(dump) != 1 { if len(dump) != 1 {
t.Fatalf("bad %#v", dump) t.Fatalf("bad %#v", dump)
} }
@ -3312,12 +3316,12 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
// But make sure the tombstone is actually gone. // But make sure the tombstone is actually gone.
snap := s.Snapshot() snap := s.Snapshot()
defer snap.Close() defer snap.Close()
dump, err := snap.TombstoneDump() iter, err := snap.Tombstones()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if len(dump) != 0 { if iter.Next() != nil {
t.Fatalf("bad %#v", dump) t.Fatalf("unexpected extra tombstones")
} }
}() }()
} }
@ -3656,10 +3660,14 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
if idx := snap.LastIndex(); idx != 7 { if idx := snap.LastIndex(); idx != 7 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
dump, err := snap.SessionDump() iter, err := snap.Sessions()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
var dump structs.Sessions
for si := iter.Next(); si != nil; si = iter.Next() {
dump = append(dump, si.(*structs.Session))
}
if !reflect.DeepEqual(dump, sessions) { if !reflect.DeepEqual(dump, sessions) {
t.Fatalf("bad: %#v", dump) t.Fatalf("bad: %#v", dump)
} }
@ -4319,10 +4327,14 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) {
if idx := snap.LastIndex(); idx != 2 { if idx := snap.LastIndex(); idx != 2 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
dump, err := snap.ACLDump() iter, err := snap.ACLs()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
var dump structs.ACLs
for ai := iter.Next(); ai != nil; ai = iter.Next() {
dump = append(dump, ai.(*structs.ACL))
}
if !reflect.DeepEqual(dump, acls) { if !reflect.DeepEqual(dump, acls) {
t.Fatalf("bad: %#v", dump) t.Fatalf("bad: %#v", dump)
} }