003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/zmq/tests
usr
/
local
/
lib
/
python3.8
/
site-packages
/
zmq
/
tests
/
📁
..
📄
__init__.py
(7.75 KB)
📁
__pycache__
📄
conftest.py
(40 B)
📄
test_asyncio.py
(15.57 KB)
📄
test_auth.py
(20.72 KB)
📄
test_cffi_backend.py
(8.79 KB)
📄
test_constants.py
(4.49 KB)
📄
test_context.py
(12.1 KB)
📄
test_cython.py
(1.03 KB)
📄
test_decorators.py
(9.4 KB)
📄
test_device.py
(6.02 KB)
📄
test_draft.py
(1.42 KB)
📄
test_error.py
(1.17 KB)
📄
test_etc.py
(545 B)
📄
test_future.py
(10.62 KB)
📄
test_imports.py
(1.76 KB)
📄
test_includes.py
(997 B)
📄
test_ioloop.py
(3.84 KB)
📄
test_log.py
(6.51 KB)
📄
test_message.py
(11.68 KB)
📄
test_monitor.py
(3 KB)
📄
test_monqueue.py
(7.97 KB)
📄
test_multipart.py
(941 B)
📄
test_mypy.py
(1.57 KB)
📄
test_pair.py
(1.24 KB)
📄
test_poll.py
(7.17 KB)
📄
test_proxy_steerable.py
(3.7 KB)
📄
test_pubsub.py
(1.07 KB)
📄
test_reqrep.py
(1.8 KB)
📄
test_retry_eintr.py
(2.88 KB)
📄
test_security.py
(8 KB)
📄
test_socket.py
(23.09 KB)
📄
test_ssh.py
(235 B)
📄
test_version.py
(1.3 KB)
📄
test_win32_shim.py
(1.68 KB)
📄
test_z85.py
(2.15 KB)
📄
test_zmqstream.py
(2.35 KB)
Editing: test_reqrep.py
# Copyright (C) PyZMQ Developers # Distributed under the terms of the Modified BSD License. from unittest import TestCase import zmq from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest class TestReqRep(BaseZMQTestCase): def test_basic(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) msg1 = b'message 1' msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2) def test_multiple(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) for i in range(10): msg1 = i * b' ' msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2) def test_bad_send_recv(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) if zmq.zmq_version() != '2.1.8': # this doesn't work on 2.1.8 for copy in (True, False): self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy) self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy) # I have to have this or we die on an Abort trap. msg1 = b'asdf' msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2) def test_json(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) o = dict(a=10, b=list(range(10))) o2 = self.ping_pong_json(s1, s2, o) def test_pyobj(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) o = dict(a=10, b=range(10)) o2 = self.ping_pong_pyobj(s1, s2, o) def test_large_msg(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) msg1 = 10000 * b'X' for i in range(10): msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2) if have_gevent: class TestReqRepGreen(GreenTest, TestReqRep): pass
Upload File
Create Folder