DynamoDB Nested Values Bug (#4570)
* Add tests to ExerciseBackend to expose nested-values bug * Update DynamoDB physical backend Delete and hasChildren logic to prevent overzealous cleanup of folders and values
This commit is contained in:
parent
a0c5ac1211
commit
4e0bc43bf8
|
@ -340,15 +340,33 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
|
||||||
},
|
},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
// clean up now empty 'folders'
|
// Clean up empty "folders" by looping through all levels of the path to the item being deleted looking for
|
||||||
|
// children. Loop from deepest path to shallowest, and only consider items children if they are not going to be
|
||||||
|
// deleted by our batch delete request. If a path has no valid children, then it should be considered an empty
|
||||||
|
// "folder" and be deleted along with the original item in our batch job. Because we loop from deepest path to
|
||||||
|
// shallowest, once we find a path level that contains valid children we can stop the cleanup operation.
|
||||||
prefixes := physical.Prefixes(key)
|
prefixes := physical.Prefixes(key)
|
||||||
sort.Sort(sort.Reverse(sort.StringSlice(prefixes)))
|
sort.Sort(sort.Reverse(sort.StringSlice(prefixes)))
|
||||||
for _, prefix := range prefixes {
|
for index, prefix := range prefixes {
|
||||||
hasChildren, err := d.hasChildren(prefix)
|
// Because delete batches its requests, we need to pass keys we know are going to be deleted into
|
||||||
|
// hasChildren so it can exclude those when it determines if there WILL be any children left after
|
||||||
|
// the delete operations have completed.
|
||||||
|
var excluded []string
|
||||||
|
if index == 0 {
|
||||||
|
// This is the value we know for sure is being deleted
|
||||||
|
excluded = append(excluded, recordKeyForVaultKey(key))
|
||||||
|
} else {
|
||||||
|
// The previous path doesn't count as a child, since if we're still looping, we've found no children
|
||||||
|
excluded = append(excluded, recordKeyForVaultKey(prefixes[index-1]))
|
||||||
|
}
|
||||||
|
|
||||||
|
hasChildren, err := d.hasChildren(prefix, excluded)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !hasChildren {
|
if !hasChildren {
|
||||||
|
// If there are no children other than ones we know are being deleted then cleanup empty "folder" pointers
|
||||||
requests = append(requests, &dynamodb.WriteRequest{
|
requests = append(requests, &dynamodb.WriteRequest{
|
||||||
DeleteRequest: &dynamodb.DeleteRequest{
|
DeleteRequest: &dynamodb.DeleteRequest{
|
||||||
Key: map[string]*dynamodb.AttributeValue{
|
Key: map[string]*dynamodb.AttributeValue{
|
||||||
|
@ -357,6 +375,12 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
// This loop starts at the deepest path and works backwards looking for children
|
||||||
|
// once a deeper level of the path has been found to have children there is no
|
||||||
|
// more cleanup that needs to happen, otherwise we might remove folder pointers
|
||||||
|
// to that deeper path making it "undiscoverable" with the list operation
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,9 +430,12 @@ func (d *DynamoDBBackend) List(ctx context.Context, prefix string) ([]string, er
|
||||||
}
|
}
|
||||||
|
|
||||||
// hasChildren returns true if there exist items below a certain path prefix.
|
// hasChildren returns true if there exist items below a certain path prefix.
|
||||||
// To do so, the method fetches such items from DynamoDB. If there are more
|
// To do so, the method fetches such items from DynamoDB. This method is primarily
|
||||||
// than one item (which is the "directory" item), there are children.
|
// used by Delete. Because DynamoDB requests are batched this method is being called
|
||||||
func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
|
// before any deletes take place. To account for that hasChildren accepts a slice of
|
||||||
|
// strings representing values we expect to find that should NOT be counted as children
|
||||||
|
// because they are going to be deleted.
|
||||||
|
func (d *DynamoDBBackend) hasChildren(prefix string, exclude []string) (bool, error) {
|
||||||
prefix = strings.TrimSuffix(prefix, "/")
|
prefix = strings.TrimSuffix(prefix, "/")
|
||||||
prefix = escapeEmptyPath(prefix)
|
prefix = escapeEmptyPath(prefix)
|
||||||
|
|
||||||
|
@ -424,9 +451,10 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
// Avoid fetching too many items from DynamoDB for performance reasons.
|
// Avoid fetching too many items from DynamoDB for performance reasons.
|
||||||
// We need at least two because one is the directory item, all others
|
// We want to know if there are any children we don't expect to see.
|
||||||
// are children.
|
// Answering that question requires fetching a minimum of one more item
|
||||||
Limit: aws.Int64(2),
|
// than the number we expect. In most cases this value will be 2
|
||||||
|
Limit: aws.Int64(int64(len(exclude) + 1)),
|
||||||
}
|
}
|
||||||
|
|
||||||
d.permitPool.Acquire()
|
d.permitPool.Acquire()
|
||||||
|
@ -436,7 +464,23 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return len(out.Items) > 1, nil
|
var childrenExist bool
|
||||||
|
for _, item := range out.Items {
|
||||||
|
for _, excluded := range exclude {
|
||||||
|
// Check if we've found an item we didn't expect to. Look for "folder" pointer keys (trailing slash)
|
||||||
|
// and regular value keys (no trailing slash)
|
||||||
|
if *item["Key"].S != excluded && *item["Key"].S != fmt.Sprintf("%s/", excluded) {
|
||||||
|
childrenExist = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if childrenExist {
|
||||||
|
// We only need to find ONE child we didn't expect to.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return childrenExist, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LockWith is used for mutual exclusion based on the given key.
|
// LockWith is used for mutual exclusion based on the given key.
|
||||||
|
|
|
@ -181,6 +181,54 @@ func ExerciseBackend(t testing.TB, b Backend) {
|
||||||
if len(keys) != 0 {
|
if len(keys) != 0 {
|
||||||
t.Errorf("should be empty at end: %v", keys)
|
t.Errorf("should be empty at end: %v", keys)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When the root path is empty, adding and removing deep nested values should not break listing
|
||||||
|
e = &Entry{Key: "foo/nested1/nested2/value1", Value: []byte("baz")}
|
||||||
|
err = b.Put(context.Background(), e)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("deep nest: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e = &Entry{Key: "foo/nested1/nested2/value2", Value: []byte("baz")}
|
||||||
|
err = b.Put(context.Background(), e)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("deep nest: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = b.Delete(context.Background(), "foo/nested1/nested2/value2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to remove deep nest: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
keys, err = b.List(context.Background(), "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("listing of root failed after deletion: %v", err)
|
||||||
|
}
|
||||||
|
if len(keys) == 0 {
|
||||||
|
t.Errorf("root is returning empty after deleting a single nested value, expected nested1/: %v", keys)
|
||||||
|
keys, err = b.List(context.Background(), "foo/nested1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("listing of expected nested path 'foo/nested1' failed: %v", err)
|
||||||
|
}
|
||||||
|
// prove that the root should not be empty and that foo/nested1 exists
|
||||||
|
if len(keys) != 0 {
|
||||||
|
t.Logf(" keys can still be listed from nested1/ so it's not empty, expected nested2/: %v", keys)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanup left over listing bug test value
|
||||||
|
err = b.Delete(context.Background(), "foo/nested1/nested2/value1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to remove deep nest: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
keys, err = b.List(context.Background(), "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("listing of root failed after delete of deep nest: %v", err)
|
||||||
|
}
|
||||||
|
if len(keys) != 0 {
|
||||||
|
t.Errorf("should be empty at end: %v", keys)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExerciseBackend_ListPrefix(t testing.TB, b Backend) {
|
func ExerciseBackend_ListPrefix(t testing.TB, b Backend) {
|
||||||
|
|
Loading…
Reference in a new issue