pyo3/tests/test_coroutine.rs
Icxolu ee89b2e8e2
deprecate wrap_pyfunction with py argument (#3954)
* deprecate `wrap_pyfunction` with `py` argument

The Python token in `wrap_pyfunction` is not handled automatically by
`WrapPyFunctionArg`, for backwards compatibility. This uses deref
specialization to deprecate this variant.

* merge `Extractor`s

* add deprecation ui test, revert closure variant due to test failure

* fix nightly
2024-03-12 22:57:03 +00:00

311 lines
8.3 KiB
Rust

#![cfg(feature = "experimental-async")]
#![cfg(not(target_arch = "wasm32"))]
use std::{task::Poll, thread, time::Duration};
use futures::{channel::oneshot, future::poll_fn, FutureExt};
use portable_atomic::{AtomicBool, Ordering};
use pyo3::{
coroutine::CancelHandle,
prelude::*,
py_run,
types::{IntoPyDict, PyType},
};
#[path = "../src/tests/common.rs"]
mod common;
fn handle_windows(test: &str) -> String {
let set_event_loop_policy = r#"
import asyncio, sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
"#;
pyo3::unindent::unindent(set_event_loop_policy) + &pyo3::unindent::unindent(test)
}
#[test]
fn noop_coroutine() {
#[pyfunction]
async fn noop() -> usize {
42
}
Python::with_gil(|gil| {
let noop = wrap_pyfunction_bound!(noop, gil).unwrap();
let test = "import asyncio; assert asyncio.run(noop()) == 42";
py_run!(gil, noop, &handle_windows(test));
})
}
#[test]
fn test_coroutine_qualname() {
#[pyfunction]
async fn my_fn() {}
#[pyclass]
struct MyClass;
#[pymethods]
impl MyClass {
#[new]
fn new() -> Self {
Self
}
// TODO use &self when possible
async fn my_method(_self: Py<Self>) {}
#[classmethod]
async fn my_classmethod(_cls: Py<PyType>) {}
#[staticmethod]
async fn my_staticmethod() {}
}
Python::with_gil(|gil| {
let test = r#"
for coro, name, qualname in [
(my_fn(), "my_fn", "my_fn"),
(MyClass().my_method(), "my_method", "MyClass.my_method"),
#(MyClass().my_classmethod(), "my_classmethod", "MyClass.my_classmethod"),
(MyClass.my_staticmethod(), "my_staticmethod", "MyClass.my_staticmethod"),
]:
assert coro.__name__ == name and coro.__qualname__ == qualname
"#;
let locals = [
(
"my_fn",
wrap_pyfunction_bound!(my_fn, gil)
.unwrap()
.as_borrowed()
.as_any(),
),
("MyClass", gil.get_type_bound::<MyClass>().as_any()),
]
.into_py_dict_bound(gil);
py_run!(gil, *locals, &handle_windows(test));
})
}
#[test]
fn sleep_0_like_coroutine() {
#[pyfunction]
async fn sleep_0() -> usize {
let mut waken = false;
poll_fn(|cx| {
if !waken {
cx.waker().wake_by_ref();
waken = true;
return Poll::Pending;
}
Poll::Ready(42)
})
.await
}
Python::with_gil(|gil| {
let sleep_0 = wrap_pyfunction_bound!(sleep_0, gil).unwrap();
let test = "import asyncio; assert asyncio.run(sleep_0()) == 42";
py_run!(gil, sleep_0, &handle_windows(test));
})
}
#[pyfunction]
async fn sleep(seconds: f64) -> usize {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs_f64(seconds));
tx.send(42).unwrap();
});
rx.await.unwrap()
}
#[test]
fn sleep_coroutine() {
Python::with_gil(|gil| {
let sleep = wrap_pyfunction_bound!(sleep, gil).unwrap();
let test = r#"import asyncio; assert asyncio.run(sleep(0.1)) == 42"#;
py_run!(gil, sleep, &handle_windows(test));
})
}
#[test]
fn cancelled_coroutine() {
Python::with_gil(|gil| {
let sleep = wrap_pyfunction_bound!(sleep, gil).unwrap();
let test = r#"
import asyncio
async def main():
task = asyncio.create_task(sleep(999))
await asyncio.sleep(0)
task.cancel()
await task
asyncio.run(main())
"#;
let globals = gil.import_bound("__main__").unwrap().dict();
globals.set_item("sleep", sleep).unwrap();
let err = gil
.run_bound(
&pyo3::unindent::unindent(&handle_windows(test)),
Some(&globals),
None,
)
.unwrap_err();
assert_eq!(
err.value_bound(gil).get_type().qualname().unwrap(),
"CancelledError"
);
})
}
#[test]
fn coroutine_cancel_handle() {
#[pyfunction]
async fn cancellable_sleep(
seconds: f64,
#[pyo3(cancel_handle)] mut cancel: CancelHandle,
) -> usize {
futures::select! {
_ = sleep(seconds).fuse() => 42,
_ = cancel.cancelled().fuse() => 0,
}
}
Python::with_gil(|gil| {
let cancellable_sleep = wrap_pyfunction_bound!(cancellable_sleep, gil).unwrap();
let test = r#"
import asyncio;
async def main():
task = asyncio.create_task(cancellable_sleep(999))
await asyncio.sleep(0)
task.cancel()
return await task
assert asyncio.run(main()) == 0
"#;
let globals = gil.import_bound("__main__").unwrap().dict();
globals
.set_item("cancellable_sleep", cancellable_sleep)
.unwrap();
gil.run_bound(
&pyo3::unindent::unindent(&handle_windows(test)),
Some(&globals),
None,
)
.unwrap();
})
}
#[test]
fn coroutine_is_cancelled() {
#[pyfunction]
async fn sleep_loop(#[pyo3(cancel_handle)] cancel: CancelHandle) {
while !cancel.is_cancelled() {
sleep(0.001).await;
}
}
Python::with_gil(|gil| {
let sleep_loop = wrap_pyfunction_bound!(sleep_loop, gil).unwrap();
let test = r#"
import asyncio;
async def main():
task = asyncio.create_task(sleep_loop())
await asyncio.sleep(0)
task.cancel()
await task
asyncio.run(main())
"#;
let globals = gil.import_bound("__main__").unwrap().dict();
globals.set_item("sleep_loop", sleep_loop).unwrap();
gil.run_bound(
&pyo3::unindent::unindent(&handle_windows(test)),
Some(&globals),
None,
)
.unwrap();
})
}
#[test]
fn coroutine_panic() {
#[pyfunction]
async fn panic() {
panic!("test panic");
}
Python::with_gil(|gil| {
let panic = wrap_pyfunction_bound!(panic, gil).unwrap();
let test = r#"
import asyncio
coro = panic()
try:
asyncio.run(coro)
except BaseException as err:
assert type(err).__name__ == "PanicException"
assert str(err) == "test panic"
else:
assert False
try:
coro.send(None)
except RuntimeError as err:
assert str(err) == "cannot reuse already awaited coroutine"
else:
assert False;
"#;
py_run!(gil, panic, &handle_windows(test));
})
}
#[test]
fn test_async_method_receiver() {
#[pyclass]
struct Counter(usize);
#[pymethods]
impl Counter {
#[new]
fn new() -> Self {
Self(0)
}
async fn get(&self) -> usize {
self.0
}
async fn incr(&mut self) -> usize {
self.0 += 1;
self.0
}
}
static IS_DROPPED: AtomicBool = AtomicBool::new(false);
impl Drop for Counter {
fn drop(&mut self) {
IS_DROPPED.store(true, Ordering::SeqCst);
}
}
Python::with_gil(|gil| {
let test = r#"
import asyncio
obj = Counter()
coro1 = obj.get()
coro2 = obj.get()
try:
obj.incr() # borrow checking should fail
except RuntimeError as err:
pass
else:
assert False
assert asyncio.run(coro1) == 0
coro2.close()
coro3 = obj.incr()
try:
obj.incr() # borrow checking should fail
except RuntimeError as err:
pass
else:
assert False
try:
obj.get() # borrow checking should fail
except RuntimeError as err:
pass
else:
assert False
assert asyncio.run(coro3) == 1
"#;
let locals = [("Counter", gil.get_type_bound::<Counter>())].into_py_dict_bound(gil);
py_run!(gil, *locals, test);
});
assert!(IS_DROPPED.load(Ordering::SeqCst));
}