Cleanup and simplify lock usage in database plugin (#15944)
Cleanup and simplify lock usage in database plugin Following up from discussions in #15923 and #15933, I wanted to split out a separate PR that drastically reduced the complexity of the use of the databaseBackend lock. We no longer need it at all for the `credRotationQueue`, and we can move it to be solely used in a few, small connections map management functions. Co-authored-by: Calvin Leung Huang <1883212+calvn@users.noreply.github.com>
This commit is contained in:
parent
6205cf6999
commit
99bc3e7b0e
|
@ -55,13 +55,8 @@ func Factory(ctx context.Context, conf *logical.BackendConfig) (logical.Backend,
|
|||
}
|
||||
|
||||
b.credRotationQueue = queue.New()
|
||||
// Create a context with a cancel method for processing any WAL entries and
|
||||
// populating the queue
|
||||
initCtx := context.Background()
|
||||
ictx, cancel := context.WithCancel(initCtx)
|
||||
b.cancelQueue = cancel
|
||||
// Load queue and kickoff new periodic ticker
|
||||
go b.initQueue(ictx, conf, conf.System.ReplicationState())
|
||||
go b.initQueue(b.queueCtx, conf, conf.System.ReplicationState())
|
||||
return b, nil
|
||||
}
|
||||
|
||||
|
@ -103,28 +98,28 @@ func Backend(conf *logical.BackendConfig) *databaseBackend {
|
|||
|
||||
b.logger = conf.Logger
|
||||
b.connections = make(map[string]*dbPluginInstance)
|
||||
|
||||
b.queueCtx, b.cancelQueueCtx = context.WithCancel(context.Background())
|
||||
b.roleLocks = locksutil.CreateLocks()
|
||||
|
||||
return &b
|
||||
}
|
||||
|
||||
type databaseBackend struct {
|
||||
// connLock is used to synchronize access to the connections map
|
||||
connLock sync.RWMutex
|
||||
// connections holds configured database connections by config name
|
||||
connections map[string]*dbPluginInstance
|
||||
logger log.Logger
|
||||
|
||||
*framework.Backend
|
||||
sync.RWMutex
|
||||
// CredRotationQueue is an in-memory priority queue used to track Static Roles
|
||||
// credRotationQueue is an in-memory priority queue used to track Static Roles
|
||||
// that require periodic rotation. Backends will have a PriorityQueue
|
||||
// initialized on setup, but only backends that are mounted by a primary
|
||||
// server or mounted as a local mount will perform the rotations.
|
||||
//
|
||||
// cancelQueue is used to remove the priority queue and terminate the
|
||||
// background ticker.
|
||||
credRotationQueue *queue.PriorityQueue
|
||||
cancelQueue context.CancelFunc
|
||||
// queueCtx is the context for the priority queue
|
||||
queueCtx context.Context
|
||||
// cancelQueueCtx is used to terminate the background ticker
|
||||
cancelQueueCtx context.CancelFunc
|
||||
|
||||
// roleLocks is used to lock modifications to roles in the queue, to ensure
|
||||
// concurrent requests are not modifying the same role and possibly causing
|
||||
|
@ -132,6 +127,47 @@ type databaseBackend struct {
|
|||
roleLocks []*locksutil.LockEntry
|
||||
}
|
||||
|
||||
func (b *databaseBackend) connGet(name string) *dbPluginInstance {
|
||||
b.connLock.RLock()
|
||||
defer b.connLock.RUnlock()
|
||||
return b.connections[name]
|
||||
}
|
||||
|
||||
func (b *databaseBackend) connPop(name string) *dbPluginInstance {
|
||||
b.connLock.Lock()
|
||||
defer b.connLock.Unlock()
|
||||
dbi := b.connections[name]
|
||||
delete(b.connections, name)
|
||||
return dbi
|
||||
}
|
||||
|
||||
func (b *databaseBackend) connPopIfEqual(name, id string) *dbPluginInstance {
|
||||
b.connLock.Lock()
|
||||
defer b.connLock.Unlock()
|
||||
dbi, ok := b.connections[name]
|
||||
if ok && dbi.id == id {
|
||||
delete(b.connections, name)
|
||||
return dbi
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *databaseBackend) connPut(name string, newDbi *dbPluginInstance) *dbPluginInstance {
|
||||
b.connLock.Lock()
|
||||
defer b.connLock.Unlock()
|
||||
dbi := b.connections[name]
|
||||
b.connections[name] = newDbi
|
||||
return dbi
|
||||
}
|
||||
|
||||
func (b *databaseBackend) connClear() map[string]*dbPluginInstance {
|
||||
b.connLock.Lock()
|
||||
defer b.connLock.Unlock()
|
||||
old := b.connections
|
||||
b.connections = make(map[string]*dbPluginInstance)
|
||||
return old
|
||||
}
|
||||
|
||||
func (b *databaseBackend) DatabaseConfig(ctx context.Context, s logical.Storage, name string) (*DatabaseConfig, error) {
|
||||
entry, err := s.Get(ctx, fmt.Sprintf("config/%s", name))
|
||||
if err != nil {
|
||||
|
@ -236,22 +272,8 @@ func (b *databaseBackend) GetConnection(ctx context.Context, s logical.Storage,
|
|||
}
|
||||
|
||||
func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name string, config *DatabaseConfig) (*dbPluginInstance, error) {
|
||||
b.RLock()
|
||||
unlockFunc := b.RUnlock
|
||||
defer func() { unlockFunc() }()
|
||||
|
||||
dbi, ok := b.connections[name]
|
||||
if ok {
|
||||
return dbi, nil
|
||||
}
|
||||
|
||||
// Upgrade lock
|
||||
b.RUnlock()
|
||||
b.Lock()
|
||||
unlockFunc = b.Unlock
|
||||
|
||||
dbi, ok = b.connections[name]
|
||||
if ok {
|
||||
dbi := b.connGet(name)
|
||||
if dbi != nil {
|
||||
return dbi, nil
|
||||
}
|
||||
|
||||
|
@ -280,38 +302,34 @@ func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name stri
|
|||
id: id,
|
||||
name: name,
|
||||
}
|
||||
b.connections[name] = dbi
|
||||
oldConn := b.connPut(name, dbi)
|
||||
if oldConn != nil {
|
||||
err := oldConn.Close()
|
||||
if err != nil {
|
||||
b.Logger().Warn("Error closing database connection", "error", err)
|
||||
}
|
||||
}
|
||||
return dbi, nil
|
||||
}
|
||||
|
||||
// invalidateQueue cancels any background queue loading and destroys the queue.
|
||||
func (b *databaseBackend) invalidateQueue() {
|
||||
// cancel context before grabbing lock to start closing any open connections
|
||||
// this is safe to do without the lock since it is only written to once in initialization
|
||||
// and can be canceled multiple times safely
|
||||
if b.cancelQueue != nil {
|
||||
b.cancelQueue()
|
||||
}
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
b.credRotationQueue = nil
|
||||
}
|
||||
|
||||
// ClearConnection closes the database connection and
|
||||
// removes it from the b.connections map.
|
||||
func (b *databaseBackend) ClearConnection(name string) error {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
return b.clearConnection(name)
|
||||
}
|
||||
|
||||
func (b *databaseBackend) clearConnection(name string) error {
|
||||
db, ok := b.connections[name]
|
||||
if ok {
|
||||
db := b.connPop(name)
|
||||
if db != nil {
|
||||
// Ignore error here since the database client is always killed
|
||||
db.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearConnectionId closes the database connection with a specific id and
|
||||
// removes it from the b.connections map.
|
||||
func (b *databaseBackend) ClearConnectionId(name, id string) error {
|
||||
db := b.connPopIfEqual(name, id)
|
||||
if db != nil {
|
||||
// Ignore error here since the database client is always killed
|
||||
db.Close()
|
||||
delete(b.connections, name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -324,33 +342,26 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) {
|
|||
// and simply defer the unlock. Since we are attaching the instance and matching
|
||||
// the id in the connection map, we can safely do this.
|
||||
go func() {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
db.Close()
|
||||
|
||||
// Ensure we are deleting the correct connection
|
||||
mapDB, ok := b.connections[db.name]
|
||||
if ok && db.id == mapDB.id {
|
||||
delete(b.connections, db.name)
|
||||
}
|
||||
// Delete the connection if it is still active.
|
||||
b.connPopIfEqual(db.name, db.id)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// clean closes all connections from all database types
|
||||
// and cancels any rotation queue loading operation.
|
||||
func (b *databaseBackend) clean(ctx context.Context) {
|
||||
// invalidateQueue acquires it's own lock on the backend, removes queue, and
|
||||
// terminates the background ticker
|
||||
b.invalidateQueue()
|
||||
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
for _, db := range b.connections {
|
||||
db.Close()
|
||||
func (b *databaseBackend) clean(_ context.Context) {
|
||||
// kill the queue and terminate the background ticker
|
||||
if b.cancelQueueCtx != nil {
|
||||
b.cancelQueueCtx()
|
||||
}
|
||||
|
||||
connections := b.connClear()
|
||||
for _, db := range connections {
|
||||
go db.Close()
|
||||
}
|
||||
b.connections = make(map[string]*dbPluginInstance)
|
||||
}
|
||||
|
||||
const backendHelp = `
|
||||
|
|
|
@ -344,16 +344,14 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {
|
|||
|
||||
b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)
|
||||
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
// Close and remove the old connection
|
||||
b.clearConnection(name)
|
||||
|
||||
b.connections[name] = &dbPluginInstance{
|
||||
oldConn := b.connPut(name, &dbPluginInstance{
|
||||
database: dbw,
|
||||
name: name,
|
||||
id: id,
|
||||
})
|
||||
if oldConn != nil {
|
||||
oldConn.Close()
|
||||
}
|
||||
|
||||
err = storeConfig(ctx, req.Storage, name, config)
|
||||
|
|
|
@ -78,22 +78,19 @@ func (b *databaseBackend) pathRotateRootCredentialsUpdate() framework.OperationF
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Take out the backend lock since we are swapping out the connection
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
// Take the write lock on the instance
|
||||
dbi.Lock()
|
||||
defer dbi.Unlock()
|
||||
|
||||
defer func() {
|
||||
dbi.Unlock()
|
||||
// Even on error, still remove the connection
|
||||
b.ClearConnectionId(name, dbi.id)
|
||||
}()
|
||||
defer func() {
|
||||
// Close the plugin
|
||||
dbi.closed = true
|
||||
if err := dbi.database.Close(); err != nil {
|
||||
b.Logger().Error("error closing the database plugin connection", "err", err)
|
||||
}
|
||||
// Even on error, still remove the connection
|
||||
delete(b.connections, name)
|
||||
}()
|
||||
|
||||
generator, err := newPasswordGenerator(nil)
|
||||
|
|
|
@ -642,14 +642,11 @@ func (b *databaseBackend) loadStaticWALs(ctx context.Context, s logical.Storage)
|
|||
// actually available. This is needed because both runTicker and initQueue
|
||||
// operate in go-routines, and could be accessing the queue concurrently
|
||||
func (b *databaseBackend) pushItem(item *queue.Item) error {
|
||||
b.RLock()
|
||||
unlockFunc := b.RUnlock
|
||||
defer func() { unlockFunc() }()
|
||||
|
||||
if b.credRotationQueue != nil {
|
||||
select {
|
||||
case <-b.queueCtx.Done():
|
||||
default:
|
||||
return b.credRotationQueue.Push(item)
|
||||
}
|
||||
|
||||
b.Logger().Warn("no queue found during push item")
|
||||
return nil
|
||||
}
|
||||
|
@ -658,9 +655,9 @@ func (b *databaseBackend) pushItem(item *queue.Item) error {
|
|||
// actually available. This is needed because both runTicker and initQueue
|
||||
// operate in go-routines, and could be accessing the queue concurrently
|
||||
func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) {
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
if b.credRotationQueue != nil {
|
||||
select {
|
||||
case <-b.queueCtx.Done():
|
||||
default:
|
||||
return b.credRotationQueue.Pop()
|
||||
}
|
||||
return nil, queue.ErrEmpty
|
||||
|
@ -670,9 +667,9 @@ func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) {
|
|||
// actually available. This is needed because both runTicker and initQueue
|
||||
// operate in go-routines, and could be accessing the queue concurrently
|
||||
func (b *databaseBackend) popFromRotationQueueByKey(name string) (*queue.Item, error) {
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
if b.credRotationQueue != nil {
|
||||
select {
|
||||
case <-b.queueCtx.Done():
|
||||
default:
|
||||
item, err := b.credRotationQueue.PopByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Reference in a new issue