003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/salt/ext/tornado/test
usr
/
local
/
lib
/
python3.8
/
site-packages
/
salt
/
ext
/
tornado
/
test
/
📁
..
📄
__init__.py
(20 B)
📄
__main__.py
(430 B)
📁
__pycache__
📄
asyncio_test.py
(4.78 KB)
📄
auth_test.py
(22.19 KB)
📄
concurrent_test.py
(13.66 KB)
📄
curl_httpclient_test.py
(4.83 KB)
📄
escape_test.py
(10.9 KB)
📄
gen_test.py
(46.36 KB)
📄
http1connection_test.py
(2.02 KB)
📄
httpclient_test.py
(26.79 KB)
📄
httpserver_test.py
(41.43 KB)
📄
httputil_test.py
(17.17 KB)
📄
import_test.py
(1.73 KB)
📄
ioloop_test.py
(24.55 KB)
📄
iostream_test.py
(42.53 KB)
📄
locale_test.py
(5.96 KB)
📄
locks_test.py
(15.74 KB)
📄
log_test.py
(9.51 KB)
📄
netutil_test.py
(7.6 KB)
📄
options_test.py
(10.83 KB)
📄
process_test.py
(11.44 KB)
📄
queues_test.py
(12.79 KB)
📄
resolve_test_helper.py
(514 B)
📄
routing_test.py
(7.48 KB)
📄
runtests.py
(7.59 KB)
📄
simple_httpclient_test.py
(30.33 KB)
📄
stack_context_test.py
(10.9 KB)
📄
tcpclient_test.py
(11.31 KB)
📄
tcpserver_test.py
(2.32 KB)
📄
template_test.py
(18.63 KB)
📄
testing_test.py
(8.57 KB)
📄
twisted_test.py
(27.33 KB)
📄
util.py
(4.65 KB)
📄
util_test.py
(7.29 KB)
📄
web_test.py
(111.58 KB)
📄
websocket_test.py
(22.37 KB)
📄
windows_test.py
(877 B)
📄
wsgi_test.py
(3.71 KB)
Editing: locks_test.py
# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # pylint: skip-file from __future__ import absolute_import, division, print_function from datetime import timedelta from salt.ext.tornado import gen, locks from salt.ext.tornado.gen import TimeoutError from salt.ext.tornado.testing import gen_test, AsyncTestCase from salt.ext.tornado.test.util import unittest, skipBefore35, exec_test class ConditionTest(AsyncTestCase): def setUp(self): super(ConditionTest, self).setUp() self.history = [] def record_done(self, future, key): """Record the resolution of a Future returned by Condition.wait.""" def callback(_): if not future.result(): # wait() resolved to False, meaning it timed out. self.history.append('timeout') else: self.history.append(key) future.add_done_callback(callback) def test_repr(self): c = locks.Condition() self.assertIn('Condition', repr(c)) self.assertNotIn('waiters', repr(c)) c.wait() self.assertIn('waiters', repr(c)) @gen_test def test_notify(self): c = locks.Condition() self.io_loop.call_later(0.01, c.notify) yield c.wait() def test_notify_1(self): c = locks.Condition() self.record_done(c.wait(), 'wait1') self.record_done(c.wait(), 'wait2') c.notify(1) self.history.append('notify1') c.notify(1) self.history.append('notify2') self.assertEqual(['wait1', 'notify1', 'wait2', 'notify2'], self.history) def test_notify_n(self): c = locks.Condition() for i in range(6): self.record_done(c.wait(), i) c.notify(3) # Callbacks execute in the order they were registered. self.assertEqual(list(range(3)), self.history) c.notify(1) self.assertEqual(list(range(4)), self.history) c.notify(2) self.assertEqual(list(range(6)), self.history) def test_notify_all(self): c = locks.Condition() for i in range(4): self.record_done(c.wait(), i) c.notify_all() self.history.append('notify_all') # Callbacks execute in the order they were registered. self.assertEqual( list(range(4)) + ['notify_all'], self.history) @gen_test def test_wait_timeout(self): c = locks.Condition() wait = c.wait(timedelta(seconds=0.01)) self.io_loop.call_later(0.02, c.notify) # Too late. yield gen.sleep(0.03) self.assertFalse((yield wait)) @gen_test def test_wait_timeout_preempted(self): c = locks.Condition() # This fires before the wait times out. self.io_loop.call_later(0.01, c.notify) wait = c.wait(timedelta(seconds=0.02)) yield gen.sleep(0.03) yield wait # No TimeoutError. @gen_test def test_notify_n_with_timeout(self): # Register callbacks 0, 1, 2, and 3. Callback 1 has a timeout. # Wait for that timeout to expire, then do notify(2) and make # sure everyone runs. Verifies that a timed-out callback does # not count against the 'n' argument to notify(). c = locks.Condition() self.record_done(c.wait(), 0) self.record_done(c.wait(timedelta(seconds=0.01)), 1) self.record_done(c.wait(), 2) self.record_done(c.wait(), 3) # Wait for callback 1 to time out. yield gen.sleep(0.02) self.assertEqual(['timeout'], self.history) c.notify(2) yield gen.sleep(0.01) self.assertEqual(['timeout', 0, 2], self.history) self.assertEqual(['timeout', 0, 2], self.history) c.notify() self.assertEqual(['timeout', 0, 2, 3], self.history) @gen_test def test_notify_all_with_timeout(self): c = locks.Condition() self.record_done(c.wait(), 0) self.record_done(c.wait(timedelta(seconds=0.01)), 1) self.record_done(c.wait(), 2) # Wait for callback 1 to time out. yield gen.sleep(0.02) self.assertEqual(['timeout'], self.history) c.notify_all() self.assertEqual(['timeout', 0, 2], self.history) @gen_test def test_nested_notify(self): # Ensure no notifications lost, even if notify() is reentered by a # waiter calling notify(). c = locks.Condition() # Three waiters. futures = [c.wait() for _ in range(3)] # First and second futures resolved. Second future reenters notify(), # resolving third future. futures[1].add_done_callback(lambda _: c.notify()) c.notify(2) self.assertTrue(all(f.done() for f in futures)) @gen_test def test_garbage_collection(self): # Test that timed-out waiters are occasionally cleaned from the queue. c = locks.Condition() for _ in range(101): c.wait(timedelta(seconds=0.01)) future = c.wait() self.assertEqual(102, len(c._waiters)) # Let first 101 waiters time out, triggering a collection. yield gen.sleep(0.02) self.assertEqual(1, len(c._waiters)) # Final waiter is still active. self.assertFalse(future.done()) c.notify() self.assertTrue(future.done()) class EventTest(AsyncTestCase): def test_repr(self): event = locks.Event() self.assertTrue('clear' in str(event)) self.assertFalse('set' in str(event)) event.set() self.assertFalse('clear' in str(event)) self.assertTrue('set' in str(event)) def test_event(self): e = locks.Event() future_0 = e.wait() e.set() future_1 = e.wait() e.clear() future_2 = e.wait() self.assertTrue(future_0.done()) self.assertTrue(future_1.done()) self.assertFalse(future_2.done()) @gen_test def test_event_timeout(self): e = locks.Event() with self.assertRaises(TimeoutError): yield e.wait(timedelta(seconds=0.01)) # After a timed-out waiter, normal operation works. self.io_loop.add_timeout(timedelta(seconds=0.01), e.set) yield e.wait(timedelta(seconds=1)) def test_event_set_multiple(self): e = locks.Event() e.set() e.set() self.assertTrue(e.is_set()) def test_event_wait_clear(self): e = locks.Event() f0 = e.wait() e.clear() f1 = e.wait() e.set() self.assertTrue(f0.done()) self.assertTrue(f1.done()) class SemaphoreTest(AsyncTestCase): def test_negative_value(self): self.assertRaises(ValueError, locks.Semaphore, value=-1) def test_repr(self): sem = locks.Semaphore() self.assertIn('Semaphore', repr(sem)) self.assertIn('unlocked,value:1', repr(sem)) sem.acquire() self.assertIn('locked', repr(sem)) self.assertNotIn('waiters', repr(sem)) sem.acquire() self.assertIn('waiters', repr(sem)) def test_acquire(self): sem = locks.Semaphore() f0 = sem.acquire() self.assertTrue(f0.done()) # Wait for release(). f1 = sem.acquire() self.assertFalse(f1.done()) f2 = sem.acquire() sem.release() self.assertTrue(f1.done()) self.assertFalse(f2.done()) sem.release() self.assertTrue(f2.done()) sem.release() # Now acquire() is instant. self.assertTrue(sem.acquire().done()) self.assertEqual(0, len(sem._waiters)) @gen_test def test_acquire_timeout(self): sem = locks.Semaphore(2) yield sem.acquire() yield sem.acquire() acquire = sem.acquire(timedelta(seconds=0.01)) self.io_loop.call_later(0.02, sem.release) # Too late. yield gen.sleep(0.3) with self.assertRaises(gen.TimeoutError): yield acquire sem.acquire() f = sem.acquire() self.assertFalse(f.done()) sem.release() self.assertTrue(f.done()) @gen_test def test_acquire_timeout_preempted(self): sem = locks.Semaphore(1) yield sem.acquire() # This fires before the wait times out. self.io_loop.call_later(0.01, sem.release) acquire = sem.acquire(timedelta(seconds=0.02)) yield gen.sleep(0.03) yield acquire # No TimeoutError. def test_release_unacquired(self): # Unbounded releases are allowed, and increment the semaphore's value. sem = locks.Semaphore() sem.release() sem.release() # Now the counter is 3. We can acquire three times before blocking. self.assertTrue(sem.acquire().done()) self.assertTrue(sem.acquire().done()) self.assertTrue(sem.acquire().done()) self.assertFalse(sem.acquire().done()) @gen_test def test_garbage_collection(self): # Test that timed-out waiters are occasionally cleaned from the queue. sem = locks.Semaphore(value=0) futures = [sem.acquire(timedelta(seconds=0.01)) for _ in range(101)] future = sem.acquire() self.assertEqual(102, len(sem._waiters)) # Let first 101 waiters time out, triggering a collection. yield gen.sleep(0.02) self.assertEqual(1, len(sem._waiters)) # Final waiter is still active. self.assertFalse(future.done()) sem.release() self.assertTrue(future.done()) # Prevent "Future exception was never retrieved" messages. for future in futures: self.assertRaises(TimeoutError, future.result) class SemaphoreContextManagerTest(AsyncTestCase): @gen_test def test_context_manager(self): sem = locks.Semaphore() with (yield sem.acquire()) as yielded: self.assertTrue(yielded is None) # Semaphore was released and can be acquired again. self.assertTrue(sem.acquire().done()) @skipBefore35 @gen_test def test_context_manager_async_await(self): # Repeat the above test using 'async with'. sem = locks.Semaphore() namespace = exec_test(globals(), locals(), """ async def f(): async with sem as yielded: self.assertTrue(yielded is None) """) yield namespace['f']() # Semaphore was released and can be acquired again. self.assertTrue(sem.acquire().done()) @gen_test def test_context_manager_exception(self): sem = locks.Semaphore() with self.assertRaises(ZeroDivisionError): with (yield sem.acquire()): 1 / 0 # Semaphore was released and can be acquired again. self.assertTrue(sem.acquire().done()) @gen_test def test_context_manager_timeout(self): sem = locks.Semaphore() with (yield sem.acquire(timedelta(seconds=0.01))): pass # Semaphore was released and can be acquired again. self.assertTrue(sem.acquire().done()) @gen_test def test_context_manager_timeout_error(self): sem = locks.Semaphore(value=0) with self.assertRaises(gen.TimeoutError): with (yield sem.acquire(timedelta(seconds=0.01))): pass # Counter is still 0. self.assertFalse(sem.acquire().done()) @gen_test def test_context_manager_contended(self): sem = locks.Semaphore() history = [] @gen.coroutine def f(index): with (yield sem.acquire()): history.append('acquired %d' % index) yield gen.sleep(0.01) history.append('release %d' % index) yield [f(i) for i in range(2)] expected_history = [] for i in range(2): expected_history.extend(['acquired %d' % i, 'release %d' % i]) self.assertEqual(expected_history, history) @gen_test def test_yield_sem(self): # Ensure we catch a "with (yield sem)", which should be # "with (yield sem.acquire())". with self.assertRaises(gen.BadYieldError): with (yield locks.Semaphore()): pass def test_context_manager_misuse(self): # Ensure we catch a "with sem", which should be # "with (yield sem.acquire())". with self.assertRaises(RuntimeError): with locks.Semaphore(): pass class BoundedSemaphoreTest(AsyncTestCase): def test_release_unacquired(self): sem = locks.BoundedSemaphore() self.assertRaises(ValueError, sem.release) # Value is 0. sem.acquire() # Block on acquire(). future = sem.acquire() self.assertFalse(future.done()) sem.release() self.assertTrue(future.done()) # Value is 1. sem.release() self.assertRaises(ValueError, sem.release) class LockTests(AsyncTestCase): def test_repr(self): lock = locks.Lock() # No errors. repr(lock) lock.acquire() repr(lock) def test_acquire_release(self): lock = locks.Lock() self.assertTrue(lock.acquire().done()) future = lock.acquire() self.assertFalse(future.done()) lock.release() self.assertTrue(future.done()) @gen_test def test_acquire_fifo(self): lock = locks.Lock() self.assertTrue(lock.acquire().done()) N = 5 history = [] @gen.coroutine def f(idx): with (yield lock.acquire()): history.append(idx) futures = [f(i) for i in range(N)] self.assertFalse(any(future.done() for future in futures)) lock.release() yield futures self.assertEqual(list(range(N)), history) @skipBefore35 @gen_test def test_acquire_fifo_async_with(self): # Repeat the above test using `async with lock:` # instead of `with (yield lock.acquire()):`. lock = locks.Lock() self.assertTrue(lock.acquire().done()) N = 5 history = [] namespace = exec_test(globals(), locals(), """ async def f(idx): async with lock: history.append(idx) """) futures = [namespace['f'](i) for i in range(N)] lock.release() yield futures self.assertEqual(list(range(N)), history) @gen_test def test_acquire_timeout(self): lock = locks.Lock() lock.acquire() with self.assertRaises(gen.TimeoutError): yield lock.acquire(timeout=timedelta(seconds=0.01)) # Still locked. self.assertFalse(lock.acquire().done()) def test_multi_release(self): lock = locks.Lock() self.assertRaises(RuntimeError, lock.release) lock.acquire() lock.release() self.assertRaises(RuntimeError, lock.release) @gen_test def test_yield_lock(self): # Ensure we catch a "with (yield lock)", which should be # "with (yield lock.acquire())". with self.assertRaises(gen.BadYieldError): with (yield locks.Lock()): pass def test_context_manager_misuse(self): # Ensure we catch a "with lock", which should be # "with (yield lock.acquire())". with self.assertRaises(RuntimeError): with locks.Lock(): pass if __name__ == '__main__': unittest.main()
Upload File
Create Folder