Server : LiteSpeed
System : Linux server51.dnsbootclub.com 4.18.0-553.62.1.lve.el8.x86_64 #1 SMP Mon Jul 21 17:50:35 UTC 2025 x86_64
User : nandedex ( 1060)
PHP Version : 8.1.33
Disable Function : NONE
Directory :  /opt/cppython/lib/python3.8/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]


Current File : //opt/cppython/lib/python3.8/test/__pycache__/test_telnetlib.cpython-38.opt-1.pyc
U

>��g�2�@s,ddlZddlZddlZddlZddlZddlmZddlZejZdd�Z	Gdd�dej
�ZGdd�de�Z
Gd	d
�d
ej�ZGdd�dej�Zejd
d��Zdefdd�ZGdd�dej
�ZGdd�de�ZGdd�de�ZeZGdd�dej
�ZGdd�dej
�ZGdd�de�Zedk�r(e��dS)�N)�supportcCsT|��|��z4z|��\}}|��Wntjk
r@YnXW5|��XdS�N)�listen�set�close�accept�socket�timeout)�evtZserv�conn�addr�r
�2/opt/cppython/lib/python3.8/test/test_telnetlib.py�servers
rc@sTeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�ZdS)�GeneralTestscCsrt��|_t�tjtj�|_|j�d�t�	|j�|_
tjt|j|jfd�|_
|j
�d�|j
��|j��dS)N�<)�target�argsT)�	threading�Eventr
r�AF_INET�SOCK_STREAM�sock�
settimeoutrZ	bind_port�port�Threadr�thread�	setDaemon�start�wait��selfr
r
r�setUps

zGeneralTests.setUpcCs|j��|`dSr)r�joinr r
r
r�tearDown#s
zGeneralTests.tearDowncCst�t|j�}|j��dSr)�	telnetlib�Telnet�HOSTrrr�r!�telnetr
r
r�	testBasic'szGeneralTests.testBasicc	Cs:t�t|j��}|�|���W5QRX|�|���dSr)r%r&r'rZassertIsNotNone�
get_socketZassertIsNone)r!�tnr
r
r�testContextManager,szGeneralTests.testContextManagerc	Cs\|�t��dk�t�d�zt�t|j�}W5t�d�X|�|j	�
�d�|j	��dS)N�)�
assertTruer�getdefaulttimeout�setdefaulttimeoutr%r&r'r�assertEqualr�
gettimeoutrr(r
r
r�testTimeoutDefault1s
zGeneralTests.testTimeoutDefaultc	Csb|�t��dk�t�d�ztjt|jdd�}W5t�d�X|�|j�	�dk�|j�
�dS�Nr.)r	)r/rr0r1r%r&r'rrr3rr(r
r
r�testTimeoutNone;s
zGeneralTests.testTimeoutNonecCs2tjt|jdd�}|�|j��d�|j��dSr5)r%r&r'rr2rr3rr(r
r
r�testTimeoutValueFszGeneralTests.testTimeoutValuecCs:t��}|jt|jdd�|�|j��d�|j��dSr5)	r%r&�openr'rr2rr3rr(r
r
r�testTimeoutOpenKszGeneralTests.testTimeoutOpencCsJtjt|jdd�}|j}|�|��|�|�|��|���|j��dSr5)	r%r&r'rrr2r+�filenor)r!r)Zt_sockr
r
r�testGettersQs
zGeneralTests.testGettersN)�__name__�
__module__�__qualname__r"r$r*r-r4r6r7r9r;r
r
r
rrs

rc@s*eZdZdZd
dd�Zdd�Zdd�Zd	S)�
SocketStubz* a socket proxy that re-defines sendall() r
cCst|�|_g|_d|_dS)NF)�list�reads�writes�block)r!rAr
r
r�__init__[s
zSocketStub.__init__cCs|j�|�dSr)rB�append)r!�datar
r
r�sendall_szSocketStub.sendallcCsZd}|jr(t|�|kr(||j�d�7}qt|�|krV|j�d||d��|d|�}|S)N�r)rA�len�pop�insert)r!�size�outr
r
r�recvaszSocketStub.recvN)r
)r<r=r>�__doc__rDrGrNr
r
r
rr?Ys
r?c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�TelnetAlikecCs
t��dSr)�NotImplementedErrorr r
r
rr:kszTelnetAlike.filenocCsdSrr
r r
r
rrmrHzTelnetAlike.closecCs
|jjSr)rrCr r
r
r�
sock_availnszTelnetAlike.sock_availc	Gs>t���}tjj||f|��W5QRX|j|��7_dSr)rZcaptured_stdoutr%r&�msg�	_messages�getvalue)r!rSrrMr
r
rrSps
zTelnetAlike.msgN)r<r=r>r:rrRrSr
r
r
rrPjsrPc@sDeZdZdd�Zedd��Zddd�Zdd	�Zdd
d�Zdd
�Z	dS)�MockSelectorcCs
i|_dSr��keysr r
r
rrDxszMockSelector.__init__cCsdS)Ng����MbP?r
r r
r
r�
resolution{szMockSelector.resolutionNcCst�|d||�}||j|<|S)Nr)�	selectors�SelectorKeyrX)r!�fileobj�eventsrF�keyr
r
r�registers
zMockSelector.registercCs|j�|�Sr)rXrJ)r!r\r
r
r�
unregister�szMockSelector.unregistercCsFd}|jD]}t|t�r
|jj}q&q
|r.gSdd�|j��D�SdS)NFcSsg|]}||jf�qSr
)r])�.0r^r
r
r�
<listcomp>�sz'MockSelector.select.<locals>.<listcomp>)rX�
isinstancerPrrC�values)r!r	rCr\r
r
r�select�s

zMockSelector.selectcCs|jSrrWr r
r
r�get_map�szMockSelector.get_map)N)N)
r<r=r>rD�propertyrYr_r`rerfr
r
r
rrVvs


