Makes all delete loops use a separate slice to protect the iterator.
This commit is contained in:
parent
768f6fd8db
commit
8504646900
|
@ -99,11 +99,19 @@ func (g *Graveyard) ReapTxn(tx *memdb.Txn, idx uint64) error {
|
||||||
return fmt.Errorf("failed querying tombstones: %s", err)
|
return fmt.Errorf("failed querying tombstones: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Find eligible tombstones.
|
||||||
|
var objs []interface{}
|
||||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||||
if stone.(*Tombstone).Index <= idx {
|
if stone.(*Tombstone).Index <= idx {
|
||||||
if err := tx.Delete("tombstones", stone); err != nil {
|
objs = append(objs, stone)
|
||||||
return fmt.Errorf("failed deleting tombstone: %s", err)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete the tombstones in a separate loop so we don't trash the
|
||||||
|
// iterator.
|
||||||
|
for _, obj := range objs {
|
||||||
|
if err := tx.Delete("tombstones", obj); err != nil {
|
||||||
|
return fmt.Errorf("failed deleting tombstone: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -481,9 +481,15 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
var sids []string
|
||||||
for service := services.Next(); service != nil; service = services.Next() {
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
svc := service.(*structs.ServiceNode)
|
svc := service.(*structs.ServiceNode)
|
||||||
if err := s.deleteServiceTxn(tx, idx, watches, nodeID, svc.ServiceID); err != nil {
|
sids = append(sids, svc.ServiceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, sid := range sids {
|
||||||
|
if err := s.deleteServiceTxn(tx, idx, watches, nodeID, sid); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -494,9 +500,15 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed check lookup: %s", err)
|
return fmt.Errorf("failed check lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
var cids []string
|
||||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||||
hc := check.(*structs.HealthCheck)
|
hc := check.(*structs.HealthCheck)
|
||||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, hc.CheckID); err != nil {
|
cids = append(cids, hc.CheckID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, cid := range cids {
|
||||||
|
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -514,9 +526,14 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed session lookup: %s", err)
|
return fmt.Errorf("failed session lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
var ids []string
|
||||||
for sess := sessions.Next(); sess != nil; sess = sessions.Next() {
|
for sess := sessions.Next(); sess != nil; sess = sessions.Next() {
|
||||||
session := sess.(*structs.Session).ID
|
ids = append(ids, sess.(*structs.Session).ID)
|
||||||
if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil {
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, id := range ids {
|
||||||
|
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||||
return fmt.Errorf("failed session delete: %s", err)
|
return fmt.Errorf("failed session delete: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -804,12 +821,20 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service check lookup: %s", err)
|
return fmt.Errorf("failed service check lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
var cids []string
|
||||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||||
hc := check.(*structs.HealthCheck)
|
hc := check.(*structs.HealthCheck)
|
||||||
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, hc.CheckID); err != nil {
|
cids = append(cids, hc.CheckID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, cid := range cids {
|
||||||
|
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the index.
|
||||||
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
|
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -895,10 +920,16 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt
|
||||||
return fmt.Errorf("failed session checks lookup: %s", err)
|
return fmt.Errorf("failed session checks lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
watches := NewDumbWatchManager(s.tableWatches)
|
var ids []string
|
||||||
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
||||||
session := mapping.(*sessionCheck).Session
|
ids = append(ids, mapping.(*sessionCheck).Session)
|
||||||
if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil {
|
}
|
||||||
|
|
||||||
|
// Delete the session in a separate loop so we don't trash the
|
||||||
|
// iterator.
|
||||||
|
watches := NewDumbWatchManager(s.tableWatches)
|
||||||
|
for _, id := range ids {
|
||||||
|
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||||
return fmt.Errorf("failed deleting session: %s", err)
|
return fmt.Errorf("failed deleting session: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1011,9 +1042,14 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed session checks lookup: %s", err)
|
return fmt.Errorf("failed session checks lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
var ids []string
|
||||||
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
||||||
session := mapping.(*sessionCheck).Session
|
ids = append(ids, mapping.(*sessionCheck).Session)
|
||||||
if err := s.deleteSessionTxn(tx, idx, watches, session); err != nil {
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, id := range ids {
|
||||||
|
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
||||||
return fmt.Errorf("failed deleting session: %s", err)
|
return fmt.Errorf("failed deleting session: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1528,16 +1564,22 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
|
||||||
// directly so that we only update the index once. We also add
|
// directly so that we only update the index once. We also add
|
||||||
// tombstones as we go.
|
// tombstones as we go.
|
||||||
var modified bool
|
var modified bool
|
||||||
|
var objs []interface{}
|
||||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||||
e := entry.(*structs.DirEntry)
|
e := entry.(*structs.DirEntry)
|
||||||
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
|
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
|
||||||
return fmt.Errorf("failed adding to graveyard: %s", err)
|
return fmt.Errorf("failed adding to graveyard: %s", err)
|
||||||
}
|
}
|
||||||
|
objs = append(objs, entry)
|
||||||
|
modified = true
|
||||||
|
}
|
||||||
|
|
||||||
if err := tx.Delete("kvs", e); err != nil {
|
// Do the actual deletes in a separate loop so we don't trash the
|
||||||
|
// iterator as we go.
|
||||||
|
for _, obj := range objs {
|
||||||
|
if err := tx.Delete("kvs", obj); err != nil {
|
||||||
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
||||||
}
|
}
|
||||||
modified = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the index
|
// Update the index
|
||||||
|
@ -1912,6 +1954,9 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
||||||
switch session.Behavior {
|
switch session.Behavior {
|
||||||
case structs.SessionKeysRelease:
|
case structs.SessionKeysRelease:
|
||||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||||
|
// Note that we clone here since we are modifying the
|
||||||
|
// returned object and want to make sure our set op
|
||||||
|
// respects the transaction we are in.
|
||||||
e := entry.(*structs.DirEntry).Clone()
|
e := entry.(*structs.DirEntry).Clone()
|
||||||
e.Session = ""
|
e.Session = ""
|
||||||
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
|
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
|
||||||
|
@ -1944,8 +1989,14 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed session checks lookup: %s", err)
|
return fmt.Errorf("failed session checks lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
var objs []interface{}
|
||||||
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
||||||
if err := tx.Delete("session_checks", mapping); err != nil {
|
objs = append(objs, mapping)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, obj := range objs {
|
||||||
|
if err := tx.Delete("session_checks", obj); err != nil {
|
||||||
return fmt.Errorf("failed deleting session check: %s", err)
|
return fmt.Errorf("failed deleting session check: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue