From 7e567aa8a91ccb5fd92f24c782112eb1585376da Mon Sep 17 00:00:00 2001 From: Bazaah Date: Sat, 31 Jul 2021 19:21:47 +0000 Subject: [PATCH] lib/queue: add Queue, a stable min binary heap The structure will be how tokens are returned via the Scanner, over the current Vec. This change is occurring because: The genesis of this structure is a need in the Scanner for fast pops, and fast inserts. A binary heap gives me both, namely O(1) inserts and O(log(n)) pops -- with allocations amortized. This is because of how YAML handles implicit keys... in that you don't know whether you have one until you hit a value (': '). The easiest solution is just to save these potential implicit keys and then insert them into the token list at the correct position, but this would require memcopy'ing everything >key.pos and potentially cause many more reallocations than required. Enter the Queue. I couldn't just use std::BinaryHeap for two reasons: 1. Its a max heap 2. Its not stable, the order of equal elements is unspecified The Queue fixes both of these problems, first by innately using std::Reverse, and second by guaranteeing that equal elements are returned in the order added. These two attributes allow me to use Scanner.stats.read (number of bytes consumed so far) and a bit of elbow grease to get my tokens out in the right order. --- src/lib.rs | 1 + src/queue.rs | 374 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 375 insertions(+) create mode 100644 src/queue.rs diff --git a/src/lib.rs b/src/lib.rs index 7686cae..559b61c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ #![allow(clippy::suspicious_else_formatting)] mod error; +mod queue; mod reader; mod scanner; mod token; diff --git a/src/queue.rs b/src/queue.rs new file mode 100644 index 0000000..2ecba26 --- /dev/null +++ b/src/queue.rs @@ -0,0 +1,374 @@ +//! The Queue is a stable min heap structure that uses std's +//! BinaryHeap at it's core. This means that is has ~O(1) +//! insert, and O(log(n)) pop operations. +//! +//! While it does have a worst case O(n) pop if the data is +//! pathological, we will _mostly_ be inserting elements in +//! sorted order, only occasionally requiring out of order +//! inserts, and never more than +-3 elements apart. + +use std::{ + cmp::{Ordering, Reverse}, + collections::BinaryHeap, + fmt::{self, Debug}, + iter::FromIterator, +}; + +/// A min heap data structure that keeps a stable ordering +/// of elements, ensuring that otherwise equal items are +/// returned in the order added +pub(crate) struct Queue +{ + heap: BinaryHeap>>, + increment: usize, +} + +impl Queue +where + T: Ord, +{ + pub fn new() -> Self + { + Self::default() + } + + pub fn push(&mut self, item: T) + { + let entry = QueueEntry::new(self.increment(), item); + + self.heap.push(Reverse(entry)) + } + + pub fn pop(&mut self) -> Option + { + if let Some(Reverse(QueueEntry { entry, mark: _ })) = self.heap.pop() + { + return Some(entry); + }; + + None + } + + pub fn sort(&mut self) + { + let heap = std::mem::take(&mut self.heap); + let heap = heap.into_sorted_vec(); + + self.heap = BinaryHeap::from(heap); + } + + pub fn into_sorted_vec(self) -> Vec + { + self.into_iter().collect() + } + + pub fn len(&self) -> usize + { + self.heap.len() + } + + pub fn is_empty(&self) -> bool + { + self.heap.is_empty() + } + + pub fn capacity(&self) -> usize + { + self.heap.capacity() + } + + pub fn reserve(&mut self, additional: usize) + { + self.heap.reserve(additional) + } + + fn increment(&mut self) -> usize + { + self.increment += 1; + + self.increment + } +} + +impl IntoIterator for Queue +where + T: Ord, +{ + type Item = T; + + type IntoIter = QueueIntoIter; + + fn into_iter(self) -> Self::IntoIter + { + Self::IntoIter::new(self) + } +} + +impl Default for Queue +where + T: Ord, +{ + fn default() -> Self + { + Queue { + heap: Default::default(), + increment: 0, + } + } +} + +impl From> for Queue +where + T: Ord, +{ + fn from(v: Vec) -> Self + { + Self::from_iter(v) + } +} + +impl FromIterator for Queue +where + T: Ord, +{ + fn from_iter>(iter: I) -> Self + { + let iter = iter.into_iter(); + let capacity = match iter.size_hint() + { + (_, Some(upper)) => upper, + (lower, None) => lower, + }; + + let mut heap = BinaryHeap::with_capacity(capacity); + let mut increment = 0; + + iter.for_each(|item| { + increment += 1; + heap.push(Reverse(QueueEntry::new(increment, item))) + }); + + Self { heap, increment } + } +} + +impl Clone for Queue +where + T: Clone, +{ + fn clone(&self) -> Self + { + Self { + heap: self.heap.clone(), + increment: self.increment, + } + } +} + +impl Debug for Queue +where + T: Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + f.debug_list().entries(self.heap.iter()).finish() + } +} + +pub(crate) struct QueueIntoIter +{ + inner: Queue, +} + +impl QueueIntoIter +where + T: Ord, +{ + pub fn new(q: Queue) -> Self + { + Self { inner: q } + } + + pub fn into_inner(self) -> Queue + { + self.inner + } +} + +impl Iterator for QueueIntoIter +where + T: Ord, +{ + type Item = T; + + fn next(&mut self) -> Option + { + self.inner.pop() + } + + fn size_hint(&self) -> (usize, Option) + { + let exact = self.inner.len(); + + (exact, Some(exact)) + } +} + +/// Entry wrapper that ensures when an entry's ordering is +/// equal a tie breaker is held via mark +struct QueueEntry +{ + entry: T, + mark: usize, +} + +impl QueueEntry +{ + pub fn new(mark: usize, entry: T) -> Self + { + Self { entry, mark } + } +} + +impl PartialEq for QueueEntry +where + T: PartialEq, +{ + fn eq(&self, other: &Self) -> bool + { + self.entry.eq(&other.entry) + } +} + +impl Eq for QueueEntry where T: Eq {} + +impl PartialOrd for QueueEntry +where + T: Ord, +{ + fn partial_cmp(&self, other: &Self) -> Option + { + Some(self.cmp(other)) + } +} + +impl Ord for QueueEntry +where + T: Ord, +{ + fn cmp(&self, other: &Self) -> Ordering + { + match self.entry.cmp(&other.entry) + { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + Ordering::Equal => self.mark.cmp(&other.mark), + } + } +} + +impl Clone for QueueEntry +where + T: Clone, +{ + fn clone(&self) -> Self + { + let QueueEntry { entry, mark } = self; + + Self::new(*mark, entry.clone()) + } +} + +impl Debug for QueueEntry +where + T: Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + f.debug_struct("QueueEntry") + .field("entry", &self.entry) + .field("mark", &self.mark) + .finish() + } +} + +#[cfg(test)] +mod tests +{ + use pretty_assertions::assert_eq; + + use super::*; + + macro_rules! t { + ($msg:expr, $ord:expr) => { + T::new($msg, $ord) + }; + } + + #[test] + fn stable_ordering() + { + let data = vec![0, 0, 0, 1, 2, 2]; + let expected = vec!["one", "two", "three", "four", "five", "six"]; + + assert!(data.len() == expected.len()); + + let test = data + .into_iter() + .zip(expected.iter()) + .fold(Queue::new(), |mut q, (num, msg)| { + q.push(t!(msg, num)); + q + }); + + for (T { msg, ord }, expected) in test.into_iter().zip(expected) + { + assert_eq!( + expected, msg, + "Expected stable ordering for '{}', got '{}' (number: {})", + expected, msg, ord + ); + } + } + + #[derive(Debug, Clone)] + struct T + { + msg: &'static str, + ord: isize, + } + + impl T + { + fn new(msg: &'static str, ord: isize) -> Self + { + Self { msg, ord } + } + } + + impl PartialEq for T + { + fn eq(&self, other: &T) -> bool + { + self.ord == other.ord + } + } + + impl Eq for T {} + + impl PartialOrd for T + { + fn partial_cmp(&self, other: &T) -> Option + { + self.ord.partial_cmp(&other.ord) + } + } + + impl Ord for T + { + fn cmp(&self, other: &Self) -> std::cmp::Ordering + { + self.ord.cmp(&other.ord) + } + } +}