open-vault/physical/inmem_ha.go

157 lines
3 KiB
Go
Raw Normal View History

2015-04-14 19:04:15 +00:00
package physical
import (
"fmt"
"sync"
2016-08-19 20:45:17 +00:00
log "github.com/mgutz/logxi/v1"
2015-04-14 19:04:15 +00:00
)
type InmemHABackend struct {
2017-02-17 14:15:35 +00:00
Backend
locks map[string]string
l sync.Mutex
cond *sync.Cond
2016-08-19 20:45:17 +00:00
logger log.Logger
2015-04-14 19:04:15 +00:00
}
2017-02-17 14:15:35 +00:00
type TransactionalInmemHABackend struct {
Transactional
InmemHABackend
}
2015-04-14 19:04:15 +00:00
// NewInmemHA constructs a new in-memory HA backend. This is only for testing.
2016-08-19 20:45:17 +00:00
func NewInmemHA(logger log.Logger) *InmemHABackend {
2015-04-14 19:04:15 +00:00
in := &InmemHABackend{
2017-02-17 14:15:35 +00:00
Backend: NewInmem(logger),
locks: make(map[string]string),
logger: logger,
}
in.cond = sync.NewCond(&in.l)
return in
}
func NewTransactionalInmemHA(logger log.Logger) *TransactionalInmemHABackend {
transInmem := NewTransactionalInmem(logger)
inmemHA := InmemHABackend{
Backend: transInmem,
locks: make(map[string]string),
logger: logger,
}
in := &TransactionalInmemHABackend{
InmemHABackend: inmemHA,
Transactional: transInmem,
2015-04-14 19:04:15 +00:00
}
in.cond = sync.NewCond(&in.l)
2015-04-14 19:04:15 +00:00
return in
}
// LockWith is used for mutual exclusion based on the given key.
func (i *InmemHABackend) LockWith(key, value string) (Lock, error) {
l := &InmemLock{
in: i,
key: key,
value: value,
2015-04-14 19:04:15 +00:00
}
return l, nil
2015-04-14 19:04:15 +00:00
}
// LockMapSize is used in some tests to determine whether this backend has ever
// been used for HA purposes rather than simply for storage
func (i *InmemHABackend) LockMapSize() int {
return len(i.locks)
}
// HAEnabled indicates whether the HA functionality should be exposed.
// Currently always returns true.
func (i *InmemHABackend) HAEnabled() bool {
return true
}
2015-04-14 19:04:15 +00:00
// InmemLock is an in-memory Lock implementation for the HABackend
type InmemLock struct {
in *InmemHABackend
key string
value string
2015-04-14 19:04:15 +00:00
held bool
leaderCh chan struct{}
l sync.Mutex
}
func (i *InmemLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
i.l.Lock()
defer i.l.Unlock()
if i.held {
return nil, fmt.Errorf("lock already held")
}
// Attempt an async acquisition
didLock := make(chan struct{})
releaseCh := make(chan bool, 1)
go func() {
// Wait to acquire the lock
i.in.l.Lock()
_, ok := i.in.locks[i.key]
for ok {
i.in.cond.Wait()
_, ok = i.in.locks[i.key]
}
i.in.locks[i.key] = i.value
i.in.l.Unlock()
// Signal that lock is held
2015-04-14 19:04:15 +00:00
close(didLock)
// Handle an early abort
release := <-releaseCh
if release {
i.in.l.Lock()
delete(i.in.locks, i.key)
i.in.l.Unlock()
i.in.cond.Broadcast()
2015-04-14 19:04:15 +00:00
}
}()
// Wait for lock acquisition or shutdown
select {
case <-didLock:
releaseCh <- false
case <-stopCh:
releaseCh <- true
return nil, nil
}
// Create the leader channel
i.held = true
i.leaderCh = make(chan struct{})
return i.leaderCh, nil
}
func (i *InmemLock) Unlock() error {
i.l.Lock()
defer i.l.Unlock()
if !i.held {
return nil
}
close(i.leaderCh)
i.leaderCh = nil
i.held = false
i.in.l.Lock()
delete(i.in.locks, i.key)
i.in.l.Unlock()
i.in.cond.Broadcast()
2015-04-14 19:04:15 +00:00
return nil
}
func (i *InmemLock) Value() (bool, string, error) {
i.in.l.Lock()
val, ok := i.in.locks[i.key]
i.in.l.Unlock()
return ok, val, nil
}