Server : LiteSpeed
System : Linux server51.dnsbootclub.com 4.18.0-553.62.1.lve.el8.x86_64 #1 SMP Mon Jul 21 17:50:35 UTC 2025 x86_64
User : nandedex ( 1060)
PHP Version : 8.1.33
Disable Function : NONE
Directory :  /opt/cppython/lib/python3.8/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]


Current File : //opt/cppython/lib/python3.8/test/__pycache__/test_signal.cpython-38.opt-2.pyc
U

>��gv��@s�ddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZddl
mZmZzddlZWnek
r�dZYnXGdd�de
j�Ze
�ejdkd�Gdd	�d	e
j��Ze
�ejdkd
�Gdd�de
j��ZGd
d�de
j�Ze
�ejdkd�Gdd�de
j��Ze
�eed�d�Gdd�de
j��Ze
�ejdkd�Gdd�de
j��Ze
�ejdkd�Gdd�de
j��ZGdd�de
j�ZGdd�de
j�Z Gdd�de
j�Z!dd �Z"e#d!k�r�e
�$�dS)"�N)�support)�assert_python_ok�spawn_pythonc@seZdZdd�ZdS)�GenericTestscCs�tt�D]�}tt|�}|dkr.|�|tj�q|dkrF|�|tj�q|�d�rj|�d�sj|�|tj�q|�d�r|�|tj�|�t	j
d�qdS)N>�SIG_IGN�SIG_DFL>�SIG_SETMASK�	SIG_BLOCK�SIG_UNBLOCKZSIGZSIG_ZCTRL_�win32)�dir�signal�getattr�assertIsInstance�Handlers�Sigmasks�
startswith�Signals�assertEqual�sys�platform)�self�name�sig�r�//opt/cppython/lib/python3.8/test/test_signal.py�
test_enumss

zGenericTests.test_enumsN)�__name__�
__module__�__qualname__rrrrrrsrrzNot valid on Windowsc@sZeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
�ej
d�dd��ZdS)�
PosixTestscGsdS�Nr�r�argsrrr�trivial_signal_handler&sz!PosixTests.trivial_signal_handlercCs8|�ttjd�|�ttjd|j�|�ttjd�dS)Ni�)�assertRaises�
ValueErrorr
�	getsignalr$�	strsignal�rrrr�,test_out_of_range_signal_number_raises_error)s
�z7PosixTests.test_out_of_range_signal_number_raises_errorcCs|�ttjtjd�dSr!)r%�	TypeErrorr
�SIGUSR1r)rrr�0test_setting_signal_handler_to_none_raises_error1s
�z;PosixTests.test_setting_signal_handler_to_none_raises_errorcCsZt�tj|j�}|�|tj�|�t�tj�|j�t�tj|�|�t�tj�|�dSr!)r
�SIGHUPr$rrrr')rZhuprrr�test_getsignal5s�zPosixTests.test_getsignalcCs@|�dt�tj��|�dt�tj��|�dt�tj��dS)NZ	InterruptZ
TerminatedZHangup)�assertInr
r(�SIGINT�SIGTERMr.r)rrr�test_strsignal=szPosixTests.test_strsignalcCs&tj�t�}tj�|d�}t|�dS)Nzsignalinterproctester.py)�os�path�dirname�__file__�joinr)rr6Zscriptrrr�test_interprocess_signalCsz#PosixTests.test_interprocess_signalcCsdt��}|�|t�|�tjj|�|�tjj|�|�d|�|�tj	|�|�
t|�tj	�dS�Nr)r
�
valid_signalsr�setr0rr1�SIGALRM�assertNotIn�NSIG�
assertLess�len�r�srrr�test_valid_signalsHszPosixTests.test_valid_signals�sys.executable required.cCs<tjtjddgtjd�}|�d|j�|�|jt	j
�dS)N�-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)��stderr�KeyboardInterrupt)�
subprocess�runr�
executable�PIPEr0rHr�
returncoder
r1)r�processrrr�!test_keyboard_interrupt_exit_codeQs��z,PosixTests.test_keyboard_interrupt_exit_codeN)rrrr$r*r-r/r3r9rD�unittest�
skipUnlessrrLrPrrrrr $s	r zWindows specificc@s2eZdZdd�Zdd�Ze�ejd�dd��Z	dS)	�WindowsSignalTestscCsdt��}|�|t�|�t|�d�|�tjj|�|�	d|�|�	tj
|�|�t|�tj
�dS)N�r)r
r;rr<ZassertGreaterEqualrAr0rr1r>r?r@rBrrrrDesz%WindowsSignalTests.test_valid_signalsc	Cs�dd�}t�}tjtjtjtjtjtjtjfD]0}t�	|�dk	r.t�|t�||��|�
|�q.|�|�|�t
��t�d|�W5QRX|�t
��t�d|�W5QRXdS)NcSsdSr!r)�x�yrrr�<lambda>p�z3WindowsSignalTests.test_issue9324.<locals>.<lambda>����)r<r
�SIGABRTZSIGBREAK�SIGFPE�SIGILLr1�SIGSEGVr2r'�add�
assertTruer%r&)r�handler�checkedrrrr�test_issue9324ns �
z!WindowsSignalTests.test_issue9324rEcCs<tjtjddgtjd�}|�d|j�d}|�|j|�dS)NrFzraise KeyboardInterruptrGrIl:)	rJrKrrLrMr0rHrrN)rrOZSTATUS_CONTROL_C_EXITrrrrP�s
�z4WindowsSignalTests.test_keyboard_interrupt_exit_codeN)
rrrrDrcrQrRrrLrPrrrrrSbs	rSc@sNeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Ze�	e
jdkd�d
d��ZdS)�
WakeupFDTestsc	CsL|�t��tjtjd�W5QRX|�t��t�tjd�W5QRXdS)N)�signumF)r%r+r
�
set_wakeup_fdr1r)rrr�test_invalid_call�szWakeupFDTests.test_invalid_callcCs t��}|�ttftj|�dSr!)rZmake_bad_fdr%r&�OSErrorr
rf)r�fdrrr�test_invalid_fd�s

�zWakeupFDTests.test_invalid_fdcCs0t��}|��}|��|�ttftj|�dSr!)�socket�fileno�closer%r&rhr
rf)r�sockrirrr�test_invalid_socket�s
�z!WakeupFDTests.test_invalid_socketcCs�t��\}}|�tj|�|�tj|�t��\}}|�tj|�|�tj|�ttd�rrt�|d�t�|d�t�|�|�t�|�|�|�t�d�|�|�t�d�d�dS)N�set_blockingFrY)	r4�pipe�
addCleanuprm�hasattrrpr
rfr)rZr1Zw1Zr2Zw2rrr�test_set_wakeup_fd_result�s

z'WakeupFDTests.test_set_wakeup_fd_resultcCs�t��}|�|j�|�d�|��}t��}|�|j�|�d�|��}t�|�|�t�|�|�|�t�d�|�|�t�d�d�dS)NFrY)rkrrrm�setblockingrlr
rfr)rZsock1�fd1Zsock2�fd2rrr� test_set_wakeup_fd_socket_result�s


z.WakeupFDTests.test_set_wakeup_fd_socket_resultrztests specific to POSIXc	Cs�t��\}}|�tj|�|�tj|�t�|d�|�t��}t�|�W5QRX|�	t
|j�d|�t�|d�t�|�t�d�dS)NTz&the fd %s must be in non-blocking modeFrY)r4rqrrrmrpr%r&r
rfr�str�	exception)rZrfdZwfd�cmrrr�test_set_wakeup_fd_blocking�s�
z)WakeupFDTests.test_set_wakeup_fd_blockingN)
rrrrgrjrortrxrQ�skipIfrrr|rrrrrd�s	rdc@steZdZe�edkd�dd�dd��Ze�edkd�dd��Zd	d
�Zdd�Z	d
d�Z
e�ee
d�d�dd��ZdS)�WakeupSignalTestsN�need _testcapiT��orderedcGs&d�ttt|��||�}td|�dS)Naif 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        rF)�format�tuple�map�intr)rZ	test_bodyr�Zsignals�coderrr�check_wakeup�s �"zWakeupSignalTests.check_wakeupc	Csjd}t��\}}z4zt�|d�Wntk
r6YnX|�d�W5t�|�t�|�Xtd|�dS)Na&if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        �xz9OS doesn't report write() error on the read end of a piperF)r4rqrm�writerh�skipTestr)rr��r�wrrr�test_wakeup_write_errors"
z)WakeupSignalTests.test_wakeup_write_errorcCs|�dtj�dS)Na�def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        �r�r
r=r)rrr�test_wakeup_fd_early<s�z&WakeupSignalTests.test_wakeup_fd_earlycCs|�dtj�dS)Na`def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r�r)rrr�test_wakeup_fd_during^s�z'WakeupSignalTests.test_wakeup_fd_duringcCs|�dtjtj�dS)Nz�def test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )r�r
r,r=r)rrr�test_signum|s�zWakeupSignalTests.test_signum�pthread_sigmask�need signal.pthread_sigmask()cCs|jdtjtjdd�dS)Na�def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Fr�)r�r
r,�SIGUSR2r)rrr�test_pending�s
�zWakeupSignalTests.test_pending)rrrrQr}�	_testcapir�r�r�r�r�rRrsr
r�rrrrr~�s&
3"�r~�
socketpairzneed socket.socketpairc@sTeZdZe�edkd�dd��Ze�edkd�dd��Ze�edkd�dd��ZdS)	�WakeupSocketSignalTestsNrcCsd}td|�dS)Na�if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        rF�r�rr�rrr�test_socket�sz#WakeupSocketSignalTests.test_socketcCs.tjdkrd}nd}dj|d�}td|�dS)N�nt�sendr�a+if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        ��actionrF�r4rr�r�rr�r�rrr�test_send_error�s
!�"z'WakeupSocketSignalTests.test_send_errorcCs.tjdkrd}nd}dj|d�}td|�dS)Nr�r�r�a�if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        # Start with large chunk size to reduce the
        # number of send needed to fill the buffer.
        written = 0
        for chunk_size in (2 ** 16, 2 ** 8, 1):
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, socket.timeout):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        r�rFr�r�rrr�test_warn_on_full_buffer�s
c�dz0WakeupSocketSignalTests.test_warn_on_full_buffer)	rrrrQr}r�r�r�r�rrrrr��s
#
*r�c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�SiginterruptTestc
Cs�d|f}td|���}z|j��}|jdd�\}}Wn*tjk
r^|��YW5QR�dSX||}|��}|dkr�td||f��|dkW5QR�SW5QRXdS)	Na�if 1:
            import errno
            import os
            import signal
            import sys

            interrupt = %r
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        rFg@)�timeoutF)��zChild error (exit code %s): %rr�)	r�stdout�readline�communicaterJ�TimeoutExpired�kill�wait�	Exception)rZ	interruptr�rOZ
first_liner�rH�exitcoderrr�readpipe_interruptedXs"#�$
�z%SiginterruptTest.readpipe_interruptedcCs|�d�}|�|�dSr!�r�r`�rZinterruptedrrr�test_without_siginterrupt�s
z*SiginterruptTest.test_without_siginterruptcCs|�d�}|�|�dS�NTr�r�rrr�test_siginterrupt_on�s
z%SiginterruptTest.test_siginterrupt_oncCs|�d�}|�|�dS)NF)r�ZassertFalser�rrr�test_siginterrupt_off�s
z&SiginterruptTest.test_siginterrupt_offN)rrrr�r�r�r�rrrrr�Us<r�c@sneZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
�ej
dkd�dd��Zdd�Zdd�ZdS)�
ItimerTestcCs(d|_d|_d|_t�tj|j�|_dS)NFr)�hndl_called�
hndl_count�itimerr
r=�sig_alrm�	old_alarmr)rrr�setUp�szItimerTest.setUpcCs,t�tj|j�|jdk	r(t�|jd�dSr:)r
r=r�r��	setitimerr)rrr�tearDown�s
zItimerTest.tearDowncGs
d|_dSr�)r�r"rrrr��szItimerTest.sig_alrmcGsFd|_|jdkrt�d��n|jdkr4t�tjd�|jd7_dS)NTr�z.setitimer didn't disable ITIMER_VIRTUAL timer.r�)r�r�r
�ItimerErrorr��ITIMER_VIRTUALr"rrr�
sig_vtalrm�s

