197 lines
6.0 KiB
Go
197 lines
6.0 KiB
Go
|
// Copyright (c) HashiCorp, Inc.
|
||
|
// SPDX-License-Identifier: MPL-2.0
|
||
|
|
||
|
// Package queue provides Vault plugins with a Priority Queue. It can be used
|
||
|
// as an in-memory list of queue.Item sorted by their priority, and offers
|
||
|
// methods to find or remove items by their key. Internally it uses
|
||
|
// container/heap; see Example Priority Queue:
|
||
|
// https://golang.org/pkg/container/heap/#example__priorityQueue
|
||
|
package queue
|
||
|
|
||
|
import (
|
||
|
"container/heap"
|
||
|
"errors"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/mitchellh/copystructure"
|
||
|
)
|
||
|
|
||
|
// ErrEmpty is returned for queues with no items
|
||
|
var ErrEmpty = errors.New("queue is empty")
|
||
|
|
||
|
// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that
|
||
|
// already exists. The queue does not attempt to update, instead returns this
|
||
|
// error. If an Item needs to be updated or replaced, pop the item first.
|
||
|
var ErrDuplicateItem = errors.New("duplicate item")
|
||
|
|
||
|
// New initializes the internal data structures and returns a new
|
||
|
// PriorityQueue
|
||
|
func New() *PriorityQueue {
|
||
|
pq := PriorityQueue{
|
||
|
data: make(queue, 0),
|
||
|
dataMap: make(map[string]*Item),
|
||
|
}
|
||
|
heap.Init(&pq.data)
|
||
|
return &pq
|
||
|
}
|
||
|
|
||
|
// PriorityQueue facilitates queue of Items, providing Push, Pop, and
|
||
|
// PopByKey convenience methods. The ordering (priority) is an int64 value
|
||
|
// with the smallest value is the highest priority. PriorityQueue maintains both
|
||
|
// an internal slice for the queue as well as a map of the same items with their
|
||
|
// keys as the index. This enables users to find specific items by key. The map
|
||
|
// must be kept in sync with the data slice.
|
||
|
// See https://golang.org/pkg/container/heap/#example__priorityQueue
|
||
|
type PriorityQueue struct {
|
||
|
// data is the internal structure that holds the queue, and is operated on by
|
||
|
// heap functions
|
||
|
data queue
|
||
|
|
||
|
// dataMap represents all the items in the queue, with unique indexes, used
|
||
|
// for finding specific items. dataMap is kept in sync with the data slice
|
||
|
dataMap map[string]*Item
|
||
|
|
||
|
// lock is a read/write mutex, and used to facilitate read/write locks on the
|
||
|
// data and dataMap fields
|
||
|
lock sync.RWMutex
|
||
|
}
|
||
|
|
||
|
// queue is the internal data structure used to satisfy heap.Interface. This
|
||
|
// prevents users from calling Pop and Push heap methods directly
|
||
|
type queue []*Item
|
||
|
|
||
|
// Item is something managed in the priority queue
|
||
|
type Item struct {
|
||
|
// Key is a unique string used to identify items in the internal data map
|
||
|
Key string
|
||
|
// Value is an unspecified type that implementations can use to store
|
||
|
// information
|
||
|
Value interface{}
|
||
|
|
||
|
// Priority determines ordering in the queue, with the lowest value being the
|
||
|
// highest priority
|
||
|
Priority int64
|
||
|
|
||
|
// index is an internal value used by the heap package, and should not be
|
||
|
// modified by any consumer of the priority queue
|
||
|
index int
|
||
|
}
|
||
|
|
||
|
// Len returns the count of items in the Priority Queue
|
||
|
func (pq *PriorityQueue) Len() int {
|
||
|
pq.lock.RLock()
|
||
|
defer pq.lock.RUnlock()
|
||
|
return pq.data.Len()
|
||
|
}
|
||
|
|
||
|
// Pop pops the highest priority item from the queue. This is a
|
||
|
// wrapper/convenience method that calls heap.Pop, so consumers do not need to
|
||
|
// invoke heap functions directly
|
||
|
func (pq *PriorityQueue) Pop() (*Item, error) {
|
||
|
pq.lock.Lock()
|
||
|
defer pq.lock.Unlock()
|
||
|
|
||
|
if pq.data.Len() == 0 {
|
||
|
return nil, ErrEmpty
|
||
|
}
|
||
|
|
||
|
item := heap.Pop(&pq.data).(*Item)
|
||
|
delete(pq.dataMap, item.Key)
|
||
|
return item, nil
|
||
|
}
|
||
|
|
||
|
// Push pushes an item on to the queue. This is a wrapper/convenience
|
||
|
// method that calls heap.Push, so consumers do not need to invoke heap
|
||
|
// functions directly. Items must have unique Keys, and Items in the queue
|
||
|
// cannot be updated. To modify an Item, users must first remove it and re-push
|
||
|
// it after modifications
|
||
|
func (pq *PriorityQueue) Push(i *Item) error {
|
||
|
if i == nil || i.Key == "" {
|
||
|
return errors.New("error adding item: Item Key is required")
|
||
|
}
|
||
|
|
||
|
pq.lock.Lock()
|
||
|
defer pq.lock.Unlock()
|
||
|
|
||
|
if _, ok := pq.dataMap[i.Key]; ok {
|
||
|
return ErrDuplicateItem
|
||
|
}
|
||
|
// Copy the item value(s) so that modifications to the source item does not
|
||
|
// affect the item on the queue
|
||
|
clone, err := copystructure.Copy(i)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
pq.dataMap[i.Key] = clone.(*Item)
|
||
|
heap.Push(&pq.data, clone)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// PopByKey searches the queue for an item with the given key and removes it
|
||
|
// from the queue if found. Returns nil if not found. This method must fix the
|
||
|
// queue after removing any key.
|
||
|
func (pq *PriorityQueue) PopByKey(key string) (*Item, error) {
|
||
|
pq.lock.Lock()
|
||
|
defer pq.lock.Unlock()
|
||
|
|
||
|
item, ok := pq.dataMap[key]
|
||
|
if !ok {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
// Remove the item the heap and delete it from the dataMap
|
||
|
itemRaw := heap.Remove(&pq.data, item.index)
|
||
|
delete(pq.dataMap, key)
|
||
|
|
||
|
if itemRaw != nil {
|
||
|
if i, ok := itemRaw.(*Item); ok {
|
||
|
return i, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
// Len returns the number of items in the queue data structure. Do not use this
|
||
|
// method directly on the queue, use PriorityQueue.Len() instead.
|
||
|
func (q queue) Len() int { return len(q) }
|
||
|
|
||
|
// Less returns whether the Item with index i should sort before the Item with
|
||
|
// index j in the queue. This method is used by the queue to determine priority
|
||
|
// internally; the Item with the lower value wins. (priority zero is higher
|
||
|
// priority than 1). The priority of Items with equal values is undetermined.
|
||
|
func (q queue) Less(i, j int) bool {
|
||
|
return q[i].Priority < q[j].Priority
|
||
|
}
|
||
|
|
||
|
// Swap swaps things in-place; part of sort.Interface
|
||
|
func (q queue) Swap(i, j int) {
|
||
|
q[i], q[j] = q[j], q[i]
|
||
|
q[i].index = i
|
||
|
q[j].index = j
|
||
|
}
|
||
|
|
||
|
// Push is used by heap.Interface to push items onto the heap. This method is
|
||
|
// invoked by container/heap, and should not be used directly.
|
||
|
// See: https://golang.org/pkg/container/heap/#Interface
|
||
|
func (q *queue) Push(x interface{}) {
|
||
|
n := len(*q)
|
||
|
item := x.(*Item)
|
||
|
item.index = n
|
||
|
*q = append(*q, item)
|
||
|
}
|
||
|
|
||
|
// Pop is used by heap.Interface to pop items off of the heap. This method is
|
||
|
// invoked by container/heap, and should not be used directly.
|
||
|
// See: https://golang.org/pkg/container/heap/#Interface
|
||
|
func (q *queue) Pop() interface{} {
|
||
|
old := *q
|
||
|
n := len(old)
|
||
|
item := old[n-1]
|
||
|
old[n-1] = nil // avoid memory leak
|
||
|
item.index = -1 // for safety
|
||
|
*q = old[0 : n-1]
|
||
|
return item
|
||
|
}
|