003 File Manager
Current Path:
/usr/local/lib/python3.8/site-packages/salt/transport
usr
/
local
/
lib
/
python3.8
/
site-packages
/
salt
/
transport
/
📁
..
📄
__init__.py
(1.42 KB)
📁
__pycache__
📄
client.py
(7.52 KB)
📄
frame.py
(2.76 KB)
📄
ipc.py
(27.31 KB)
📄
local.py
(1.37 KB)
📁
mixins
📄
server.py
(3.15 KB)
📄
tcp.py
(62.49 KB)
📄
zeromq.py
(49.23 KB)
Editing: local.py
import logging import salt.utils.files from salt.transport.client import ReqChannel log = logging.getLogger(__name__) class LocalChannel(ReqChannel): """ Local channel for testing purposes """ def __init__(self, opts, **kwargs): self.opts = opts self.kwargs = kwargs self.tries = 0 def close(self): """ Close the local channel. Currently a NOOP """ def send(self, load, tries=3, timeout=60, raw=False): if self.tries == 0: log.debug("LocalChannel load: %s", load) # data = json.loads(load) # {'path': 'apt-cacher-ng/map.jinja', 'saltenv': 'base', 'cmd': '_serve_file', 'loc': 0} # f = open(data['path']) with salt.utils.files.fopen(load["path"]) as f: ret = { "data": "".join(f.readlines()), "dest": load["path"], } print("returning", ret) else: # end of buffer ret = { "data": None, "dest": None, } self.tries = self.tries + 1 return ret def crypted_transfer_decode_dictentry( self, load, dictkey=None, tries=3, timeout=60 ): super().crypted_transfer_decode_dictentry( load, dictkey=dictkey, tries=tries, timeout=timeout )
Upload File
Create Folder