Merge pull request #2 from davidhewitt/new-nativetypes
Thread-safe release pools
This commit is contained in:
commit
f2b347a9f6
221
src/gil.rs
221
src/gil.rs
|
@ -3,7 +3,8 @@
|
|||
//! Interaction with python's global interpreter lock
|
||||
|
||||
use crate::{ffi, internal_tricks::Unsendable, Python};
|
||||
use std::cell::{Cell, UnsafeCell};
|
||||
use parking_lot::Mutex;
|
||||
use std::cell::{Cell, RefCell, UnsafeCell};
|
||||
use std::{any, mem::ManuallyDrop, ptr::NonNull, sync};
|
||||
|
||||
static START: sync::Once = sync::Once::new();
|
||||
|
@ -16,6 +17,13 @@ thread_local! {
|
|||
///
|
||||
/// As a result, if this thread has the GIL, GIL_COUNT is greater than zero.
|
||||
static GIL_COUNT: Cell<u32> = Cell::new(0);
|
||||
|
||||
/// These are objects owned by the current thread, to be released when the GILPool drops.
|
||||
static OWNED_OBJECTS: RefCell<Vec<NonNull<ffi::PyObject>>> = RefCell::new(Vec::with_capacity(256));
|
||||
|
||||
/// These are non-python objects such as (String) owned by the current thread, to be released
|
||||
/// when the GILPool drops.
|
||||
static OWNED_ANYS: RefCell<Vec<Box<dyn any::Any>>> = RefCell::new(Vec::with_capacity(256));
|
||||
}
|
||||
|
||||
/// Check whether the GIL is acquired.
|
||||
|
@ -136,85 +144,75 @@ impl GILGuard {
|
|||
impl Drop for GILGuard {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
// Must drop the objects in the pool before releasing the GILGuard
|
||||
ManuallyDrop::drop(&mut self.pool);
|
||||
ffi::PyGILState_Release(self.gstate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of release pool
|
||||
struct ReleasePoolImpl {
|
||||
owned: Vec<NonNull<ffi::PyObject>>,
|
||||
pointers: *mut Vec<NonNull<ffi::PyObject>>,
|
||||
obj: Vec<Box<dyn any::Any>>,
|
||||
p: parking_lot::Mutex<*mut Vec<NonNull<ffi::PyObject>>>,
|
||||
}
|
||||
|
||||
impl ReleasePoolImpl {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
owned: Vec::with_capacity(256),
|
||||
pointers: Box::into_raw(Box::new(Vec::with_capacity(256))),
|
||||
obj: Vec::with_capacity(8),
|
||||
p: parking_lot::Mutex::new(Box::into_raw(Box::new(Vec::with_capacity(256)))),
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn release_pointers(&mut self) {
|
||||
let mut v = self.p.lock();
|
||||
let vec = &mut **v;
|
||||
if vec.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// switch vectors
|
||||
std::mem::swap(&mut self.pointers, &mut *v);
|
||||
drop(v);
|
||||
|
||||
// release PyObjects
|
||||
for ptr in vec.iter_mut() {
|
||||
ffi::Py_DECREF(ptr.as_ptr());
|
||||
}
|
||||
vec.set_len(0);
|
||||
}
|
||||
|
||||
pub unsafe fn drain(&mut self, _py: Python, owned: usize) {
|
||||
// Release owned objects(call decref)
|
||||
for i in owned..self.owned.len() {
|
||||
ffi::Py_DECREF(self.owned[i].as_ptr());
|
||||
}
|
||||
self.owned.truncate(owned);
|
||||
self.release_pointers();
|
||||
self.obj.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Sync wrapper of ReleasePoolImpl
|
||||
/// Thread-safe storage for objects which were dropped while the GIL was not held.
|
||||
struct ReleasePool {
|
||||
value: UnsafeCell<Option<ReleasePoolImpl>>,
|
||||
pointers_to_drop: Mutex<*mut Vec<NonNull<ffi::PyObject>>>,
|
||||
pointers_being_dropped: UnsafeCell<*mut Vec<NonNull<ffi::PyObject>>>,
|
||||
}
|
||||
|
||||
impl ReleasePool {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
value: UnsafeCell::new(None),
|
||||
pointers_to_drop: parking_lot::const_mutex(std::ptr::null_mut()),
|
||||
pointers_being_dropped: UnsafeCell::new(std::ptr::null_mut()),
|
||||
}
|
||||
}
|
||||
/// # Safety
|
||||
/// This function is not thread safe. Thus, the caller has to have GIL.
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
unsafe fn get_or_init(&self) -> &mut ReleasePoolImpl {
|
||||
(*self.value.get()).get_or_insert_with(ReleasePoolImpl::new)
|
||||
|
||||
fn register_pointer(&self, obj: NonNull<ffi::PyObject>) {
|
||||
let mut storage = self.pointers_to_drop.lock();
|
||||
if storage.is_null() {
|
||||
*storage = Box::into_raw(Box::new(Vec::with_capacity(256)))
|
||||
}
|
||||
unsafe {
|
||||
(**storage).push(obj);
|
||||
}
|
||||
}
|
||||
|
||||
fn release_pointers(&self, _py: Python) {
|
||||
let mut v = self.pointers_to_drop.lock();
|
||||
|
||||
if v.is_null() {
|
||||
// No pointers have been registered
|
||||
return;
|
||||
}
|
||||
|
||||
unsafe {
|
||||
// Function is safe to call because GIL is held, so only one thread can be inside this
|
||||
// block at a time
|
||||
|
||||
let vec = &mut **v;
|
||||
if vec.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// switch vectors
|
||||
std::mem::swap(&mut *self.pointers_being_dropped.get(), &mut *v);
|
||||
drop(v);
|
||||
|
||||
// release PyObjects
|
||||
for ptr in vec.iter_mut() {
|
||||
ffi::Py_DECREF(ptr.as_ptr());
|
||||
}
|
||||
vec.set_len(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static POOL: ReleasePool = ReleasePool::new();
|
||||
|
||||
unsafe impl Sync for ReleasePool {}
|
||||
|
||||
static POOL: ReleasePool = ReleasePool::new();
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct GILPool {
|
||||
owned: usize,
|
||||
owned_objects_start: usize,
|
||||
owned_anys_start: usize,
|
||||
// Stable solution for impl !Send
|
||||
no_send: Unsendable,
|
||||
}
|
||||
|
@ -226,10 +224,10 @@ impl GILPool {
|
|||
pub unsafe fn new() -> GILPool {
|
||||
increment_gil_count();
|
||||
// Release objects that were dropped since last GIL acquisition
|
||||
let pool = POOL.get_or_init();
|
||||
pool.release_pointers();
|
||||
POOL.release_pointers(Python::assume_gil_acquired());
|
||||
GILPool {
|
||||
owned: pool.owned.len(),
|
||||
owned_objects_start: OWNED_OBJECTS.with(|o| o.borrow().len()),
|
||||
owned_anys_start: OWNED_ANYS.with(|o| o.borrow().len()),
|
||||
no_send: Unsendable::default(),
|
||||
}
|
||||
}
|
||||
|
@ -241,37 +239,68 @@ impl GILPool {
|
|||
impl Drop for GILPool {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let pool = POOL.get_or_init();
|
||||
pool.drain(self.python(), self.owned);
|
||||
OWNED_OBJECTS.with(|owned_objects| {
|
||||
// Note: inside this closure we must be careful to not hold a borrow too long, because
|
||||
// while calling Py_DECREF we may cause other callbacks to run which will need to
|
||||
// register objects into the GILPool.
|
||||
let len = owned_objects.borrow().len();
|
||||
for i in self.owned_objects_start..len {
|
||||
let ptr = owned_objects.borrow().get_unchecked(i).as_ptr();
|
||||
ffi::Py_DECREF(ptr);
|
||||
}
|
||||
// If this assertion fails, something weird is going on where another GILPool that was
|
||||
// created after this one has not yet been dropped.
|
||||
debug_assert!(owned_objects.borrow().len() == len);
|
||||
owned_objects
|
||||
.borrow_mut()
|
||||
.truncate(self.owned_objects_start);
|
||||
});
|
||||
|
||||
OWNED_ANYS.with(|owned_anys| owned_anys.borrow_mut().truncate(self.owned_anys_start));
|
||||
}
|
||||
decrement_gil_count();
|
||||
}
|
||||
}
|
||||
|
||||
pub unsafe fn register_any<'p, T: 'static>(obj: T) -> &'p T {
|
||||
let pool = POOL.get_or_init();
|
||||
|
||||
pool.obj.push(Box::new(obj));
|
||||
pool.obj
|
||||
.last()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.downcast_ref::<T>()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Register a Python object pointer inside the release pool, to have reference count decreased
|
||||
/// next time the GIL is acquired in pyo3.
|
||||
///
|
||||
/// # Safety
|
||||
/// The object must be an owned Python reference.
|
||||
pub unsafe fn register_pointer(obj: NonNull<ffi::PyObject>) {
|
||||
let pool = POOL.get_or_init();
|
||||
if gil_is_acquired() {
|
||||
ffi::Py_DECREF(obj.as_ptr())
|
||||
} else {
|
||||
(**pool.p.lock()).push(obj);
|
||||
POOL.register_pointer(obj);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register an owned object inside the GILPool.
|
||||
///
|
||||
/// # Safety
|
||||
/// The object must be an owned Python reference.
|
||||
pub unsafe fn register_owned(_py: Python, obj: NonNull<ffi::PyObject>) {
|
||||
let pool = POOL.get_or_init();
|
||||
pool.owned.push(obj);
|
||||
debug_assert!(gil_is_acquired());
|
||||
OWNED_OBJECTS.with(|objs| objs.borrow_mut().push(obj));
|
||||
}
|
||||
|
||||
/// Register any value inside the GILPool.
|
||||
///
|
||||
/// # Safety
|
||||
/// It is the caller's responsibility to ensure that the inferred lifetime 'p is not inferred by
|
||||
/// the Rust compiler to outlast the current GILPool.
|
||||
pub unsafe fn register_any<'p, T: 'static>(obj: T) -> &'p T {
|
||||
debug_assert!(gil_is_acquired());
|
||||
OWNED_ANYS.with(|owned_anys| {
|
||||
let boxed = Box::new(obj);
|
||||
let value_ref: &T = &*boxed;
|
||||
|
||||
// Sneaky - extend the lifetime of the reference so that the box can be moved
|
||||
let value_ref_extended_lifetime = std::mem::transmute(value_ref);
|
||||
|
||||
owned_anys.borrow_mut().push(boxed);
|
||||
value_ref_extended_lifetime
|
||||
})
|
||||
}
|
||||
|
||||
/// Increment pyo3's internal GIL count - to be called whenever GILPool or GILGuard is created.
|
||||
|
@ -295,7 +324,7 @@ fn decrement_gil_count() {
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::{GILPool, GIL_COUNT, POOL};
|
||||
use super::{GILPool, GIL_COUNT, OWNED_OBJECTS};
|
||||
use crate::{ffi, gil, AsPyPointer, IntoPyPointer, PyObject, Python, ToPyObject};
|
||||
use std::ptr::NonNull;
|
||||
|
||||
|
@ -309,6 +338,10 @@ mod test {
|
|||
obj.to_object(py)
|
||||
}
|
||||
|
||||
fn owned_object_count() -> usize {
|
||||
OWNED_OBJECTS.with(|objs| objs.borrow().len())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned() {
|
||||
let gil = Python::acquire_gil();
|
||||
|
@ -319,18 +352,16 @@ mod test {
|
|||
let _ref = obj.clone_ref(py);
|
||||
|
||||
unsafe {
|
||||
let p = POOL.get_or_init();
|
||||
|
||||
{
|
||||
let gil = Python::acquire_gil();
|
||||
gil::register_owned(gil.python(), NonNull::new_unchecked(obj.into_ptr()));
|
||||
|
||||
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
|
||||
assert_eq!(p.owned.len(), 1);
|
||||
assert_eq!(owned_object_count(), 1);
|
||||
}
|
||||
{
|
||||
let _gil = Python::acquire_gil();
|
||||
assert_eq!(p.owned.len(), 0);
|
||||
assert_eq!(owned_object_count(), 0);
|
||||
assert_eq!(ffi::Py_REFCNT(obj_ptr), 1);
|
||||
}
|
||||
}
|
||||
|
@ -346,26 +377,24 @@ mod test {
|
|||
let obj_ptr = obj.as_ptr();
|
||||
|
||||
unsafe {
|
||||
let p = POOL.get_or_init();
|
||||
|
||||
{
|
||||
let _pool = GILPool::new();
|
||||
assert_eq!(p.owned.len(), 0);
|
||||
assert_eq!(owned_object_count(), 0);
|
||||
|
||||
gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
|
||||
|
||||
assert_eq!(p.owned.len(), 1);
|
||||
assert_eq!(owned_object_count(), 1);
|
||||
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
|
||||
{
|
||||
let _pool = GILPool::new();
|
||||
let obj = get_object();
|
||||
gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
|
||||
assert_eq!(p.owned.len(), 2);
|
||||
assert_eq!(owned_object_count(), 2);
|
||||
}
|
||||
assert_eq!(p.owned.len(), 1);
|
||||
assert_eq!(owned_object_count(), 1);
|
||||
}
|
||||
{
|
||||
assert_eq!(p.owned.len(), 0);
|
||||
assert_eq!(owned_object_count(), 0);
|
||||
assert_eq!(ffi::Py_REFCNT(obj_ptr), 1);
|
||||
}
|
||||
}
|
||||
|
@ -381,10 +410,8 @@ mod test {
|
|||
let obj_ptr = obj.as_ptr();
|
||||
|
||||
unsafe {
|
||||
let p = POOL.get_or_init();
|
||||
|
||||
{
|
||||
assert_eq!(p.owned.len(), 0);
|
||||
assert_eq!(owned_object_count(), 0);
|
||||
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
|
||||
}
|
||||
|
||||
|
@ -404,10 +431,8 @@ mod test {
|
|||
let obj_ptr = obj.as_ptr();
|
||||
|
||||
unsafe {
|
||||
let p = POOL.get_or_init();
|
||||
|
||||
{
|
||||
assert_eq!(p.owned.len(), 0);
|
||||
assert_eq!(owned_object_count(), 0);
|
||||
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
|
||||
}
|
||||
|
||||
|
|
|
@ -54,14 +54,8 @@ macro_rules! assert_check_only {
|
|||
};
|
||||
}
|
||||
|
||||
// Because of the relase pool unsoundness reported in https://github.com/PyO3/pyo3/issues/756,
|
||||
// we need to stop other threads before calling `py.import()`.
|
||||
// TODO(kngwyu): Remove this variable
|
||||
static MUTEX: parking_lot::Mutex<()> = parking_lot::const_mutex(());
|
||||
|
||||
#[test]
|
||||
fn test_date_check() {
|
||||
let _lock = MUTEX.lock();
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "date", "2018, 1, 1").unwrap();
|
||||
|
@ -73,7 +67,6 @@ fn test_date_check() {
|
|||
|
||||
#[test]
|
||||
fn test_time_check() {
|
||||
let _lock = MUTEX.lock();
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "time", "12, 30, 15").unwrap();
|
||||
|
@ -85,7 +78,6 @@ fn test_time_check() {
|
|||
|
||||
#[test]
|
||||
fn test_datetime_check() {
|
||||
let _lock = MUTEX.lock();
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "datetime", "2018, 1, 1, 13, 30, 15")
|
||||
|
@ -100,7 +92,6 @@ fn test_datetime_check() {
|
|||
|
||||
#[test]
|
||||
fn test_delta_check() {
|
||||
let _lock = MUTEX.lock();
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "timedelta", "1, -3").unwrap();
|
||||
|
@ -115,7 +106,6 @@ fn test_datetime_utc() {
|
|||
use assert_approx_eq::assert_approx_eq;
|
||||
use pyo3::types::PyDateTime;
|
||||
|
||||
let _lock = MUTEX.lock();
|
||||
let gil = Python::acquire_gil();
|
||||
let py = gil.python();
|
||||
let datetime = py.import("datetime").map_err(|e| e.print(py)).unwrap();
|
||||
|
|
Loading…
Reference in New Issue