003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/salt/ext/tornado/test
usr
/
local
/
lib
/
python3.8
/
site-packages
/
salt
/
ext
/
tornado
/
test
/
π
..
π
__init__.py
(20 B)
π
__main__.py
(430 B)
π
__pycache__
π
asyncio_test.py
(4.78 KB)
π
auth_test.py
(22.19 KB)
π
concurrent_test.py
(13.66 KB)
π
curl_httpclient_test.py
(4.83 KB)
π
escape_test.py
(10.9 KB)
π
gen_test.py
(46.36 KB)
π
http1connection_test.py
(2.02 KB)
π
httpclient_test.py
(26.79 KB)
π
httpserver_test.py
(41.43 KB)
π
httputil_test.py
(17.17 KB)
π
import_test.py
(1.73 KB)
π
ioloop_test.py
(24.55 KB)
π
iostream_test.py
(42.53 KB)
π
locale_test.py
(5.96 KB)
π
locks_test.py
(15.74 KB)
π
log_test.py
(9.51 KB)
π
netutil_test.py
(7.6 KB)
π
options_test.py
(10.83 KB)
π
process_test.py
(11.44 KB)
π
queues_test.py
(12.79 KB)
π
resolve_test_helper.py
(514 B)
π
routing_test.py
(7.48 KB)
π
runtests.py
(7.59 KB)
π
simple_httpclient_test.py
(30.33 KB)
π
stack_context_test.py
(10.9 KB)
π
tcpclient_test.py
(11.31 KB)
π
tcpserver_test.py
(2.32 KB)
π
template_test.py
(18.63 KB)
π
testing_test.py
(8.57 KB)
π
twisted_test.py
(27.33 KB)
π
util.py
(4.65 KB)
π
util_test.py
(7.29 KB)
π
web_test.py
(111.58 KB)
π
websocket_test.py
(22.37 KB)
π
windows_test.py
(877 B)
π
wsgi_test.py
(3.71 KB)
Editing: curl_httpclient_test.py
# coding: utf-8 # pylint: skip-file from __future__ import absolute_import, division, print_function from hashlib import md5 from salt.ext.tornado.escape import utf8 from salt.ext.tornado.httpclient import HTTPRequest from salt.ext.tornado.stack_context import ExceptionStackContext from salt.ext.tornado.testing import AsyncHTTPTestCase from salt.ext.tornado.test import httpclient_test from salt.ext.tornado.test.util import unittest from salt.ext.tornado.web import Application, RequestHandler try: import pycurl # type: ignore except ImportError: pycurl = None if pycurl is not None: from salt.ext.tornado.curl_httpclient import CurlAsyncHTTPClient @unittest.skipIf(pycurl is None, "pycurl module not present") class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase): def get_http_client(self): client = CurlAsyncHTTPClient(io_loop=self.io_loop, defaults=dict(allow_ipv6=False)) # make sure AsyncHTTPClient magic doesn't give us the wrong class self.assertTrue(isinstance(client, CurlAsyncHTTPClient)) return client class DigestAuthHandler(RequestHandler): def get(self): realm = 'test' opaque = 'asdf' # Real implementations would use a random nonce. nonce = "1234" username = 'foo' password = 'bar' auth_header = self.request.headers.get('Authorization', None) if auth_header is not None: auth_mode, params = auth_header.split(' ', 1) assert auth_mode == 'Digest' param_dict = {} for pair in params.split(','): k, v = pair.strip().split('=', 1) if v[0] == '"' and v[-1] == '"': v = v[1:-1] param_dict[k] = v assert param_dict['realm'] == realm assert param_dict['opaque'] == opaque assert param_dict['nonce'] == nonce assert param_dict['username'] == username assert param_dict['uri'] == self.request.path h1 = md5(utf8('%s:%s:%s' % (username, realm, password))).hexdigest() h2 = md5(utf8('%s:%s' % (self.request.method, self.request.path))).hexdigest() digest = md5(utf8('%s:%s:%s' % (h1, nonce, h2))).hexdigest() if digest == param_dict['response']: self.write('ok') else: self.write('fail') else: self.set_status(401) self.set_header('WWW-Authenticate', 'Digest realm="%s", nonce="%s", opaque="%s"' % (realm, nonce, opaque)) class CustomReasonHandler(RequestHandler): def get(self): self.set_status(200, "Custom reason") class CustomFailReasonHandler(RequestHandler): def get(self): self.set_status(400, "Custom reason") @unittest.skipIf(pycurl is None, "pycurl module not present") class CurlHTTPClientTestCase(AsyncHTTPTestCase): def setUp(self): super(CurlHTTPClientTestCase, self).setUp() self.http_client = self.create_client() def get_app(self): return Application([ ('/digest', DigestAuthHandler), ('/custom_reason', CustomReasonHandler), ('/custom_fail_reason', CustomFailReasonHandler), ]) def create_client(self, **kwargs): return CurlAsyncHTTPClient(self.io_loop, force_instance=True, defaults=dict(allow_ipv6=False), **kwargs) def test_prepare_curl_callback_stack_context(self): exc_info = [] def error_handler(typ, value, tb): exc_info.append((typ, value, tb)) self.stop() return True with ExceptionStackContext(error_handler): request = HTTPRequest(self.get_url('/'), prepare_curl_callback=lambda curl: 1 / 0) self.http_client.fetch(request, callback=self.stop) self.wait() self.assertEqual(1, len(exc_info)) self.assertIs(exc_info[0][0], ZeroDivisionError) def test_digest_auth(self): response = self.fetch('/digest', auth_mode='digest', auth_username='foo', auth_password='bar') self.assertEqual(response.body, b'ok') def test_custom_reason(self): response = self.fetch('/custom_reason') self.assertEqual(response.reason, "Custom reason") def test_fail_custom_reason(self): response = self.fetch('/custom_fail_reason') self.assertEqual(str(response.error), "HTTP 400: Custom reason") def test_failed_setup(self): self.http_client = self.create_client(max_clients=1) for i in range(5): response = self.fetch(u'/γ¦γγ³γΌγ') self.assertIsNot(response.error, None)
Upload File
Create Folder