Updates hashicorp/go-immutable-radix to pick up leaf panic fixes.

This fixes #2724 by properly tracking leaf updates during very large
delete transactions.
This commit is contained in:
James Phillips 2017-02-13 16:47:49 -08:00
parent cabf2db2c7
commit d9f4d4949b
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
3 changed files with 69 additions and 46 deletions

View File

@ -71,7 +71,7 @@ type Txn struct {
// trackOverflow flag, which will cause us to use a more expensive // trackOverflow flag, which will cause us to use a more expensive
// algorithm to perform the notifications. Mutation tracking is only // algorithm to perform the notifications. Mutation tracking is only
// performed if trackMutate is true. // performed if trackMutate is true.
trackChannels map[*chan struct{}]struct{} trackChannels map[chan struct{}]struct{}
trackOverflow bool trackOverflow bool
trackMutate bool trackMutate bool
} }
@ -97,24 +97,30 @@ func (t *Txn) TrackMutate(track bool) {
// overflow flag if we can no longer track any more. This limits the amount of // overflow flag if we can no longer track any more. This limits the amount of
// state that will accumulate during a transaction and we have a slower algorithm // state that will accumulate during a transaction and we have a slower algorithm
// to switch to if we overflow. // to switch to if we overflow.
func (t *Txn) trackChannel(ch *chan struct{}) { func (t *Txn) trackChannel(ch chan struct{}) {
// In overflow, make sure we don't store any more objects. // In overflow, make sure we don't store any more objects.
if t.trackOverflow { if t.trackOverflow {
return return
} }
// Create the map on the fly when we need it.
if t.trackChannels == nil {
t.trackChannels = make(map[*chan struct{}]struct{})
}
// If this would overflow the state we reject it and set the flag (since // If this would overflow the state we reject it and set the flag (since
// we aren't tracking everything that's required any longer). // we aren't tracking everything that's required any longer).
if len(t.trackChannels) >= defaultModifiedCache { if len(t.trackChannels) >= defaultModifiedCache {
// Mark that we are in the overflow state
t.trackOverflow = true t.trackOverflow = true
// Clear the map so that the channels can be garbage collected. It is
// safe to do this since we have already overflowed and will be using
// the slow notify algorithm.
t.trackChannels = nil
return return
} }
// Create the map on the fly when we need it.
if t.trackChannels == nil {
t.trackChannels = make(map[chan struct{}]struct{})
}
// Otherwise we are good to track it. // Otherwise we are good to track it.
t.trackChannels[ch] = struct{}{} t.trackChannels[ch] = struct{}{}
} }
@ -134,25 +140,31 @@ func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
} }
// If this node has already been modified, we can continue to use it // If this node has already been modified, we can continue to use it
// during this transaction. If a node gets kicked out of cache then we // during this transaction. We know that we don't need to track it for
// *may* notify for its mutation if we end up copying the node again, // a node update since the node is writable, but if this is for a leaf
// but we don't make any guarantees about notifying for intermediate // update we track it, in case the initial write to this node didn't
// mutations that were never exposed outside of a transaction. // update the leaf.
if _, ok := t.writable.Get(n); ok { if _, ok := t.writable.Get(n); ok {
if t.trackMutate && forLeafUpdate && n.leaf != nil {
t.trackChannel(n.leaf.mutateCh)
}
return n return n
} }
// Mark this node as being mutated. // Mark this node as being mutated.
if t.trackMutate { if t.trackMutate {
t.trackChannel(&(n.mutateCh)) t.trackChannel(n.mutateCh)
} }
// Mark its leaf as being mutated, if appropriate. // Mark its leaf as being mutated, if appropriate.
if t.trackMutate && forLeafUpdate && n.leaf != nil { if t.trackMutate && forLeafUpdate && n.leaf != nil {
t.trackChannel(&(n.leaf.mutateCh)) t.trackChannel(n.leaf.mutateCh)
} }
// Copy the existing node. // Copy the existing node. If you have set forLeafUpdate it will be
// safe to replace this leaf with another after you get your node for
// writing. You MUST replace it, because the channel associated with
// this leaf will be closed when this transaction is committed.
nc := &Node{ nc := &Node{
mutateCh: make(chan struct{}), mutateCh: make(chan struct{}),
leaf: n.leaf, leaf: n.leaf,
@ -171,6 +183,29 @@ func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
return nc return nc
} }
// mergeChild is called to collapse the given node with its child. This is only
// called when the given node is not a leaf and has a single edge.
func (t *Txn) mergeChild(n *Node) {
// Mark the child node as being mutated since we are about to abandon
// it. We don't need to mark the leaf since we are retaining it if it
// is there.
e := n.edges[0]
child := e.node
if t.trackMutate {
t.trackChannel(child.mutateCh)
}
// Merge the nodes.
n.prefix = concat(n.prefix, child.prefix)
n.leaf = child.leaf
if len(child.edges) != 0 {
n.edges = make([]edge, len(child.edges))
copy(n.edges, child.edges)
} else {
n.edges = nil
}
}
// insert does a recursive insertion // insert does a recursive insertion
func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) { func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) {
// Handle key exhaustion // Handle key exhaustion
@ -285,7 +320,7 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
// Check if this node should be merged // Check if this node should be merged
if n != t.root && len(nc.edges) == 1 { if n != t.root && len(nc.edges) == 1 {
nc.mergeChild() t.mergeChild(nc)
} }
return nc, n.leaf return nc, n.leaf
} }
@ -305,7 +340,7 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
} }
// Copy this node. WATCH OUT - it's safe to pass "false" here because we // Copy this node. WATCH OUT - it's safe to pass "false" here because we
// will only ADD a leaf via nc.mergeChilde() if there isn't one due to // will only ADD a leaf via nc.mergeChild() if there isn't one due to
// the !nc.isLeaf() check in the logic just below. This is pretty subtle, // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
// so be careful if you change any of the logic here. // so be careful if you change any of the logic here.
nc := t.writeNode(n, false) nc := t.writeNode(n, false)
@ -314,7 +349,7 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
if newChild.leaf == nil && len(newChild.edges) == 0 { if newChild.leaf == nil && len(newChild.edges) == 0 {
nc.delEdge(label) nc.delEdge(label)
if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() { if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
nc.mergeChild() t.mergeChild(nc)
} }
} else { } else {
nc.edges[idx].node = newChild nc.edges[idx].node = newChild
@ -371,15 +406,16 @@ func (t *Txn) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) {
// Commit is used to finalize the transaction and return a new tree. If mutation // Commit is used to finalize the transaction and return a new tree. If mutation
// tracking is turned on then notifications will also be issued. // tracking is turned on then notifications will also be issued.
func (t *Txn) Commit() *Tree { func (t *Txn) Commit() *Tree {
nt := t.commit() nt := t.CommitOnly()
if t.trackMutate { if t.trackMutate {
t.notify() t.Notify()
} }
return nt return nt
} }
// commit is an internal helper for Commit(), useful for unit tests. // CommitOnly is used to finalize the transaction and return a new tree, but
func (t *Txn) commit() *Tree { // does not issue any notifications until Notify is called.
func (t *Txn) CommitOnly() *Tree {
nt := &Tree{t.root, t.size} nt := &Tree{t.root, t.size}
t.writable = nil t.writable = nil
return nt return nt
@ -448,16 +484,21 @@ func (t *Txn) slowNotify() {
} }
} }
// notify is used along with TrackMutate to trigger notifications. This should // Notify is used along with TrackMutate to trigger notifications. This must
// only be done once a transaction is committed. // only be done once a transaction is committed via CommitOnly, and it is called
func (t *Txn) notify() { // automatically by Commit.
func (t *Txn) Notify() {
if !t.trackMutate {
return
}
// If we've overflowed the tracking state we can't use it in any way and // If we've overflowed the tracking state we can't use it in any way and
// need to do a full tree compare. // need to do a full tree compare.
if t.trackOverflow { if t.trackOverflow {
t.slowNotify() t.slowNotify()
} else { } else {
for ch := range t.trackChannels { for ch := range t.trackChannels {
close(*ch) close(ch)
} }
} }

View File

@ -91,24 +91,6 @@ func (n *Node) delEdge(label byte) {
} }
} }
func (n *Node) mergeChild() {
e := n.edges[0]
child := e.node
n.prefix = concat(n.prefix, child.prefix)
if child.leaf != nil {
n.leaf = new(leafNode)
*n.leaf = *child.leaf
} else {
n.leaf = nil
}
if len(child.edges) != 0 {
n.edges = make([]edge, len(child.edges))
copy(n.edges, child.edges)
} else {
n.edges = nil
}
}
func (n *Node) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) { func (n *Node) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) {
search := k search := k
watch := n.mutateCh watch := n.mutateCh

6
vendor/vendor.json vendored
View File

@ -444,10 +444,10 @@
"revisionTime": "2017-02-11T01:34:15Z" "revisionTime": "2017-02-11T01:34:15Z"
}, },
{ {
"checksumSHA1": "jPxyofQxI1PRPq6LPc6VlcRn5fI=", "checksumSHA1": "zvmksNyW6g+Fd/bywd4vcn8rp+M=",
"path": "github.com/hashicorp/go-immutable-radix", "path": "github.com/hashicorp/go-immutable-radix",
"revision": "76b5f4e390910df355bfb9b16b41899538594a05", "revision": "d0852f9e7b91ec9633735052bdab00bf802b353c",
"revisionTime": "2017-01-13T02:29:29Z" "revisionTime": "2017-02-14T00:45:45Z"
}, },
{ {
"checksumSHA1": "K8Fsgt1llTXP0EwqdBzvSGdKOKc=", "checksumSHA1": "K8Fsgt1llTXP0EwqdBzvSGdKOKc=",