pyo3/tests/test_buffer_protocol.rs

132 lines
3.5 KiB
Rust
Raw Normal View History

#![cfg(not(Py_LIMITED_API))]
use pyo3::buffer::PyBuffer;
2018-09-21 21:32:48 +00:00
use pyo3::class::PyBufferProtocol;
use pyo3::exceptions::PyBufferError;
use pyo3::ffi;
use pyo3::prelude::*;
use pyo3::types::IntoPyDict;
2020-02-25 11:15:17 +00:00
use pyo3::AsPyPointer;
2019-03-24 16:19:15 +00:00
use std::ffi::CStr;
2019-02-01 13:01:18 +00:00
use std::os::raw::{c_int, c_void};
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[pyclass]
struct TestBufferClass {
2017-05-20 06:18:54 +00:00
vec: Vec<u8>,
drop_called: Arc<AtomicBool>,
2017-05-20 06:18:54 +00:00
}
#[pyproto]
impl PyBufferProtocol for TestBufferClass {
2020-02-25 11:15:17 +00:00
fn bf_getbuffer(slf: PyRefMut<Self>, view: *mut ffi::Py_buffer, flags: c_int) -> PyResult<()> {
if view.is_null() {
2020-08-25 19:33:36 +00:00
return Err(PyBufferError::new_err("View is null"));
}
if (flags & ffi::PyBUF_WRITABLE) == ffi::PyBUF_WRITABLE {
2020-08-25 19:33:36 +00:00
return Err(PyBufferError::new_err("Object is not writable"));
}
unsafe {
(*view).obj = slf.as_ptr();
ffi::Py_INCREF((*view).obj);
}
let bytes = &slf.vec;
unsafe {
(*view).buf = bytes.as_ptr() as *mut c_void;
(*view).len = bytes.len() as isize;
(*view).readonly = 1;
(*view).itemsize = 1;
(*view).format = ptr::null_mut();
if (flags & ffi::PyBUF_FORMAT) == ffi::PyBUF_FORMAT {
2019-03-20 20:48:36 +00:00
let msg = CStr::from_bytes_with_nul(b"B\0").unwrap();
(*view).format = msg.as_ptr() as *mut _;
}
(*view).ndim = 1;
(*view).shape = ptr::null_mut();
if (flags & ffi::PyBUF_ND) == ffi::PyBUF_ND {
(*view).shape = (&((*view).len)) as *const _ as *mut _;
}
(*view).strides = ptr::null_mut();
if (flags & ffi::PyBUF_STRIDES) == ffi::PyBUF_STRIDES {
(*view).strides = &((*view).itemsize) as *const _ as *mut _;
}
(*view).suboffsets = ptr::null_mut();
(*view).internal = ptr::null_mut();
}
Ok(())
}
fn bf_releasebuffer(_slf: PyRefMut<Self>, _view: *mut ffi::Py_buffer) {}
}
impl Drop for TestBufferClass {
fn drop(&mut self) {
print!("dropped");
self.drop_called.store(true, Ordering::Relaxed);
}
}
#[test]
fn test_buffer() {
let drop_called = Arc::new(AtomicBool::new(false));
{
let gil = Python::acquire_gil();
let py = gil.python();
let instance = Py::new(
py,
TestBufferClass {
vec: vec![b' ', b'2', b'3'],
drop_called: drop_called.clone(),
},
)
.unwrap();
let env = [("ob", instance)].into_py_dict(py);
py.run("assert bytes(ob) == b' 23'", None, Some(env))
.unwrap();
}
assert!(drop_called.load(Ordering::Relaxed));
}
#[test]
fn test_buffer_referenced() {
let drop_called = Arc::new(AtomicBool::new(false));
let buf = {
let input = vec![b' ', b'2', b'3'];
let gil = Python::acquire_gil();
let py = gil.python();
let instance: PyObject = TestBufferClass {
vec: input.clone(),
drop_called: drop_called.clone(),
}
.into_py(py);
2020-06-04 13:00:47 +00:00
let buf = PyBuffer::<u8>::get(instance.as_ref(py)).unwrap();
assert_eq!(buf.to_vec(py).unwrap(), input);
drop(instance);
buf
};
assert!(!drop_called.load(Ordering::Relaxed));
{
let _py = Python::acquire_gil().python();
drop(buf);
}
assert!(drop_called.load(Ordering::Relaxed));
}