rVc#s0�fdd�}ztj}|t_dVW5|t_XdS)Ncst��Sr)r?)Zignored�rAr
r�new_conn�sztest_socket.<locals>.new_conn)r�create_connection)rAriZold_connr
rhr�test_socket�s
rkr
c	Cs2|D]}qt|��|dd�}d|_W5QRX|S)za return a telnetlib.Telnet object that uses a SocketStub with
        reads queued up to be read �dummyr�)rkrT)rA�cls�xr)r
r
r�test_telnet�s

rpc@seZdZdd�Zdd�ZdS)�ExpectAndReadTestCasecCstj|_tt_dSr)r%�_TelnetSelector�old_selectorrVr r
r
rr"�szExpectAndReadTestCase.setUpcCs|jt_dSr)rsr%rrr r
r
rr$�szExpectAndReadTestCase.tearDownN)r<r=r>r"r$r
r
r
rrq�srqc@sDeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dS)�	ReadTestscCstdg}t|�}|�d�}|j|d|j|j|jjfd�dddg}d�|dd	��}t|�}|�d�}|�||�dS)
zc
        read_until(expected, timeout=None)
        test the blocking version of read_util
        sxxxmatchyyy�matchsxxxmatch)rSs2xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxs2yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyrHN���)rpZ
read_untilr2ZcookedqZrawqrrAr#)r!�wantr)rFrA�expectr
r
r�test_read_until�s


zReadTests.test_read_untilcCs4dddg}d�|�}t|�}|��}|�||�dS)zJ
        read_all()
          Read all data until EOF; may block.
        ��xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxs�yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyys�zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzrHN)r#rp�read_allr2)r!rArxr)rFr
r
r�
test_read_all�s

zReadTests.test_read_allcCsBtdg�}|��}|�t|�dk�t�}|��}|�d|�dS)zQ
        read_some()
          Read at least one byte or EOF; may block.
        rz�rHN)rpZ	read_somer/rIr2)r!r)rFr
