003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/zmq/tests
usr
/
local
/
lib
/
python3.8
/
site-packages
/
zmq
/
tests
/
📁
..
📄
__init__.py
(7.75 KB)
📁
__pycache__
📄
conftest.py
(40 B)
📄
test_asyncio.py
(15.57 KB)
📄
test_auth.py
(20.72 KB)
📄
test_cffi_backend.py
(8.79 KB)
📄
test_constants.py
(4.49 KB)
📄
test_context.py
(12.1 KB)
📄
test_cython.py
(1.03 KB)
📄
test_decorators.py
(9.4 KB)
📄
test_device.py
(6.02 KB)
📄
test_draft.py
(1.42 KB)
📄
test_error.py
(1.17 KB)
📄
test_etc.py
(545 B)
📄
test_future.py
(10.62 KB)
📄
test_imports.py
(1.76 KB)
📄
test_includes.py
(997 B)
📄
test_ioloop.py
(3.84 KB)
📄
test_log.py
(6.51 KB)
📄
test_message.py
(11.68 KB)
📄
test_monitor.py
(3 KB)
📄
test_monqueue.py
(7.97 KB)
📄
test_multipart.py
(941 B)
📄
test_mypy.py
(1.57 KB)
📄
test_pair.py
(1.24 KB)
📄
test_poll.py
(7.17 KB)
📄
test_proxy_steerable.py
(3.7 KB)
📄
test_pubsub.py
(1.07 KB)
📄
test_reqrep.py
(1.8 KB)
📄
test_retry_eintr.py
(2.88 KB)
📄
test_security.py
(8 KB)
📄
test_socket.py
(23.09 KB)
📄
test_ssh.py
(235 B)
📄
test_version.py
(1.3 KB)
📄
test_win32_shim.py
(1.68 KB)
📄
test_z85.py
(2.15 KB)
📄
test_zmqstream.py
(2.35 KB)
Editing: test_proxy_steerable.py
# Copyright (C) PyZMQ Developers # Distributed under the terms of the Modified BSD License. import time import struct import zmq from zmq import devices from zmq.tests import BaseZMQTestCase, SkipTest, PYPY if PYPY: # cleanup of shared Context doesn't work on PyPy devices.Device.context_factory = zmq.Context class TestProxySteerable(BaseZMQTestCase): def test_proxy_steerable(self): if zmq.zmq_version_info() < (4, 1): raise SkipTest("Steerable Proxies only in libzmq >= 4.1") dev = devices.ThreadProxySteerable(zmq.PULL, zmq.PUSH, zmq.PUSH, zmq.PAIR) iface = 'tcp://127.0.0.1' port = dev.bind_in_to_random_port(iface) port2 = dev.bind_out_to_random_port(iface) port3 = dev.bind_mon_to_random_port(iface) port4 = dev.bind_ctrl_to_random_port(iface) dev.start() time.sleep(0.25) msg = b'hello' push = self.context.socket(zmq.PUSH) push.connect("%s:%i" % (iface, port)) pull = self.context.socket(zmq.PULL) pull.connect("%s:%i" % (iface, port2)) mon = self.context.socket(zmq.PULL) mon.connect("%s:%i" % (iface, port3)) ctrl = self.context.socket(zmq.PAIR) ctrl.connect("%s:%i" % (iface, port4)) push.send(msg) self.sockets.extend([push, pull, mon, ctrl]) self.assertEqual(msg, self.recv(pull)) self.assertEqual(msg, self.recv(mon)) ctrl.send(b'TERMINATE') dev.join() def test_proxy_steerable_bind_to_random_with_args(self): if zmq.zmq_version_info() < (4, 1): raise SkipTest("Steerable Proxies only in libzmq >= 4.1") dev = devices.ThreadProxySteerable(zmq.PULL, zmq.PUSH, zmq.PUSH, zmq.PAIR) iface = 'tcp://127.0.0.1' ports = [] min, max = 5000, 5050 ports.extend( [ dev.bind_in_to_random_port(iface, min_port=min, max_port=max), dev.bind_out_to_random_port(iface, min_port=min, max_port=max), dev.bind_mon_to_random_port(iface, min_port=min, max_port=max), dev.bind_ctrl_to_random_port(iface, min_port=min, max_port=max), ] ) for port in ports: if port < min or port > max: self.fail('Unexpected port number: %i' % port) def test_proxy_steerable_statistics(self): if zmq.zmq_version_info() < (4, 3): raise SkipTest("STATISTICS only in libzmq >= 4.3") dev = devices.ThreadProxySteerable(zmq.PULL, zmq.PUSH, zmq.PUSH, zmq.PAIR) iface = 'tcp://127.0.0.1' port = dev.bind_in_to_random_port(iface) port2 = dev.bind_out_to_random_port(iface) port3 = dev.bind_mon_to_random_port(iface) port4 = dev.bind_ctrl_to_random_port(iface) dev.start() time.sleep(0.25) msg = b'hello' push = self.context.socket(zmq.PUSH) push.connect("%s:%i" % (iface, port)) pull = self.context.socket(zmq.PULL) pull.connect("%s:%i" % (iface, port2)) mon = self.context.socket(zmq.PULL) mon.connect("%s:%i" % (iface, port3)) ctrl = self.context.socket(zmq.PAIR) ctrl.connect("%s:%i" % (iface, port4)) push.send(msg) self.sockets.extend([push, pull, mon, ctrl]) self.assertEqual(msg, self.recv(pull)) self.assertEqual(msg, self.recv(mon)) ctrl.send(b'STATISTICS') stats = self.recv_multipart(ctrl) stats_int = [struct.unpack("=Q", x)[0] for x in stats] self.assertEqual(1, stats_int[0]) self.assertEqual(len(msg), stats_int[1]) self.assertEqual(1, stats_int[6]) self.assertEqual(len(msg), stats_int[7]) ctrl.send(b'TERMINATE') dev.join()
Upload File
Create Folder