identity: Add batch entity deletion endpoint (#8785)
* identity: Add batch entity deletion endpoint * Update the parameter description * Update error message * Update helper/storagepacker/storagepacker.go Co-Authored-By: Vishal Nayak <vishalnayak@users.noreply.github.com> * Review feedback * Update vault/identity_store_entities.go Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com> Co-authored-by: Vishal Nayak <vishalnayak@users.noreply.github.com> Co-authored-by: Calvin Leung Huang <cleung2010@gmail.com>
This commit is contained in:
parent
4f982ea12e
commit
21cdba6fb5
|
@ -127,113 +127,85 @@ func (s *StoragePacker) BucketKey(itemID string) string {
|
|||
|
||||
// DeleteItem removes the item from the respective bucket
|
||||
func (s *StoragePacker) DeleteItem(_ context.Context, itemID string) error {
|
||||
return s.DeleteMultipleItems(context.Background(), nil, itemID)
|
||||
return s.DeleteMultipleItems(context.Background(), nil, []string{itemID})
|
||||
}
|
||||
|
||||
func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs ...string) error {
|
||||
func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs []string) error {
|
||||
defer metrics.MeasureSince([]string{"storage_packer", "delete_items"}, time.Now())
|
||||
var err error
|
||||
switch len(itemIDs) {
|
||||
case 0:
|
||||
// Nothing
|
||||
if len(itemIDs) == 0 {
|
||||
return nil
|
||||
|
||||
case 1:
|
||||
logger = hclog.NewNullLogger()
|
||||
fallthrough
|
||||
|
||||
default:
|
||||
lockIndexes := make(map[string]struct{}, len(s.storageLocks))
|
||||
for _, itemID := range itemIDs {
|
||||
bucketKey := s.BucketKey(itemID)
|
||||
if _, ok := lockIndexes[bucketKey]; !ok {
|
||||
lockIndexes[bucketKey] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
lockKeys := make([]string, 0, len(lockIndexes))
|
||||
for k := range lockIndexes {
|
||||
lockKeys = append(lockKeys, k)
|
||||
}
|
||||
|
||||
locks := locksutil.LocksForKeys(s.storageLocks, lockKeys)
|
||||
for _, lock := range locks {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
if logger == nil {
|
||||
logger = hclog.NewNullLogger()
|
||||
}
|
||||
|
||||
bucketCache := make(map[string]*Bucket, len(s.storageLocks))
|
||||
// Sort the ids by the bucket they will be deleted from
|
||||
lockKeys := make([]string, 0)
|
||||
byBucket := make(map[string]map[string]struct{})
|
||||
for _, id := range itemIDs {
|
||||
bucketKey := s.BucketKey(id)
|
||||
bucket, ok := byBucket[bucketKey]
|
||||
if !ok {
|
||||
bucket = make(map[string]struct{})
|
||||
byBucket[bucketKey] = bucket
|
||||
|
||||
// Add the lock key once
|
||||
lockKeys = append(lockKeys, bucketKey)
|
||||
}
|
||||
|
||||
bucket[id] = struct{}{}
|
||||
}
|
||||
|
||||
locks := locksutil.LocksForKeys(s.storageLocks, lockKeys)
|
||||
for _, lock := range locks {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
}
|
||||
|
||||
logger.Debug("deleting multiple items from storagepacker; caching and deleting from buckets", "total_items", len(itemIDs))
|
||||
|
||||
var pctDone int
|
||||
for idx, itemID := range itemIDs {
|
||||
bucketKey := s.BucketKey(itemID)
|
||||
|
||||
bucket, bucketFound := bucketCache[bucketKey]
|
||||
if !bucketFound {
|
||||
// Read from storage
|
||||
storageEntry, err := s.view.Get(context.Background(), bucketKey)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to read packed storage value: {{err}}", err)
|
||||
}
|
||||
if storageEntry == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err)
|
||||
}
|
||||
if notCompressed {
|
||||
uncompressedData = storageEntry.Value
|
||||
}
|
||||
|
||||
bucket = new(Bucket)
|
||||
err = proto.Unmarshal(uncompressedData, bucket)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Look for a matching storage entry
|
||||
foundIdx := -1
|
||||
for itemIdx, item := range bucket.Items {
|
||||
if item.ID == itemID {
|
||||
foundIdx = itemIdx
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If there is a match, remove it from the collection and persist the
|
||||
// resulting collection
|
||||
if foundIdx != -1 {
|
||||
bucket.Items[foundIdx] = bucket.Items[len(bucket.Items)-1]
|
||||
bucket.Items = bucket.Items[:len(bucket.Items)-1]
|
||||
if !bucketFound {
|
||||
bucketCache[bucketKey] = bucket
|
||||
}
|
||||
}
|
||||
|
||||
newPctDone := idx * 100.0 / len(itemIDs)
|
||||
if int(newPctDone) > pctDone {
|
||||
pctDone = int(newPctDone)
|
||||
logger.Trace("bucket item removal progress", "percent", pctDone, "items_removed", idx)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debug("persisting buckets", "total_buckets", len(bucketCache))
|
||||
|
||||
// Persist all buckets in the cache; these will be the ones that had
|
||||
// deletions
|
||||
pctDone = 0
|
||||
// For each bucket, load from storage, remove the necessary items, and add
|
||||
// write it back out to storage
|
||||
pctDone := 0
|
||||
idx := 0
|
||||
for _, bucket := range bucketCache {
|
||||
for bucketKey, itemsToRemove := range byBucket {
|
||||
// Read bucket from storage
|
||||
storageEntry, err := s.view.Get(context.Background(), bucketKey)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to read packed storage value: {{err}}", err)
|
||||
}
|
||||
if storageEntry == nil {
|
||||
logger.Warn("could not find bucket", "bucket", bucketKey)
|
||||
continue
|
||||
}
|
||||
|
||||
uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err)
|
||||
}
|
||||
if notCompressed {
|
||||
uncompressedData = storageEntry.Value
|
||||
}
|
||||
|
||||
bucket := new(Bucket)
|
||||
err = proto.Unmarshal(uncompressedData, bucket)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err)
|
||||
}
|
||||
|
||||
// Look for a matching storage entries and delete them from the list.
|
||||
for i := 0; i < len(bucket.Items); i++ {
|
||||
if _, ok := itemsToRemove[bucket.Items[i].ID]; ok {
|
||||
bucket.Items[i] = bucket.Items[len(bucket.Items)-1]
|
||||
bucket.Items = bucket.Items[:len(bucket.Items)-1]
|
||||
|
||||
// Since we just moved a value to position i we need to
|
||||
// decrement i so we replay this position
|
||||
i--
|
||||
}
|
||||
}
|
||||
|
||||
// Fail if the context is canceled, the storage calls will fail anyways
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
|
@ -244,7 +216,7 @@ func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Lo
|
|||
return err
|
||||
}
|
||||
|
||||
newPctDone := idx * 100.0 / len(bucketCache)
|
||||
newPctDone := idx * 100.0 / len(byBucket)
|
||||
if int(newPctDone) > pctDone {
|
||||
pctDone = int(newPctDone)
|
||||
logger.Trace("bucket persistence progress", "percent", pctDone, "buckets_persisted", idx)
|
||||
|
|
|
@ -217,7 +217,7 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) {
|
|||
itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i))
|
||||
}
|
||||
|
||||
err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete...)
|
||||
err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -237,3 +237,56 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoragePacker_DeleteMultiple_ALL(t *testing.T) {
|
||||
storagePacker, err := NewStoragePacker(&logical.InmemStorage{}, log.New(&log.LoggerOptions{Name: "storagepackertest"}), "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Persist a storage entry
|
||||
itemsToDelete := make([]string, 0, 10000)
|
||||
for i := 0; i < 10000; i++ {
|
||||
item := &Item{
|
||||
ID: fmt.Sprintf("item%d", i),
|
||||
}
|
||||
|
||||
err = storagePacker.PutItem(ctx, item)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify that it can be read
|
||||
fetchedItem, err := storagePacker.GetItem(item.ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if fetchedItem == nil {
|
||||
t.Fatalf("failed to read the stored item")
|
||||
}
|
||||
|
||||
if item.ID != fetchedItem.ID {
|
||||
t.Fatalf("bad: item ID; expected: %q\n actual: %q\n", item.ID, fetchedItem.ID)
|
||||
}
|
||||
|
||||
itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i))
|
||||
}
|
||||
|
||||
err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Check that the deletion was successful
|
||||
for _, item := range itemsToDelete {
|
||||
fetchedItem, err := storagePacker.GetItem(item)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if fetchedItem != nil {
|
||||
t.Fatal("item not deleted")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -89,6 +89,21 @@ func entityPaths(i *IdentityStore) []*framework.Path {
|
|||
HelpSynopsis: strings.TrimSpace(entityHelp["entity-id"][0]),
|
||||
HelpDescription: strings.TrimSpace(entityHelp["entity-id"][1]),
|
||||
},
|
||||
{
|
||||
Pattern: "entity/batch-delete",
|
||||
Fields: map[string]*framework.FieldSchema{
|
||||
"entity_ids": {
|
||||
Type: framework.TypeCommaStringSlice,
|
||||
Description: "Entity IDs to delete",
|
||||
},
|
||||
},
|
||||
Callbacks: map[logical.Operation]framework.OperationFunc{
|
||||
logical.UpdateOperation: i.handleEntityBatchDelete(),
|
||||
},
|
||||
|
||||
HelpSynopsis: strings.TrimSpace(entityHelp["batch-delete"][0]),
|
||||
HelpDescription: strings.TrimSpace(entityHelp["batch-delete"][1]),
|
||||
},
|
||||
{
|
||||
Pattern: "entity/name/?$",
|
||||
Callbacks: map[logical.Operation]framework.OperationFunc{
|
||||
|
@ -420,7 +435,7 @@ func (i *IdentityStore) pathEntityIDDelete() framework.OperationFunc {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
err = i.handleEntityDeleteCommon(ctx, txn, entity)
|
||||
err = i.handleEntityDeleteCommon(ctx, txn, entity, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -464,7 +479,7 @@ func (i *IdentityStore) pathEntityNameDelete() framework.OperationFunc {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
err = i.handleEntityDeleteCommon(ctx, txn, entity)
|
||||
err = i.handleEntityDeleteCommon(ctx, txn, entity, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -475,7 +490,83 @@ func (i *IdentityStore) pathEntityNameDelete() framework.OperationFunc {
|
|||
}
|
||||
}
|
||||
|
||||
func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb.Txn, entity *identity.Entity) error {
|
||||
// pathEntityIDDelete deletes the entity for a given entity ID
|
||||
func (i *IdentityStore) handleEntityBatchDelete() framework.OperationFunc {
|
||||
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
||||
entityIDs := d.Get("entity_ids").([]string)
|
||||
if len(entityIDs) == 0 {
|
||||
return logical.ErrorResponse("missing entity ids to delete"), nil
|
||||
}
|
||||
|
||||
// Sort the ids by the bucket they will be deleted from
|
||||
byBucket := make(map[string]map[string]struct{})
|
||||
for _, id := range entityIDs {
|
||||
bucketKey := i.entityPacker.BucketKey(id)
|
||||
|
||||
bucket, ok := byBucket[bucketKey]
|
||||
if !ok {
|
||||
bucket = make(map[string]struct{})
|
||||
byBucket[bucketKey] = bucket
|
||||
}
|
||||
|
||||
bucket[id] = struct{}{}
|
||||
}
|
||||
|
||||
deleteIdsForBucket := func(entityIDs []string) error {
|
||||
i.lock.Lock()
|
||||
defer i.lock.Unlock()
|
||||
|
||||
// Create a MemDB transaction to delete entities from the inmem database
|
||||
// without altering storage. Batch deletion on storage bucket items is
|
||||
// performed directly through entityPacker.
|
||||
txn := i.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
for _, entityID := range entityIDs {
|
||||
// Fetch the entity using its ID
|
||||
entity, err := i.MemDBEntityByIDInTxn(txn, entityID, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if entity == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = i.handleEntityDeleteCommon(ctx, txn, entity, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write all updates for this bucket.
|
||||
err := i.entityPacker.DeleteMultipleItems(ctx, i.logger, entityIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, bucket := range byBucket {
|
||||
ids := make([]string, len(bucket))
|
||||
i := 0
|
||||
for id, _ := range bucket {
|
||||
ids[i] = id
|
||||
i++
|
||||
}
|
||||
|
||||
err := deleteIdsForBucket(ids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb.Txn, entity *identity.Entity, update bool) error {
|
||||
ns, err := namespace.FromContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -511,10 +602,12 @@ func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb
|
|||
return err
|
||||
}
|
||||
|
||||
// Delete the entity from storage
|
||||
err = i.entityPacker.DeleteItem(ctx, entity.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
if update {
|
||||
// Delete the entity from storage
|
||||
err = i.entityPacker.DeleteItem(ctx, entity.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -772,4 +865,8 @@ var entityHelp = map[string][2]string{
|
|||
"Merge two or more entities together",
|
||||
"",
|
||||
},
|
||||
"batch-delete": {
|
||||
"Delete all of the entities provided",
|
||||
"",
|
||||
},
|
||||
}
|
||||
|
|
|
@ -407,6 +407,58 @@ func TestIdentityStore_EntityCreateUpdate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIdentityStore_BatchDelete(t *testing.T) {
|
||||
ctx := namespace.RootContext(nil)
|
||||
is, _, _ := testIdentityStoreWithGithubAuth(ctx, t)
|
||||
|
||||
ids := make([]string, 10000)
|
||||
for i := 0; i < 10000; i++ {
|
||||
entityData := map[string]interface{}{
|
||||
"name": fmt.Sprintf("entity-%d", i),
|
||||
}
|
||||
|
||||
entityReq := &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "entity",
|
||||
Data: entityData,
|
||||
}
|
||||
|
||||
// Create the entity
|
||||
resp, err := is.HandleRequest(ctx, entityReq)
|
||||
if err != nil || (resp != nil && resp.IsError()) {
|
||||
t.Fatalf("err:%v resp:%#v", err, resp)
|
||||
}
|
||||
ids[i] = resp.Data["id"].(string)
|
||||
}
|
||||
|
||||
deleteReq := &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "entity/batch-delete",
|
||||
Data: map[string]interface{}{
|
||||
"entity_ids": ids,
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := is.HandleRequest(ctx, deleteReq)
|
||||
if err != nil || (resp != nil && resp.IsError()) {
|
||||
t.Fatalf("err:%v resp:%#v", err, resp)
|
||||
}
|
||||
|
||||
for _, entityID := range ids {
|
||||
// Read the entity
|
||||
resp, err := is.HandleRequest(ctx, &logical.Request{
|
||||
Operation: logical.ReadOperation,
|
||||
Path: "entity/id/" + entityID,
|
||||
})
|
||||
if err != nil || (resp != nil && resp.IsError()) {
|
||||
t.Fatalf("err:%v resp:%#v", err, resp)
|
||||
}
|
||||
if resp != nil {
|
||||
t.Fatal(resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIdentityStore_CloneImmutability(t *testing.T) {
|
||||
alias := &identity.Alias{
|
||||
ID: "testaliasid",
|
||||
|
|
|
@ -174,6 +174,44 @@ $ curl \
|
|||
http://127.0.0.1:8200/v1/identity/entity/id/8d6a45e5-572f-8f13-d226-cd0d1ec57297
|
||||
```
|
||||
|
||||
## Batch Delete Entities
|
||||
|
||||
This endpoint deletes all entities provided.
|
||||
|
||||
| Method | Path |
|
||||
| :------- | :------------------------ |
|
||||
| `POST` | `/identity/entity/batch-delete` |
|
||||
|
||||
### Parameters
|
||||
|
||||
- `entity_ids` `([]string: <required>)` – List of entity identifiers to delete.
|
||||
|
||||
### Sample Payload
|
||||
|
||||
```json
|
||||
{
|
||||
"entity_ids": [
|
||||
"02fe5a88-912b-6794-62ed-db873ef86a95",
|
||||
"3bf81bc9-44df-8138-57f9-724a9ae36d04",
|
||||
"627fba68-98c9-c012-71ba-bfb349585ce1",
|
||||
"6c4c805b-b384-3d0e-4d51-44d349887b96",
|
||||
"70a72feb-35d1-c775-0813-8efaa8b4b9b5",
|
||||
"f1092a67-ce34-48fd-161d-c13a367bc1cd",
|
||||
"faedd89a-0d82-c197-c8f9-93a3e6cf0cd0"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Sample Request
|
||||
|
||||
```shell
|
||||
$ curl \
|
||||
--header "X-Vault-Token: ..." \
|
||||
--request POST \
|
||||
--data @payload.json \
|
||||
http://127.0.0.1:8200/v1/identity/entity/batch-delete
|
||||
```
|
||||
|
||||
## List Entities by ID
|
||||
|
||||
This endpoint returns a list of available entities by their identifiers.
|
||||
|
|
Loading…
Reference in New Issue