r
r�test_read_some�s
zReadTests.test_read_somecCstd}t|g�}t||�}d|j_|�d|��d|j_d}z||�7}Wq:tk
r`YqdYq:Xq:|�||�dS)z
        read_*_eager()
          Read all data available already queued or on the socket,
          without blocking.
        �dxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxTrHFN)rp�getattrrrCr2�EOFError)r!�	func_namerwr)�funcrFr
r
r�_read_eager�s

zReadTests._read_eagercCs|�d�|�d�dS)NZ
read_eagerZread_very_eager)r�r r
r
r�test_read_eager�s
zReadTests.test_read_eagercCsVd}t|g�}|�d|���|jjr0|��q|��}|�||�|�t|j�dS�NrrH)rpr2�read_very_lazyrrA�	fill_rawqZassertRaisesr�)r!rwr)rFr
r
rr��s

zReadTests.read_very_lazycCs~d}t|g�}|�d|���d}z |��}||7}|s@|��Wntk
rZYqnYnX|�|�|��q"|�||�dSr�)rpr2Z	read_lazyr�r�r/�
startswith)r!rwr)rFZ	read_datar
r
r�test_read_lazys

zReadTests.test_read_lazyN)
r<r=r>ryr|r~r�r�r�r�r
r
r
rrt�s
rtc@seZdZddd�Zdd�ZdS)�nego_collectorNcCsd|_||_d|_dS)NrH)�seen�	sb_getter�sb_seen)r!r�r
r
rrDsznego_collector.__init__cCs<|j||7_|tjkr8|jr8|��}|j|7_dSr)r��tl�SEr�r�)r!r�cmd�optZsb_datar
r
r�do_negosznego_collector.do_nego)N)r<r=r>rDr�r
r
r
rr�s
r�c@seZdZdZdd�ZdS)�
WriteTestszKThe only thing that write does is replace each tl.IAC for
    tl.IAC+tl.IACcCszddtjddtjtjdtjtjdg}|D]@}t�}|�|�d�|jj�}|�|�tjtjtj�|�q4dS)Nsdata sample without IACsdata sample withs one IACsa fews iacsrH)	r��IACrp�writer#rrBr2�replace)r!Zdata_samplerFr)Zwrittenr
r
r�
test_write's�
zWriteTests.test_writeN)r<r=r>rOr�r
r
r
rr�#sr�c@s`eZdZejejejejejej	ej
ejgZdd�Z
dd�Zdd�Zdd�Zd	d
�Zdd�Zd
S)�OptionTestscCs�t|�}td�|��}t�}|�|j�|��}|j}|�t|�dk�|�	|dd�|j
�|�|dd�tj
�|�|t||��d|_dS)z helper for testing IAC + cmd rHrNr}�)rprIr#r��set_option_negotiation_callbackr�r{r�r/�assertIn�cmdsr2r�ZNOOPTr�)r!rFr)Zdata_len�nego�txtr�r
r
r�
_test_command7szOptionTests._test_commandcCs^|jD]<}|�tj|g�|�dtj|dg�|�dtj|dg�q|�dd�|jD��dS)Nrsdyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy�
xxxxxxxxxx�
yyyyyyyyyycSsg|]}tj|�qSr
)r�r�)rar�r
r
rrbKsz1OptionTests.test_IAC_commands.<locals>.<listcomp>)r�r�r�r�)r!r�r
r
r�test_IAC_commandsEs

zOptionTests.test_IAC_commandscCs0tjtjtjtjtjtjtjtjtjtjtjtjtjtjdtjtjtjtjdtjtjtjtjtjtjdtjtjdtjtjg}t|�}t|j�}|�|j�|�	�}|�
|d�tjtjdtjdtjd}|�
|j|�|�
d|���d|_dS)NsaasbbsccsddrHsaabb)
r�r�ZSBr�rpr�Zread_sb_datar�r�r{r2r�r�)r!�sendr)r�r�Zwant_sb_datar
r
r�test_SB_commandsMs"&&*�
$zOptionTests.test_SB_commandscCs�dtjtdg�dftjtjtdg�dftjtjtdg�dftjtjtdg�dftjtjtdg�dfg}|D]2\}}t|g�}|�d�|�	�}|�
||j�q|dS)	N)�az: recv b''
�Xz: IAC 88 not recognized
r}z: IAC DO 1
z
: IAC DONT 1
z
: IAC WILL 1
z
: IAC WONT 1
)r�r��bytesZDOZDONTZWILLZWONTrp�set_debuglevelr{r�rT)r!Zgiven_a_expect_b�a�br)r�r
r
r�test_debuglevel_reads_s�


z!OptionTests.test_debuglevel_readscCs0t�}|�d�|�d�d}|�||j�dS)Nr}sxxxzsend b'xxx'
)rpr�r�r�rT)r!r)Zexpectedr
r
r�test_debuglevel_writers


z!OptionTests.test_debuglevel_writec	CsJtg��tdd�}d|_W5QRX|�d�|�d�|�|jd�dS)Nrl�0rmr}�testz0.*test)rkrPrTr�rSZassertRegexr(r
r
r�test_debug_accepts_str_portys



z'OptionTests.test_debug_accepts_str_portN)r<r=r>r�ZAOZAYTZBRKZECZELZGAZIPZNOPr�r�r�r�r�r�r�r
r
r
rr�3s$r�c@seZdZdd�ZdS)�ExpectTestscCsBdddg}t|�}|�dg�\}}}|�|d�|dd���dS)z�
        expect(expected, [timeout])
          Read until the expected string has been seen, or a timeout is
          hit (default is no timeout); may block.
        r�rur�rHNrv)rprxr2r#)r!rwr)�_rFr
r
r�test_expect�s
zExpectTests.test_expectN)r<r=r>r�r
r
r
rr��sr��__main__)rrZr%r�
contextlibr�rZunittestr'rZTestCaser�objectr?r&rP�BaseSelectorrV�contextmanagerrkrprqrtr�r�r�r�r�r<�mainr
r
r
r�<module>s0B 

bP


F1le Man4ger