diff --git a/Cargo.lock b/Cargo.lock index 8ca77448604..7d3de174114 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2258,8 +2258,7 @@ dependencies = [ [[package]] name = "parking_lot_core" version = "0.9.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +source = "git+https://github.com/youknowone/parking_lot?branch=rustpython#4392edbe879acc9c0dd94eda53d2205d3ab912c9" dependencies = [ "cfg-if", "libc", diff --git a/Cargo.toml b/Cargo.toml index 664340c23cf..5e2b532c6a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ opt-level = 3 lto = "thin" [patch.crates-io] +parking_lot_core = { git = "https://github.com/youknowone/parking_lot", branch = "rustpython" } # REDOX START, Uncomment when you want to compile/check with redoxer # REDOX END diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 894cebda57b..35ce70fced2 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1459,7 +1459,7 @@ def _acquire_release(lock, timeout, l=None, n=1): for _ in range(n): lock.release() - @unittest.skip("TODO: RUSTPYTHON; flaky timeout") + @unittest.skip("TODO: RUSTPYTHON; flaky timeout - thread start latency") def test_repr_rlock(self): if self.TYPE != 'processes': self.skipTest('test not appropriate for {}'.format(self.TYPE)) @@ -4415,7 +4415,6 @@ def test_shared_memory_across_processes(self): sms.close() - @unittest.skip("TODO: RUSTPYTHON; flaky") @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") def test_shared_memory_SharedMemoryServer_ignores_sigint(self): # bpo-36368: protect SharedMemoryManager server process from @@ -4440,7 +4439,6 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self): smm.shutdown() - @unittest.skip("TODO: RUSTPYTHON: sem_unlink cleanup race causes spurious stderr output") @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") @resource_tracker_format_subtests def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 0faf32f79ea..520f5c733c3 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1179,8 +1179,6 @@ async def runner(): wsock.close() -# TODO: RUSTPYTHON, fork() segfaults due to stale parking_lot global state -@unittest.skip("TODO: RUSTPYTHON") @support.requires_fork() class TestFork(unittest.TestCase): diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index ef318dfc7e1..5d4e9677f5c 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -85,7 +85,6 @@ def test_traceback(self): self.assertIn('raise RuntimeError(123) # some comment', f1.getvalue()) - @unittest.skip('TODO: RUSTPYTHON flaky EOFError') @hashlib_helper.requires_hashdigest('md5') def test_ressources_gced_in_workers(self): # Ensure that argument for a job are correctly gc-ed after the job diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py index 818e0d51a2c..6749a690f6c 100644 --- a/Lib/test/test_concurrent_futures/test_wait.py +++ b/Lib/test/test_concurrent_futures/test_wait.py @@ -200,20 +200,5 @@ def future_func(): def setUpModule(): setup_module() -class ProcessPoolForkWaitTest(ProcessPoolForkWaitTest): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_first_completed(self): super().test_first_completed() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON Fatal Python error: Segmentation fault") - def test_first_completed_some_already_completed(self): super().test_first_completed_some_already_completed() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON flaky") - def test_first_exception(self): super().test_first_exception() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_first_exception_one_already_failed(self): super().test_first_exception_one_already_failed() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON flaky") - def test_first_exception_some_already_complete(self): super().test_first_exception_some_already_complete() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON Fatal Python error: Segmentation fault") - def test_timeout(self): super().test_timeout() # TODO: RUSTPYTHON - - if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_multiprocessing_fork/test_manager.py b/Lib/test/test_multiprocessing_fork/test_manager.py index f8d7eddd652..9efbb83bbb7 100644 --- a/Lib/test/test_multiprocessing_fork/test_manager.py +++ b/Lib/test/test_multiprocessing_fork/test_manager.py @@ -3,22 +3,5 @@ install_tests_in_module_dict(globals(), 'fork', only_type="manager") -import sys # TODO: RUSTPYTHON -class WithManagerTestCondition(WithManagerTestCondition): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON, times out') - def test_notify_all(self): super().test_notify_all() # TODO: RUSTPYTHON - -class WithManagerTestQueue(WithManagerTestQueue): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON, times out') - def test_fork(self): super().test_fork() # TODO: RUSTPYTHON - -local_globs = globals().copy() # TODO: RUSTPYTHON -for name, base in local_globs.items(): # TODO: RUSTPYTHON - if name.startswith('WithManagerTest') and issubclass(base, unittest.TestCase): # TODO: RUSTPYTHON - base = unittest.skipIf( # TODO: RUSTPYTHON - sys.platform == 'linux', # TODO: RUSTPYTHON - 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError' - )(base) # TODO: RUSTPYTHON - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_multiprocessing_fork/test_misc.py b/Lib/test/test_multiprocessing_fork/test_misc.py index bcf0858258e..891a494020c 100644 --- a/Lib/test/test_multiprocessing_fork/test_misc.py +++ b/Lib/test/test_multiprocessing_fork/test_misc.py @@ -3,24 +3,5 @@ install_tests_in_module_dict(globals(), 'fork', exclude_types=True) -import sys # TODO: RUSTPYTHON -class TestManagerExceptions(TestManagerExceptions): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_queue_get(self): super().test_queue_get() # TODO: RUSTPYTHON - -@unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") -class TestInitializers(TestInitializers): pass # TODO: RUSTPYTHON - -class TestStartMethod(TestStartMethod): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_nested_startmethod(self): super().test_nested_startmethod() # TODO: RUSTPYTHON - -@unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") -class TestSyncManagerTypes(TestSyncManagerTypes): pass # TODO: RUSTPYTHON - -class MiscTestCase(MiscTestCase): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_forked_thread_not_started(self): super().test_forked_thread_not_started() # TODO: RUSTPYTHON - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_multiprocessing_fork/test_threads.py b/Lib/test/test_multiprocessing_fork/test_threads.py index 1065ebf7fe4..1670e34cb17 100644 --- a/Lib/test/test_multiprocessing_fork/test_threads.py +++ b/Lib/test/test_multiprocessing_fork/test_threads.py @@ -3,14 +3,5 @@ install_tests_in_module_dict(globals(), 'fork', only_type="threads") -import os, sys # TODO: RUSTPYTHON -class WithThreadsTestPool(WithThreadsTestPool): # TODO: RUSTPYTHON - @unittest.skip("TODO: RUSTPYTHON; flaky environment pollution when running rustpython -m test --fail-env-changed due to unknown reason") - def test_terminate(self): super().test_terminate() # TODO: RUSTPYTHON - -class WithThreadsTestManagerRestart(WithThreadsTestManagerRestart): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError') - def test_rapid_restart(self): super().test_rapid_restart() # TODO: RUSTPYTHON - if __name__ == '__main__': unittest.main() diff --git a/crates/common/src/lock.rs b/crates/common/src/lock.rs index af680010821..cd7df512d83 100644 --- a/crates/common/src/lock.rs +++ b/crates/common/src/lock.rs @@ -68,32 +68,37 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock, // can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to -/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`. +/// Reset a lock to its initial (unlocked) state by zeroing its bytes. /// -/// After `fork()`, locks held by dead parent threads would deadlock in the -/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor, -/// bypassing the normal unlock path which may interact with parking_lot's -/// internal waiter queues. +/// After `fork()`, any lock held by a now-dead thread would remain +/// permanently locked. We zero the raw bytes (the unlocked state for all +/// `parking_lot` raw lock types) instead of using the normal unlock path, +/// which would interact with stale waiter queues. /// /// # Safety /// /// Must only be called from the single-threaded child process immediately /// after `fork()`, before any other thread is created. -#[cfg(unix)] -pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { - // Use Mutex::raw() to access the underlying lock without layout assumptions. - // parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell) both - // represent the unlocked state as all-zero bytes. +/// The type `T` must represent the unlocked state as all-zero bytes +/// (true for `parking_lot::RawMutex`, `RawRwLock`, `RawReentrantMutex`, etc.). +pub unsafe fn zero_reinit_after_fork(lock: *const T) { unsafe { - let raw = mutex.raw() as *const RawMutex as *mut u8; - core::ptr::write_bytes(raw, 0, core::mem::size_of::()); + core::ptr::write_bytes(lock as *mut u8, 0, core::mem::size_of::()); } } -/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`. +/// Reset a `PyMutex` after `fork()`. See [`zero_reinit_after_fork`]. +/// +/// # Safety /// -/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or -/// write locks would cause permanent deadlock in the child. +/// Must only be called from the single-threaded child process immediately +/// after `fork()`, before any other thread is created. +#[cfg(unix)] +pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { + unsafe { zero_reinit_after_fork(mutex.raw()) } +} + +/// Reset a `PyRwLock` after `fork()`. See [`zero_reinit_after_fork`]. /// /// # Safety /// @@ -101,10 +106,7 @@ pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { /// after `fork()`, before any other thread is created. #[cfg(unix)] pub unsafe fn reinit_rwlock_after_fork(rwlock: &PyRwLock) { - unsafe { - let raw = rwlock.raw() as *const RawRwLock as *mut u8; - core::ptr::write_bytes(raw, 0, core::mem::size_of::()); - } + unsafe { zero_reinit_after_fork(rwlock.raw()) } } /// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`. diff --git a/crates/vm/src/stdlib/imp.rs b/crates/vm/src/stdlib/imp.rs index 087556c8cf2..1d2eb4ebd3b 100644 --- a/crates/vm/src/stdlib/imp.rs +++ b/crates/vm/src/stdlib/imp.rs @@ -47,12 +47,7 @@ mod lock { pub(crate) unsafe fn reinit_after_fork() { if IMP_LOCK.is_locked() && !IMP_LOCK.is_owned_by_current_thread() { // Held by a dead thread — reset to unlocked. - // Same pattern as RLock::_at_fork_reinit in thread.rs. - unsafe { - let old: &crossbeam_utils::atomic::AtomicCell = - core::mem::transmute(&IMP_LOCK); - old.swap(RawRMutex::INIT); - } + unsafe { rustpython_common::lock::zero_reinit_after_fork(&IMP_LOCK) }; } } } diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index 4cdb12f0d47..4a6c33c7f84 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -908,14 +908,22 @@ pub mod module { fn fork(vm: &VirtualMachine) -> i32 { warn_if_multi_threaded("fork", vm); - let pid: i32; py_os_before_fork(vm); - unsafe { - pid = libc::fork(); - } + + // Like CPython's PyOS_BeforeFork: stop all other Python threads + // so they are at safe points, not holding internal Rust/C locks. + #[cfg(feature = "threading")] + vm.state.stop_the_world.stop_the_world(vm); + + let pid = unsafe { libc::fork() }; if pid == 0 { + #[cfg(feature = "threading")] + vm.state.stop_the_world.reset_after_fork(); py_os_after_fork_child(vm); } else { + // Like CPython's PyOS_AfterFork_Parent: resume all threads + #[cfg(feature = "threading")] + vm.state.stop_the_world.start_the_world(vm); py_os_after_fork_parent(vm); } pid diff --git a/crates/vm/src/stdlib/thread.rs b/crates/vm/src/stdlib/thread.rs index bf22cb3c9c6..459357befe0 100644 --- a/crates/vm/src/stdlib/thread.rs +++ b/crates/vm/src/stdlib/thread.rs @@ -23,7 +23,6 @@ pub(crate) mod _thread { sync::{Arc, Weak}, }; use core::{cell::RefCell, time::Duration}; - use crossbeam_utils::atomic::AtomicCell; use parking_lot::{ RawMutex, RawThreadId, lock_api::{RawMutex as RawMutexT, RawMutexTimed, RawReentrantMutex}, @@ -150,17 +149,12 @@ pub(crate) mod _thread { Ok(()) } + #[cfg(unix)] #[pymethod] fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> { - // Reset the mutex to unlocked by directly writing the INIT value. - // Do NOT call unlock() here — after fork(), unlock_slow() would - // try to unpark stale waiters from dead parent threads. - let new_mut = RawMutex::INIT; - unsafe { - let old_mutex: &AtomicCell = core::mem::transmute(&self.mu); - old_mutex.swap(new_mut); - } - + // Overwrite lock state to unlocked. Do NOT call unlock() here — + // after fork(), unlock_slow() would try to unpark stale waiters. + unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) }; Ok(()) } @@ -250,18 +244,13 @@ pub(crate) mod _thread { Ok(()) } + #[cfg(unix)] #[pymethod] fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> { - // Reset the reentrant mutex to unlocked by directly writing INIT. - // Do NOT call unlock() — after fork(), the slow path would try - // to unpark stale waiters from dead parent threads. + // Overwrite lock state to unlocked. Do NOT call unlock() here — + // after fork(), unlock_slow() would try to unpark stale waiters. self.count.store(0, core::sync::atomic::Ordering::Relaxed); - let new_mut = RawRMutex::INIT; - unsafe { - let old_mutex: &AtomicCell = core::mem::transmute(&self.mu); - old_mutex.swap(new_mut); - } - + unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) }; Ok(()) } @@ -1021,10 +1010,7 @@ pub(crate) mod _thread { /// Reset a parking_lot::Mutex to unlocked state after fork. #[cfg(unix)] fn reinit_parking_lot_mutex(mutex: &parking_lot::Mutex) { - unsafe { - let raw = mutex.raw() as *const parking_lot::RawMutex as *mut u8; - core::ptr::write_bytes(raw, 0, core::mem::size_of::()); - } + unsafe { rustpython_common::lock::zero_reinit_after_fork(mutex.raw()) }; } // Thread handle state enum diff --git a/crates/vm/src/vm/interpreter.rs b/crates/vm/src/vm/interpreter.rs index 8e275d1ce9e..ba075145ca9 100644 --- a/crates/vm/src/vm/interpreter.rs +++ b/crates/vm/src/vm/interpreter.rs @@ -1,3 +1,5 @@ +#[cfg(all(unix, feature = "threading"))] +use super::StopTheWorldState; use super::{Context, PyConfig, PyGlobalState, VirtualMachine, setting::Settings, thread}; use crate::{ PyResult, builtins, common::rc::PyRc, frozen::FrozenModule, getpath, py_freeze, stdlib::atexit, @@ -124,6 +126,8 @@ where monitoring: PyMutex::default(), monitoring_events: AtomicCell::new(0), instrumentation_version: AtomicU64::new(0), + #[cfg(all(unix, feature = "threading"))] + stop_the_world: StopTheWorldState::new(), }); // Create VM with the global state diff --git a/crates/vm/src/vm/mod.rs b/crates/vm/src/vm/mod.rs index 6461502a582..52cba338f82 100644 --- a/crates/vm/src/vm/mod.rs +++ b/crates/vm/src/vm/mod.rs @@ -125,6 +125,123 @@ struct ExceptionStack { stack: Vec>, } +/// Stop-the-world state for fork safety. Before `fork()`, the requester +/// stops all other Python threads so they are not holding internal locks. +#[cfg(all(unix, feature = "threading"))] +pub struct StopTheWorldState { + /// Fast-path flag checked in the bytecode loop (like `_PY_EVAL_PLEASE_STOP_BIT`) + pub(crate) requested: AtomicBool, + /// Ident of the thread that requested the stop (like `stw->requester`) + requester: AtomicU64, + /// Signaled by suspending threads when their state transitions to SUSPENDED + notify_mutex: std::sync::Mutex<()>, + notify_cv: std::sync::Condvar, +} + +#[cfg(all(unix, feature = "threading"))] +impl StopTheWorldState { + pub const fn new() -> Self { + Self { + requested: AtomicBool::new(false), + requester: AtomicU64::new(0), + notify_mutex: std::sync::Mutex::new(()), + notify_cv: std::sync::Condvar::new(), + } + } + + /// Wake the stop-the-world requester (called by each thread that suspends). + pub(crate) fn notify_suspended(&self) { + // Just signal the condvar; the requester holds the mutex. + self.notify_cv.notify_one(); + } + + /// Try to CAS detached threads directly to SUSPENDED and check whether + /// all non-requester threads are now SUSPENDED. + /// Like CPython's `park_detached_threads`. + fn park_detached_threads(&self, vm: &VirtualMachine) -> bool { + use thread::{THREAD_ATTACHED, THREAD_DETACHED, THREAD_SUSPENDED}; + let requester = self.requester.load(Ordering::Relaxed); + let registry = vm.state.thread_frames.lock(); + let mut all_suspended = true; + for (&id, slot) in registry.iter() { + if id == requester { + continue; + } + let state = slot.state.load(Ordering::Relaxed); + if state == THREAD_DETACHED { + // CAS DETACHED → SUSPENDED (park without thread cooperation) + let _ = slot.state.compare_exchange( + THREAD_DETACHED, + THREAD_SUSPENDED, + Ordering::AcqRel, + Ordering::Relaxed, + ); + all_suspended = false; // re-check on next poll + } else if state == THREAD_ATTACHED { + // Thread is in bytecode — it will see `requested` and self-suspend + all_suspended = false; + } + // THREAD_SUSPENDED → already parked + } + if all_suspended { + // Verify once more after dropping the lock + return true; + } + all_suspended + } + + /// Stop all non-requester threads. Like CPython's `stop_the_world`. + /// + /// 1. Sets `requested`, marking the requester thread. + /// 2. CAS detached threads to SUSPENDED. + /// 3. Waits (polling with 1 ms condvar timeout) for attached threads + /// to self-suspend in `check_signals`. + pub fn stop_the_world(&self, vm: &VirtualMachine) { + let requester_ident = crate::stdlib::thread::get_ident(); + self.requester.store(requester_ident, Ordering::Relaxed); + self.requested.store(true, Ordering::Release); + + let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500); + loop { + if self.park_detached_threads(vm) { + break; + } + let remaining = deadline.saturating_duration_since(std::time::Instant::now()); + if remaining.is_zero() { + break; + } + // Wait up to 1 ms for a thread to notify us it suspended + let wait = remaining.min(std::time::Duration::from_millis(1)); + let guard = self.notify_mutex.lock().unwrap(); + let _ = self.notify_cv.wait_timeout(guard, wait); + } + } + + /// Resume all suspended threads. Like CPython's `start_the_world`. + pub fn start_the_world(&self, vm: &VirtualMachine) { + use thread::{THREAD_DETACHED, THREAD_SUSPENDED}; + let requester = self.requester.load(Ordering::Relaxed); + let registry = vm.state.thread_frames.lock(); + for (&id, slot) in registry.iter() { + if id == requester { + continue; + } + if slot.state.load(Ordering::Relaxed) == THREAD_SUSPENDED { + slot.state.store(THREAD_DETACHED, Ordering::Release); + } + } + drop(registry); + self.requested.store(false, Ordering::Release); + self.requester.store(0, Ordering::Relaxed); + } + + /// Reset after fork in the child (only one thread alive). + pub fn reset_after_fork(&self) { + self.requested.store(false, Ordering::Relaxed); + self.requester.store(0, Ordering::Relaxed); + } +} + pub struct PyGlobalState { pub config: PyConfig, pub module_defs: BTreeMap<&'static str, &'static builtins::PyModuleDef>, @@ -165,6 +282,9 @@ pub struct PyGlobalState { /// Incremented on every monitoring state change. Code objects compare their /// local version against this to decide whether re-instrumentation is needed. pub instrumentation_version: AtomicU64, + /// Stop-the-world state for pre-fork thread suspension + #[cfg(all(unix, feature = "threading"))] + pub stop_the_world: StopTheWorldState, } pub fn process_hash_secret_seed() -> u32 { @@ -1482,6 +1602,10 @@ impl VirtualMachine { return Err(self.new_exception(self.ctx.exceptions.system_exit.to_owned(), vec![])); } + // Suspend this thread if stop-the-world is in progress + #[cfg(all(unix, feature = "threading"))] + thread::suspend_if_needed(&self.state.stop_the_world); + #[cfg(not(target_arch = "wasm32"))] { crate::signal::check_signals(self) diff --git a/crates/vm/src/vm/thread.rs b/crates/vm/src/vm/thread.rs index 8dd8e0312ee..f2f111d40e4 100644 --- a/crates/vm/src/vm/thread.rs +++ b/crates/vm/src/vm/thread.rs @@ -14,6 +14,17 @@ use core::{ use itertools::Itertools; use std::thread_local; +// Thread states for stop-the-world support. +// DETACHED: not executing Python bytecode (in native code, or idle) +// ATTACHED: actively executing Python bytecode +// SUSPENDED: parked by a stop-the-world request +#[cfg(all(unix, feature = "threading"))] +pub const THREAD_DETACHED: i32 = 0; +#[cfg(all(unix, feature = "threading"))] +pub const THREAD_ATTACHED: i32 = 1; +#[cfg(all(unix, feature = "threading"))] +pub const THREAD_SUSPENDED: i32 = 2; + /// Per-thread shared state for sys._current_frames() and sys._current_exceptions(). /// The exception field uses atomic operations for lock-free cross-thread reads. #[cfg(feature = "threading")] @@ -22,6 +33,9 @@ pub struct ThreadSlot { /// Readers must hold the Mutex and convert to FrameRef inside the lock. pub frames: parking_lot::Mutex>, pub exception: crate::PyAtomicRef>, + /// Thread state for stop-the-world: DETACHED / ATTACHED / SUSPENDED + #[cfg(unix)] + pub state: core::sync::atomic::AtomicI32, } #[cfg(feature = "threading")] @@ -57,13 +71,29 @@ pub fn with_current_vm(f: impl FnOnce(&VirtualMachine) -> R) -> R { pub fn enter_vm(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R { VM_STACK.with(|vms| { + // Outermost enter_vm: transition DETACHED → ATTACHED + #[cfg(all(unix, feature = "threading"))] + let was_outermost = vms.borrow().is_empty(); + vms.borrow_mut().push(vm.into()); // Initialize thread slot for this thread if not already done #[cfg(feature = "threading")] init_thread_slot_if_needed(vm); - scopeguard::defer! { vms.borrow_mut().pop(); } + #[cfg(all(unix, feature = "threading"))] + if was_outermost { + attach_thread(); + } + + scopeguard::defer! { + // Outermost exit: transition ATTACHED → DETACHED + #[cfg(all(unix, feature = "threading"))] + if vms.borrow().len() == 1 { + detach_thread(); + } + vms.borrow_mut().pop(); + } VM_CURRENT.set(vm, f) }) } @@ -78,6 +108,8 @@ fn init_thread_slot_if_needed(vm: &VirtualMachine) { let new_slot = Arc::new(ThreadSlot { frames: parking_lot::Mutex::new(Vec::new()), exception: crate::PyAtomicRef::from(None::), + #[cfg(unix)] + state: core::sync::atomic::AtomicI32::new(THREAD_DETACHED), }); vm.state .thread_frames @@ -88,6 +120,80 @@ fn init_thread_slot_if_needed(vm: &VirtualMachine) { }); } +/// Transition DETACHED → ATTACHED. Blocks if the thread was SUSPENDED by +/// a stop-the-world request (like `_PyThreadState_Attach` + `tstate_wait_attach`). +#[cfg(all(unix, feature = "threading"))] +fn attach_thread() { + CURRENT_THREAD_SLOT.with(|slot| { + if let Some(s) = slot.borrow().as_ref() { + loop { + match s.state.compare_exchange( + THREAD_DETACHED, + THREAD_ATTACHED, + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(THREAD_SUSPENDED) => { + // Parked by stop-the-world — wait until released to DETACHED + while s.state.load(Ordering::Acquire) == THREAD_SUSPENDED { + std::thread::yield_now(); + } + // Retry CAS + } + Err(state) => { + debug_assert!(false, "unexpected thread state in attach: {state}"); + break; + } + } + } + } + }); +} + +/// Transition ATTACHED → DETACHED (like `_PyThreadState_Detach`). +#[cfg(all(unix, feature = "threading"))] +fn detach_thread() { + CURRENT_THREAD_SLOT.with(|slot| { + if let Some(s) = slot.borrow().as_ref() { + s.state.store(THREAD_DETACHED, Ordering::Release); + } + }); +} + +/// Called from check_signals when stop-the-world is requested. +/// Transitions ATTACHED → SUSPENDED and waits until released +/// (like `_PyThreadState_Suspend` + `_PyThreadState_Attach`). +#[cfg(all(unix, feature = "threading"))] +pub fn suspend_if_needed(stw: &super::StopTheWorldState) { + if !stw.requested.load(Ordering::Relaxed) { + return; + } + do_suspend(stw); +} + +#[cfg(all(unix, feature = "threading"))] +#[cold] +fn do_suspend(stw: &super::StopTheWorldState) { + CURRENT_THREAD_SLOT.with(|slot| { + if let Some(s) = slot.borrow().as_ref() { + // ATTACHED → SUSPENDED + s.state.store(THREAD_SUSPENDED, Ordering::Release); + + // Notify the stop-the-world requester that we've parked + stw.notify_suspended(); + + // Wait until start_the_world sets us back to DETACHED + while s.state.load(Ordering::Acquire) == THREAD_SUSPENDED { + std::thread::yield_now(); + } + + // Re-attach (DETACHED → ATTACHED) + s.state.store(THREAD_ATTACHED, Ordering::Release); + } + }); +} + /// Push a frame pointer onto the current thread's shared frame stack. /// The pointed-to frame must remain alive until the matching pop. #[cfg(feature = "threading")] @@ -179,6 +285,8 @@ pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) { let new_slot = Arc::new(ThreadSlot { frames: parking_lot::Mutex::new(current_frames), exception: crate::PyAtomicRef::from(vm.topmost_exception()), + #[cfg(unix)] + state: core::sync::atomic::AtomicI32::new(THREAD_ATTACHED), }); // Lock is safe: reinit_locks_after_fork() already reset it to unlocked. diff --git a/extra_tests/test_manager_fork_debug.py b/extra_tests/test_manager_fork_debug.py new file mode 100644 index 00000000000..6110f7e3699 --- /dev/null +++ b/extra_tests/test_manager_fork_debug.py @@ -0,0 +1,149 @@ +"""Minimal reproduction of multiprocessing Manager + fork failure.""" + +import multiprocessing +import os +import sys +import time +import traceback + +import pytest + +pytestmark = pytest.mark.skipif(not hasattr(os, "fork"), reason="requires os.fork") + + +def test_basic_manager(): + """Test Manager without fork - does it work at all?""" + print("=== Test 1: Basic Manager (no fork) ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + ev = manager.Event() + print(f" Event created: {ev}") + ev.set() + print(f" Event set, is_set={ev.is_set()}") + assert ev.is_set() + print(" PASS") + finally: + manager.shutdown() + + +def test_manager_with_process(): + """Test Manager shared between parent and child process.""" + print("\n=== Test 2: Manager with forked child ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + result = manager.Value("i", 0) + ev = manager.Event() + + def child_fn(): + try: + ev.set() + result.value = 42 + except Exception as e: + print(f" CHILD ERROR: {e}", file=sys.stderr) + traceback.print_exc() + sys.exit(1) + + print(f" Starting child process...") + process = ctx.Process(target=child_fn) + process.start() + print(f" Waiting for child (pid={process.pid})...") + process.join(timeout=10) + + if process.exitcode != 0: + print(f" FAIL: child exited with code {process.exitcode}") + return False + + print(f" Child done. result={result.value}, event={ev.is_set()}") + assert result.value == 42 + assert ev.is_set() + print(" PASS") + return True + finally: + manager.shutdown() + + +def test_manager_server_alive_after_fork(): + """Test that Manager server survives after forking a child.""" + print("\n=== Test 3: Manager server alive after fork ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + ev = manager.Event() + + # Fork a child that does nothing with the manager + pid = os.fork() + if pid == 0: + # Child - exit immediately + os._exit(0) + + # Parent - wait for child + os.waitpid(pid, 0) + + # Now try to use the manager in the parent + print(f" After fork, trying to use Manager in parent...") + ev.set() + print(f" ev.is_set() = {ev.is_set()}") + assert ev.is_set() + print(" PASS") + return True + finally: + manager.shutdown() + + +def test_manager_server_alive_after_fork_with_child_usage(): + """Test that Manager server survives when child also uses it.""" + print("\n=== Test 4: Manager server alive after fork + child usage ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + child_ev = manager.Event() + parent_ev = manager.Event() + + def child_fn(): + try: + child_ev.set() + except Exception as e: + print(f" CHILD ERROR: {e}", file=sys.stderr) + traceback.print_exc() + sys.exit(1) + + process = ctx.Process(target=child_fn) + process.start() + process.join(timeout=10) + + if process.exitcode != 0: + print(f" FAIL: child exited with code {process.exitcode}") + return False + + # Now use manager in parent AFTER child is done + print(f" Child done. Trying parent usage...") + parent_ev.set() + print(f" child_ev={child_ev.is_set()}, parent_ev={parent_ev.is_set()}") + assert child_ev.is_set() + assert parent_ev.is_set() + print(" PASS") + return True + finally: + manager.shutdown() + + +if __name__ == "__main__": + test_basic_manager() + + passed = 0 + total = 10 + for i in range(total): + print(f"\n--- Iteration {i + 1}/{total} ---") + ok = True + ok = ok and test_manager_with_process() + ok = ok and test_manager_server_alive_after_fork() + ok = ok and test_manager_server_alive_after_fork_with_child_usage() + if ok: + passed += 1 + else: + print(f" FAILED on iteration {i + 1}") + + print(f"\n=== Results: {passed}/{total} passed ===") + sys.exit(0 if passed == total else 1)