Server : LiteSpeed
System : Linux server51.dnsbootclub.com 4.18.0-553.62.1.lve.el8.x86_64 #1 SMP Mon Jul 21 17:50:35 UTC 2025 x86_64
User : nandedex ( 1060)
PHP Version : 8.1.33
Disable Function : NONE
Directory :  /opt/cppython/lib/python3.8/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]


Current File : //opt/cppython/lib/python3.8/test/__pycache__/test_threaded_import.cpython-38.pyc
U

>��g�#�@s�ddlZddlZddlZddlZddlZddlZddlZddlZddlm	Z	ddl
mZmZm
Z
mZmZmZmZmZdd�Zdd�Zdd	d
dd�ZGd
d�d�ZGdd�d�ZGdd�dej�Zedd��Zedkr�e�dS)�N)�mock)�verbose�run_unittest�TESTFN�reap_threads�forget�unlink�rmtree�
start_threadsc	
Cs�zvz>t|�dr"ddl}ddl}nddl}ddl}|�dd�}Wn2tk
rr}z|�|�	d��W5d}~XYnXW5|�t���t|�|k}|r�|��XdS)N�r��)
�append�	threading�	get_ident�len�set�modulefinder�random�	randrange�	Exception�with_traceback)	�N�done�
done_tasks�errors�finishedrr�x�e�r�8/opt/cppython/lib/python3.8/test/test_threaded_import.py�tasks
&r!cCstjddd�|�S)Nzos.register_at_forkT)Zcreate)rZpatch)�funcrrr �mock_register_at_fork)sr#zaif 1:
        import time
        time.sleep(%(delay)s)
        x = 'a'
        import C
        zaif 1:
        import time
        time.sleep(%(delay)s)
        x = 'b'
        import D
        zimport Bzimport A)�A�B�C�Dc@s"eZdZdZdd�Zddd�ZdS)�FinderzIA dummy finder to detect concurrent access to its find_spec()
    method.cCsd|_d|_t��|_dS�Nr)�numcallsrr�Lock�lock��selfrrr �__init__GszFinder.__init__Nc	CsJt��st�|j�|jd7_W5QRX|j}t�d�|d|_dS)Nrg{�G�z�?)�imp�	lock_held�AssertionErrorr,r*r�time�sleep)r.�name�path�targetrrrr �	find_specLs
zFinder.find_spec)NN)�__name__�
__module__�__qualname__�__doc__r/r8rrrr r(Csr(c@seZdZdZddd�ZdS)�FlushingFinderzMA dummy finder which flushes sys.path_importer_cache when it gets
    called.NcCstj��dS�N)�sys�path_importer_cache�clear)r.r5r6r7rrr r8[szFlushingFinder.find_spec)NN)r9r:r;r<r8rrrr r=Wsr=c@s\eZdZdd�Zdd�Zedd��Zdd�Zd	d
�Zdd�Z	d
d�Z
dd�Zedd��ZdS)�ThreadedImportTestscCstj�dd�|_dS�Nr)r?�modules�pop�
old_randomr-rrr �setUpaszThreadedImportTests.setUpcCs|jdk	r|jtjd<dSrC)rFr?rDr-rrr �tearDownds
zThreadedImportTests.tearDownc
st��rt�d��t���dD]�tr6td�ddd�dD]&}ztj	|=Wq:t
k
r^Yq:Xq:g�g����t�
�}t����fdd	�t��D���W5QRX��d
�}t�
�|}tr�td|dd
dd�dt���f}|��|�|�||�trtd�qdS)Nz"can't run when import lock is held)��2rIrJrIrJZTryingzthreads ...� )�end)rrc3s$|]}tjt����fd�VqdS))r7�argsN)r�Threadr!)�.0�i�rrrrrr �	<genexpr>s�
�zAThreadedImportTests.check_parallel_module_init.<locals>.<genexpr>iXz%.1f msg@�@T)�flushrLzdone: %s/%szOK.)r0r1�unittestZSkipTestr�Eventr�printr?rD�KeyErrorrAr3�	monotonicr
�range�waitr�assertFalseZ
assertTrue)r.�mock_os�modname�t0Z	completedZdtZdbg_inforrQr �check_parallel_module_initks8
�


z.ThreadedImportTests.check_parallel_module_initcCs|��dSr>)r_r-rrr �test_parallel_module_init�sz-ThreadedImportTests.test_parallel_module_initc	CsRt�}tj�d|�z*|��|�|jd�|�|j	|j�W5tj�|�XdSr))
r(r?�	meta_path�insert�remover_�
assertGreaterr*�assertEqualr)r.�finderrrr �test_parallel_meta_path�sz+ThreadedImportTests.test_parallel_meta_pathc	s�t��t�}�fdd�}tj�d|�tj�|�z4|�d�|�	�}|�
�jd�|��j
�j�W5tj�|�tj�|�XdS)Ncs��d�t�dS)N�)r8�ImportError)r6�rfrr �	path_hook�s
z?ThreadedImportTests.test_parallel_path_hooks.<locals>.path_hookrrh)r(r=r?�
path_hooksrbrarrcr8r_rdr*rer)r.Zflushing_finderrkZnumtestsrrjr �test_parallel_path_hooks�s
z,ThreadedImportTests.test_parallel_path_hookscCs<ztjd=Wntk
r YnXddl}|�|jj�dS)Nztest.threaded_import_hangersr)r?rDrWZtest.threaded_import_hangersr[Zthreaded_import_hangersr)r.�testrrr �test_import_hangers�sz'ThreadedImportTests.test_import_hangersc	
sd}t�t�|�tjt�tj�dt�|�tjj	t�t
��D]T\}}|d|i}ttj�
t|d�d��}|�|�d��W5QRX|�t|�qBt��g��fdd�}�fd	d
�}tj|d�}tj|d�}|��|��|�
�|�
�|�t��dd
h�dS)Ng�?r�delay�.py�wb�utf-8csddl}��t|dd��dS�Nrr)r$r�getattr)r$��resultsrr �	import_ab�sz<ThreadedImportTests.test_circular_imports.<locals>.import_abcsddl}��t|dd��dSrt)r%rru)r%rvrr �	import_ba�sz<ThreadedImportTests.test_circular_imports.<locals>.import_ba)r7�a�b)�os�mkdirr�
addCleanup�shutilr	r?r6rbrc�circular_imports_modules�items�open�join�write�encoder�	importlib�invalidate_cachesrrN�startrer)	r.rpr5�contents�frxry�t1�t2rrvr �test_circular_imports�s*
z)ThreadedImportTests.test_circular_importsc	Cs�d}tj�dtj�|�tjjtj�td}t|d��}|�	|�
d��W5QRX|�t|�|�tt�|�t
d�t��tt�tjt=dS)Nz�if 1:
            import threading
            def target():
                import random
            t = threading.Thread(target=target)
            t.start()
            t.join()
            t = Nonerrqrrrs�__pycache__)r?r6rbr|�curdirr~rcrr�r�r�rrr	r�r��
__import__rD)r.r\�code�filenamer�rrr �test_side_effect_import�sz+ThreadedImportTests.test_side_effect_importN)
r9r:r;rGrHr#r_r`rgrmror�r�rrrr rB_s
!

&rBc	CsVd}zt��}t�d�Wntk
r.YnXztt�W5|dk	rPt�|�XdS)Ng�h㈵��>)r?�getswitchinterval�setswitchinterval�AttributeErrorrrB)Zold_switchintervalrrr �	test_main�sr��__main__)�_impr0r|r�r?r3rrrTrZtest.supportrrrrrrr	r
r!r#r�r(r=ZTestCaserBr�r9rrrr �<module>s0(
�



F1le Man4ger