csi: update leader's ACL in volumewatcher (#11891)

The volumewatcher that runs on the leader needs to make RPC calls
rather than writing to raft (as we do in the deploymentwatcher)
because the unpublish workflow needs to make RPC calls to the
clients. This requires that the volumewatcher has access to the
leader's ACL token.

But when leadership transitions, the new leader creates a new leader
ACL token. This ACL token needs to be passed into the volumewatcher
when we enable it, otherwise the volumewatcher can find itself with a
stale token.
This commit is contained in:
Tim Gross 2022-01-24 11:49:50 -05:00
parent 460416e787
commit 4e559c6255
4 changed files with 16 additions and 12 deletions

3
.changelog/11891.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions.
```

View File

@ -263,7 +263,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.nodeDrainer.SetEnabled(true, s.State()) s.nodeDrainer.SetEnabled(true, s.State())
// Enable the volume watcher, since we are now the leader // Enable the volume watcher, since we are now the leader
s.volumeWatcher.SetEnabled(true, s.State()) s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl())
// Restore the eval broker state // Restore the eval broker state
if err := s.restoreEvals(); err != nil { if err := s.restoreEvals(); err != nil {
@ -1074,7 +1074,7 @@ func (s *Server) revokeLeadership() error {
s.nodeDrainer.SetEnabled(false, nil) s.nodeDrainer.SetEnabled(false, nil)
// Disable the volume watcher // Disable the volume watcher
s.volumeWatcher.SetEnabled(false, nil) s.volumeWatcher.SetEnabled(false, nil, "")
// Disable any enterprise systems required. // Disable any enterprise systems required.
if err := s.revokeEnterpriseLeadership(); err != nil { if err := s.revokeEnterpriseLeadership(); err != nil {

View File

@ -57,14 +57,15 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W
// SetEnabled is used to control if the watcher is enabled. The // SetEnabled is used to control if the watcher is enabled. The
// watcher should only be enabled on the active leader. When being // watcher should only be enabled on the active leader. When being
// enabled the state is passed in as it is no longer valid once a // enabled the state and leader's ACL is passed in as it is no longer
// leader election has taken place. // valid once a leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) { func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore, leaderAcl string) {
w.wlock.Lock() w.wlock.Lock()
defer w.wlock.Unlock() defer w.wlock.Unlock()
wasEnabled := w.enabled wasEnabled := w.enabled
w.enabled = enabled w.enabled = enabled
w.leaderAcl = leaderAcl
if state != nil { if state != nil {
w.state = state w.state = state

View File

@ -23,7 +23,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
index := uint64(100) index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State()) watcher.SetEnabled(true, srv.State(), "")
plugin := mock.CSIPlugin() plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State()) node := testNode(plugin, srv.State())
@ -48,7 +48,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
return 1 == len(watcher.watchers) return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond) }, time.Second, 10*time.Millisecond)
watcher.SetEnabled(false, nil) watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers)) require.Equal(0, len(watcher.watchers))
} }
@ -70,7 +70,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusComplete alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID) vol := testVolume(plugin, alloc, node.ID)
watcher.SetEnabled(true, srv.State()) watcher.SetEnabled(true, srv.State(), "")
index++ index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
@ -94,7 +94,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// watches for that change will fire on the new watcher // watches for that change will fire on the new watcher
// step-down (this is sync) // step-down (this is sync)
watcher.SetEnabled(false, nil) watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers)) require.Equal(0, len(watcher.watchers))
// allocation is now invalid // allocation is now invalid
@ -116,7 +116,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// create a new watcher and enable it to simulate the leadership // create a new watcher and enable it to simulate the leadership
// transition // transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State()) watcher.SetEnabled(true, srv.State(), "")
require.Eventually(func() bool { require.Eventually(func() bool {
watcher.wlock.RLock() watcher.wlock.RLock()
@ -142,7 +142,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
index := uint64(100) index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State()) watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers)) require.Equal(0, len(watcher.watchers))
plugin := mock.CSIPlugin() plugin := mock.CSIPlugin()
@ -237,7 +237,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State()) watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers)) require.Equal(0, len(watcher.watchers))
plugin := mock.CSIPlugin() plugin := mock.CSIPlugin()