From 00dd979de069cfb2d57945901dcc97f3d3548b92 Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Mon, 2 Mar 2026 21:23:59 +0900 Subject: [PATCH 1/2] Use patched parking_lot_core with fork-safe HASHTABLE reset parking_lot_core's global HASHTABLE retains stale ThreadData after fork(), causing segfaults when contended locks enter park(). Use the patched version from youknowone/parking_lot (rustpython branch) which registers a pthread_atfork handler to reset the hash table. Unskip test_asyncio TestFork. Add Manager+fork integration test. --- Cargo.lock | 3 +- Cargo.toml | 1 + Lib/test/test_asyncio/test_unix_events.py | 2 - extra_tests/test_manager_fork_debug.py | 149 ++++++++++++++++++++++ 4 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 extra_tests/test_manager_fork_debug.py diff --git a/Cargo.lock b/Cargo.lock index e2a2f05a73..cf3a9293a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2258,8 +2258,7 @@ dependencies = [ [[package]] name = "parking_lot_core" version = "0.9.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +source = "git+https://github.com/youknowone/parking_lot?branch=rustpython#4392edbe879acc9c0dd94eda53d2205d3ab912c9" dependencies = [ "cfg-if", "libc", diff --git a/Cargo.toml b/Cargo.toml index 664340c23c..5e2b532c6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ opt-level = 3 lto = "thin" [patch.crates-io] +parking_lot_core = { git = "https://github.com/youknowone/parking_lot", branch = "rustpython" } # REDOX START, Uncomment when you want to compile/check with redoxer # REDOX END diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 0faf32f79e..520f5c733c 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1179,8 +1179,6 @@ async def runner(): wsock.close() -# TODO: RUSTPYTHON, fork() segfaults due to stale parking_lot global state -@unittest.skip("TODO: RUSTPYTHON") @support.requires_fork() class TestFork(unittest.TestCase): diff --git a/extra_tests/test_manager_fork_debug.py b/extra_tests/test_manager_fork_debug.py new file mode 100644 index 0000000000..6110f7e369 --- /dev/null +++ b/extra_tests/test_manager_fork_debug.py @@ -0,0 +1,149 @@ +"""Minimal reproduction of multiprocessing Manager + fork failure.""" + +import multiprocessing +import os +import sys +import time +import traceback + +import pytest + +pytestmark = pytest.mark.skipif(not hasattr(os, "fork"), reason="requires os.fork") + + +def test_basic_manager(): + """Test Manager without fork - does it work at all?""" + print("=== Test 1: Basic Manager (no fork) ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + ev = manager.Event() + print(f" Event created: {ev}") + ev.set() + print(f" Event set, is_set={ev.is_set()}") + assert ev.is_set() + print(" PASS") + finally: + manager.shutdown() + + +def test_manager_with_process(): + """Test Manager shared between parent and child process.""" + print("\n=== Test 2: Manager with forked child ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + result = manager.Value("i", 0) + ev = manager.Event() + + def child_fn(): + try: + ev.set() + result.value = 42 + except Exception as e: + print(f" CHILD ERROR: {e}", file=sys.stderr) + traceback.print_exc() + sys.exit(1) + + print(f" Starting child process...") + process = ctx.Process(target=child_fn) + process.start() + print(f" Waiting for child (pid={process.pid})...") + process.join(timeout=10) + + if process.exitcode != 0: + print(f" FAIL: child exited with code {process.exitcode}") + return False + + print(f" Child done. result={result.value}, event={ev.is_set()}") + assert result.value == 42 + assert ev.is_set() + print(" PASS") + return True + finally: + manager.shutdown() + + +def test_manager_server_alive_after_fork(): + """Test that Manager server survives after forking a child.""" + print("\n=== Test 3: Manager server alive after fork ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + ev = manager.Event() + + # Fork a child that does nothing with the manager + pid = os.fork() + if pid == 0: + # Child - exit immediately + os._exit(0) + + # Parent - wait for child + os.waitpid(pid, 0) + + # Now try to use the manager in the parent + print(f" After fork, trying to use Manager in parent...") + ev.set() + print(f" ev.is_set() = {ev.is_set()}") + assert ev.is_set() + print(" PASS") + return True + finally: + manager.shutdown() + + +def test_manager_server_alive_after_fork_with_child_usage(): + """Test that Manager server survives when child also uses it.""" + print("\n=== Test 4: Manager server alive after fork + child usage ===") + ctx = multiprocessing.get_context("fork") + manager = ctx.Manager() + try: + child_ev = manager.Event() + parent_ev = manager.Event() + + def child_fn(): + try: + child_ev.set() + except Exception as e: + print(f" CHILD ERROR: {e}", file=sys.stderr) + traceback.print_exc() + sys.exit(1) + + process = ctx.Process(target=child_fn) + process.start() + process.join(timeout=10) + + if process.exitcode != 0: + print(f" FAIL: child exited with code {process.exitcode}") + return False + + # Now use manager in parent AFTER child is done + print(f" Child done. Trying parent usage...") + parent_ev.set() + print(f" child_ev={child_ev.is_set()}, parent_ev={parent_ev.is_set()}") + assert child_ev.is_set() + assert parent_ev.is_set() + print(" PASS") + return True + finally: + manager.shutdown() + + +if __name__ == "__main__": + test_basic_manager() + + passed = 0 + total = 10 + for i in range(total): + print(f"\n--- Iteration {i + 1}/{total} ---") + ok = True + ok = ok and test_manager_with_process() + ok = ok and test_manager_server_alive_after_fork() + ok = ok and test_manager_server_alive_after_fork_with_child_usage() + if ok: + passed += 1 + else: + print(f" FAILED on iteration {i + 1}") + + print(f"\n=== Results: {passed}/{total} passed ===") + sys.exit(0 if passed == total else 1) From 147d89a6123ae7e2f5b05dbaed386267adaa4693 Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Tue, 3 Mar 2026 22:30:37 +0900 Subject: [PATCH 2/2] Unskip fork-related flaky tests after parking_lot fix With parking_lot_core's HASHTABLE now properly reset via pthread_atfork, fork-related segfaults and connection errors in multiprocessing tests should be resolved. Remove skip/expectedFailure markers from: - test_concurrent_futures/test_wait.py (6 tests) - test_concurrent_futures/test_process_pool.py (1 test) - test_multiprocessing_fork/test_manager.py (all WithManagerTest*) - test_multiprocessing_fork/test_misc.py (5 tests) - test_multiprocessing_fork/test_threads.py (2 tests) - _test_multiprocessing.py (2 shared_memory tests) Keep test_repr_rlock skipped (flaky thread start latency, not fork-related). --- Lib/test/_test_multiprocessing.py | 4 +--- .../test_process_pool.py | 1 - Lib/test/test_concurrent_futures/test_wait.py | 15 --------------- .../test_multiprocessing_fork/test_manager.py | 17 ----------------- .../test_multiprocessing_fork/test_misc.py | 19 ------------------- .../test_multiprocessing_fork/test_threads.py | 9 --------- 6 files changed, 1 insertion(+), 64 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d05ed68144..f20c147669 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1459,7 +1459,7 @@ def _acquire_release(lock, timeout, l=None, n=1): for _ in range(n): lock.release() - @unittest.skip("TODO: RUSTPYTHON; flaky timeout") + @unittest.skip("TODO: RUSTPYTHON; flaky timeout - thread start latency") def test_repr_rlock(self): if self.TYPE != 'processes': self.skipTest('test not appropriate for {}'.format(self.TYPE)) @@ -4415,7 +4415,6 @@ def test_shared_memory_across_processes(self): sms.close() - @unittest.skip("TODO: RUSTPYTHON; flaky") @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") def test_shared_memory_SharedMemoryServer_ignores_sigint(self): # bpo-36368: protect SharedMemoryManager server process from @@ -4440,7 +4439,6 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self): smm.shutdown() - @unittest.skip("TODO: RUSTPYTHON: sem_unlink cleanup race causes spurious stderr output") @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") @resource_tracker_format_subtests def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index ef318dfc7e..5d4e9677f5 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -85,7 +85,6 @@ def test_traceback(self): self.assertIn('raise RuntimeError(123) # some comment', f1.getvalue()) - @unittest.skip('TODO: RUSTPYTHON flaky EOFError') @hashlib_helper.requires_hashdigest('md5') def test_ressources_gced_in_workers(self): # Ensure that argument for a job are correctly gc-ed after the job diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py index 818e0d51a2..6749a690f6 100644 --- a/Lib/test/test_concurrent_futures/test_wait.py +++ b/Lib/test/test_concurrent_futures/test_wait.py @@ -200,20 +200,5 @@ def future_func(): def setUpModule(): setup_module() -class ProcessPoolForkWaitTest(ProcessPoolForkWaitTest): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_first_completed(self): super().test_first_completed() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON Fatal Python error: Segmentation fault") - def test_first_completed_some_already_completed(self): super().test_first_completed_some_already_completed() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON flaky") - def test_first_exception(self): super().test_first_exception() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_first_exception_one_already_failed(self): super().test_first_exception_one_already_failed() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON flaky") - def test_first_exception_some_already_complete(self): super().test_first_exception_some_already_complete() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON Fatal Python error: Segmentation fault") - def test_timeout(self): super().test_timeout() # TODO: RUSTPYTHON - - if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_multiprocessing_fork/test_manager.py b/Lib/test/test_multiprocessing_fork/test_manager.py index f8d7eddd65..9efbb83bbb 100644 --- a/Lib/test/test_multiprocessing_fork/test_manager.py +++ b/Lib/test/test_multiprocessing_fork/test_manager.py @@ -3,22 +3,5 @@ install_tests_in_module_dict(globals(), 'fork', only_type="manager") -import sys # TODO: RUSTPYTHON -class WithManagerTestCondition(WithManagerTestCondition): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON, times out') - def test_notify_all(self): super().test_notify_all() # TODO: RUSTPYTHON - -class WithManagerTestQueue(WithManagerTestQueue): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON, times out') - def test_fork(self): super().test_fork() # TODO: RUSTPYTHON - -local_globs = globals().copy() # TODO: RUSTPYTHON -for name, base in local_globs.items(): # TODO: RUSTPYTHON - if name.startswith('WithManagerTest') and issubclass(base, unittest.TestCase): # TODO: RUSTPYTHON - base = unittest.skipIf( # TODO: RUSTPYTHON - sys.platform == 'linux', # TODO: RUSTPYTHON - 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError' - )(base) # TODO: RUSTPYTHON - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_multiprocessing_fork/test_misc.py b/Lib/test/test_multiprocessing_fork/test_misc.py index bcf0858258..891a494020 100644 --- a/Lib/test/test_multiprocessing_fork/test_misc.py +++ b/Lib/test/test_multiprocessing_fork/test_misc.py @@ -3,24 +3,5 @@ install_tests_in_module_dict(globals(), 'fork', exclude_types=True) -import sys # TODO: RUSTPYTHON -class TestManagerExceptions(TestManagerExceptions): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_queue_get(self): super().test_queue_get() # TODO: RUSTPYTHON - -@unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") -class TestInitializers(TestInitializers): pass # TODO: RUSTPYTHON - -class TestStartMethod(TestStartMethod): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_nested_startmethod(self): super().test_nested_startmethod() # TODO: RUSTPYTHON - -@unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") -class TestSyncManagerTypes(TestSyncManagerTypes): pass # TODO: RUSTPYTHON - -class MiscTestCase(MiscTestCase): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', "TODO: RUSTPYTHON flaky") - def test_forked_thread_not_started(self): super().test_forked_thread_not_started() # TODO: RUSTPYTHON - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_multiprocessing_fork/test_threads.py b/Lib/test/test_multiprocessing_fork/test_threads.py index 1065ebf7fe..1670e34cb1 100644 --- a/Lib/test/test_multiprocessing_fork/test_threads.py +++ b/Lib/test/test_multiprocessing_fork/test_threads.py @@ -3,14 +3,5 @@ install_tests_in_module_dict(globals(), 'fork', only_type="threads") -import os, sys # TODO: RUSTPYTHON -class WithThreadsTestPool(WithThreadsTestPool): # TODO: RUSTPYTHON - @unittest.skip("TODO: RUSTPYTHON; flaky environment pollution when running rustpython -m test --fail-env-changed due to unknown reason") - def test_terminate(self): super().test_terminate() # TODO: RUSTPYTHON - -class WithThreadsTestManagerRestart(WithThreadsTestManagerRestart): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError') - def test_rapid_restart(self): super().test_rapid_restart() # TODO: RUSTPYTHON - if __name__ == '__main__': unittest.main()