003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/salt/ext/tornado/platform
usr
/
local
/
lib
/
python3.8
/
site-packages
/
salt
/
ext
/
tornado
/
platform
/
📁
..
📄
__init__.py
(20 B)
📁
__pycache__
📄
asyncio.py
(7.84 KB)
📄
auto.py
(1.92 KB)
📄
caresresolver.py
(3.06 KB)
📄
common.py
(3.93 KB)
📄
epoll.py
(947 B)
📄
interface.py
(2.25 KB)
📄
kqueue.py
(3.36 KB)
📄
posix.py
(1.86 KB)
📄
select.py
(2.58 KB)
📄
twisted.py
(21.71 KB)
📄
windows.py
(713 B)
Editing: select.py
#!/usr/bin/env python # # Copyright 2012 Facebook # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Select-based IOLoop implementation. Used as a fallback for systems that don't support epoll or kqueue. """ # pylint: skip-file from __future__ import absolute_import, division, print_function import select from salt.ext.tornado.ioloop import IOLoop, PollIOLoop class _Select(object): """A simple, select()-based IOLoop implementation for non-Linux systems""" def __init__(self): self.read_fds = set() self.write_fds = set() self.error_fds = set() self.fd_sets = (self.read_fds, self.write_fds, self.error_fds) def close(self): pass def register(self, fd, events): if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds: raise IOError("fd %s already registered" % fd) if events & IOLoop.READ: self.read_fds.add(fd) if events & IOLoop.WRITE: self.write_fds.add(fd) if events & IOLoop.ERROR: self.error_fds.add(fd) # Closed connections are reported as errors by epoll and kqueue, # but as zero-byte reads by select, so when errors are requested # we need to listen for both read and error. # self.read_fds.add(fd) def modify(self, fd, events): self.unregister(fd) self.register(fd, events) def unregister(self, fd): self.read_fds.discard(fd) self.write_fds.discard(fd) self.error_fds.discard(fd) def poll(self, timeout): readable, writeable, errors = select.select( self.read_fds, self.write_fds, self.error_fds, timeout) events = {} for fd in readable: events[fd] = events.get(fd, 0) | IOLoop.READ for fd in writeable: events[fd] = events.get(fd, 0) | IOLoop.WRITE for fd in errors: events[fd] = events.get(fd, 0) | IOLoop.ERROR return events.items() class SelectIOLoop(PollIOLoop): def initialize(self, **kwargs): super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)
Upload File
Create Folder