zItimerTest.sig_vtalrmcGsd|_t�tjd�dS)NTr)r�r
r��ITIMER_PROFr"rrr�sig_prof�szItimerTest.sig_profcCs|�tjtjdd�dS)NrYr)r%r
r�r�r)rrr�test_itimer_exc�szItimerTest.test_itimer_exccCs0tj|_t�|jd�t��|�|jd�dS)Ng�?T)r
�ITIMER_REALr�r��pauserr�r)rrr�test_itimer_real�szItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.cCs�tj|_t�tj|j�t�|jdd�t��}t��|dkr`tddd�}t�	|j�dkr0qjq0|�
d�|�t�	|j�d�|�|jd	�dS)
Ng333333�?皙�����?�N@�90�2	铖���r��8timeout: likely cause: machine too slow or load too highT)
r
r�r��	SIGVTALRMr�r��time�	monotonic�pow�	getitimerr�rr��rZ
start_time�_rrr�test_itimer_virtual�s
zItimerTest.test_itimer_virtualcCs�tj|_t�tj|j�t�|jdd�t��}t��|dkr`tddd�}t�	|j�dkr0qjq0|�
d�|�t�	|j�d�|�|jd�dS)	Nr�r�r�r�r�r�r�T)
r
r�r��SIGPROFr�r�r�r�r�r�r�rr�r�rrr�test_itimer_prof�s
zItimerTest.test_itimer_profcCs2tj|_t�|jd�t�d�|�|jd�dS)N���ư>r�T)r
r�r�r�r��sleeprr�r)rrr�test_setitimer_tinys
zItimerTest.test_setitimer_tinyN)rrrr�r�r�r�r�r�r�rQr}rrr�r�r�rrrrr��s
	�
r�c@s�eZdZe�eed�d�dd��Ze�eed�d�e�eed�d�dd���Ze�eed	�d
�dd��Z	e�eed�d�d
d��Z
e�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�dd��Z
e�eed�d�dd��Ze�eed�d�dd��Ze�eed�d�dd ��Ze�eed�d�e�eed�d�d!d"���Ze�eed�d�d#d$��Ze�eed�d�d%d&��Ze�eed�d�d'd(��Ze�eed	�d
�d)d*��Zd+S),�PendingSignalsTests�
sigpendingzneed signal.sigpending()cCs|�t��t��dSr!)rr
r�r<r)rrr�test_sigpending_emptysz)PendingSignalsTests.test_sigpending_emptyr�r�cCsd}td|�dS)Na
if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rFr�r�rrr�test_sigpendingsz#PendingSignalsTests.test_sigpending�pthread_killzneed signal.pthread_kill()cCsd}td|�dS)Na�if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rFr�r�rrr�test_pthread_kill9sz%PendingSignalsTests.test_pthread_killcCsd|��|f}td|�dS)Nawif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        %s

        blocked = %s
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        rF)�stripr)rZblocked�testr�rrr�wait_helperRs 
�%zPendingSignalsTests.wait_helper�sigwaitzneed signal.sigwait()cCs|�tjd�dS)Na 
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        �r�r
r=r)rrr�test_sigwait�sz PendingSignalsTests.test_sigwait�sigwaitinfozneed signal.sigwaitinfo()cCs|�tjd�dS)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        r�r)rrr�test_sigwaitinfo�sz$PendingSignalsTests.test_sigwaitinfo�sigtimedwaitzneed signal.sigtimedwait()cCs|�tjd�dS)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r�r)rrr�test_sigtimedwait�sz%PendingSignalsTests.test_sigtimedwaitcCs|�tjd�dS)Nz�
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        r�r)rrr�test_sigtimedwait_poll�sz*PendingSignalsTests.test_sigtimedwait_pollcCs|�tjd�dS)Nz�
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        r�r)rrr�test_sigtimedwait_timeout�sz-PendingSignalsTests.test_sigtimedwait_timeoutcCstj}|�ttj|gd�dS)Ng�)r
r=r%r&r�)rrerrr�"test_sigtimedwait_negative_timeout�sz6PendingSignalsTests.test_sigtimedwait_negative_timeoutcCstdd�dS)NrFa�if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        r�r)rrr�test_sigwait_thread�s	z'PendingSignalsTests.test_sigwait_threadc	Cs�|�ttj�|�ttjd�|�ttjddd�|�ttjdg�|�t��t�tjtjg�W5QRX|�t��t�tjdg�W5QRX|�t��t�tjdd>g�W5QRXdS)Nr�r�r�i�ri�)r%r+r
r�rhr&r	r?r)rrr�test_pthread_sigmask_arguments�sz2PendingSignalsTests.test_pthread_sigmask_argumentscCsJt�tjt���}|�tjtj|�t�tjt���}|�|t���dSr!)r
r�r	r;rrrr
ZassertLessEqualrBrrr�"test_pthread_sigmask_valid_signals�sz6PendingSignalsTests.test_pthread_sigmask_valid_signalscCsd}td|�dS)Na-	if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        rFr�r�rrr�test_pthread_sigmask�sHz(PendingSignalsTests.test_pthread_sigmaskc	CsJd}td|��2}|��\}}|��}|dkr<td||f��W5QRXdS)Na7if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        rFr�zChild error (exit code %s): %s)rr�r�r�)rr�rOr�rHr�rrr�test_pthread_kill_main_threadEs
�z1PendingSignalsTests.test_pthread_kill_main_threadN)rrrrQrRrsr
r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�rrrrr�s��
���
�
,�

�
	�
	�
