import os
import pathlib
import stat
import subprocess
import textwrap
import paramiko
import toml
from typing import Any, Callable, Dict, Iterator, Optional, Tuple, Union
from .servers import servers
PathLike = Union[str, pathlib.Path]
StandardIO = Tuple[3*(paramiko.channel.ChannelFile, )]
indent = lambda s: textwrap.indent(str(s), 4*' ')
class Config:
'''้
็ฝฎไฟกๆฏ
'''
NONE = 0 # None in toml
TODO = '' # todo
data_fields = ('username', 'password', 'private_key_path')
meta_fields = ('rsync', )
def __init__(
self,
server: str, software: str, version: str,
path: PathLike = 'config.toml', encoding: str = 'utf-8',
) -> None:
assertion = lambda s, d: f'Accessible {s}(s): ' + ', '.join(d)
self._keys = server, software, version
assert (server:=servers.get(server)), assertion('server', servers)
assert (software:=server.get(software)), assertion('software', server)
assert (version:=software.get(version)), assertion('version', software)
self._main = version # server-software-version
self._path = pathlib.Path(path)
self._path.touch(exist_ok=True)
self._encoding = encoding
self._data = toml.loads(self._path.read_text(encoding))
# ้
็ฝฎๆไปถ้ป่ฎคๅผ
self._data.setdefault('meta', {f: self.TODO for f in self.meta_fields})
self._data \
.setdefault(self._keys[0], dict()) \
.setdefault(self._keys[1], dict()) \
.setdefault(self._keys[2], {f: self.TODO for f in self.data_fields})
def __getitem__(self, key: str) -> Optional[str]:
value = self.data.get(key)
return value if value!=self.NONE else None
def __str__(self) -> str:
destination = f'{self["username"]}@{self.host}:{self.port}'
return f'<Config>{"::".join(self._keys)} {destination}</Config>'
@property
def data(self) -> Dict[str, Any]:
# data = self._data
# for key in self._keys:
# data = data[key]
# return data
server, software, version = self._keys
return self._data[server][software][version]
@property
def meta(self) -> Dict[str, Any]:
return self._data['meta']
@property
def host(self) -> str:
return self._main.host
@property
def port(self) -> str:
return self._main.port
def input(
self,
input: Callable[[str], str] = input, pattern: str = '({}) >> ',
) -> 'Config':
for d1ct in (self.meta, self.data): # s1mple
for key, value in d1ct.items():
if value == self.TODO:
d1ct[key] = input(pattern.format(key)) or self.NONE
return self.save()
def save(self) -> 'Config':
self._path.write_text(toml.dumps(self._data), self._encoding)
return self
class Remote:
'''่ฟ็จ่ฟๆฅ
'''
def __init__(self, config: Config) -> None:
self._config = config
self._trans = paramiko.Transport((config._main.host, int(config._main.port)))
self._ssh = self._sftp = None
self._path = [pathlib.PureWindowsPath, pathlib.PurePosixPath][config._main.posix]
# ๅๅค็
self._trans.connect(username=config['username'], **self._connect_kwargs())
def __enter__(self) -> 'Remote':
return self
def __exit__(self, *_) -> None: # type, value, traceback
self.close()
def __str__(self) -> str:
return f'<Remote>\n{indent(self._config)}\n</Remote>'
@classmethod
def from_config(cls, *args, **kwargs) -> 'Remote':
'''ๅฆๆ้ๆๅฎ input ็ๅๆฐ๏ผ่ฏทไฝฟ็จๅฎๆดๆ้ ๅฝๆฐ
- Example:
>>> with Remote.from_config('sustech_iydon') as remote:
... pass
>>> with Remote(Config('sustech_iydon')) as remote:
... pass
'''
return cls(Config(*args, **kwargs).input())
@property
def config(self) -> 'Config':
return self._config
@property
def ssh(self) -> paramiko.SSHClient:
if self._ssh is None:
self._ssh = paramiko.SSHClient()
self._ssh._transport = self._trans
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
return self._ssh
@property
def sftp(self) -> paramiko.SFTPClient:
if self._sftp is None:
self._sftp = paramiko.SFTPClient.from_transport(self._trans)
return self._sftp
@property
def Path(self) -> pathlib.Path:
return self._path
def close(self) -> None:
self._ssh and self._ssh.close() # is not None
self._sftp and self._sftp.close()
self._ssh = self._sftp = None
def copy(
self,
local_path: PathLike, remote_path: PathLike, down: bool = True,
) -> None:
'''ๆท่ดๆไปถ
- Argument:
- down: local_path<-remote_path if down else local_path->remote_path
'''
method = getattr(self.sftp, 'get' if down else 'put')
method(localpath=str(local_path), remotepath=str(remote_path))
def exec(self, command: str, encoding: Optional[str] = None) -> Tuple[str, str]:
'''ssh: exec command
'''
_, stdout, stderr = self._exec(command)
encoding = encoding or self._config._encoding
return stdout.read().decode(encoding), stderr.read().decode(encoding)
def exists(self, remote_path_or_directory: PathLike) -> bool:
'''่ฟ็จๆไปถๆๆไปถๅคนๆฏๅฆๅญๅจ
'''
try:
self.sftp.stat(str(remote_path_or_directory))
except OSError:
return False
else:
return True
def mkdir(self, remote_directory: PathLike) -> None:
'''ๆฐๅปบ่ฟ็จๆไปถๅคน
- Note:
- parents=True, exist_ok=True
'''
path = self._path(remote_directory) / 'NULL'
for parent in reversed(path.parents):
try:
self.sftp.mkdir(str(parent))
except OSError:
pass
def stat(self, remote_path_or_directory: PathLike) -> paramiko.SFTPAttributes:
return self.sftp.stat(str(remote_path_or_directory))
def sync(
self,
local_directory: PathLike, remote_directory: PathLike,
down: bool = True, dry_run: bool = False, **_, # verify
):
'''rsync ๅฝไปค
'''
local, remote = pathlib.Path(local_directory), self.Path(remote_directory)
destination = f'{self._config["username"]}@{self._config.host}:{remote}'
# parts of command
rsync = self._config.meta['rsync'].format(port=self._config.port)
options = ['--dry-run'] if dry_run else list()
from_, to = (destination, local) if down else (local, destination)
parts = (rsync, *options, from_, to)
process = subprocess.run(' '.join(map(str, parts)), cwd=os.getcwd(), shell=True)
assert process.returncode==0, repr(parts)
def syncpy(
self,
local_directory: PathLike, remote_directory: PathLike,
down: bool = True, dry_run: bool = False, verify: bool = True, **_,
) -> None:
'''Python ๅฎ็ฐ็ๅๆญฅ็ฎๅฝ
- Note:
- ไป
้่ฟๆ ก้ชๆไปถๅคงๅฐๅคๆญๆฏๅฆไธ่ด๏ผๅฏ่ฝๅ็ปญ่่ mtime๏ผ
- See Also
- `Remote::copy`
'''
copy = (lambda l, r, d: print(l, '><'[d], r)) if dry_run else self.copy
exists, mkdir, order = (
lambda p: p.exists(), lambda d: d.mkdir(0o777, True, True), lambda *x: x,
) if down else (self.exists, self.mkdir, lambda *x: reversed(x))
to_directory, from_directory = order(
pathlib.Path(local_directory), self._path(remote_directory)
)
stat = lambda p, loc: p.stat() if loc else self.stat(p)
# from -> to
directory = to_directory / from_directory.name
for from_path in self._walk(from_directory, not down):
to_path = directory / from_path.relative_to(from_directory)
not dry_run and mkdir(to_path.parent)
if verify and exists(to_path):
s1, s2 = stat(to_path, down), stat(from_path, not down)
if s1.st_size==s2.st_size: # s1.st_mtime==s2.st_mtime
continue
copy(*order(to_path, from_path), down)
def walk(self, root: PathLike) -> Iterator[pathlib.Path]:
'''้ๅๆๅกๅจๆไปถ
- Example:
>>> for path in remote.walk(remote.Path(...)):
... pass
'''
# assert stat.S_ISDIR(self.sftp.stat(str(self._path(root))).st_mode)
yield from self._walk(self._path(root), local=False)
def _connect_kwargs(self) -> Dict[str, Any]:
if (password:=self._config['password']) is not None:
return {'password': password}
if (path:=self._config['private_key_path']) is not None:
return {'pkey': paramiko.RSAKey.from_private_key_file(path)}
return dict()
def _exec(self, command: str) -> StandardIO:
'''ๅบๅฑ paramiko wrapper
'''
return self.ssh.exec_command(command) # stdin, stdout, stderr
def _sync_down(self) -> None:
pass
def _sync_up(self) -> None:
pass
def _walk(self, root: pathlib.Path, local: bool = True) -> Iterator[pathlib.Path]:
if local: # ๆ็ฐๆๆนๆณ๏ผ้ฟๅ
้ๅฝ
for dirname, _, filenames in os.walk(root):
yield from map(pathlib.Path(dirname).__truediv__, filenames)
else:
# assert isinstance(root, self._path)
for attr in self.sftp.listdir_attr(str(root)):
path = root / attr.filename
yield from (
self._walk(path, local=False) if stat.S_ISDIR(attr.st_mode)
else (path, )
)
class Scheduler:
'''ๆๅก่ฐๅบฆๅจ
'''
def __init__(self, config: Config) -> None:
self._remote = Remote(config)
def __enter__(self) -> 'Scheduler':
return self
def __exit__(self, *_) -> None:
self._remote.close()
def __str__(self) -> str:
return f'<Scheduler>\n{indent(self._remote)}\n</Scheduler>'
@classmethod
def from_config(cls, *args, **kwargs) -> 'Scheduler':
'''
- See Also:
- `Remote::from_config`
'''
return cls(Config(*args, **kwargs).input())
@property
def config(self) -> 'Config':
return self._remote._config
@property
def remote(self) -> 'Remote':
return self._remote