core: fix data races in blocked eval chan handling (#14142)

Similar to the deployment watcher fix in #14121 - the server code loves these mutable structs so we need to guard access to the struct fields with locks.

Capturing ch := b.capacityChangeCh is sufficient to satisfy the data race detector, but I noticed it was also possible to leak goroutines:

Since the watchCapacity loop is in charge of receiving from capacityChangeCh and exits when stopCh is closed, senders to capacityChangeCh also must exit when stopCh is closed. Otherwise they may block forever if capacityChangeCh is full because it will never be received on again. I did not find evidence of this occurring in my meager smattering of prod goroutine dumps I have laying around, but this isn't surprising as the chan has a buffer of 8096! I would imagine that is sufficient to handle "late" sends and then just get GC'd away when the last reference to the old chan is dropped. This is just additional safety/correctness.
This commit is contained in:
Michael Schurter 2022-08-16 12:33:53 -07:00 committed by GitHub
parent 0c3cfb073a
commit cdf5a74998
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 3 deletions

View File

@ -413,11 +413,18 @@ func (b *BlockedEvals) Unblock(computedClass string, index uint64) {
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
b.unblockIndexes[computedClass] = index
// Capture chan in lock as Flush overwrites it
ch := b.capacityChangeCh
done := b.stopCh
b.l.Unlock()
b.capacityChangeCh <- &capacityUpdate{
select {
case <-done:
case ch <- &capacityUpdate{
computedClass: computedClass,
index: index,
}:
}
}
@ -441,11 +448,16 @@ func (b *BlockedEvals) UnblockQuota(quota string, index uint64) {
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
b.unblockIndexes[quota] = index
ch := b.capacityChangeCh
done := b.stopCh
b.l.Unlock()
b.capacityChangeCh <- &capacityUpdate{
select {
case <-done:
case ch <- &capacityUpdate{
quotaChange: quota,
index: index,
}:
}
}
@ -472,12 +484,16 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) {
// Capture chan inside the lock to prevent a race with it getting reset
// in Flush.
ch := b.capacityChangeCh
done := b.stopCh
b.l.Unlock()
ch <- &capacityUpdate{
select {
case <-done:
case ch <- &capacityUpdate{
computedClass: class,
quotaChange: quota,
index: index,
}:
}
}