�
�
���
�
�
K�r�c@sreZdZdd�Zdd�Zdd�Ze�ee	d�d�d	d
��Z
e�ee	d�d�dd��Ze�ee	d
�d�dd��ZdS)�
StressTestcCs t�||�}|�tj||�dSr!)r
rr)rrera�old_handlerrrr�setsigfszStressTest.setsigcs�d�g�d
��fdd�	}|�tjtjd�|�tj|�|�t���krVt�d�q>�fdd�t	t��d�D�}t
�|�}tj
r�td	|f�|S)N�cs,t���kr(��t���t�tjd�dS)Nr�)rA�appendr��perf_counterr
r�r��re�frame��N�timesrrransz5StressTest.measure_itimer_resolution.<locals>.handlerrg����MbP?cs g|]}�|d�|�qS)r�r)�.0�i)rrr�
<listcomp>|sz8StressTest.measure_itimer_resolution.<locals>.<listcomp>r�z,detected median itimer() resolution: %.6f s.)NN)rrr
r�r�r�r=rAr�r��range�
statisticsZmedianr�verbose�print)rraZ	durationsZmedrr�r�measure_itimer_resolutionjs
z$StressTest.measure_itimer_resolutioncCs4|��}|dkrdS|dkr dS|�d|f�dS)Ng-C��6?i'g{�G�z�?�dz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r	r�)rZresorrr�decide_itimer_count�s�zStressTest.decide_itimer_countr�ztest needs setitimer()cs�|��}g�dd�}d
�fdd�	}|�tj|�|�tj|�|�tj|�d}t��d}||kr�t�	t�
�tj�|d7}t��|kr�t��|kr�t�d�q~t�	t�
�tj�|d7}t��|kr\t��|kr\t�d�q�q\|�
t��|d	�dS)NcSst�tjdt��d�dS)Nr���h㈵��>)r
r�r��randomr�rrr�
first_handler�sz@StressTest.test_stress_delivery_dependent.<locals>.first_handlercs��|�dSr!�r�r��Zsigsrr�second_handler�szAStressTest.test_stress_delivery_dependent.<locals>.second_handlerr�.@r�r�Some signals were lost)NN)rr�r
r�r,r=r�r�r4r��getpidrAr�r)rrrr�
expected_sigs�deadlinerrr�test_stress_delivery_dependent�s&z)StressTest.test_stress_delivery_dependentcs�|��}g��fdd�}|�tj|�|�tj|�d}t��d}||kr�t�tjdt	�	�d�t
�t
��tj�|d7}t
��|krDt��|krDt�d�q�qD|�t
��|d�dS)	Ncs��|�dSr!rr�rrrra�sz=StressTest.test_stress_delivery_simultaneous.<locals>.handlerrrr�rr�r)rr�r
r,r=r�r�r�r�r
r4r�rrAr�r)rrrarrrrr�!test_stress_delivery_simultaneous�sz,StressTest.test_stress_delivery_simultaneousr,ztest needs SIGUSR1c	stj�d�d�d��fdd�����fdd�}���fdd�}t����}|�tj�|�tj|d	�}z�d}t���Z}|��|�d
�|��|j	dk	r�|�
|j	jt�|�
d��d�t|j	j��d
}W5QRX|s�|��d�|����W5d
�|��XdS)
NrFcs�d7�dS�Nr�rr�)�num_received_signalsrr�custom_handler�szAStressTest.test_stress_modifying_handlers.<locals>.custom_handlercs�st����d7�qdSr)r
�raise_signalr)�do_stop�num_sent_signalsrerr�set_interrupts�s
zAStressTest.test_stress_modifying_handlers.<locals>.set_interruptscs8�dkr4td�D] }�tjfD]}t��|�qqqdS)Nr
i N)rr
r)rra)rrrerr�cycle_handlers�szAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlers)�targetTzSignal z ignored due to race condition)r
r,rr�	threading�Threadr8rZcatch_unraisable_exception�startZ
unraisabler�	exc_valuerhr0ryZ
assertGreaterr@)rrr r��tZignoredr{r)rrrrrer�test_stress_modifying_handlers�s:



�z)StressTest.test_stress_modifying_handlersN)
rrrr�r	rrQrRrsr
rrr'rrrrr�_s�
,�
�r�c@s6eZdZdd�Ze�ejdkd�dd��Zdd�Z	d	S)
�RaiseSignalTestc	Cs&|�t��t�tj�W5QRXdSr!)r%�KeyboardInterruptr
rr1r)rrr�test_sigintszRaiseSignalTest.test_sigintrzWindows specific testc
CsTzd}t�|�|�d�Wn2tk
rN}z|jtjkr<n�W5d}~XYnXdS)Nr�z#OSError (Invalid argument) expected)r
rZfailrh�errno�EINVAL)rr.�errr�test_invalid_argument s
z%RaiseSignalTest.test_invalid_argumentcsJd��fdd�}t�tj|�}|�tjtj|�t�tj�|���dS)NFcsd�dSr�r)�a�b�Zis_okrrra.sz-RaiseSignalTest.test_handler.<locals>.handler)r
r1rrrr`)rraZ
old_signalrr1r�test_handler,szRaiseSignalTest.test_handlerN)
rrrr*rQr}rrr.r2rrrrr(s
r(cCst��dSr!)r�
reap_childrenrrrr�tearDownModule8sr4�__main__)%r+r4r
r
rkrrJrr"r�rQr�rZtest.support.script_helperrrr��ImportErrorZTestCaserr}rr rRrSrdr~rsr�r�r�r�r�r(r4r�mainrrrr�<module>sR
=/M6@TeQ<


F1le Man4ger