003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/zmq/tests
usr
/
local
/
lib
/
python3.8
/
site-packages
/
zmq
/
tests
/
📁
..
📄
__init__.py
(7.75 KB)
📁
__pycache__
📄
conftest.py
(40 B)
📄
test_asyncio.py
(15.57 KB)
📄
test_auth.py
(20.72 KB)
📄
test_cffi_backend.py
(8.79 KB)
📄
test_constants.py
(4.49 KB)
📄
test_context.py
(12.1 KB)
📄
test_cython.py
(1.03 KB)
📄
test_decorators.py
(9.4 KB)
📄
test_device.py
(6.02 KB)
📄
test_draft.py
(1.42 KB)
📄
test_error.py
(1.17 KB)
📄
test_etc.py
(545 B)
📄
test_future.py
(10.62 KB)
📄
test_imports.py
(1.76 KB)
📄
test_includes.py
(997 B)
📄
test_ioloop.py
(3.84 KB)
📄
test_log.py
(6.51 KB)
📄
test_message.py
(11.68 KB)
📄
test_monitor.py
(3 KB)
📄
test_monqueue.py
(7.97 KB)
📄
test_multipart.py
(941 B)
📄
test_mypy.py
(1.57 KB)
📄
test_pair.py
(1.24 KB)
📄
test_poll.py
(7.17 KB)
📄
test_proxy_steerable.py
(3.7 KB)
📄
test_pubsub.py
(1.07 KB)
📄
test_reqrep.py
(1.8 KB)
📄
test_retry_eintr.py
(2.88 KB)
📄
test_security.py
(8 KB)
📄
test_socket.py
(23.09 KB)
📄
test_ssh.py
(235 B)
📄
test_version.py
(1.3 KB)
📄
test_win32_shim.py
(1.68 KB)
📄
test_z85.py
(2.15 KB)
📄
test_zmqstream.py
(2.35 KB)
Editing: test_zmqstream.py
# -*- coding: utf8 -*- # Copyright (C) PyZMQ Developers # Distributed under the terms of the Modified BSD License. from __future__ import absolute_import import asyncio from unittest import TestCase import pytest import zmq try: import tornado from tornado import gen from zmq.eventloop import ioloop, zmqstream except ImportError: tornado = None # type: ignore class TestZMQStream(TestCase): def setUp(self): if tornado is None: pytest.skip() if asyncio: asyncio.set_event_loop(asyncio.new_event_loop()) self.context = zmq.Context() self.loop = ioloop.IOLoop() self.loop.make_current() self.push = zmqstream.ZMQStream(self.context.socket(zmq.PUSH)) self.pull = zmqstream.ZMQStream(self.context.socket(zmq.PULL)) port = self.push.bind_to_random_port('tcp://127.0.0.1') self.pull.connect('tcp://127.0.0.1:%i' % port) self.stream = self.push def tearDown(self): self.loop.close(all_fds=True) self.context.term() ioloop.IOLoop.clear_current() def run_until_timeout(self, timeout=10): timed_out = [] @gen.coroutine def sleep_timeout(): yield gen.sleep(timeout) timed_out[:] = ['timed out'] self.loop.stop() self.loop.add_callback(lambda: sleep_timeout()) self.loop.start() assert not timed_out def test_callable_check(self): """Ensure callable check works (py3k).""" self.stream.on_send(lambda *args: None) self.stream.on_recv(lambda *args: None) self.assertRaises(AssertionError, self.stream.on_recv, 1) self.assertRaises(AssertionError, self.stream.on_send, 1) self.assertRaises(AssertionError, self.stream.on_recv, zmq) def test_on_recv_basic(self): sent = [b'basic'] def callback(msg): assert msg == sent self.loop.stop() self.loop.add_callback(lambda: self.push.send_multipart(sent)) self.pull.on_recv(callback) self.run_until_timeout() def test_on_recv_wake(self): sent = [b'wake'] def callback(msg): assert msg == sent self.loop.stop() self.pull.on_recv(callback) self.loop.call_later(1, lambda: self.push.send_multipart(sent)) self.run_until_timeout()
Upload File
Create Folder