003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/zmq/tests
usr
/
local
/
lib
/
python3.8
/
site-packages
/
zmq
/
tests
/
📁
..
📄
__init__.py
(7.75 KB)
📁
__pycache__
📄
conftest.py
(40 B)
📄
test_asyncio.py
(15.57 KB)
📄
test_auth.py
(20.72 KB)
📄
test_cffi_backend.py
(8.79 KB)
📄
test_constants.py
(4.49 KB)
📄
test_context.py
(12.1 KB)
📄
test_cython.py
(1.03 KB)
📄
test_decorators.py
(9.4 KB)
📄
test_device.py
(6.02 KB)
📄
test_draft.py
(1.42 KB)
📄
test_error.py
(1.17 KB)
📄
test_etc.py
(545 B)
📄
test_future.py
(10.62 KB)
📄
test_imports.py
(1.76 KB)
📄
test_includes.py
(997 B)
📄
test_ioloop.py
(3.84 KB)
📄
test_log.py
(6.51 KB)
📄
test_message.py
(11.68 KB)
📄
test_monitor.py
(3 KB)
📄
test_monqueue.py
(7.97 KB)
📄
test_multipart.py
(941 B)
📄
test_mypy.py
(1.57 KB)
📄
test_pair.py
(1.24 KB)
📄
test_poll.py
(7.17 KB)
📄
test_proxy_steerable.py
(3.7 KB)
📄
test_pubsub.py
(1.07 KB)
📄
test_reqrep.py
(1.8 KB)
📄
test_retry_eintr.py
(2.88 KB)
📄
test_security.py
(8 KB)
📄
test_socket.py
(23.09 KB)
📄
test_ssh.py
(235 B)
📄
test_version.py
(1.3 KB)
📄
test_win32_shim.py
(1.68 KB)
📄
test_z85.py
(2.15 KB)
📄
test_zmqstream.py
(2.35 KB)
Editing: test_poll.py
# Copyright (C) PyZMQ Developers # Distributed under the terms of the Modified BSD License. import os import sys import time from pytest import mark import zmq from zmq.tests import PollZMQTestCase, have_gevent, GreenTest def wait(): time.sleep(0.25) class TestPoll(PollZMQTestCase): Poller = zmq.Poller def test_pair(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # Sleep to allow sockets to connect. wait() poller = self.Poller() poller.register(s1, zmq.POLLIN | zmq.POLLOUT) poller.register(s2, zmq.POLLIN | zmq.POLLOUT) # Poll result should contain both sockets socks = dict(poller.poll()) # Now make sure that both are send ready. self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(socks[s2], zmq.POLLOUT) # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN s1.send(b'msg1') s2.send(b'msg2') wait() socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT | zmq.POLLIN) self.assertEqual(socks[s2], zmq.POLLOUT | zmq.POLLIN) # Make sure that both are in POLLOUT after recv. s1.recv() s2.recv() socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(socks[s2], zmq.POLLOUT) poller.unregister(s1) poller.unregister(s2) def test_reqrep(self): s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ) # Sleep to allow sockets to connect. wait() poller = self.Poller() poller.register(s1, zmq.POLLIN | zmq.POLLOUT) poller.register(s2, zmq.POLLIN | zmq.POLLOUT) # Make sure that s1 is in state 0 and s2 is in POLLOUT socks = dict(poller.poll()) self.assertEqual(s1 in socks, 0) self.assertEqual(socks[s2], zmq.POLLOUT) # Make sure that s2 goes immediately into state 0 after send. s2.send(b'msg1') socks = dict(poller.poll()) self.assertEqual(s2 in socks, 0) # Make sure that s1 goes into POLLIN state after a time.sleep(). time.sleep(0.5) socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLIN) # Make sure that s1 goes into POLLOUT after recv. s1.recv() socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) # Make sure s1 goes into state 0 after send. s1.send(b'msg2') socks = dict(poller.poll()) self.assertEqual(s1 in socks, 0) # Wait and then see that s2 is in POLLIN. time.sleep(0.5) socks = dict(poller.poll()) self.assertEqual(socks[s2], zmq.POLLIN) # Make sure that s2 is in POLLOUT after recv. s2.recv() socks = dict(poller.poll()) self.assertEqual(socks[s2], zmq.POLLOUT) poller.unregister(s1) poller.unregister(s2) def test_no_events(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s1, zmq.POLLIN | zmq.POLLOUT) poller.register(s2, 0) self.assertTrue(s1 in poller) self.assertFalse(s2 in poller) poller.register(s1, 0) self.assertFalse(s1 in poller) def test_pubsub(self): s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB) s2.setsockopt(zmq.SUBSCRIBE, b'') # Sleep to allow sockets to connect. wait() poller = self.Poller() poller.register(s1, zmq.POLLIN | zmq.POLLOUT) poller.register(s2, zmq.POLLIN) # Now make sure that both are send ready. socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(s2 in socks, 0) # Make sure that s1 stays in POLLOUT after a send. s1.send(b'msg1') socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) # Make sure that s2 is POLLIN after waiting. wait() socks = dict(poller.poll()) self.assertEqual(socks[s2], zmq.POLLIN) # Make sure that s2 goes into 0 after recv. s2.recv() socks = dict(poller.poll()) self.assertEqual(s2 in socks, 0) poller.unregister(s1) poller.unregister(s2) @mark.skipif(sys.platform.startswith('win'), reason='Windows') def test_raw(self): r, w = os.pipe() r = os.fdopen(r, 'rb') w = os.fdopen(w, 'wb') p = self.Poller() p.register(r, zmq.POLLIN) socks = dict(p.poll(1)) assert socks == {} w.write(b'x') w.flush() socks = dict(p.poll(1)) assert socks == {r.fileno(): zmq.POLLIN} w.close() r.close() def test_timeout(self): """make sure Poller.poll timeout has the right units (milliseconds).""" s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s1, zmq.POLLIN) tic = time.time() evt = poller.poll(0.005) toc = time.time() self.assertTrue(toc - tic < 0.1) tic = time.time() evt = poller.poll(5) toc = time.time() self.assertTrue(toc - tic < 0.1) self.assertTrue(toc - tic > 0.001) tic = time.time() evt = poller.poll(500) toc = time.time() self.assertTrue(toc - tic < 1) self.assertTrue(toc - tic > 0.1) class TestSelect(PollZMQTestCase): def test_pair(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # Sleep to allow sockets to connect. wait() rlist, wlist, xlist = zmq.select([s1, s2], [s1, s2], [s1, s2]) self.assertTrue(s1 in wlist) self.assertTrue(s2 in wlist) self.assertTrue(s1 not in rlist) self.assertTrue(s2 not in rlist) @mark.flaky(reruns=3) def test_timeout(self): """make sure select timeout has the right units (seconds).""" s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) tic = time.time() r, w, x = zmq.select([s1, s2], [], [], 0.005) toc = time.time() self.assertTrue(toc - tic < 1) self.assertTrue(toc - tic > 0.001) tic = time.time() r, w, x = zmq.select([s1, s2], [], [], 0.25) toc = time.time() self.assertTrue(toc - tic < 1) self.assertTrue(toc - tic > 0.1) if have_gevent: import gevent from zmq import green as gzmq class TestPollGreen(GreenTest, TestPoll): Poller = gzmq.Poller def test_wakeup(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s2, zmq.POLLIN) tic = time.time() r = gevent.spawn(lambda: poller.poll(10000)) s = gevent.spawn(lambda: s1.send(b'msg1')) r.join() toc = time.time() self.assertTrue(toc - tic < 1) def test_socket_poll(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) tic = time.time() r = gevent.spawn(lambda: s2.poll(10000)) s = gevent.spawn(lambda: s1.send(b'msg1')) r.join() toc = time.time() self.assertTrue(toc - tic < 1)
Upload File
Create Folder