003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/salt/ext/tornado/test
usr
/
local
/
lib
/
python3.8
/
site-packages
/
salt
/
ext
/
tornado
/
test
/
📁
..
📄
__init__.py
(20 B)
📄
__main__.py
(430 B)
📁
__pycache__
📄
asyncio_test.py
(4.78 KB)
📄
auth_test.py
(22.19 KB)
📄
concurrent_test.py
(13.66 KB)
📄
curl_httpclient_test.py
(4.83 KB)
📄
escape_test.py
(10.9 KB)
📄
gen_test.py
(46.36 KB)
📄
http1connection_test.py
(2.02 KB)
📄
httpclient_test.py
(26.79 KB)
📄
httpserver_test.py
(41.43 KB)
📄
httputil_test.py
(17.17 KB)
📄
import_test.py
(1.73 KB)
📄
ioloop_test.py
(24.55 KB)
📄
iostream_test.py
(42.53 KB)
📄
locale_test.py
(5.96 KB)
📄
locks_test.py
(15.74 KB)
📄
log_test.py
(9.51 KB)
📄
netutil_test.py
(7.6 KB)
📄
options_test.py
(10.83 KB)
📄
process_test.py
(11.44 KB)
📄
queues_test.py
(12.79 KB)
📄
resolve_test_helper.py
(514 B)
📄
routing_test.py
(7.48 KB)
📄
runtests.py
(7.59 KB)
📄
simple_httpclient_test.py
(30.33 KB)
📄
stack_context_test.py
(10.9 KB)
📄
tcpclient_test.py
(11.31 KB)
📄
tcpserver_test.py
(2.32 KB)
📄
template_test.py
(18.63 KB)
📄
testing_test.py
(8.57 KB)
📄
twisted_test.py
(27.33 KB)
📄
util.py
(4.65 KB)
📄
util_test.py
(7.29 KB)
📄
web_test.py
(111.58 KB)
📄
websocket_test.py
(22.37 KB)
📄
windows_test.py
(877 B)
📄
wsgi_test.py
(3.71 KB)
Editing: tcpclient_test.py
#!/usr/bin/env python # # Copyright 2014 Facebook # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # pylint: skip-file from __future__ import absolute_import, division, print_function from contextlib import closing import os import socket from salt.ext.tornado.concurrent import Future from salt.ext.tornado.netutil import bind_sockets, Resolver from salt.ext.tornado.queues import Queue from salt.ext.tornado.tcpclient import TCPClient, _Connector from salt.ext.tornado.tcpserver import TCPServer from salt.ext.tornado.testing import AsyncTestCase, gen_test from salt.ext.tornado.test.util import skipIfNoIPv6, unittest, refusing_port, skipIfNonUnix # Fake address families for testing. Used in place of AF_INET # and AF_INET6 because some installations do not have AF_INET6. AF1, AF2 = 1, 2 class TestTCPServer(TCPServer): def __init__(self, family): super(TestTCPServer, self).__init__() self.streams = [] self.queue = Queue() sockets = bind_sockets(None, 'localhost', family) self.add_sockets(sockets) self.port = sockets[0].getsockname()[1] def handle_stream(self, stream, address): self.streams.append(stream) self.queue.put(stream) def stop(self): super(TestTCPServer, self).stop() for stream in self.streams: stream.close() class TCPClientTest(AsyncTestCase): def setUp(self): super(TCPClientTest, self).setUp() self.server = None self.client = TCPClient() def start_server(self, family): if family == socket.AF_UNSPEC and 'TRAVIS' in os.environ: self.skipTest("dual-stack servers often have port conflicts on travis") self.server = TestTCPServer(family) return self.server.port def stop_server(self): if self.server is not None: self.server.stop() self.server = None def tearDown(self): self.client.close() self.stop_server() super(TCPClientTest, self).tearDown() def skipIfLocalhostV4(self): # The port used here doesn't matter, but some systems require it # to be non-zero if we do not also pass AI_PASSIVE. Resolver().resolve('localhost', 80, callback=self.stop) addrinfo = self.wait() families = set(addr[0] for addr in addrinfo) if socket.AF_INET6 not in families: self.skipTest("localhost does not resolve to ipv6") @gen_test def do_test_connect(self, family, host, source_ip=None, source_port=None): port = self.start_server(family) stream = yield self.client.connect(host, port, source_ip=source_ip, source_port=source_port) server_stream = yield self.server.queue.get() with closing(stream): stream.write(b"hello") data = yield server_stream.read_bytes(5) self.assertEqual(data, b"hello") def test_connect_ipv4_ipv4(self): self.do_test_connect(socket.AF_INET, '127.0.0.1') def test_connect_ipv4_dual(self): self.do_test_connect(socket.AF_INET, 'localhost') @skipIfNoIPv6 def test_connect_ipv6_ipv6(self): self.skipIfLocalhostV4() self.do_test_connect(socket.AF_INET6, '::1') @skipIfNoIPv6 def test_connect_ipv6_dual(self): self.skipIfLocalhostV4() if Resolver.configured_class().__name__.endswith('TwistedResolver'): self.skipTest('TwistedResolver does not support multiple addresses') self.do_test_connect(socket.AF_INET6, 'localhost') def test_connect_unspec_ipv4(self): self.do_test_connect(socket.AF_UNSPEC, '127.0.0.1') @skipIfNoIPv6 def test_connect_unspec_ipv6(self): self.skipIfLocalhostV4() self.do_test_connect(socket.AF_UNSPEC, '::1') def test_connect_unspec_dual(self): self.do_test_connect(socket.AF_UNSPEC, 'localhost') @gen_test def test_refused_ipv4(self): cleanup_func, port = refusing_port() self.addCleanup(cleanup_func) with self.assertRaises(IOError): yield self.client.connect('127.0.0.1', port) def test_source_ip_fail(self): ''' Fail when trying to use the source IP Address '8.8.8.8'. ''' self.assertRaises(socket.error, self.do_test_connect, socket.AF_INET, '127.0.0.1', source_ip='8.8.8.8') def test_source_ip_success(self): ''' Success when trying to use the source IP Address '127.0.0.1' ''' self.do_test_connect(socket.AF_INET, '127.0.0.1', source_ip='127.0.0.1') @skipIfNonUnix def test_source_port_fail(self): ''' Fail when trying to use source port 1. ''' self.assertRaises(socket.error, self.do_test_connect, socket.AF_INET, '127.0.0.1', source_port=1) class TestConnectorSplit(unittest.TestCase): def test_one_family(self): # These addresses aren't in the right format, but split doesn't care. primary, secondary = _Connector.split( [(AF1, 'a'), (AF1, 'b')]) self.assertEqual(primary, [(AF1, 'a'), (AF1, 'b')]) self.assertEqual(secondary, []) def test_mixed(self): primary, secondary = _Connector.split( [(AF1, 'a'), (AF2, 'b'), (AF1, 'c'), (AF2, 'd')]) self.assertEqual(primary, [(AF1, 'a'), (AF1, 'c')]) self.assertEqual(secondary, [(AF2, 'b'), (AF2, 'd')]) class ConnectorTest(AsyncTestCase): class FakeStream(object): def __init__(self): self.closed = False def close(self): self.closed = True def setUp(self): super(ConnectorTest, self).setUp() self.connect_futures = {} self.streams = {} self.addrinfo = [(AF1, 'a'), (AF1, 'b'), (AF2, 'c'), (AF2, 'd')] def tearDown(self): # Unless explicitly checked (and popped) in the test, we shouldn't # be closing any streams for stream in self.streams.values(): self.assertFalse(stream.closed) super(ConnectorTest, self).tearDown() def create_stream(self, af, addr): future = Future() self.connect_futures[(af, addr)] = future return future def assert_pending(self, *keys): self.assertEqual(sorted(self.connect_futures.keys()), sorted(keys)) def resolve_connect(self, af, addr, success): future = self.connect_futures.pop((af, addr)) if success: self.streams[addr] = ConnectorTest.FakeStream() future.set_result(self.streams[addr]) else: future.set_exception(IOError()) def start_connect(self, addrinfo): conn = _Connector(addrinfo, self.io_loop, self.create_stream) # Give it a huge timeout; we'll trigger timeouts manually. future = conn.start(3600) return conn, future def test_immediate_success(self): conn, future = self.start_connect(self.addrinfo) self.assertEqual(list(self.connect_futures.keys()), [(AF1, 'a')]) self.resolve_connect(AF1, 'a', True) self.assertEqual(future.result(), (AF1, 'a', self.streams['a'])) def test_immediate_failure(self): # Fail with just one address. conn, future = self.start_connect([(AF1, 'a')]) self.assert_pending((AF1, 'a')) self.resolve_connect(AF1, 'a', False) self.assertRaises(IOError, future.result) def test_one_family_second_try(self): conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')]) self.assert_pending((AF1, 'a')) self.resolve_connect(AF1, 'a', False) self.assert_pending((AF1, 'b')) self.resolve_connect(AF1, 'b', True) self.assertEqual(future.result(), (AF1, 'b', self.streams['b'])) def test_one_family_second_try_failure(self): conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')]) self.assert_pending((AF1, 'a')) self.resolve_connect(AF1, 'a', False) self.assert_pending((AF1, 'b')) self.resolve_connect(AF1, 'b', False) self.assertRaises(IOError, future.result) def test_one_family_second_try_timeout(self): conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')]) self.assert_pending((AF1, 'a')) # trigger the timeout while the first lookup is pending; # nothing happens. conn.on_timeout() self.assert_pending((AF1, 'a')) self.resolve_connect(AF1, 'a', False) self.assert_pending((AF1, 'b')) self.resolve_connect(AF1, 'b', True) self.assertEqual(future.result(), (AF1, 'b', self.streams['b'])) def test_two_families_immediate_failure(self): conn, future = self.start_connect(self.addrinfo) self.assert_pending((AF1, 'a')) self.resolve_connect(AF1, 'a', False) self.assert_pending((AF1, 'b'), (AF2, 'c')) self.resolve_connect(AF1, 'b', False) self.resolve_connect(AF2, 'c', True) self.assertEqual(future.result(), (AF2, 'c', self.streams['c'])) def test_two_families_timeout(self): conn, future = self.start_connect(self.addrinfo) self.assert_pending((AF1, 'a')) conn.on_timeout() self.assert_pending((AF1, 'a'), (AF2, 'c')) self.resolve_connect(AF2, 'c', True) self.assertEqual(future.result(), (AF2, 'c', self.streams['c'])) # resolving 'a' after the connection has completed doesn't start 'b' self.resolve_connect(AF1, 'a', False) self.assert_pending() def test_success_after_timeout(self): conn, future = self.start_connect(self.addrinfo) self.assert_pending((AF1, 'a')) conn.on_timeout() self.assert_pending((AF1, 'a'), (AF2, 'c')) self.resolve_connect(AF1, 'a', True) self.assertEqual(future.result(), (AF1, 'a', self.streams['a'])) # resolving 'c' after completion closes the connection. self.resolve_connect(AF2, 'c', True) self.assertTrue(self.streams.pop('c').closed) def test_all_fail(self): conn, future = self.start_connect(self.addrinfo) self.assert_pending((AF1, 'a')) conn.on_timeout() self.assert_pending((AF1, 'a'), (AF2, 'c')) self.resolve_connect(AF2, 'c', False) self.assert_pending((AF1, 'a'), (AF2, 'd')) self.resolve_connect(AF2, 'd', False) # one queue is now empty self.assert_pending((AF1, 'a')) self.resolve_connect(AF1, 'a', False) self.assert_pending((AF1, 'b')) self.assertFalse(future.done()) self.resolve_connect(AF1, 'b', False) self.assertRaises(IOError, future.result)
Upload File
Create Folder