Server : LiteSpeed
System : Linux server51.dnsbootclub.com 4.18.0-553.62.1.lve.el8.x86_64 #1 SMP Mon Jul 21 17:50:35 UTC 2025 x86_64
User : nandedex ( 1060)
PHP Version : 8.1.33
Disable Function : NONE
Directory :  /opt/cppython/lib/python3.8/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]


Current File : //opt/cppython/lib/python3.8/test/__pycache__/test_threading.cpython-38.opt-1.pyc
U

>��g��@s4dZddlZddlmZmZmZmZddlmZm	Z	ddl
Z
ddlZddlZddl
Z
ddlZddlZddlZddlZddlZddlZddlmZddlmZdZGdd	�d	e�ZGd
d�de
j�ZGdd
�d
ej�ZGdd�de�ZGdd�de�ZGdd�de�ZGdd�de�Z Gdd�de
j�Z!Gdd�de�Z"Gdd�de�Z#Gdd�dej$�Z$Gdd�dej%�Z&e�'e
j(dkd �Gd!d"�d"ej%��Z)Gd#d$�d$ej*�Z*Gd%d&�d&ej%�Z+Gd'd(�d(ej,�Z,Gd)d*�d*ej-�Z-Gd+d,�d,ej.�Z.Gd-d.�d.ej/�Z/Gd/d0�d0ej�Z0Gd1d2�d2ej�Z1e2d3k�r0e�3�dS)4z!
Tests for the threading module.
�N)�verbose�
import_module�cpython_only�requires_type_collecting)�assert_python_ok�assert_python_failure)�
lock_tests)�support)Znetbsd5zhp-ux11c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�CountercCs
d|_dS�Nr��value��self�r�2/opt/cppython/lib/python3.8/test/test_threading.py�__init__"szCounter.__init__cCs|jd7_dS�N�rrrrr�inc$szCounter.inccCs|jd8_dSrrrrrr�dec&szCounter.deccCs|jS�Nrrrrr�get(szCounter.getN)�__name__�
__module__�__qualname__rrrrrrrrr
!sr
c@seZdZdd�Zdd�ZdS)�
TestThreadcCs,tjj||d�||_||_||_||_dS)N��name)�	threading�Threadr�testcase�sema�mutex�nrunning)rrr!r"r#r$rrrr,s
zTestThread.__init__c
Cs�t��d}tr&td|j|df�|j��|j�8|j��trTt|j��d�|j	�
|j��d�W5QRXt�|�tr�td|jd�|j�@|j�
�|j	�|j��d�tr�td	|j|j��f�W5QRXW5QRXdS)
Ng��@ztask %s will run for %.1f usecg��.Aztasks are running�Ztask�donerz$%s is finished. %d tasks are running)�randomr�printrr"r#r$rrr!ZassertLessEqual�time�sleeprZassertGreaterEqual)r�delayrrr�run3s*�


�zTestThread.runN)rrrrr,rrrrr+src@seZdZdd�Zdd�ZdS)�BaseTestCasecCstj��|_dSr)�testr	Zthreading_setup�_threadsrrrr�setUpMszBaseTestCase.setUpcCstjj|j�tj��dSr)r.r	Zthreading_cleanupr/�
reap_childrenrrrr�tearDownPszBaseTestCase.tearDownN)rrrr0r2rrrrr-Lsr-c@sneZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Ze�eed�d �d!d"��Ze�eed�d#�d$d%��Zd&d'�Ze�eed�d(�e�eed)�d*�d+d,���Ze�ejekd-�e�eed�d(�e�eed)�d*�d.d/����Zed0d1��Z d2d3�Z!d4d5�Z"d6d7�Z#d8d9�Z$e%d:d;��Z&e%d<d=��Z'd>S)?�ThreadTestsc	Cs0d}tjdd�}t��}t�}g}t|�D]F}td|||||�}|�|�|�|j�|�	t
|�d�|��q*ttd�r�t
dd�|D��t��hB}|�d|�|�t|�|d	�tr�td
�|D]D}|��|�|���|�|jd�|�|j�|�	t
|�d�q�t�rtd
�|�|��d�dS)N�
r%rz<thread %d>z^<TestThread\(.*, initial\)>$�
get_native_idcss|]}|jVqdSr)�	native_id)�.0�trrr�	<genexpr>msz/ThreadTests.test_various_ops.<locals>.<genexpr>rz!waiting for all tasks to completerz#^<TestThread\(.*, stopped -?\d+\)>$zall tasks done)r�BoundedSemaphore�RLockr
�ranger�append�assertIsNone�ident�assertRegex�repr�start�hasattr�setr5�assertNotIn�assertEqual�lenrr(�join�assertFalse�is_alive�assertNotEqual�assertIsNotNoner)	rZNUMTASKSr"r#Z
numrunning�threads�ir8Z
native_idsrrr�test_various_opsYs4


zThreadTests.test_various_opsc	sr|�t��j���fdd�}t���g�t���*t�|d�}��	�|�
�d|�W5QRXtj�d=dS)Ncs��t��j����dSr)r=r�
currentThreadr?rDr�r&r?rr�f�sz9ThreadTests.test_ident_of_no_threading_threads.<locals>.frr)rLrrPr?�Eventr	�wait_threads_exit�_thread�start_new_thread�waitrF�_active)rrR�tidrrQr�"test_ident_of_no_threading_threads}s
z.ThreadTests.test_ident_of_no_threading_threadscCsRtrtd�zt�d�Wn tjk
r:t�d��YnX|��t�d�dS)Nz!with 256 KiB thread stack size...i�4platform does not support changing thread stack sizer�	rr(r�
stack_sizerU�error�unittestZSkipTestrOrrrr�test_various_ops_small_stack�s�
z(ThreadTests.test_various_ops_small_stackcCsRtrtd�zt�d�Wn tjk
r:t�d��YnX|��t�d�dS)Nzwith 1 MiB thread stack size...ir[rr\rrrr�test_various_ops_large_stack�s�
z(ThreadTests.test_various_ops_large_stackc	Cs�dd�}t��}|��t���t�||f�}|��W5QRX|�|tj�|�	tj|tj
�|�tj|���|�
ttj|�d�tj|=dS)NcSst��|��dSr)r�current_thread�release)r#rrrrR�sz*ThreadTests.test_foreign_thread.<locals>.f�_DummyThread)r�Lock�acquirer	rTrUrV�assertInrX�assertIsInstancerd�
assertTruerJr@rA)rrRr#rYrrr�test_foreign_thread�s
zThreadTests.test_foreign_threadc	s�td�}|jj}|j|jf|_Gdd�dt��|���}t��}|�	|t
�|�|d�z|||�}qdWn�k
r|YnX|�d�z|�
|d�Wntk
r�YnXt���t���G���fdd�dtj�}|�}d	|_|��tr�td
�t�rtd�|d|�}|�
|d�t�r*td
����}|�|�t�rJtd�|�|j�t�rdtd�||j|�}|�
|d�t�r�td��jdd�|�|j�t�r�td�|j�r�|��dS)N�ctypesc@seZdZdS)z<ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.AsyncExcN)rrrrrrr�AsyncExc�srlrzAsyncExc not raisedrcseZdZ���fdd�ZdS)z:ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.WorkercsPt��|_d|_z���t�d�qWn"�k
rJd|_���YnXdS)NFg�������?T)r�	get_ident�id�finishedrDr)r*r�rlZworker_saw_exceptionZworker_startedrrr,�s
z>ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Worker.runN�rrrr,rrprr�Worker�srrTz    started worker threadz     trying nonsensical thread id���z,    waiting for worker thread to get startedz"    verifying worker hasn't exitedz2    attempting to raise asynch exception in workerz5    waiting for worker to say it caught the exceptionr4��timeoutz    all OK -- joining worker)rZ	pythonapiZPyThreadState_SetAsyncExcZc_ulongZ	py_objectZargtypes�	Exceptionrrmrh�intZ
assertGreaterZfailrF�UnboundLocalErrorrSr �daemonrBrr(rWrirIrornrH)	rrkZ
set_async_exc�	exceptionrY�resultrrr8�retrrpr�test_PyThreadState_SetAsyncExc�sb





z*ThreadTests.test_PyThreadState_SetAsyncExccCsXdd�}tj}|t_z6tjdd�d�}|�tj|j�|�|tjkd�W5|t_XdS)NcWst���dSr)r�ThreadError��argsrrr�fail_new_threadsz7ThreadTests.test_limbo_cleanup.<locals>.fail_new_threadcSsdSrrrrrr�<lambda>�z0ThreadTests.test_limbo_cleanup.<locals>.<lambda>��targetz:Failed to cleanup _limbo map on failure of Thread.start().)r�_start_new_threadr �assertRaisesr~rBrI�_limbo)rr�r�r8rrr�test_limbo_cleanups�zThreadTests.test_limbo_cleanupcCs(td�tdd�\}}}|�|d�dS)Nrk�-caNif 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            �*)rrrF�r�rc�out�errrrr�test_finalize_runnning_thread"sz)ThreadTests.test_finalize_runnning_threadcCstdd�dS)Nr�aPif 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print('program blocked; aborting')
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            )rrrrr�test_finalize_with_traceCsz$ThreadTests.test_finalize_with_tracecCs0tdd�\}}}|�|��d�|�|d�dS)Nr�a�if 1:
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print("Woke up, sleep function is:", sleep)

                threading.Thread(target=child).start()
                raise SystemExit
            s5Woke up, sleep function is: <built-in function sleep>r�)rrF�stripr�rrr�test_join_nondaemon_on_shutdown\s

�z+ThreadTests.test_join_nondaemon_on_shutdownc	Cs~tj}t��}z^tdd�D]N}t�|d�tjdd�d�}|��|��|�}|�	||d||f�qW5t�|�XdS)Nr�dg-C��6*?cSsdSrrrrrrr�xr�z7ThreadTests.test_enumerate_after_join.<locals>.<lambda>r�z&#1703448 triggered after %d trials: %s)
r�	enumerate�sys�getswitchinterval�setswitchintervalr<r rBrHrE)r�enum�old_intervalrNr8�lrrr�test_enumerate_after_joinps
�
z%ThreadTests.test_enumerate_after_joincCs�Gdd�dt�}|dd�}t�|�}|j��~|j|�dt�|��d�|dd�}t�|�}|j��~|j|�dt�|��d�dS)Nc@seZdZdd�Zdd�ZdS)zDThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunctioncSs.||_tj|j|fd|id�|_|j��dS)N�yet_another)r�r��kwargs)�should_raiserr �_run�threadrB)rr�rrrr�s�zMThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction.__init__cSs|jr
t�dSr)r��
SystemExit)rZ	other_refr�rrrr��szIThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction._runN)rrrrr�rrrr�RunSelfFunction�s	r�F)r�z%d references still around)�msgT)�object�weakref�refr�rHr>r��getrefcount)rr�Z
cyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectrrr�test_no_refcycle_through_target�s&



��



��z+ThreadTests.test_no_refcycle_through_targetc	Csht��}|��|�d�|��|�d�|�td��|��W5QRXt�	�}|�
�t��dS)NTrzuse is_alive())rr �isDaemon�	setDaemon�getName�setNameZassertWarnsRegex�DeprecationWarning�isAliverS�isSet�activeCount)rr8�errr�test_old_threading_api�s

z"ThreadTests.test_old_threading_apicCs2t��}|�dt|��d|_|�dt|��dS�NryT)rr rErAryrg�rr8rrr�test_repr_daemon�szThreadTests.test_repr_daemoncCsHt��}|�|j�tjdd�}|�|j�tjdd�}|�|j�dS)NF�ryT)rr rIryrir�rrr�test_daemon_param�szThreadTests.test_daemon_param�forkztest needs fork()cCs0d}td|�\}}}|�|d�|�|d�dS)Na�if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        r�r��rrF)r�code�_r�r�rrr�test_dummy_thread_after_fork�sz(ThreadTests.test_dummy_thread_after_fork�needs os.fork()cCs�t��}|�tj|�tj�d�td�D]~}tjdd�d�}|�	�t
��}|dkrnt
�|�
�rfdnd�q*|��t
�|d�\}}|�t
�|��|�dt
�|��q*dS)	Ng���ư>�cSsdSrrrrrrr��r�z6ThreadTests.test_is_alive_after_fork.<locals>.<lambda>r�r�r4)r�r��
addCleanupr�r.r	r<rr rB�osr��_exitrJrH�waitpidri�	WIFEXITEDrF�WEXITSTATUS)rr�rNr8�pid�statusrrr�test_is_alive_after_fork�sz$ThreadTests.test_is_alive_after_forkcsht��}��|jd���|jt��j���|jt����fdd�}tj|d�}|��|�	�dS)N�
MainThreadcs��t��jt��j�dSr)rKr�main_threadr?rbrrrrrR�s�z'ThreadTests.test_main_thread.<locals>.fr�)
rr�rFrr?rbrmr rBrH)r�mainrR�thrrr�test_main_thread�szThreadTests.test_main_threadztest needs os.fork()r�ztest needs os.waitpid()cCs@d}td|�\}}}|���dd�}|�|d�|�|d�dS)Nakif 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        r��
�r�zMainThread
True
True
�r�decode�replacerF�rr�r�r�r��datarrr�test_main_thread_after_fork�s
z'ThreadTests.test_main_thread_after_fork�due to known OS bugcCs@d}td|�\}}}|���dd�}|�|d�|�|d�dS)Na�if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        r�r�r�r�zThread-1
True
True
r�r�rrr�/test_main_thread_after_fork_from_nonmain_threads
z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadcCsBd}td|�\}}}|��}|�|d�|�|��dgd�dS)Na�if 1:
            import gc, threading

            main_thread = threading.current_thread()
            assert main_thread is threading.main_thread()  # sanity check

            class RefCycle:
                def __init__(self):
                    self.cycle = self

                def __del__(self):
                    print("GC:",
                          threading.current_thread() is main_thread,
                          threading.main_thread() is main_thread,
                          threading.enumerate() == [main_thread])

            RefCycle()
            gc.collect()  # sanity check
            x = RefCycle()
        r�r�zGC: True True True�)rr�rF�
splitlinesr�rrr� test_main_thread_during_shutdown,s
�z,ThreadTests.test_main_thread_during_shutdowncCs$d}td|�\}}}|�|d�dS)Na�if 1:
            import os
            import threading
            import time
            import random

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_Finalize() is called.
                random_sleep()
                tls.x = Sleeper()
                random_sleep()

            threading.Thread(target=f).start()
            random_sleep()
        r�r�r�)rr�r�r�r�rrr�test_finalization_shutdownJsz&ThreadTests.test_finalization_shutdowncs�t���t�����������fdd�}tj|d�}|�|jd�|�����|�|�	��|j}|�
|jdd�d����|�|jdd�d�|�|�	��|��|�
|�	��|�|j�|�
�dS)Ncs������t�d�dS)N�{�G�z�?)rcrfr)r*r�Zfinish�startedrrrRssz'ThreadTests.test_tstate_lock.<locals>.fr�rrtF�)rU�
allocate_lockrfrr �assertIs�_tstate_lockrBrirJrIrcr>rH)rrRr8�tstate_lockrr�r�test_tstate_lockms&zThreadTests.test_tstate_lockcs�t���t�����������fdd�}tj|d�}|�����|�dt|�����d}t	d�D]}|t|�kr�q�t
�d�qn|�|t|��|��dS)Ncs������dSr)rcrfrr�rrrR�sz(ThreadTests.test_repr_stopped.<locals>.fr�r��stoppedi�r�)
rUr�rfrr rBrgrArcr<r)r*rH)rrRr8ZLOOKING_FORrNrr�r�test_repr_stopped�s"zThreadTests.test_repr_stoppedcs�tdd�D]�}t�|���fdd�t|�D�}|D]}|��q2|D]}|��qD�fdd�t|�D�}|D]}|��ql|D]}|��q~|�t�j�q
dS)Nrr4csg|]}tj�jd��qS�r�)rr rf�r7r���bsrr�
<listcomp>�s�z;ThreadTests.test_BoundedSemaphore_limit.<locals>.<listcomp>csg|]}tj�jd��qSr�)rr rcr�r�rrr��s�)r<rr:rBrHr��
ValueErrorrc)r�limitrMr8rr�r�test_BoundedSemaphore_limit�s"

�


�

z'ThreadTests.test_BoundedSemaphore_limitc	s��fdd��dd����fdd��d�_t��}t���z4t���ddl}|���td�D]
}��q`W5t�|�XdS)	Ncs�Srr)�frame�event�arg)�
noop_tracerrr��sz9ThreadTests.test_frame_tstate_tracing.<locals>.noop_tracecssdVqdS)N�	generatorrrrrrr��sz8ThreadTests.test_frame_tstate_tracing.<locals>.generatorcs�jdkr���_t�j�Sr)�gen�nextr)�callbackr�rrr��s
z7ThreadTests.test_frame_tstate_tracing.<locals>.callbackrr%)r�r��gettrace�settracer�	_testcapiZcall_in_temporary_c_threadr<)rZ	old_tracer�r.r)r�r�r�r�test_frame_tstate_tracing�s


z%ThreadTests.test_frame_tstate_tracingc
Cs�dD]�}|j|d��lt��}tj|j|d�}|��|j}|sP|�|tj�n|�	|tj�|�
�|��|�	|tj�W5QRXqdS)N)FTr�)r�ry)ZsubTestrrSr rWrBr�rg�_shutdown_locksrErDrH)rryr�r�r�rrr�test_shutdown_locks�szThreadTests.test_shutdown_locksN)(rrrrOrZr`rarjr}r�r�r�r�r�r�r�r�r�r_�
skipUnlessrCr�r�r�r�r��skipIfr��platform�platforms_to_skipr�rr�r�r�r�r�rr�r�rrrrr3UsJ$X!



##
'r3c@s�eZdZdd�Zdd�Ze�eed�d�e�	e
jekd�dd	���Z
e�eed�d�e�	e
jekd�d
d���Ze�	e
jekd�dd
��Ze�eed�d�e�	e
jekd�dd���Ze�eed�d�dd��ZdS)�ThreadJoinOnShutdowncCs8d|}td|�\}}}|���dd�}|�|d�dS)Na�if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print('end of thread')
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
        
r�r�r�zend of main
end of thread
r�)r�scriptr�r�r�r�rrr�
_run_and_joins
�z"ThreadJoinOnShutdown._run_and_joincCsd}|�|�dS)Nz�if 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print('end of main')
            �r�rrrrr�test_1_join_on_shutdownsz,ThreadJoinOnShutdown.test_1_join_on_shutdownr�r�r�cCsd}|�|�dS)NaGif 1:
            childpid = os.fork()
            if childpid != 0:
                os.waitpid(childpid, 0)
                sys.exit(0)

            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print('end of main')
            rrrrr�test_2_join_in_forked_processsz2ThreadJoinOnShutdown.test_2_join_in_forked_processcCsd}|�|�dS)Na:if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            rrrrr�!test_3_join_in_forked_from_thread.sz6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadcCs"d}td|�\}}}|�|�dS)Nacif True:
            import os
            import random
            import sys
            import time
            import threading

            thread_has_run = set()

            def random_io():
                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
                while True:
                    with open(os.__file__, 'rb') as in_f:
                        stuff = in_f.read(200)
                        with open(os.devnull, 'wb') as null_f:
                            null_f.write(stuff)
                            time.sleep(random.random() / 1995)
                    thread_has_run.add(threading.current_thread())

            def main():
                count = 0
                for _ in range(40):
                    new_thread = threading.Thread(target=random_io)
                    new_thread.daemon = True
                    new_thread.start()
                    count += 1
                while len(thread_has_run) < count:
                    time.sleep(0.001)
                # Trigger process shutdown
                sys.exit(0)

            main()
            r�)rrI�rrr�r�r�rrr�test_4_daemon_threadsGs!z*ThreadJoinOnShutdown.test_4_daemon_threadscCsNdd�}g}td�D]"}tj|d�}|�|�|��q|D]}|��q<dS)NcSs,t��}|dkrt�|d�n
t�d�dSr)r�r�r�r�)r�rrr�do_fork_and_waitvszIThreadJoinOnShutdown.test_reinit_tls_after_fork.<locals>.do_fork_and_wait�r�)r<rr r=rBrH)rrrMrNr8rrr�test_reinit_tls_after_forkps	

z/ThreadJoinOnShutdown.test_reinit_tls_after_forkcCs�g}td�D]&}tjdd�d�}|�|�|��qt��}|dkrltt�	��dkr`t�
d�q�t�
d�nt�|d�\}}|�d|�|D]}|�
�q�dS)Nr
cSs
t�d�S)Ng333333�?)r)r*rrrrr��r�zKThreadJoinOnShutdown.test_clear_threads_states_after_fork.<locals>.<lambda>r�rr)r<rr r=rBr�r�rGr��_current_framesr�r�rFrH)rrMrNr8r�r�r�rrr�$test_clear_threads_states_after_fork�s

z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)rrrrrr_r�rCr�r�r�rrrr	rrrrrrrr�s
(rc@s(eZdZdd�Zdd�Zedd��ZdS)�SubinterpThreadingTestscCsbt��\}}|�tj|�|�tj|�d|f}tj�|�}|�|d�|�t�|d�d�dS)Naif 1:
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
            rr�x�	r��piper��closer.r	Zrun_in_subinterprF�read�r�r�wr�r|rrr�test_threads_join�s�z)SubinterpThreadingTests.test_threads_joincCsbt��\}}|�tj|�|�tj|�d|f}tj�|�}|�|d�|�t�|d�d�dS)Na�if 1:
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                tls.x = Sleeper()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
            rrrrrrrr�test_threads_join_2�s�z+SubinterpThreadingTests.test_threads_join_2c	CsHd}d|f}tj���td|�\}}}W5QRX|�d|���dS)NaAif 1:
            import os
            import threading
            import time

            def f():
                # Make sure the daemon thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(10)
            threading.Thread(target=f, daemon=True).start()
            z[if 1:
            import _testcapi

            _testcapi.run_in_subinterp(%r)
            r�z:Fatal Python error: Py_EndInterpreter: not the last thread)r.r	ZSuppressCrashReportrrgr�)rZsubinterp_coderr�r�r�rrr�test_daemon_threads_fatal_error�s��z7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)rrrrrrrrrrrr�s'rc@s`eZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
dd��Zdd�Zdd�Z
dS)�ThreadingExceptionTestscCs*t��}|��|�t|j�|��dSr)rr rBr��RuntimeErrorrH�rr�rrr�test_start_thread_againsz/ThreadingExceptionTests.test_start_thread_againcCst��}|�t|j�dSr)rrbr�rrH)rrbrrr�test_joining_current_thread
sz3ThreadingExceptionTests.test_joining_current_threadcCst��}|�t|j�dSr)rr r�rrHrrrr�test_joining_inactive_threadsz4ThreadingExceptionTests.test_joining_inactive_threadcCs.t��}|��|�tt|dd�|��dSr�)rr rBr�r�setattrrHrrrr�test_daemonize_active_threadsz4ThreadingExceptionTests.test_daemonize_active_threadcCst��}|�t|j�dSr)rrer�rrc)r�lockrrr�test_releasing_unacquired_locksz6ThreadingExceptionTests.test_releasing_unacquired_lockcCshd}d}tjtjd|gtjtjd�}|��\}}|���dd�}|�|j	dd|���|�||�dS)	Naif True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RecursionError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            zend of main thread
r�)�stdout�stderrr�r�rzUnexpected error: )
�
subprocess�Popenr��
executable�PIPE�communicater�r�rF�
returncode)rrZexpected_output�pr'r(r�rrr�test_recursion_limits�z,ThreadingExceptionTests.test_recursion_limitcCs\d}td|�\}}}|�|d�|��}|�d|�|�d|�|�d|�|�d|�dS)Na�if True:
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            r�r��Exception in thread�"Traceback (most recent call last):�ZeroDivisionError�Unhandled exception�rrFr�rgrEr
rrr�test_print_exception;sz,ThreadingExceptionTests.test_print_exceptioncCs\d}td|�\}}}|�|d�|��}|�d|�|�d|�|�d|�|�d|�dS)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            sys.stderr = None
            running = False
            t.join()
            r�r�r1r2r3r4r5r
rrr�%test_print_exception_stderr_is_none_1Vsz=ThreadingExceptionTests.test_print_exception_stderr_is_none_1cCs4d}td|�\}}}|�|d�|�d|���dS)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            sys.stderr = None
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            r�r�r4)rrFrEr�r
rrr�%test_print_exception_stderr_is_none_2tsz=ThreadingExceptionTests.test_print_exception_stderr_is_none_2csXdd��G�fdd�dtj�}|�}|��|��|�|j�|�|jt�d|_dS)NcSs�dSrrrrrr�
bare_raise�szOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.bare_raisecseZdZdZ�fdd�ZdS)zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558Nc
s8z
��Wn(tk
r2}z
||_W5d}~XYnXdSr)rv�exc)rr:�r9rrr,�s
zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558.run)rrrr:r,rr;rr�
Issue27558�sr<)rr rBrHrLr:rhr)rr<r�rr;r�#test_bare_raise_in_brand_new_thread�s	z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadN)rrrr r!r"r$r&r0r6rr7r8r=rrrrrs
rc@seZdZdd�ZdS)�
ThreadRunFailcCstd��dS)N�
run failed�r�rrrrr,�szThreadRunFail.runNrqrrrrr>�sr>c@s:eZdZdd�Zejdd��Zdd�Zdd�Zd	d
�Z	dS)�ExceptHookTestsc	Cszt�d�� }tdd�}|��|��W5QRX|����}|�d|j�d�|�|�d|�|�d|�|�d|�dS)	Nr(zexcepthook threadr�Exception in thread �:
�#Traceback (most recent call last):
z   raise ValueError("run failed")zValueError: run failed)	r	�captured_outputr>rBrH�getvaluer�rgr)rr(r�rrr�test_excepthook�s
zExceptHookTests.test_excepthookcCs�t�d��^}ztd��WnJtk
rb}z,t�t��d��}zt�|�W5d}XW5d}~XYnXW5QRX|�	��
�}|�dt���d�|�|�d|�|�d|�|�d|�dS)	Nr(ZbugrBrCrDz  raise ValueError("bug")zValueError: bug)N)
r	rEr�rvr�ExceptHookArgsr��exc_info�
excepthookrFr�rgrm)rr(r:r�rrr�test_excepthook_thread_None�s"z+ExceptHookTests.test_excepthook_thread_Nonec	CsRGdd�dtj�}t�d��}|�}|��|��W5QRX|�|��d�dS)Nc@seZdZdd�ZdS)z4ExceptHookTests.test_system_exit.<locals>.ThreadExitcSst�d�dSr)r��exitrrrrr,�sz8ExceptHookTests.test_system_exit.<locals>.ThreadExit.runNrqrrrr�
ThreadExit�srMr(r�)rr r	rErBrHrFrF)rrMr(r�rrr�test_system_exit�sz ExceptHookTests.test_system_exitc	s�d��fdd�}ztt�td|��t�}|��|��W5QRX|��jt�|�t	�j
�d�|��j�j
j�|�
�j|�W5d�XdS)Ncs|�dSrr)Z	hook_argsrrr�hook�sz4ExceptHookTests.test_custom_excepthook.<locals>.hookrJr?)r	�	swap_attrrr>rBrHrF�exc_typer��str�	exc_value�
exc_traceback�
__traceback__r�r�)rrOr�rrr�test_custom_excepthook�sz&ExceptHookTests.test_custom_excepthookcs�dd�}d��fdd�}t�td|��Lt�td|��2t�d��}t�}|��|��W5QRXW5QRXW5QRX|�|�	�d�|��d�dS)	NcSstd��dS)N�threading_hook failedr@rrrr�threading_hook�szCExceptHookTests.test_custom_excepthook_fail.<locals>.threading_hookcst|��dSr)rR)rQrSrT�Zerr_strrr�sys_hook�sz=ExceptHookTests.test_custom_excepthook_fail.<locals>.sys_hookrJr(z#Exception in threading.excepthook:
rW)
r	rPrr�rEr>rBrHrFrF)rrXrZr(r�rrYr�test_custom_excepthook_fail�s ��&
�z+ExceptHookTests.test_custom_excepthook_failN)
rrrrGr	rrKrNrVr[rrrrrA�s

rAc@s$eZdZdd�Zdd�Zdd�ZdS)�
TimerTestscCst�|�g|_t��|_dSr)r-r0�
callback_argsrrS�callback_eventrrrrr0s
zTimerTests.setUpcCs�t�d|j�}|��|j��|j�d�d|jd<|j�	�t�d|j�}|��|j��|�
t|j�d�|�
|jdifdifg�|�
�|�
�dS)Nr�ZblahZbarZfoor�r)r�Timer�
_callback_spyrBr^rWr�r=r��clearrFrGr]rH)rZtimer1Ztimer2rrr� test_init_immutable_default_args	s



z+TimerTests.test_init_immutable_default_argscOs*|j�|dd�|��f�|j��dSr)r]r=�copyr^rD)rr�r�rrrr`szTimerTests._callback_spyN)rrrr0rbr`rrrrr\sr\c@seZdZeej�ZdS)�	LockTestsN)rrr�staticmethodrre�locktyperrrrrdsrdc@seZdZeej�ZdS)�PyRLockTestsN)rrrrer�_PyRLockrfrrrrrg!srgzRLock not implemented in Cc@seZdZeej�ZdS)�CRLockTestsN)rrrrer�_CRLockrfrrrrri$sric@seZdZeej�ZdS)�
EventTestsN)rrrrerrSZ	eventtyperrrrrk(srkc@seZdZeej�ZdS)�ConditionAsRLockTestsN)rrrrer�	Conditionrfrrrrrl+srlc@seZdZeej�ZdS)�ConditionTestsN)rrrrerrmZcondtyperrrrrn/srnc@seZdZeej�ZdS)�SemaphoreTestsN)rrrrer�	Semaphore�semtyperrrrro2sroc@seZdZeej�ZdS)�BoundedSemaphoreTestsN)rrrrerr:rqrrrrrr5srrc@seZdZeej�ZdS)�BarrierTestsN)rrrrer�BarrierZbarriertyperrrrrs8srsc@seZdZdd�ZdS)�MiscTestCasecCs&dh}ddh}tj|td||d�dS)Nr~rPr�)rrU)�extra�	blacklist)r	Zcheck__all__r)rrvrwrrr�test__all__=s
�zMiscTestCase.test__all__N)rrrrxrrrrru<sruc@s$eZdZdd�Zdd�Zdd�ZdS)�InterruptMainTestsc	CsFdd�}tj|d�}|�t��|��|��W5QRX|��dS)NcSst��dSr)rU�interrupt_mainrrrr�call_interruptHszHInterruptMainTests.test_interrupt_main_subthread.<locals>.call_interruptr�)rr r��KeyboardInterruptrBrH)rr{r8rrr�test_interrupt_main_subthreadEsz0InterruptMainTests.test_interrupt_main_subthreadc	Cs"|�t��t��W5QRXdSr)r�r|rUrzrrrr�test_interrupt_main_mainthreadPsz1InterruptMainTests.test_interrupt_main_mainthreadc
CsVt�tj�}z4t�tjtj�t��t�tjtj�t��W5t�tj|�XdSr)�signal�	getsignal�SIGINT�SIG_IGNrUrz�SIG_DFL)r�handlerrrr�test_interrupt_main_noerrorVsz.InterruptMainTests.test_interrupt_main_noerrorN)rrrr}r~r�rrrrryDsry�__main__)4�__doc__Ztest.supportr.rrrrZtest.support.script_helperrrr'r�rUrr)r_r�r�r)rrr	rr�r
r rZTestCaser-r3rrrr>rAr\rdZ
RLockTestsrgr�rjrirkrlrnrorrrsruryrr�rrrr�<module>s^
!	.%_#Z 


F1le Man4ger