identity: group refresh shouldn't lock unless an update is needed (#8795)
This commit is contained in:
parent
d979279015
commit
33b3e6857e
|
@ -6,7 +6,9 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/errwrap"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
@ -129,6 +131,7 @@ func (s *StoragePacker) DeleteItem(_ context.Context, itemID string) error {
|
|||
}
|
||||
|
||||
func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs ...string) error {
|
||||
defer metrics.MeasureSince([]string{"storage_packer", "delete_items"}, time.Now())
|
||||
var err error
|
||||
switch len(itemIDs) {
|
||||
case 0:
|
||||
|
@ -254,6 +257,7 @@ func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Lo
|
|||
}
|
||||
|
||||
func (s *StoragePacker) putBucket(ctx context.Context, bucket *Bucket) error {
|
||||
defer metrics.MeasureSince([]string{"storage_packer", "put_bucket"}, time.Now())
|
||||
if bucket == nil {
|
||||
return fmt.Errorf("nil bucket entry")
|
||||
}
|
||||
|
@ -293,6 +297,8 @@ func (s *StoragePacker) putBucket(ctx context.Context, bucket *Bucket) error {
|
|||
// GetItem fetches the storage entry for a given key from its corresponding
|
||||
// bucket.
|
||||
func (s *StoragePacker) GetItem(itemID string) (*Item, error) {
|
||||
defer metrics.MeasureSince([]string{"storage_packer", "get_item"}, time.Now())
|
||||
|
||||
if itemID == "" {
|
||||
return nil, fmt.Errorf("empty item ID")
|
||||
}
|
||||
|
@ -320,6 +326,8 @@ func (s *StoragePacker) GetItem(itemID string) (*Item, error) {
|
|||
|
||||
// PutItem stores the given item in its respective bucket
|
||||
func (s *StoragePacker) PutItem(_ context.Context, item *Item) error {
|
||||
defer metrics.MeasureSince([]string{"storage_packer", "put_item"}, time.Now())
|
||||
|
||||
if item == nil {
|
||||
return fmt.Errorf("nil item")
|
||||
}
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hashicorp/errwrap"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
@ -478,6 +480,8 @@ func (i *IdentityStore) entityByAliasFactorsInTxn(txn *memdb.Txn, mountAccessor,
|
|||
// CreateOrFetchEntity creates a new entity. This is used by core to
|
||||
// associate each login attempt by an alias to a unified entity in Vault.
|
||||
func (i *IdentityStore) CreateOrFetchEntity(ctx context.Context, alias *logical.Alias) (*identity.Entity, error) {
|
||||
defer metrics.MeasureSince([]string{"identity", "create_or_fetch_entity"}, time.Now())
|
||||
|
||||
var entity *identity.Entity
|
||||
var err error
|
||||
var update bool
|
||||
|
|
|
@ -6,7 +6,9 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hashicorp/errwrap"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
@ -330,6 +332,7 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
|
|||
// updated, in which case, callers should send in both entity and
|
||||
// previousEntity.
|
||||
func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, entity *identity.Entity, previousEntity *identity.Entity, persist bool) error {
|
||||
defer metrics.MeasureSince([]string{"identity", "upsert_entity_txn"}, time.Now())
|
||||
var err error
|
||||
|
||||
if txn == nil {
|
||||
|
@ -485,6 +488,7 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
|
|||
// updated, in which case, callers should send in both entity and
|
||||
// previousEntity.
|
||||
func (i *IdentityStore) upsertEntity(ctx context.Context, entity *identity.Entity, previousEntity *identity.Entity, persist bool) error {
|
||||
defer metrics.MeasureSince([]string{"identity", "upsert_entity"}, time.Now())
|
||||
|
||||
// Create a MemDB transaction to update both alias and entity
|
||||
txn := i.db.Txn(true)
|
||||
|
@ -1365,6 +1369,8 @@ func (i *IdentityStore) MemDBGroupByName(ctx context.Context, groupName string,
|
|||
}
|
||||
|
||||
func (i *IdentityStore) UpsertGroup(ctx context.Context, group *identity.Group, persist bool) error {
|
||||
defer metrics.MeasureSince([]string{"identity", "upsert_group"}, time.Now())
|
||||
|
||||
txn := i.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
|
@ -1379,6 +1385,8 @@ func (i *IdentityStore) UpsertGroup(ctx context.Context, group *identity.Group,
|
|||
}
|
||||
|
||||
func (i *IdentityStore) UpsertGroupInTxn(ctx context.Context, txn *memdb.Txn, group *identity.Group, persist bool) error {
|
||||
defer metrics.MeasureSince([]string{"identity", "upsert_group_txn"}, time.Now())
|
||||
|
||||
var err error
|
||||
|
||||
if txn == nil {
|
||||
|
@ -1879,91 +1887,130 @@ func (i *IdentityStore) MemDBGroupByAliasID(aliasID string, clone bool) (*identi
|
|||
}
|
||||
|
||||
func (i *IdentityStore) refreshExternalGroupMembershipsByEntityID(ctx context.Context, entityID string, groupAliases []*logical.Alias) ([]*logical.Alias, error) {
|
||||
i.logger.Debug("refreshing external group memberships", "entity_id", entityID, "group_aliases", groupAliases)
|
||||
defer metrics.MeasureSince([]string{"identity", "refresh_external_groups"}, time.Now())
|
||||
|
||||
if entityID == "" {
|
||||
return nil, fmt.Errorf("empty entity ID")
|
||||
}
|
||||
|
||||
i.groupLock.Lock()
|
||||
defer i.groupLock.Unlock()
|
||||
refreshFunc := func(dryRun bool) (bool, []*logical.Alias, error) {
|
||||
|
||||
txn := i.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
if !dryRun {
|
||||
i.groupLock.Lock()
|
||||
defer i.groupLock.Unlock()
|
||||
}
|
||||
|
||||
oldGroups, err := i.MemDBGroupsByMemberEntityIDInTxn(txn, entityID, true, true)
|
||||
txn := i.db.Txn(!dryRun)
|
||||
defer txn.Abort()
|
||||
|
||||
oldGroups, err := i.MemDBGroupsByMemberEntityIDInTxn(txn, entityID, true, true)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
mountAccessor := ""
|
||||
if len(groupAliases) != 0 {
|
||||
mountAccessor = groupAliases[0].MountAccessor
|
||||
}
|
||||
|
||||
var newGroups []*identity.Group
|
||||
var validAliases []*logical.Alias
|
||||
for _, alias := range groupAliases {
|
||||
aliasByFactors, err := i.MemDBAliasByFactorsInTxn(txn, alias.MountAccessor, alias.Name, true, true)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
if aliasByFactors == nil {
|
||||
continue
|
||||
}
|
||||
mappingGroup, err := i.MemDBGroupByAliasIDInTxn(txn, aliasByFactors.ID, true)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
if mappingGroup == nil {
|
||||
return false, nil, fmt.Errorf("group unavailable for a valid alias ID %q", aliasByFactors.ID)
|
||||
}
|
||||
|
||||
newGroups = append(newGroups, mappingGroup)
|
||||
validAliases = append(validAliases, alias)
|
||||
}
|
||||
|
||||
diff := diffGroups(oldGroups, newGroups)
|
||||
|
||||
// Add the entity ID to all the new groups
|
||||
for _, group := range diff.New {
|
||||
if group.Type != groupTypeExternal {
|
||||
continue
|
||||
}
|
||||
|
||||
// We need to update a group, if we are in a dry run we should
|
||||
// report back that a change needs to take place.
|
||||
if dryRun {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
i.logger.Debug("adding member entity ID to external group", "member_entity_id", entityID, "group_id", group.ID)
|
||||
|
||||
group.MemberEntityIDs = append(group.MemberEntityIDs, entityID)
|
||||
|
||||
err = i.UpsertGroupInTxn(ctx, txn, group, true)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the entity ID from all the deleted groups
|
||||
for _, group := range diff.Deleted {
|
||||
if group.Type != groupTypeExternal {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the external group is from a different mount, don't remove the
|
||||
// entity ID from it.
|
||||
if mountAccessor != "" && group.Alias != nil && group.Alias.MountAccessor != mountAccessor {
|
||||
continue
|
||||
}
|
||||
|
||||
// We need to update a group, if we are in a dry run we should
|
||||
// report back that a change needs to take place.
|
||||
if dryRun {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
i.logger.Debug("removing member entity ID from external group", "member_entity_id", entityID, "group_id", group.ID)
|
||||
|
||||
group.MemberEntityIDs = strutil.StrListDelete(group.MemberEntityIDs, entityID)
|
||||
|
||||
err = i.UpsertGroupInTxn(ctx, txn, group, true)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return false, validAliases, nil
|
||||
}
|
||||
|
||||
// dryRun
|
||||
needsUpdate, validAliases, err := refreshFunc(true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mountAccessor := ""
|
||||
if len(groupAliases) != 0 {
|
||||
mountAccessor = groupAliases[0].MountAccessor
|
||||
if needsUpdate || len(groupAliases) > 0 {
|
||||
i.logger.Debug("refreshing external group memberships", "entity_id", entityID, "group_aliases", groupAliases)
|
||||
}
|
||||
|
||||
var newGroups []*identity.Group
|
||||
var validAliases []*logical.Alias
|
||||
for _, alias := range groupAliases {
|
||||
aliasByFactors, err := i.MemDBAliasByFactors(alias.MountAccessor, alias.Name, true, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if aliasByFactors == nil {
|
||||
continue
|
||||
}
|
||||
mappingGroup, err := i.MemDBGroupByAliasID(aliasByFactors.ID, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if mappingGroup == nil {
|
||||
return nil, fmt.Errorf("group unavailable for a valid alias ID %q", aliasByFactors.ID)
|
||||
}
|
||||
|
||||
newGroups = append(newGroups, mappingGroup)
|
||||
validAliases = append(validAliases, alias)
|
||||
if !needsUpdate {
|
||||
return validAliases, nil
|
||||
}
|
||||
|
||||
diff := diffGroups(oldGroups, newGroups)
|
||||
|
||||
// Add the entity ID to all the new groups
|
||||
for _, group := range diff.New {
|
||||
if group.Type != groupTypeExternal {
|
||||
continue
|
||||
}
|
||||
|
||||
i.logger.Debug("adding member entity ID to external group", "member_entity_id", entityID, "group_id", group.ID)
|
||||
|
||||
group.MemberEntityIDs = append(group.MemberEntityIDs, entityID)
|
||||
|
||||
err = i.UpsertGroupInTxn(ctx, txn, group, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Run the update
|
||||
_, validAliases, err = refreshFunc(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Remove the entity ID from all the deleted groups
|
||||
for _, group := range diff.Deleted {
|
||||
if group.Type != groupTypeExternal {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the external group is from a different mount, don't remove the
|
||||
// entity ID from it.
|
||||
if mountAccessor != "" && group.Alias != nil && group.Alias.MountAccessor != mountAccessor {
|
||||
continue
|
||||
}
|
||||
|
||||
i.logger.Debug("removing member entity ID from external group", "member_entity_id", entityID, "group_id", group.ID)
|
||||
|
||||
group.MemberEntityIDs = strutil.StrListDelete(group.MemberEntityIDs, entityID)
|
||||
|
||||
err = i.UpsertGroupInTxn(ctx, txn, group, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
|
||||
return validAliases, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue